feat: added librespotConcurrency, which determines the threadpool of librespot api processes
This commit is contained in:
@@ -59,7 +59,15 @@ class SSEBroadcaster:
|
|||||||
return
|
return
|
||||||
|
|
||||||
# Add global task counts right before broadcasting - this is the single source of truth
|
# Add global task counts right before broadcasting - this is the single source of truth
|
||||||
enhanced_event_data = add_global_task_counts_to_event(event_data.copy())
|
# Skip expensive count recomputation for high-frequency callback/progress updates
|
||||||
|
try:
|
||||||
|
trigger_reason = event_data.get("trigger_reason")
|
||||||
|
except Exception:
|
||||||
|
trigger_reason = None
|
||||||
|
if trigger_reason and trigger_reason in {"callback_update", "progress_update"}:
|
||||||
|
enhanced_event_data = event_data.copy()
|
||||||
|
else:
|
||||||
|
enhanced_event_data = add_global_task_counts_to_event(event_data.copy())
|
||||||
event_json = json.dumps(enhanced_event_data)
|
event_json = json.dumps(enhanced_event_data)
|
||||||
sse_data = f"data: {event_json}\n\n"
|
sse_data = f"data: {event_json}\n\n"
|
||||||
|
|
||||||
@@ -133,14 +141,26 @@ def start_sse_redis_subscriber():
|
|||||||
)
|
)
|
||||||
elif event_type == "summary_update":
|
elif event_type == "summary_update":
|
||||||
# Task summary update - use standardized trigger
|
# Task summary update - use standardized trigger
|
||||||
loop.run_until_complete(
|
# Short-circuit if task no longer exists to avoid expensive processing
|
||||||
trigger_sse_update(
|
try:
|
||||||
task_id, event_data.get("reason", "update")
|
if not get_task_info(task_id):
|
||||||
|
logger.debug(
|
||||||
|
f"SSE Redis Subscriber: summary_update for missing task {task_id}, skipping"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
loop.run_until_complete(
|
||||||
|
trigger_sse_update(
|
||||||
|
task_id, event_data.get("reason", "update")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
f"SSE Redis Subscriber: Processed summary update for {task_id}"
|
||||||
|
)
|
||||||
|
except Exception as _e:
|
||||||
|
logger.error(
|
||||||
|
f"SSE Redis Subscriber: Error handling summary_update for {task_id}: {_e}",
|
||||||
|
exc_info=True,
|
||||||
)
|
)
|
||||||
)
|
|
||||||
logger.debug(
|
|
||||||
f"SSE Redis Subscriber: Processed summary update for {task_id}"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
# Unknown event type - attempt to standardize and broadcast
|
# Unknown event type - attempt to standardize and broadcast
|
||||||
standardized = standardize_incoming_event(event_data)
|
standardized = standardize_incoming_event(event_data)
|
||||||
@@ -304,7 +324,7 @@ async def trigger_sse_update(task_id: str, reason: str = "task_update"):
|
|||||||
# Find the specific task that changed
|
# Find the specific task that changed
|
||||||
task_info = get_task_info(task_id)
|
task_info = get_task_info(task_id)
|
||||||
if not task_info:
|
if not task_info:
|
||||||
logger.warning(f"SSE: Task {task_id} not found for update")
|
logger.debug(f"SSE: Task {task_id} not found for update")
|
||||||
return
|
return
|
||||||
|
|
||||||
last_status = get_last_task_status(task_id)
|
last_status = get_last_task_status(task_id)
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ DEFAULT_MAIN_CONFIG = {
|
|||||||
"saveCover": True,
|
"saveCover": True,
|
||||||
"maxConcurrentDownloads": 3,
|
"maxConcurrentDownloads": 3,
|
||||||
"utilityConcurrency": 1,
|
"utilityConcurrency": 1,
|
||||||
|
"librespotConcurrency": 2,
|
||||||
"maxRetries": 3,
|
"maxRetries": 3,
|
||||||
"retryDelaySeconds": 5,
|
"retryDelaySeconds": 5,
|
||||||
"retryDelayIncrease": 5,
|
"retryDelayIncrease": 5,
|
||||||
|
|||||||
@@ -44,7 +44,11 @@ def get_client() -> LibrespotClient:
|
|||||||
_shared_client.close()
|
_shared_client.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
_shared_client = LibrespotClient(stored_credentials_path=desired_blob)
|
cfg = get_config_params() or {}
|
||||||
|
max_workers = int(cfg.get("librespotConcurrency", 2) or 2)
|
||||||
|
_shared_client = LibrespotClient(
|
||||||
|
stored_credentials_path=desired_blob, max_workers=max_workers
|
||||||
|
)
|
||||||
_shared_blob_path = desired_blob
|
_shared_blob_path = desired_blob
|
||||||
return _shared_client
|
return _shared_client
|
||||||
|
|
||||||
@@ -59,7 +63,9 @@ def create_client(credentials_path: str) -> LibrespotClient:
|
|||||||
abs_path = os.path.abspath(credentials_path)
|
abs_path = os.path.abspath(credentials_path)
|
||||||
if not os.path.isfile(abs_path):
|
if not os.path.isfile(abs_path):
|
||||||
raise FileNotFoundError(f"Credentials file not found: {abs_path}")
|
raise FileNotFoundError(f"Credentials file not found: {abs_path}")
|
||||||
return LibrespotClient(stored_credentials_path=abs_path)
|
cfg = get_config_params() or {}
|
||||||
|
max_workers = int(cfg.get("librespotConcurrency", 2) or 2)
|
||||||
|
return LibrespotClient(stored_credentials_path=abs_path, max_workers=max_workers)
|
||||||
|
|
||||||
|
|
||||||
def close_client(client: LibrespotClient) -> None:
|
def close_client(client: LibrespotClient) -> None:
|
||||||
|
|||||||
@@ -171,7 +171,6 @@ def download_track(
|
|||||||
convert_to=convert_to,
|
convert_to=convert_to,
|
||||||
bitrate=bitrate,
|
bitrate=bitrate,
|
||||||
artist_separator=artist_separator,
|
artist_separator=artist_separator,
|
||||||
spotify_metadata=spotify_metadata,
|
|
||||||
pad_number_width=pad_number_width,
|
pad_number_width=pad_number_width,
|
||||||
)
|
)
|
||||||
print(
|
print(
|
||||||
|
|||||||
@@ -179,6 +179,77 @@ function UtilityConcurrencyForm() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function LibrespotConcurrencyForm() {
|
||||||
|
const queryClient = useQueryClient();
|
||||||
|
const { data: configData, isLoading } = useQuery({
|
||||||
|
queryKey: ["config"],
|
||||||
|
queryFn: () => authApiClient.getConfig<any>(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const { register, handleSubmit, reset, formState: { isDirty } } = useForm<{ librespotConcurrency: number }>();
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
if (configData) {
|
||||||
|
reset({ librespotConcurrency: Number(configData.librespotConcurrency ?? 2) });
|
||||||
|
}
|
||||||
|
}, [configData, reset]);
|
||||||
|
|
||||||
|
const mutation = useMutation({
|
||||||
|
mutationFn: (payload: { librespotConcurrency: number }) => authApiClient.updateConfig(payload),
|
||||||
|
onSuccess: () => {
|
||||||
|
toast.success("Librespot concurrency saved!");
|
||||||
|
queryClient.invalidateQueries({ queryKey: ["config"] });
|
||||||
|
},
|
||||||
|
onError: (e) => {
|
||||||
|
toast.error(`Failed to save: ${(e as any).message}`);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const onSubmit = (values: { librespotConcurrency: number }) => {
|
||||||
|
const raw = Number(values.librespotConcurrency || 2);
|
||||||
|
const safe = Math.max(1, Math.min(16, raw));
|
||||||
|
mutation.mutate({ librespotConcurrency: safe });
|
||||||
|
};
|
||||||
|
|
||||||
|
if (isLoading) return <p className="text-content-muted dark:text-content-muted-dark">Loading server settings...</p>;
|
||||||
|
|
||||||
|
return (
|
||||||
|
<form onSubmit={handleSubmit(onSubmit)} className="space-y-4">
|
||||||
|
<div className="flex items-center justify-end mb-2">
|
||||||
|
<div className="flex items-center gap-3">
|
||||||
|
<button
|
||||||
|
type="submit"
|
||||||
|
disabled={mutation.isPending || !isDirty}
|
||||||
|
className="px-4 py-2 bg-button-primary hover:bg-button-primary-hover text-button-primary-text rounded-md disabled:opacity-50"
|
||||||
|
title="Save Librespot Concurrency"
|
||||||
|
>
|
||||||
|
{mutation.isPending ? (
|
||||||
|
<img src="/spinner.svg" alt="Saving" className="w-5 h-5 animate-spin logo" />
|
||||||
|
) : (
|
||||||
|
<img src="/save.svg" alt="Save" className="w-5 h-5 logo" />
|
||||||
|
)}
|
||||||
|
</button>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div className="flex flex-col gap-2">
|
||||||
|
<label htmlFor="librespotConcurrency" className="text-content-primary dark:text-content-primary-dark">Librespot Concurrency</label>
|
||||||
|
<input
|
||||||
|
id="librespotConcurrency"
|
||||||
|
type="number"
|
||||||
|
min={1}
|
||||||
|
max={16}
|
||||||
|
step={1}
|
||||||
|
{...register("librespotConcurrency", { valueAsNumber: true })}
|
||||||
|
className="block w-full p-2 border bg-input-background dark:bg-input-background-dark border-input-border dark:border-input-border-dark rounded-md focus:outline-none focus:ring-2 focus:ring-input-focus"
|
||||||
|
placeholder="2"
|
||||||
|
/>
|
||||||
|
<p className="text-xs text-content-secondary dark:text-content-secondary-dark">Controls worker threads used by the Librespot client. 1–16 is recommended.</p>
|
||||||
|
</div>
|
||||||
|
</form>
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// --- Components ---
|
// --- Components ---
|
||||||
function WebhookForm() {
|
function WebhookForm() {
|
||||||
const queryClient = useQueryClient();
|
const queryClient = useQueryClient();
|
||||||
@@ -299,6 +370,12 @@ export function ServerTab() {
|
|||||||
<UtilityConcurrencyForm />
|
<UtilityConcurrencyForm />
|
||||||
</div>
|
</div>
|
||||||
<hr className="border-border dark:border-border-dark" />
|
<hr className="border-border dark:border-border-dark" />
|
||||||
|
<div>
|
||||||
|
<h3 className="text-xl font-semibold text-content-primary dark:text-content-primary-dark">Librespot</h3>
|
||||||
|
<p className="text-sm text-content-muted dark:text-content-muted-dark mt-1">Adjust Librespot client worker threads.</p>
|
||||||
|
<LibrespotConcurrencyForm />
|
||||||
|
</div>
|
||||||
|
<hr className="border-border dark:border-border-dark" />
|
||||||
<div>
|
<div>
|
||||||
<h3 className="text-xl font-semibold text-content-primary dark:text-content-primary-dark">Webhooks</h3>
|
<h3 className="text-xl font-semibold text-content-primary dark:text-content-primary-dark">Webhooks</h3>
|
||||||
<p className="text-sm text-content-muted dark:text-content-muted-dark mt-1">
|
<p className="text-sm text-content-muted dark:text-content-muted-dark mt-1">
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ export type FlatAppSettings = {
|
|||||||
deezerQuality: "MP3_128" | "MP3_320" | "FLAC";
|
deezerQuality: "MP3_128" | "MP3_320" | "FLAC";
|
||||||
maxConcurrentDownloads: number;
|
maxConcurrentDownloads: number;
|
||||||
utilityConcurrency: number;
|
utilityConcurrency: number;
|
||||||
|
librespotConcurrency: number;
|
||||||
realTime: boolean;
|
realTime: boolean;
|
||||||
fallback: boolean;
|
fallback: boolean;
|
||||||
convertTo: "MP3" | "AAC" | "OGG" | "OPUS" | "FLAC" | "WAV" | "ALAC" | "";
|
convertTo: "MP3" | "AAC" | "OGG" | "OPUS" | "FLAC" | "WAV" | "ALAC" | "";
|
||||||
@@ -74,6 +75,7 @@ const defaultSettings: FlatAppSettings = {
|
|||||||
deezerQuality: "MP3_128",
|
deezerQuality: "MP3_128",
|
||||||
maxConcurrentDownloads: 3,
|
maxConcurrentDownloads: 3,
|
||||||
utilityConcurrency: 1,
|
utilityConcurrency: 1,
|
||||||
|
librespotConcurrency: 2,
|
||||||
realTime: false,
|
realTime: false,
|
||||||
fallback: false,
|
fallback: false,
|
||||||
convertTo: "",
|
convertTo: "",
|
||||||
@@ -138,6 +140,7 @@ const fetchSettings = async (): Promise<FlatAppSettings> => {
|
|||||||
recursiveQuality: Boolean((camelData as any).recursiveQuality ?? false),
|
recursiveQuality: Boolean((camelData as any).recursiveQuality ?? false),
|
||||||
realTimeMultiplier: Number((camelData as any).realTimeMultiplier ?? 0),
|
realTimeMultiplier: Number((camelData as any).realTimeMultiplier ?? 0),
|
||||||
utilityConcurrency: Number((camelData as any).utilityConcurrency ?? 1),
|
utilityConcurrency: Number((camelData as any).utilityConcurrency ?? 1),
|
||||||
|
librespotConcurrency: Number((camelData as any).librespotConcurrency ?? 2),
|
||||||
// Ensure watch subkeys default if missing
|
// Ensure watch subkeys default if missing
|
||||||
watch: {
|
watch: {
|
||||||
...(camelData.watch as any),
|
...(camelData.watch as any),
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ export interface AppSettings {
|
|||||||
deezerQuality: "MP3_128" | "MP3_320" | "FLAC";
|
deezerQuality: "MP3_128" | "MP3_320" | "FLAC";
|
||||||
maxConcurrentDownloads: number;
|
maxConcurrentDownloads: number;
|
||||||
utilityConcurrency: number;
|
utilityConcurrency: number;
|
||||||
|
librespotConcurrency: number;
|
||||||
realTime: boolean;
|
realTime: boolean;
|
||||||
fallback: boolean;
|
fallback: boolean;
|
||||||
convertTo: "MP3" | "AAC" | "OGG" | "OPUS" | "FLAC" | "WAV" | "ALAC" | "";
|
convertTo: "MP3" | "AAC" | "OGG" | "OPUS" | "FLAC" | "WAV" | "ALAC" | "";
|
||||||
|
|||||||
Reference in New Issue
Block a user