From e777dbeba28c53757aee23c665bb386fd946b82b Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Fri, 29 Aug 2025 09:32:37 -0600 Subject: [PATCH] feat: added librespotConcurrency, which determines the threadpool of librespot api processes --- routes/system/progress.py | 38 ++++++--- routes/utils/celery_config.py | 1 + routes/utils/get_info.py | 10 ++- routes/utils/track.py | 1 - .../src/components/config/ServerTab.tsx | 77 +++++++++++++++++++ .../src/contexts/SettingsProvider.tsx | 3 + spotizerr-ui/src/contexts/settings-context.ts | 1 + 7 files changed, 119 insertions(+), 12 deletions(-) diff --git a/routes/system/progress.py b/routes/system/progress.py index 113dcc6..6001762 100755 --- a/routes/system/progress.py +++ b/routes/system/progress.py @@ -59,7 +59,15 @@ class SSEBroadcaster: return # Add global task counts right before broadcasting - this is the single source of truth - enhanced_event_data = add_global_task_counts_to_event(event_data.copy()) + # Skip expensive count recomputation for high-frequency callback/progress updates + try: + trigger_reason = event_data.get("trigger_reason") + except Exception: + trigger_reason = None + if trigger_reason and trigger_reason in {"callback_update", "progress_update"}: + enhanced_event_data = event_data.copy() + else: + enhanced_event_data = add_global_task_counts_to_event(event_data.copy()) event_json = json.dumps(enhanced_event_data) sse_data = f"data: {event_json}\n\n" @@ -133,14 +141,26 @@ def start_sse_redis_subscriber(): ) elif event_type == "summary_update": # Task summary update - use standardized trigger - loop.run_until_complete( - trigger_sse_update( - task_id, event_data.get("reason", "update") + # Short-circuit if task no longer exists to avoid expensive processing + try: + if not get_task_info(task_id): + logger.debug( + f"SSE Redis Subscriber: summary_update for missing task {task_id}, skipping" + ) + else: + loop.run_until_complete( + trigger_sse_update( + task_id, event_data.get("reason", "update") + ) + ) + logger.debug( + f"SSE Redis Subscriber: Processed summary update for {task_id}" + ) + except Exception as _e: + logger.error( + f"SSE Redis Subscriber: Error handling summary_update for {task_id}: {_e}", + exc_info=True, ) - ) - logger.debug( - f"SSE Redis Subscriber: Processed summary update for {task_id}" - ) else: # Unknown event type - attempt to standardize and broadcast standardized = standardize_incoming_event(event_data) @@ -304,7 +324,7 @@ async def trigger_sse_update(task_id: str, reason: str = "task_update"): # Find the specific task that changed task_info = get_task_info(task_id) if not task_info: - logger.warning(f"SSE: Task {task_id} not found for update") + logger.debug(f"SSE: Task {task_id} not found for update") return last_status = get_last_task_status(task_id) diff --git a/routes/utils/celery_config.py b/routes/utils/celery_config.py index b93dce3..9a2cc86 100644 --- a/routes/utils/celery_config.py +++ b/routes/utils/celery_config.py @@ -41,6 +41,7 @@ DEFAULT_MAIN_CONFIG = { "saveCover": True, "maxConcurrentDownloads": 3, "utilityConcurrency": 1, + "librespotConcurrency": 2, "maxRetries": 3, "retryDelaySeconds": 5, "retryDelayIncrease": 5, diff --git a/routes/utils/get_info.py b/routes/utils/get_info.py index 99d04e0..24e9ed0 100644 --- a/routes/utils/get_info.py +++ b/routes/utils/get_info.py @@ -44,7 +44,11 @@ def get_client() -> LibrespotClient: _shared_client.close() except Exception: pass - _shared_client = LibrespotClient(stored_credentials_path=desired_blob) + cfg = get_config_params() or {} + max_workers = int(cfg.get("librespotConcurrency", 2) or 2) + _shared_client = LibrespotClient( + stored_credentials_path=desired_blob, max_workers=max_workers + ) _shared_blob_path = desired_blob return _shared_client @@ -59,7 +63,9 @@ def create_client(credentials_path: str) -> LibrespotClient: abs_path = os.path.abspath(credentials_path) if not os.path.isfile(abs_path): raise FileNotFoundError(f"Credentials file not found: {abs_path}") - return LibrespotClient(stored_credentials_path=abs_path) + cfg = get_config_params() or {} + max_workers = int(cfg.get("librespotConcurrency", 2) or 2) + return LibrespotClient(stored_credentials_path=abs_path, max_workers=max_workers) def close_client(client: LibrespotClient) -> None: diff --git a/routes/utils/track.py b/routes/utils/track.py index 76156fe..68b7569 100755 --- a/routes/utils/track.py +++ b/routes/utils/track.py @@ -171,7 +171,6 @@ def download_track( convert_to=convert_to, bitrate=bitrate, artist_separator=artist_separator, - spotify_metadata=spotify_metadata, pad_number_width=pad_number_width, ) print( diff --git a/spotizerr-ui/src/components/config/ServerTab.tsx b/spotizerr-ui/src/components/config/ServerTab.tsx index 37e2e17..c12732d 100644 --- a/spotizerr-ui/src/components/config/ServerTab.tsx +++ b/spotizerr-ui/src/components/config/ServerTab.tsx @@ -179,6 +179,77 @@ function UtilityConcurrencyForm() { ); } +function LibrespotConcurrencyForm() { + const queryClient = useQueryClient(); + const { data: configData, isLoading } = useQuery({ + queryKey: ["config"], + queryFn: () => authApiClient.getConfig(), + }); + + const { register, handleSubmit, reset, formState: { isDirty } } = useForm<{ librespotConcurrency: number }>(); + + useEffect(() => { + if (configData) { + reset({ librespotConcurrency: Number(configData.librespotConcurrency ?? 2) }); + } + }, [configData, reset]); + + const mutation = useMutation({ + mutationFn: (payload: { librespotConcurrency: number }) => authApiClient.updateConfig(payload), + onSuccess: () => { + toast.success("Librespot concurrency saved!"); + queryClient.invalidateQueries({ queryKey: ["config"] }); + }, + onError: (e) => { + toast.error(`Failed to save: ${(e as any).message}`); + }, + }); + + const onSubmit = (values: { librespotConcurrency: number }) => { + const raw = Number(values.librespotConcurrency || 2); + const safe = Math.max(1, Math.min(16, raw)); + mutation.mutate({ librespotConcurrency: safe }); + }; + + if (isLoading) return

