diff --git a/Dockerfile b/Dockerfile index 09a719d..b464440 100755 --- a/Dockerfile +++ b/Dockerfile @@ -18,20 +18,29 @@ RUN uv pip install --target /python -r requirements.txt FROM debian:stable-slim AS ffmpeg ARG TARGETARCH RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates curl xz-utils \ + ca-certificates curl xz-utils jq \ && rm -rf /var/lib/apt/lists/* -RUN case "$TARGETARCH" in \ - amd64) FFMPEG_PKG=ffmpeg-master-latest-linux64-gpl.tar.xz ;; \ - arm64) FFMPEG_PKG=ffmpeg-master-latest-linuxarm64-gpl.tar.xz ;; \ +RUN set -euo pipefail; \ + case "$TARGETARCH" in \ + amd64) ARCH_SUFFIX=linux64 ;; \ + arm64) ARCH_SUFFIX=linuxarm64 ;; \ *) echo "Unsupported arch: $TARGETARCH" && exit 1 ;; \ - esac && \ - curl -fsSL -o /tmp/ffmpeg.tar.xz https://github.com/BtbN/FFmpeg-Builds/releases/latest/download/${FFMPEG_PKG} && \ - tar -xJf /tmp/ffmpeg.tar.xz -C /tmp && \ + esac; \ + ASSET_URL=$(curl -fsSL https://api.github.com/repos/BtbN/FFmpeg-Builds/releases/latest \ + | jq -r ".assets[] | select(.name | endswith(\"${ARCH_SUFFIX}-gpl.tar.xz\")) | .browser_download_url" \ + | head -n1); \ + if [ -z "$ASSET_URL" ]; then \ + echo "Failed to resolve FFmpeg asset for arch ${ARCH_SUFFIX}" && exit 1; \ + fi; \ + echo "Fetching FFmpeg from: $ASSET_URL"; \ + curl -fsSL -o /tmp/ffmpeg.tar.xz "$ASSET_URL"; \ + tar -xJf /tmp/ffmpeg.tar.xz -C /tmp; \ mv /tmp/ffmpeg-* /ffmpeg # Stage 4: Prepare world-writable runtime directories FROM busybox:1.36.1-musl AS runtime-dirs RUN mkdir -p /artifact/downloads /artifact/data/config /artifact/data/creds /artifact/data/watch /artifact/data/history /artifact/logs/tasks \ + && touch /artifact/.cache \ && chmod -R 0777 /artifact # Stage 5: Final application image (distroless) @@ -44,6 +53,9 @@ WORKDIR /app # Ensure Python finds vendored site-packages and unbuffered output ENV PYTHONPATH=/python ENV PYTHONUNBUFFERED=1 +ENV PYTHONUTF8=1 +ENV LANG=C.UTF-8 +ENV LC_ALL=C.UTF-8 # Copy application code COPY --chown=65532:65532 . . diff --git a/requirements.txt b/requirements.txt index 74eea53..2ea3f67 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ fastapi==0.116.1 uvicorn[standard]==0.35.0 celery==5.5.3 -deezspot-spotizerr==2.7.4 +deezspot-spotizerr==2.7.6 httpx==0.28.1 bcrypt==4.2.1 PyJWT==2.10.1 diff --git a/routes/system/progress.py b/routes/system/progress.py index 465407d..016e3d9 100755 --- a/routes/system/progress.py +++ b/routes/system/progress.py @@ -1,10 +1,14 @@ from fastapi import APIRouter, HTTPException, Request, Depends -from fastapi.responses import JSONResponse, StreamingResponse +from fastapi.responses import StreamingResponse import logging import time import json import asyncio -from typing import Dict, Set +from typing import Set + +import redis +import threading +from routes.utils.celery_config import REDIS_URL from routes.utils.celery_tasks import ( get_task_info, @@ -12,47 +16,57 @@ from routes.utils.celery_tasks import ( get_last_task_status, get_all_tasks, cancel_task, + delete_task_data_and_log, ProgressState, ) # Import authentication dependencies -from routes.auth.middleware import require_auth_from_state, get_current_user_from_state, User +from routes.auth.middleware import ( + require_auth_from_state, + get_current_user_from_state, + User, +) # Configure logging logger = logging.getLogger(__name__) router = APIRouter() + # Global SSE Event Broadcaster class SSEBroadcaster: def __init__(self): self.clients: Set[asyncio.Queue] = set() - + async def add_client(self, queue: asyncio.Queue): """Add a new SSE client""" self.clients.add(queue) logger.info(f"SSE: Client connected (total: {len(self.clients)})") - + async def remove_client(self, queue: asyncio.Queue): """Remove an SSE client""" self.clients.discard(queue) logger.info(f"SSE: Client disconnected (total: {len(self.clients)})") - + async def broadcast_event(self, event_data: dict): """Broadcast an event to all connected clients""" - logger.debug(f"SSE Broadcaster: Attempting to broadcast to {len(self.clients)} clients") - + logger.debug( + f"SSE Broadcaster: Attempting to broadcast to {len(self.clients)} clients" + ) + if not self.clients: logger.debug("SSE Broadcaster: No clients connected, skipping broadcast") return - + # Add global task counts right before broadcasting - this is the single source of truth enhanced_event_data = add_global_task_counts_to_event(event_data.copy()) event_json = json.dumps(enhanced_event_data) sse_data = f"data: {event_json}\n\n" - - logger.debug(f"SSE Broadcaster: Broadcasting event: {enhanced_event_data.get('change_type', 'unknown')} with {enhanced_event_data.get('active_tasks', 0)} active tasks") - + + logger.debug( + f"SSE Broadcaster: Broadcasting event: {enhanced_event_data.get('change_type', 'unknown')} with {enhanced_event_data.get('active_tasks', 0)} active tasks" + ) + # Send to all clients, remove disconnected ones disconnected = set() sent_count = 0 @@ -60,64 +74,82 @@ class SSEBroadcaster: try: await client_queue.put(sse_data) sent_count += 1 - logger.debug(f"SSE: Successfully sent to client queue") + logger.debug("SSE: Successfully sent to client queue") except Exception as e: logger.error(f"SSE: Failed to send to client: {e}") disconnected.add(client_queue) - + # Clean up disconnected clients for client in disconnected: self.clients.discard(client) - - logger.info(f"SSE Broadcaster: Successfully sent to {sent_count} clients, removed {len(disconnected)} disconnected clients") + + logger.info( + f"SSE Broadcaster: Successfully sent to {sent_count} clients, removed {len(disconnected)} disconnected clients" + ) + # Global broadcaster instance sse_broadcaster = SSEBroadcaster() # Redis subscriber for cross-process SSE events -import redis -import threading -from routes.utils.celery_config import REDIS_URL # Redis client for SSE pub/sub sse_redis_client = redis.Redis.from_url(REDIS_URL) + def start_sse_redis_subscriber(): """Start Redis subscriber to listen for SSE events from Celery workers""" + def redis_subscriber_thread(): try: pubsub = sse_redis_client.pubsub() pubsub.subscribe("sse_events") logger.info("SSE Redis Subscriber: Started listening for events") - + for message in pubsub.listen(): - if message['type'] == 'message': + if message["type"] == "message": try: - event_data = json.loads(message['data'].decode('utf-8')) - event_type = event_data.get('event_type', 'unknown') - task_id = event_data.get('task_id', 'unknown') - - logger.debug(f"SSE Redis Subscriber: Received {event_type} for task {task_id}") - + event_data = json.loads(message["data"].decode("utf-8")) + event_type = event_data.get("event_type", "unknown") + task_id = event_data.get("task_id", "unknown") + + logger.debug( + f"SSE Redis Subscriber: Received {event_type} for task {task_id}" + ) + # Handle different event types - if event_type == 'progress_update': + if event_type == "progress_update": # Transform callback data into task format expected by frontend loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - broadcast_data = loop.run_until_complete(transform_callback_to_task_format(task_id, event_data)) + broadcast_data = loop.run_until_complete( + transform_callback_to_task_format( + task_id, event_data + ) + ) if broadcast_data: - loop.run_until_complete(sse_broadcaster.broadcast_event(broadcast_data)) - logger.debug(f"SSE Redis Subscriber: Broadcasted callback to {len(sse_broadcaster.clients)} clients") + loop.run_until_complete( + sse_broadcaster.broadcast_event(broadcast_data) + ) + logger.debug( + f"SSE Redis Subscriber: Broadcasted callback to {len(sse_broadcaster.clients)} clients" + ) finally: loop.close() - elif event_type == 'summary_update': + elif event_type == "summary_update": # Task summary update - use existing trigger_sse_update logic loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - loop.run_until_complete(trigger_sse_update(task_id, event_data.get('reason', 'update'))) - logger.debug(f"SSE Redis Subscriber: Processed summary update for {task_id}") + loop.run_until_complete( + trigger_sse_update( + task_id, event_data.get("reason", "update") + ) + ) + logger.debug( + f"SSE Redis Subscriber: Processed summary update for {task_id}" + ) finally: loop.close() else: @@ -125,50 +157,58 @@ def start_sse_redis_subscriber(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - loop.run_until_complete(sse_broadcaster.broadcast_event(event_data)) - logger.debug(f"SSE Redis Subscriber: Broadcasted {event_type} to {len(sse_broadcaster.clients)} clients") + loop.run_until_complete( + sse_broadcaster.broadcast_event(event_data) + ) + logger.debug( + f"SSE Redis Subscriber: Broadcasted {event_type} to {len(sse_broadcaster.clients)} clients" + ) finally: loop.close() - + except Exception as e: - logger.error(f"SSE Redis Subscriber: Error processing message: {e}", exc_info=True) - + logger.error( + f"SSE Redis Subscriber: Error processing message: {e}", + exc_info=True, + ) + except Exception as e: logger.error(f"SSE Redis Subscriber: Fatal error: {e}", exc_info=True) - + # Start Redis subscriber in background thread thread = threading.Thread(target=redis_subscriber_thread, daemon=True) thread.start() logger.info("SSE Redis Subscriber: Background thread started") + async def transform_callback_to_task_format(task_id: str, event_data: dict) -> dict: """Transform callback event data into the task format expected by frontend""" try: # Import here to avoid circular imports - from routes.utils.celery_tasks import get_task_info, get_all_tasks - + from routes.utils.celery_tasks import get_task_info + # Get task info to build complete task object task_info = get_task_info(task_id) if not task_info: logger.warning(f"SSE Transform: No task info found for {task_id}") return None - + # Extract callback data - callback_data = event_data.get('callback_data', {}) - + callback_data = event_data.get("callback_data", {}) + # Build task object in the format expected by frontend task_object = { "task_id": task_id, "original_url": f"http://localhost:7171/api/{task_info.get('download_type', 'track')}/download/{task_info.get('url', '').split('/')[-1] if task_info.get('url') else ''}", "last_line": callback_data, # This is what frontend expects for callback data - "timestamp": event_data.get('timestamp', time.time()), - "download_type": task_info.get('download_type', 'track'), - "type": task_info.get('type', task_info.get('download_type', 'track')), - "name": task_info.get('name', 'Unknown'), - "artist": task_info.get('artist', ''), - "created_at": task_info.get('created_at'), + "timestamp": event_data.get("timestamp", time.time()), + "download_type": task_info.get("download_type", "track"), + "type": task_info.get("type", task_info.get("download_type", "track")), + "name": task_info.get("name", "Unknown"), + "artist": task_info.get("artist", ""), + "created_at": task_info.get("created_at"), } - + # Build minimal event data - global counts will be added at broadcast time return { "change_type": "update", # Use "update" so it gets processed by existing frontend logic @@ -176,90 +216,98 @@ async def transform_callback_to_task_format(task_id: str, event_data: dict) -> d "current_timestamp": time.time(), "updated_count": 1, "since_timestamp": time.time(), - "trigger_reason": "callback_update" + "trigger_reason": "callback_update", } - + except Exception as e: - logger.error(f"SSE Transform: Error transforming callback for task {task_id}: {e}", exc_info=True) + logger.error( + f"SSE Transform: Error transforming callback for task {task_id}: {e}", + exc_info=True, + ) return None + # Start the Redis subscriber when module loads start_sse_redis_subscriber() + async def trigger_sse_update(task_id: str, reason: str = "task_update"): """Trigger an immediate SSE update for a specific task""" try: current_time = time.time() - + # Find the specific task that changed task_info = get_task_info(task_id) if not task_info: logger.warning(f"SSE: Task {task_id} not found for update") return - + last_status = get_last_task_status(task_id) - + # Create a dummy request for the _build_task_response function - from fastapi import Request class DummyRequest: def __init__(self): self.base_url = "http://localhost:7171" - + dummy_request = DummyRequest() - task_response = _build_task_response(task_info, last_status, task_id, current_time, dummy_request) - + task_response = _build_task_response( + task_info, last_status, task_id, current_time, dummy_request + ) + # Create minimal event data - global counts will be added at broadcast time event_data = { "tasks": [task_response], "current_timestamp": current_time, "since_timestamp": current_time, "change_type": "realtime", - "trigger_reason": reason + "trigger_reason": reason, } - + await sse_broadcaster.broadcast_event(event_data) logger.debug(f"SSE: Broadcast update for task {task_id} (reason: {reason})") - + except Exception as e: logger.error(f"SSE: Failed to trigger update for task {task_id}: {e}") + # Define active task states using ProgressState constants ACTIVE_TASK_STATES = { - ProgressState.INITIALIZING, # "initializing" - task is starting up - ProgressState.PROCESSING, # "processing" - task is being processed - ProgressState.DOWNLOADING, # "downloading" - actively downloading - ProgressState.PROGRESS, # "progress" - album/playlist progress updates - ProgressState.TRACK_PROGRESS, # "track_progress" - real-time track progress - ProgressState.REAL_TIME, # "real_time" - real-time download progress - ProgressState.RETRYING, # "retrying" - task is retrying after error - "real-time", # "real-time" - real-time download progress (hyphenated version) - ProgressState.QUEUED, # "queued" - task is queued and waiting - "pending", # "pending" - legacy queued status + ProgressState.INITIALIZING, # "initializing" - task is starting up + ProgressState.PROCESSING, # "processing" - task is being processed + ProgressState.DOWNLOADING, # "downloading" - actively downloading + ProgressState.PROGRESS, # "progress" - album/playlist progress updates + ProgressState.TRACK_PROGRESS, # "track_progress" - real-time track progress + ProgressState.REAL_TIME, # "real_time" - real-time download progress + ProgressState.RETRYING, # "retrying" - task is retrying after error + "real-time", # "real-time" - real-time download progress (hyphenated version) + ProgressState.QUEUED, # "queued" - task is queued and waiting + "pending", # "pending" - legacy queued status } # Define terminal task states that should be included when recently completed TERMINAL_TASK_STATES = { - ProgressState.COMPLETE, # "complete" - task completed successfully - ProgressState.DONE, # "done" - task finished processing - ProgressState.ERROR, # "error" - task failed - ProgressState.CANCELLED, # "cancelled" - task was cancelled - ProgressState.SKIPPED, # "skipped" - task was skipped + ProgressState.COMPLETE, # "complete" - task completed successfully + ProgressState.DONE, # "done" - task finished processing + ProgressState.ERROR, # "error" - task failed + ProgressState.CANCELLED, # "cancelled" - task was cancelled + ProgressState.SKIPPED, # "skipped" - task was skipped } + def get_task_status_from_last_status(last_status): """ Extract the task status from last_status, checking both possible locations. Uses improved priority logic to handle real-time downloads correctly. - + Args: last_status: The last status dict from get_last_task_status() - + Returns: str: The task status string """ if not last_status: return "unknown" - + # For real-time downloads, prioritize status_info.status as it contains the actual progress state status_info = last_status.get("status_info", {}) if isinstance(status_info, dict) and "status" in status_info: @@ -267,10 +315,10 @@ def get_task_status_from_last_status(last_status): # If status_info contains an active status, use it regardless of top-level status if status_info_status in ACTIVE_TASK_STATES: return status_info_status - + # Fall back to top-level status top_level_status = last_status.get("status", "unknown") - + # If both exist but neither is active, prefer the more recent one (usually top-level) # For active states, we already handled status_info above return top_level_status @@ -279,10 +327,10 @@ def get_task_status_from_last_status(last_status): def is_task_active(task_status): """ Determine if a task is currently active (working/processing). - + Args: task_status: The status string from the task - + Returns: bool: True if the task is active, False otherwise """ @@ -295,24 +343,24 @@ def get_global_task_counts(): """ Get comprehensive task counts for ALL tasks in Redis. This is called right before sending SSE events to ensure accurate counts. - + Returns: dict: Task counts by status """ task_counts = { "active": 0, - "queued": 0, + "queued": 0, "completed": 0, "error": 0, "cancelled": 0, "retrying": 0, - "skipped": 0 + "skipped": 0, } - + try: # Get ALL tasks from Redis - this is the source of truth all_tasks = get_all_tasks() - + for task_summary in all_tasks: task_id = task_summary.get("task_id") if not task_id: @@ -325,7 +373,7 @@ def get_global_task_counts(): last_status = get_last_task_status(task_id) task_status = get_task_status_from_last_status(last_status) is_active_task = is_task_active(task_status) - + # Categorize tasks by status using ProgressState constants if task_status == ProgressState.RETRYING: task_counts["retrying"] += 1 @@ -341,12 +389,14 @@ def get_global_task_counts(): task_counts["skipped"] += 1 elif is_active_task: task_counts["active"] += 1 - - logger.debug(f"Global task counts: {task_counts} (total: {len(all_tasks)} tasks)") - + + logger.debug( + f"Global task counts: {task_counts} (total: {len(all_tasks)} tasks)" + ) + except Exception as e: logger.error(f"Error getting global task counts: {e}", exc_info=True) - + return task_counts @@ -354,26 +404,28 @@ def add_global_task_counts_to_event(event_data): """ Add global task counts to any SSE event data right before broadcasting. This ensures all SSE events have accurate, up-to-date counts of ALL tasks. - + Args: event_data: The event data dictionary to be sent via SSE - + Returns: dict: Enhanced event data with global task counts """ try: # Get fresh counts of ALL tasks right before sending global_task_counts = get_global_task_counts() - + # Add/update the counts in the event data event_data["task_counts"] = global_task_counts event_data["active_tasks"] = global_task_counts["active"] event_data["all_tasks_count"] = sum(global_task_counts.values()) - + return event_data - + except Exception as e: - logger.error(f"Error adding global task counts to SSE event: {e}", exc_info=True) + logger.error( + f"Error adding global task counts to SSE event: {e}", exc_info=True + ) return event_data @@ -397,10 +449,9 @@ def _build_error_callback_object(last_status): callback_object["album"] = { "type": "album", "title": name, - "artists": [{ - "type": "artistAlbum", - "name": artist_or_owner - }] if artist_or_owner else [], + "artists": [{"type": "artistAlbum", "name": artist_or_owner}] + if artist_or_owner + else [], } elif download_type == "playlist": playlist_payload = {"type": "playlist", "title": name} @@ -411,10 +462,9 @@ def _build_error_callback_object(last_status): callback_object["track"] = { "type": "track", "title": name, - "artists": [{ - "type": "artistTrack", - "name": artist_or_owner - }] if artist_or_owner else [], + "artists": [{"type": "artistTrack", "name": artist_or_owner}] + if artist_or_owner + else [], } else: # Fallback for unknown types to avoid breaking the client, returning a basic error structure. @@ -431,7 +481,9 @@ def _build_error_callback_object(last_status): return callback_object -def _build_task_response(task_info, last_status, task_id, current_time, request: Request): +def _build_task_response( + task_info, last_status, task_id, current_time, request: Request +): """ Helper function to build a standardized task response object. """ @@ -465,6 +517,27 @@ def _build_task_response(task_info, last_status, task_id, current_time, request: logger.warning( f"Missing download_type ('{download_type}') or item_url ('{item_url}') in task_info for task {task_id}. Falling back for original_url." ) + # Auto-delete faulty task data to keep the queue clean + try: + delete_task_data_and_log( + task_id, + reason="Auto-cleaned: Missing download_type or url in task_info.", + ) + # Trigger SSE so clients refresh their task lists + try: + # Avoid circular import at top-level + import asyncio as _asyncio + + # Fire-and-forget; if no event loop available, ignore + loop = _asyncio.get_event_loop() + if loop.is_running(): + _asyncio.create_task( + trigger_sse_update(task_id, "auto_deleted_faulty") + ) + except Exception: + pass + except Exception as _e: + logger.error(f"Auto-delete failed for faulty task {task_id}: {_e}") original_request_obj = task_info.get("original_request", {}) dynamic_original_url = original_request_obj.get("original_url", "") @@ -478,13 +551,18 @@ def _build_task_response(task_info, last_status, task_id, current_time, request: else: last_line_content = last_status + # Normalize created_at to a numeric timestamp + created_at_value = task_info.get("created_at") + if not isinstance(created_at_value, (int, float)): + created_at_value = current_time + task_response = { "original_url": dynamic_original_url, "last_line": last_line_content, "timestamp": last_status.get("timestamp") if last_status else current_time, "task_id": task_id, "status_count": status_count, - "created_at": task_info.get("created_at"), + "created_at": created_at_value, "name": task_info.get("name"), "artist": task_info.get("artist"), "type": task_info.get("type"), @@ -496,19 +574,21 @@ def _build_task_response(task_info, last_status, task_id, current_time, request: return task_response -async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Request = None): +async def get_paginated_tasks( + page=1, limit=20, active_only=False, request: Request = None +): """ Get paginated list of tasks. """ try: all_tasks = get_all_tasks() - - # Get global task counts + + # Get global task counts task_counts = get_global_task_counts() - + active_tasks = [] other_tasks = [] - + # Process tasks for pagination and response building for task_summary in all_tasks: task_id = task_summary.get("task_id") @@ -522,17 +602,19 @@ async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Requ last_status = get_last_task_status(task_id) task_status = get_task_status_from_last_status(last_status) is_active_task = is_task_active(task_status) - - task_response = _build_task_response(task_info, last_status, task_id, time.time(), request) - + + task_response = _build_task_response( + task_info, last_status, task_id, time.time(), request + ) + if is_active_task: active_tasks.append(task_response) else: other_tasks.append(task_response) # Sort other tasks by creation time (newest first) - other_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True) - + other_tasks.sort(key=lambda x: (x.get("created_at") or 0.0), reverse=True) + if active_only: paginated_tasks = active_tasks pagination_info = { @@ -540,49 +622,55 @@ async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Requ "limit": limit, "total_non_active": 0, "has_more": False, - "returned_non_active": 0 + "returned_non_active": 0, } else: # Apply pagination to non-active tasks offset = (page - 1) * limit - paginated_other_tasks = other_tasks[offset:offset + limit] + paginated_other_tasks = other_tasks[offset : offset + limit] paginated_tasks = active_tasks + paginated_other_tasks - + pagination_info = { "page": page, "limit": limit, "total_non_active": len(other_tasks), "has_more": len(other_tasks) > offset + limit, - "returned_non_active": len(paginated_other_tasks) + "returned_non_active": len(paginated_other_tasks), } response = { "tasks": paginated_tasks, "current_timestamp": time.time(), - "total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter + "total_tasks": task_counts["active"] + + task_counts["retrying"], # Only active/retrying tasks for counter "all_tasks_count": len(all_tasks), # Total count of all tasks "task_counts": task_counts, # Categorized counts "active_tasks": len(active_tasks), "updated_count": len(paginated_tasks), - "pagination": pagination_info + "pagination": pagination_info, } - + return response - + except Exception as e: logger.error(f"Error in get_paginated_tasks: {e}", exc_info=True) - raise HTTPException(status_code=500, detail={"error": "Failed to retrieve paginated tasks"}) + raise HTTPException( + status_code=500, detail={"error": "Failed to retrieve paginated tasks"} + ) # IMPORTANT: Specific routes MUST come before parameterized routes in FastAPI # Otherwise "updates" gets matched as a {task_id} parameter! + @router.get("/list") -async def list_tasks(request: Request, current_user: User = Depends(require_auth_from_state)): +async def list_tasks( + request: Request, current_user: User = Depends(require_auth_from_state) +): """ Retrieve a paginated list of all tasks in the system. Returns a detailed list of task objects including status and metadata. - + Query parameters: page (int): Page number for pagination (default: 1) limit (int): Number of tasks per page (default: 50, max: 100) @@ -590,25 +678,25 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth """ try: # Get query parameters - page = int(request.query_params.get('page', 1)) - limit = min(int(request.query_params.get('limit', 50)), 100) # Cap at 100 - active_only = request.query_params.get('active_only', '').lower() == 'true' - + page = int(request.query_params.get("page", 1)) + limit = min(int(request.query_params.get("limit", 50)), 100) # Cap at 100 + active_only = request.query_params.get("active_only", "").lower() == "true" + tasks = get_all_tasks() active_tasks = [] other_tasks = [] - + # Task categorization counters task_counts = { "active": 0, - "queued": 0, + "queued": 0, "completed": 0, "error": 0, "cancelled": 0, "retrying": 0, - "skipped": 0 + "skipped": 0, } - + for task_summary in tasks: task_id = task_summary.get("task_id") if not task_id: @@ -621,11 +709,14 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth last_status = get_last_task_status(task_id) task_status = get_task_status_from_last_status(last_status) is_active_task = is_task_active(task_status) - + # Categorize tasks by status using ProgressState constants if task_status == ProgressState.RETRYING: task_counts["retrying"] += 1 - elif task_status in {ProgressState.QUEUED, "pending"}: # Keep "pending" for backward compatibility + elif task_status in { + ProgressState.QUEUED, + "pending", + }: # Keep "pending" for backward compatibility task_counts["queued"] += 1 elif task_status in {ProgressState.COMPLETE, ProgressState.DONE}: task_counts["completed"] += 1 @@ -637,17 +728,19 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth task_counts["skipped"] += 1 elif is_active_task: task_counts["active"] += 1 - - task_response = _build_task_response(task_info, last_status, task_id, time.time(), request) - + + task_response = _build_task_response( + task_info, last_status, task_id, time.time(), request + ) + if is_active_task: active_tasks.append(task_response) else: other_tasks.append(task_response) # Sort other tasks by creation time (newest first) - other_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True) - + other_tasks.sort(key=lambda x: (x.get("created_at") or 0.0), reverse=True) + if active_only: # Return only active tasks without pagination response_tasks = active_tasks @@ -656,17 +749,17 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth "limit": limit, "total_items": len(active_tasks), "total_pages": 1, - "has_more": False + "has_more": False, } else: # Apply pagination to non-active tasks and combine with active tasks offset = (page - 1) * limit - + # Always include active tasks at the top if page == 1: # For first page, include active tasks + first batch of other tasks available_space = limit - len(active_tasks) - paginated_other_tasks = other_tasks[:max(0, available_space)] + paginated_other_tasks = other_tasks[: max(0, available_space)] response_tasks = active_tasks + paginated_other_tasks else: # For subsequent pages, only include other tasks @@ -674,12 +767,14 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth adjusted_offset = offset - len(active_tasks) if adjusted_offset < 0: adjusted_offset = 0 - paginated_other_tasks = other_tasks[adjusted_offset:adjusted_offset + limit] + paginated_other_tasks = other_tasks[ + adjusted_offset : adjusted_offset + limit + ] response_tasks = paginated_other_tasks - + total_items = len(active_tasks) + len(other_tasks) total_pages = ((total_items - 1) // limit) + 1 if total_items > 0 else 1 - + pagination_info = { "page": page, "limit": limit, @@ -687,37 +782,42 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth "total_pages": total_pages, "has_more": page < total_pages, "active_tasks": len(active_tasks), - "total_other_tasks": len(other_tasks) + "total_other_tasks": len(other_tasks), } response = { "tasks": response_tasks, "pagination": pagination_info, - "total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter + "total_tasks": task_counts["active"] + + task_counts["retrying"], # Only active/retrying tasks for counter "all_tasks_count": len(tasks), # Total count of all tasks "task_counts": task_counts, # Categorized counts "active_tasks": len(active_tasks), - "timestamp": time.time() + "timestamp": time.time(), } return response except Exception as e: logger.error(f"Error in /api/prgs/list: {e}", exc_info=True) - raise HTTPException(status_code=500, detail={"error": "Failed to retrieve task list"}) + raise HTTPException( + status_code=500, detail={"error": "Failed to retrieve task list"} + ) @router.get("/updates") -async def get_task_updates(request: Request, current_user: User = Depends(require_auth_from_state)): +async def get_task_updates( + request: Request, current_user: User = Depends(require_auth_from_state) +): """ Retrieve only tasks that have been updated since the specified timestamp. This endpoint is optimized for polling to reduce unnecessary data transfer. - + Query parameters: since (float): Unix timestamp - only return tasks updated after this time page (int): Page number for pagination (default: 1) limit (int): Number of queued/completed tasks per page (default: 20, max: 100) active_only (bool): If true, only return active tasks (downloading, processing, etc.) - + Returns: JSON object containing: - tasks: Array of updated task objects @@ -728,31 +828,33 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir """ try: # Get query parameters - since_param = request.query_params.get('since') - page = int(request.query_params.get('page', 1)) - limit = min(int(request.query_params.get('limit', 20)), 100) # Cap at 100 - active_only = request.query_params.get('active_only', '').lower() == 'true' - + since_param = request.query_params.get("since") + page = int(request.query_params.get("page", 1)) + limit = min(int(request.query_params.get("limit", 20)), 100) # Cap at 100 + active_only = request.query_params.get("active_only", "").lower() == "true" + if not since_param: # If no 'since' parameter, return paginated tasks (fallback behavior) response = await get_paginated_tasks(page, limit, active_only, request) return response - + try: since_timestamp = float(since_param) except (ValueError, TypeError): - raise HTTPException(status_code=400, detail={"error": "Invalid 'since' timestamp format"}) - + raise HTTPException( + status_code=400, detail={"error": "Invalid 'since' timestamp format"} + ) + # Get all tasks all_tasks = get_all_tasks() current_time = time.time() - + # Get global task counts task_counts = get_global_task_counts() - + updated_tasks = [] active_tasks = [] - + # Process tasks for filtering and response building for task_summary in all_tasks: task_id = task_summary.get("task_id") @@ -766,19 +868,31 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir last_status = get_last_task_status(task_id) task_status = get_task_status_from_last_status(last_status) is_active_task = is_task_active(task_status) - + # Check if task has been updated since the given timestamp - task_timestamp = last_status.get("timestamp") if last_status else task_info.get("created_at", 0) - + task_timestamp = ( + last_status.get("timestamp") + if last_status + else task_info.get("created_at", 0) + ) + # Always include active tasks in updates, apply filtering to others # Also include recently completed/terminal tasks to ensure "done" status gets sent - is_recently_terminal = task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp - should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) or is_recently_terminal - + is_recently_terminal = ( + task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp + ) + should_include = ( + is_active_task + or (task_timestamp > since_timestamp and not active_only) + or is_recently_terminal + ) + if should_include: # Construct the same detailed task object as in list_tasks() - task_response = _build_task_response(task_info, last_status, task_id, current_time, request) - + task_response = _build_task_response( + task_info, last_status, task_id, current_time, request + ) + if is_active_task: active_tasks.append(task_response) else: @@ -786,21 +900,26 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir # Apply pagination to non-active tasks offset = (page - 1) * limit - paginated_updated_tasks = updated_tasks[offset:offset + limit] if not active_only else [] - + paginated_updated_tasks = ( + updated_tasks[offset : offset + limit] if not active_only else [] + ) + # Combine active tasks (always shown) with paginated updated tasks all_returned_tasks = active_tasks + paginated_updated_tasks - + # Sort by priority (active first, then by creation time) - all_returned_tasks.sort(key=lambda x: ( - 0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1, - -(x.get("created_at") or 0) - )) + all_returned_tasks.sort( + key=lambda x: ( + 0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1, + -(x.get("created_at") or 0), + ) + ) response = { "tasks": all_returned_tasks, "current_timestamp": current_time, - "total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter + "total_tasks": task_counts["active"] + + task_counts["retrying"], # Only active/retrying tasks for counter "all_tasks_count": len(all_tasks), # Total count of all tasks "task_counts": task_counts, # Categorized counts "active_tasks": len(active_tasks), @@ -811,18 +930,22 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir "limit": limit, "total_non_active": len(updated_tasks), "has_more": len(updated_tasks) > offset + limit, - "returned_non_active": len(paginated_updated_tasks) - } + "returned_non_active": len(paginated_updated_tasks), + }, } - - logger.debug(f"Returning {len(active_tasks)} active + {len(paginated_updated_tasks)} paginated tasks out of {len(all_tasks)} total") + + logger.debug( + f"Returning {len(active_tasks)} active + {len(paginated_updated_tasks)} paginated tasks out of {len(all_tasks)} total" + ) return response - + except HTTPException: raise except Exception as e: logger.error(f"Error in /api/prgs/updates: {e}", exc_info=True) - raise HTTPException(status_code=500, detail={"error": "Failed to retrieve task updates"}) + raise HTTPException( + status_code=500, detail={"error": "Failed to retrieve task updates"} + ) @router.post("/cancel/all") @@ -855,11 +978,15 @@ async def cancel_all_tasks(current_user: User = Depends(require_auth_from_state) return response except Exception as e: logger.error(f"Error in /api/prgs/cancel/all: {e}", exc_info=True) - raise HTTPException(status_code=500, detail={"error": "Failed to cancel all tasks"}) + raise HTTPException( + status_code=500, detail={"error": "Failed to cancel all tasks"} + ) @router.post("/cancel/{task_id}") -async def cancel_task_endpoint(task_id: str, current_user: User = Depends(require_auth_from_state)): +async def cancel_task_endpoint( + task_id: str, current_user: User = Depends(require_auth_from_state) +): """ Cancel a running or queued task. @@ -888,7 +1015,7 @@ async def cancel_task_endpoint(task_id: str, current_user: User = Depends(requir detail={ "status": "error", "message": "Cancellation for old system is not supported in the new API. Please use the new task ID format.", - } + }, ) except HTTPException: raise @@ -897,7 +1024,9 @@ async def cancel_task_endpoint(task_id: str, current_user: User = Depends(requir @router.delete("/delete/{task_id}") -async def delete_task(task_id: str, current_user: User = Depends(require_auth_from_state)): +async def delete_task( + task_id: str, current_user: User = Depends(require_auth_from_state) +): """ Delete a task's information and history. @@ -916,52 +1045,60 @@ async def delete_task(task_id: str, current_user: User = Depends(require_auth_fr @router.get("/stream") -async def stream_task_updates(request: Request, current_user: User = Depends(get_current_user_from_state)): +async def stream_task_updates( + request: Request, current_user: User = Depends(get_current_user_from_state) +): """ Stream real-time task updates via Server-Sent Events (SSE). Now uses event-driven architecture for true real-time updates. Uses optional authentication to avoid breaking SSE connections. - + Query parameters: active_only (bool): If true, only stream active tasks (downloading, processing, etc.) - + Returns: Server-Sent Events stream with task update data in JSON format """ - + # Get query parameters - active_only = request.query_params.get('active_only', '').lower() == 'true' - + active_only = request.query_params.get("active_only", "").lower() == "true" + async def event_generator(): # Create a queue for this client client_queue = asyncio.Queue() - + try: # Register this client with the broadcaster - logger.info(f"SSE Stream: New client connecting...") + logger.info("SSE Stream: New client connecting...") await sse_broadcaster.add_client(client_queue) - logger.info(f"SSE Stream: Client registered successfully, total clients: {len(sse_broadcaster.clients)}") - + logger.info( + f"SSE Stream: Client registered successfully, total clients: {len(sse_broadcaster.clients)}" + ) + # Send initial data immediately upon connection - initial_data = await generate_task_update_event(time.time(), active_only, request) + initial_data = await generate_task_update_event( + time.time(), active_only, request + ) yield initial_data - + # Also send any active tasks as callback-style events to newly connected clients all_tasks = get_all_tasks() for task_summary in all_tasks: task_id = task_summary.get("task_id") if not task_id: continue - + task_info = get_task_info(task_id) if not task_info: continue - + last_status = get_last_task_status(task_id) task_status = get_task_status_from_last_status(last_status) - + # Send recent callback data for active or recently completed tasks - if is_task_active(task_status) or (last_status and last_status.get("timestamp", 0) > time.time() - 30): + if is_task_active(task_status) or ( + last_status and last_status.get("timestamp", 0) > time.time() - 30 + ): if last_status and "raw_callback" in last_status: callback_event = { "task_id": task_id, @@ -969,21 +1106,25 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get "timestamp": last_status.get("timestamp", time.time()), "change_type": "callback", "event_type": "progress_update", - "replay": True # Mark as replay for client + "replay": True, # Mark as replay for client } event_json = json.dumps(callback_event) yield f"data: {event_json}\n\n" - logger.info(f"SSE Stream: Sent replay callback for task {task_id}") - + logger.info( + f"SSE Stream: Sent replay callback for task {task_id}" + ) + # Send periodic heartbeats and listen for real-time events last_heartbeat = time.time() heartbeat_interval = 30.0 - + while True: try: # Wait for either an event or timeout for heartbeat try: - event_data = await asyncio.wait_for(client_queue.get(), timeout=heartbeat_interval) + event_data = await asyncio.wait_for( + client_queue.get(), timeout=heartbeat_interval + ) # Send the real-time event yield event_data last_heartbeat = time.time() @@ -993,8 +1134,16 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get if current_time - last_heartbeat >= heartbeat_interval: # Generate current task counts for heartbeat all_tasks = get_all_tasks() - task_counts = {"active": 0, "queued": 0, "completed": 0, "error": 0, "cancelled": 0, "retrying": 0, "skipped": 0} - + task_counts = { + "active": 0, + "queued": 0, + "completed": 0, + "error": 0, + "cancelled": 0, + "retrying": 0, + "skipped": 0, + } + for task_summary in all_tasks: task_id = task_summary.get("task_id") if not task_id: @@ -1003,13 +1152,18 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get if not task_info: continue last_status = get_last_task_status(task_id) - task_status = get_task_status_from_last_status(last_status) - + task_status = get_task_status_from_last_status( + last_status + ) + if task_status == ProgressState.RETRYING: task_counts["retrying"] += 1 elif task_status in {ProgressState.QUEUED, "pending"}: task_counts["queued"] += 1 - elif task_status in {ProgressState.COMPLETE, ProgressState.DONE}: + elif task_status in { + ProgressState.COMPLETE, + ProgressState.DONE, + }: task_counts["completed"] += 1 elif task_status == ProgressState.ERROR: task_counts["error"] += 1 @@ -1019,25 +1173,32 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get task_counts["skipped"] += 1 elif is_task_active(task_status): task_counts["active"] += 1 - + heartbeat_data = { "current_timestamp": current_time, - "total_tasks": task_counts["active"] + task_counts["retrying"], + "total_tasks": task_counts["active"] + + task_counts["retrying"], "task_counts": task_counts, - "change_type": "heartbeat" + "change_type": "heartbeat", } - + event_json = json.dumps(heartbeat_data) yield f"data: {event_json}\n\n" last_heartbeat = current_time - + except Exception as e: logger.error(f"Error in SSE event streaming: {e}", exc_info=True) # Send error event and continue - error_data = json.dumps({"error": "Internal server error", "timestamp": time.time(), "change_type": "error"}) + error_data = json.dumps( + { + "error": "Internal server error", + "timestamp": time.time(), + "change_type": "error", + } + ) yield f"data: {error_data}\n\n" await asyncio.sleep(1) - + except asyncio.CancelledError: logger.info("SSE client disconnected") return @@ -1056,12 +1217,14 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get "Connection": "keep-alive", "Content-Type": "text/event-stream", "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Headers": "Cache-Control" - } + "Access-Control-Allow-Headers": "Cache-Control", + }, ) -async def generate_task_update_event(since_timestamp: float, active_only: bool, request: Request) -> str: +async def generate_task_update_event( + since_timestamp: float, active_only: bool, request: Request +) -> str: """ Generate initial task update event for SSE connection. This replicates the logic from get_task_updates but for SSE format. @@ -1070,10 +1233,10 @@ async def generate_task_update_event(since_timestamp: float, active_only: bool, # Get all tasks for filtering all_tasks = get_all_tasks() current_time = time.time() - + updated_tasks = [] active_tasks = [] - + # Process tasks for filtering only - no counting here for task_summary in all_tasks: task_id = task_summary.get("task_id") @@ -1087,19 +1250,31 @@ async def generate_task_update_event(since_timestamp: float, active_only: bool, last_status = get_last_task_status(task_id) task_status = get_task_status_from_last_status(last_status) is_active_task = is_task_active(task_status) - + # Check if task has been updated since the given timestamp - task_timestamp = last_status.get("timestamp") if last_status else task_info.get("created_at", 0) - + task_timestamp = ( + last_status.get("timestamp") + if last_status + else task_info.get("created_at", 0) + ) + # Always include active tasks in updates, apply filtering to others # Also include recently completed/terminal tasks to ensure "done" status gets sent - is_recently_terminal = task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp - should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) or is_recently_terminal - + is_recently_terminal = ( + task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp + ) + should_include = ( + is_active_task + or (task_timestamp > since_timestamp and not active_only) + or is_recently_terminal + ) + if should_include: # Construct the same detailed task object as in updates endpoint - task_response = _build_task_response(task_info, last_status, task_id, current_time, request) - + task_response = _build_task_response( + task_info, last_status, task_id, current_time, request + ) + if is_active_task: active_tasks.append(task_response) else: @@ -1107,37 +1282,45 @@ async def generate_task_update_event(since_timestamp: float, active_only: bool, # Combine active tasks (always shown) with updated tasks all_returned_tasks = active_tasks + updated_tasks - + # Sort by priority (active first, then by creation time) - all_returned_tasks.sort(key=lambda x: ( - 0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1, - -(x.get("created_at") or 0) - )) + all_returned_tasks.sort( + key=lambda x: ( + 0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1, + -(x.get("created_at") or 0), + ) + ) initial_data = { "tasks": all_returned_tasks, "current_timestamp": current_time, "updated_count": len(updated_tasks), "since_timestamp": since_timestamp, - "initial": True # Mark as initial load + "initial": True, # Mark as initial load } - + # Add global task counts since this bypasses the broadcaster enhanced_data = add_global_task_counts_to_event(initial_data) - + event_data = json.dumps(enhanced_data) return f"data: {event_data}\n\n" - + except Exception as e: logger.error(f"Error generating initial SSE event: {e}", exc_info=True) - error_data = json.dumps({"error": "Failed to load initial data", "timestamp": time.time()}) + error_data = json.dumps( + {"error": "Failed to load initial data", "timestamp": time.time()} + ) return f"data: {error_data}\n\n" # IMPORTANT: This parameterized route MUST come AFTER all specific routes # Otherwise FastAPI will match specific routes like "/updates" as task_id parameters @router.get("/{task_id}") -async def get_task_details(task_id: str, request: Request, current_user: User = Depends(require_auth_from_state)): +async def get_task_details( + task_id: str, + request: Request, + current_user: User = Depends(require_auth_from_state), +): """ Return a JSON object with the resource type, its name (title), the last progress update, and, if available, the original request parameters. diff --git a/routes/utils/album.py b/routes/utils/album.py index be67078..b6fb6e5 100755 --- a/routes/utils/album.py +++ b/routes/utils/album.py @@ -101,7 +101,7 @@ def download_album( ) dl.download_albumspo( link_album=url, # Spotify URL - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, # Deezer quality recursive_quality=recursive_quality, recursive_download=False, @@ -159,7 +159,7 @@ def download_album( ) spo.download_album( link_album=url, # Spotify URL - output_dir="./downloads", + output_dir="/app/downloads", quality_download=fall_quality, # Spotify quality recursive_quality=recursive_quality, recursive_download=False, @@ -216,7 +216,7 @@ def download_album( ) spo.download_album( link_album=url, - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, recursive_quality=recursive_quality, recursive_download=False, @@ -260,7 +260,7 @@ def download_album( ) dl.download_albumdee( # Deezer URL, download via Deezer link_album=url, - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, recursive_quality=recursive_quality, recursive_download=False, diff --git a/routes/utils/celery_manager.py b/routes/utils/celery_manager.py index faebe95..08b9727 100644 --- a/routes/utils/celery_manager.py +++ b/routes/utils/celery_manager.py @@ -2,6 +2,7 @@ import subprocess import logging import time import threading +import sys # Import Celery task utilities from .celery_config import get_config_params, MAX_CONCURRENT_DL @@ -46,6 +47,8 @@ class CeleryManager: # %h is replaced by celery with the actual hostname. hostname = f"worker_{worker_name_suffix}@%h" command = [ + sys.executable, + "-m", "celery", "-A", self.app_name, @@ -73,11 +76,14 @@ class CeleryManager: log_method = logger.info # Default log method if error: # This is a stderr stream - if " - ERROR - " in line_stripped or " - CRITICAL - " in line_stripped: + if ( + " - ERROR - " in line_stripped + or " - CRITICAL - " in line_stripped + ): log_method = logger.error elif " - WARNING - " in line_stripped: log_method = logger.warning - + log_method(f"{log_prefix}: {line_stripped}") elif ( self.stop_event.is_set() @@ -151,7 +157,7 @@ class CeleryManager: queues="utility_tasks,default", # Listen to utility and default concurrency=5, # Increased concurrency for SSE updates and utility tasks worker_name_suffix="utw", # Utility Worker - log_level="ERROR" # Reduce log verbosity for utility worker (only errors) + log_level="ERROR", # Reduce log verbosity for utility worker (only errors) ) logger.info( f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}" diff --git a/routes/utils/playlist.py b/routes/utils/playlist.py index efdec27..b19bd7c 100755 --- a/routes/utils/playlist.py +++ b/routes/utils/playlist.py @@ -98,7 +98,7 @@ def download_playlist( ) dl.download_playlistspo( link_playlist=url, # Spotify URL - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, # Deezer quality recursive_quality=recursive_quality, recursive_download=False, @@ -161,7 +161,7 @@ def download_playlist( ) spo.download_playlist( link_playlist=url, # Spotify URL - output_dir="./downloads", + output_dir="/app/downloads", quality_download=fall_quality, # Spotify quality recursive_quality=recursive_quality, recursive_download=False, @@ -224,7 +224,7 @@ def download_playlist( ) spo.download_playlist( link_playlist=url, - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, recursive_quality=recursive_quality, recursive_download=False, @@ -268,7 +268,7 @@ def download_playlist( ) dl.download_playlistdee( # Deezer URL, download via Deezer link_playlist=url, - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, recursive_quality=recursive_quality, # Usually False for playlists to get individual track qualities recursive_download=False, diff --git a/routes/utils/track.py b/routes/utils/track.py index e1f8b4a..7499d31 100755 --- a/routes/utils/track.py +++ b/routes/utils/track.py @@ -94,7 +94,7 @@ def download_track( # download_trackspo means: Spotify URL, download via Deezer dl.download_trackspo( link_track=url, # Spotify URL - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, # Deezer quality recursive_quality=recursive_quality, recursive_download=False, @@ -153,7 +153,7 @@ def download_track( ) spo.download_track( link_track=url, # Spotify URL - output_dir="./downloads", + output_dir="/app/downloads", quality_download=fall_quality, # Spotify quality recursive_quality=recursive_quality, recursive_download=False, @@ -169,7 +169,7 @@ def download_track( convert_to=convert_to, bitrate=bitrate, artist_separator=artist_separator, - real_time_multiplier=real_time_multiplier, + spotify_metadata=spotify_metadata, pad_number_width=pad_number_width, ) print( @@ -211,7 +211,7 @@ def download_track( ) spo.download_track( link_track=url, - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, recursive_quality=recursive_quality, recursive_download=False, @@ -254,7 +254,7 @@ def download_track( ) dl.download_trackdee( # Deezer URL, download via Deezer link_track=url, - output_dir="./downloads", + output_dir="/app/downloads", quality_download=quality, recursive_quality=recursive_quality, recursive_download=False, diff --git a/routes/utils/watch/manager.py b/routes/utils/watch/manager.py index 4a10e78..a4e7aa6 100644 --- a/routes/utils/watch/manager.py +++ b/routes/utils/watch/manager.py @@ -1098,7 +1098,7 @@ def update_playlist_m3u_file(playlist_spotify_id: str): # Get configuration settings output_dir = ( - "./downloads" # This matches the output_dir used in download functions + "/app/downloads" # This matches the output_dir used in download functions ) # Get all tracks for the playlist @@ -1125,14 +1125,14 @@ def update_playlist_m3u_file(playlist_spotify_id: str): skipped_missing_final_path = 0 for track in tracks: - # Use final_path from deezspot summary and convert from ./downloads to ../ relative path + # Use final_path from deezspot summary and convert from /app/downloads to ../ relative path final_path = track.get("final_path") if not final_path: skipped_missing_final_path += 1 continue normalized = str(final_path).replace("\\", "/") - if normalized.startswith("./downloads/"): - relative_path = normalized.replace("./downloads/", "../", 1) + if normalized.startswith("/app/downloads/"): + relative_path = normalized.replace("/app/downloads/", "../", 1) elif "/downloads/" in normalized.lower(): idx = normalized.lower().rfind("/downloads/") relative_path = "../" + normalized[idx + len("/downloads/") :]