Loading server settings...

; + + return ( +
+
+
+ +
+
+ +
+ + +

Controls worker threads used by the Librespot client. 1–16 is recommended.

+
+
+ ); +} + // --- Components --- function WebhookForm() { const queryClient = useQueryClient(); @@ -299,6 +370,12 @@ export function ServerTab() {
+
+

Librespot

+

Adjust Librespot client worker threads.

+ +
+

Webhooks

diff --git a/spotizerr-ui/src/contexts/SettingsProvider.tsx b/spotizerr-ui/src/contexts/SettingsProvider.tsx index 4f45ab3..8b42e56 100644 --- a/spotizerr-ui/src/contexts/SettingsProvider.tsx +++ b/spotizerr-ui/src/contexts/SettingsProvider.tsx @@ -33,6 +33,7 @@ export type FlatAppSettings = { deezerQuality: "MP3_128" | "MP3_320" | "FLAC"; maxConcurrentDownloads: number; utilityConcurrency: number; + librespotConcurrency: number; realTime: boolean; fallback: boolean; convertTo: "MP3" | "AAC" | "OGG" | "OPUS" | "FLAC" | "WAV" | "ALAC" | ""; @@ -74,6 +75,7 @@ const defaultSettings: FlatAppSettings = { deezerQuality: "MP3_128", maxConcurrentDownloads: 3, utilityConcurrency: 1, + librespotConcurrency: 2, realTime: false, fallback: false, convertTo: "", @@ -138,6 +140,7 @@ const fetchSettings = async (): Promise => { recursiveQuality: Boolean((camelData as any).recursiveQuality ?? false), realTimeMultiplier: Number((camelData as any).realTimeMultiplier ?? 0), utilityConcurrency: Number((camelData as any).utilityConcurrency ?? 1), + librespotConcurrency: Number((camelData as any).librespotConcurrency ?? 2), // Ensure watch subkeys default if missing watch: { ...(camelData.watch as any), diff --git a/spotizerr-ui/src/contexts/settings-context.ts b/spotizerr-ui/src/contexts/settings-context.ts index ee3ec6b..2dae240 100644 --- a/spotizerr-ui/src/contexts/settings-context.ts +++ b/spotizerr-ui/src/contexts/settings-context.ts @@ -9,6 +9,7 @@ export interface AppSettings { deezerQuality: "MP3_128" | "MP3_320" | "FLAC"; maxConcurrentDownloads: number; utilityConcurrency: number; + librespotConcurrency: number; realTime: boolean; fallback: boolean; convertTo: "MP3" | "AAC" | "OGG" | "OPUS" | "FLAC" | "WAV" | "ALAC" | "";