diff --git a/routes/prgs.py b/routes/prgs.py index d41d1bf..476918d 100755 --- a/routes/prgs.py +++ b/routes/prgs.py @@ -8,6 +8,7 @@ from routes.utils.celery_tasks import ( get_last_task_status, get_all_tasks, cancel_task, + ProgressState, ) # Configure logging @@ -17,6 +18,29 @@ prgs_bp = Blueprint("prgs", __name__, url_prefix="/api/prgs") # (Old .prg file system removed. Using new task system only.) +# Define active task states using ProgressState constants +ACTIVE_TASK_STATES = { + ProgressState.INITIALIZING, # "initializing" - task is starting up + ProgressState.PROCESSING, # "processing" - task is being processed + ProgressState.DOWNLOADING, # "downloading" - actively downloading + ProgressState.PROGRESS, # "progress" - album/playlist progress updates + ProgressState.TRACK_PROGRESS, # "track_progress" - real-time track progress + ProgressState.REAL_TIME, # "real_time" - real-time download progress + ProgressState.RETRYING, # "retrying" - task is retrying after error +} + +def is_task_active(task_status): + """ + Determine if a task is currently active (working/processing). + + Args: + task_status: The status string from the task + + Returns: + bool: True if the task is active, False otherwise + """ + return task_status in ACTIVE_TASK_STATES + def _build_error_callback_object(last_status): """ @@ -72,6 +96,171 @@ def _build_error_callback_object(last_status): return callback_object +def _build_task_response(task_info, last_status, task_id, current_time): + """ + Helper function to build a standardized task response object. + """ + # Dynamically construct original_url + dynamic_original_url = "" + download_type = task_info.get("download_type") + item_url = task_info.get("url") + + if download_type and item_url: + try: + item_id = item_url.split("/")[-1] + if item_id: + base_url = request.host_url.rstrip("/") + dynamic_original_url = ( + f"{base_url}/api/{download_type}/download/{item_id}" + ) + else: + logger.warning( + f"Could not extract item ID from URL: {item_url} for task {task_id}. Falling back for original_url." + ) + original_request_obj = task_info.get("original_request", {}) + dynamic_original_url = original_request_obj.get("original_url", "") + except Exception as e: + logger.error( + f"Error constructing dynamic original_url for task {task_id}: {e}", + exc_info=True, + ) + original_request_obj = task_info.get("original_request", {}) + dynamic_original_url = original_request_obj.get("original_url", "") + else: + logger.warning( + f"Missing download_type ('{download_type}') or item_url ('{item_url}') in task_info for task {task_id}. Falling back for original_url." + ) + original_request_obj = task_info.get("original_request", {}) + dynamic_original_url = original_request_obj.get("original_url", "") + + status_count = len(get_task_status(task_id)) + + # Determine last_line content + if last_status and "raw_callback" in last_status: + last_line_content = last_status["raw_callback"] + elif last_status and last_status.get("status") == "error": + last_line_content = _build_error_callback_object(last_status) + else: + last_line_content = last_status + + task_response = { + "original_url": dynamic_original_url, + "last_line": last_line_content, + "timestamp": last_status.get("timestamp") if last_status else current_time, + "task_id": task_id, + "status_count": status_count, + "created_at": task_info.get("created_at"), + "name": task_info.get("name"), + "artist": task_info.get("artist"), + "type": task_info.get("type"), + "download_type": task_info.get("download_type"), + } + if last_status and last_status.get("summary"): + task_response["summary"] = last_status["summary"] + + return task_response + + +def get_paginated_tasks(page=1, limit=20, active_only=False): + """ + Get paginated list of tasks. + """ + try: + all_tasks = get_all_tasks() + active_tasks = [] + other_tasks = [] + + # Task categorization counters + task_counts = { + "active": 0, + "queued": 0, + "completed": 0, + "error": 0, + "cancelled": 0, + "retrying": 0, + "skipped": 0 + } + + for task_summary in all_tasks: + task_id = task_summary.get("task_id") + if not task_id: + continue + + task_info = get_task_info(task_id) + if not task_info: + continue + + last_status = get_last_task_status(task_id) + task_status = last_status.get("status") if last_status else "unknown" + is_active_task = is_task_active(task_status) + + # Categorize tasks by status using ProgressState constants + if task_status == ProgressState.RETRYING: + task_counts["retrying"] += 1 + elif task_status in {ProgressState.QUEUED, "pending"}: # Keep "pending" for backward compatibility + task_counts["queued"] += 1 + elif task_status in {ProgressState.COMPLETE, ProgressState.DONE}: + task_counts["completed"] += 1 + elif task_status == ProgressState.ERROR: + task_counts["error"] += 1 + elif task_status == ProgressState.CANCELLED: + task_counts["cancelled"] += 1 + elif task_status == ProgressState.SKIPPED: + task_counts["skipped"] += 1 + elif is_active_task: + task_counts["active"] += 1 + + task_response = _build_task_response(task_info, last_status, task_id, time.time()) + + if is_active_task: + active_tasks.append(task_response) + else: + other_tasks.append(task_response) + + # Sort other tasks by creation time (newest first) + other_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True) + + if active_only: + paginated_tasks = active_tasks + pagination_info = { + "page": page, + "limit": limit, + "total_non_active": 0, + "has_more": False, + "returned_non_active": 0 + } + else: + # Apply pagination to non-active tasks + offset = (page - 1) * limit + paginated_other_tasks = other_tasks[offset:offset + limit] + paginated_tasks = active_tasks + paginated_other_tasks + + pagination_info = { + "page": page, + "limit": limit, + "total_non_active": len(other_tasks), + "has_more": len(other_tasks) > offset + limit, + "returned_non_active": len(paginated_other_tasks) + } + + response = { + "tasks": paginated_tasks, + "current_timestamp": time.time(), + "total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter + "all_tasks_count": len(all_tasks), # Total count of all tasks + "task_counts": task_counts, # Categorized counts + "active_tasks": len(active_tasks), + "updated_count": len(paginated_tasks), + "pagination": pagination_info + } + + return jsonify(response) + + except Exception as e: + logger.error(f"Error in get_paginated_tasks: {e}", exc_info=True) + return jsonify({"error": "Failed to retrieve paginated tasks"}), 500 + + @prgs_bp.route("/", methods=["GET"]) def get_task_details(task_id): """ @@ -177,12 +366,35 @@ def delete_task(task_id): @prgs_bp.route("/list", methods=["GET"]) def list_tasks(): """ - Retrieve a list of all tasks in the system. + Retrieve a paginated list of all tasks in the system. Returns a detailed list of task objects including status and metadata. + + Query parameters: + page (int): Page number for pagination (default: 1) + limit (int): Number of tasks per page (default: 50, max: 100) + active_only (bool): If true, only return active tasks (downloading, processing, etc.) """ try: + # Get query parameters + page = int(request.args.get('page', 1)) + limit = min(int(request.args.get('limit', 50)), 100) # Cap at 100 + active_only = request.args.get('active_only', '').lower() == 'true' + tasks = get_all_tasks() - detailed_tasks = [] + active_tasks = [] + other_tasks = [] + + # Task categorization counters + task_counts = { + "active": 0, + "queued": 0, + "completed": 0, + "error": 0, + "cancelled": 0, + "retrying": 0, + "skipped": 0 + } + for task_summary in tasks: task_id = task_summary.get("task_id") if not task_id: @@ -192,79 +404,89 @@ def list_tasks(): if not task_info: continue - # Dynamically construct original_url - dynamic_original_url = "" - download_type = task_info.get("download_type") - # The 'url' field in task_info stores the Spotify/Deezer URL of the item - # e.g., https://open.spotify.com/album/albumId or https://www.deezer.com/track/trackId - item_url = task_info.get("url") - - if download_type and item_url: - try: - # Extract the ID from the item_url (last part of the path) - item_id = item_url.split("/")[-1] - if item_id: # Ensure item_id is not empty - base_url = request.host_url.rstrip("/") - dynamic_original_url = ( - f"{base_url}/api/{download_type}/download/{item_id}" - ) - else: - logger.warning( - f"Could not extract item ID from URL: {item_url} for task {task_id}. Falling back for original_url." - ) - original_request_obj = task_info.get("original_request", {}) - dynamic_original_url = original_request_obj.get( - "original_url", "" - ) - except Exception as e: - logger.error( - f"Error constructing dynamic original_url for task {task_id}: {e}", - exc_info=True, - ) - original_request_obj = task_info.get("original_request", {}) - dynamic_original_url = original_request_obj.get( - "original_url", "" - ) # Fallback on any error - else: - logger.warning( - f"Missing download_type ('{download_type}') or item_url ('{item_url}') in task_info for task {task_id}. Falling back for original_url." - ) - original_request_obj = task_info.get("original_request", {}) - dynamic_original_url = original_request_obj.get("original_url", "") - last_status = get_last_task_status(task_id) - status_count = len(get_task_status(task_id)) - - # Determine last_line content - if last_status and "raw_callback" in last_status: - last_line_content = last_status["raw_callback"] - elif last_status and last_status.get("status") == "error": - last_line_content = _build_error_callback_object(last_status) + task_status = last_status.get("status") if last_status else "unknown" + is_active_task = is_task_active(task_status) + + # Categorize tasks by status using ProgressState constants + if task_status == ProgressState.RETRYING: + task_counts["retrying"] += 1 + elif task_status in {ProgressState.QUEUED, "pending"}: # Keep "pending" for backward compatibility + task_counts["queued"] += 1 + elif task_status in {ProgressState.COMPLETE, ProgressState.DONE}: + task_counts["completed"] += 1 + elif task_status == ProgressState.ERROR: + task_counts["error"] += 1 + elif task_status == ProgressState.CANCELLED: + task_counts["cancelled"] += 1 + elif task_status == ProgressState.SKIPPED: + task_counts["skipped"] += 1 + elif is_active_task: + task_counts["active"] += 1 + + task_response = _build_task_response(task_info, last_status, task_id, time.time()) + + if is_active_task: + active_tasks.append(task_response) else: - # Fallback for non-error, no raw_callback, or if last_status is None - last_line_content = last_status + other_tasks.append(task_response) - response = { - "original_url": dynamic_original_url, - "last_line": last_line_content, - "timestamp": last_status.get("timestamp") if last_status else time.time(), - "task_id": task_id, - "status_count": status_count, - "created_at": task_info.get("created_at"), - "name": task_info.get("name"), - "artist": task_info.get("artist"), - "type": task_info.get("type"), - "download_type": task_info.get("download_type"), + # Sort other tasks by creation time (newest first) + other_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True) + + if active_only: + # Return only active tasks without pagination + response_tasks = active_tasks + pagination_info = { + "page": page, + "limit": limit, + "total_items": len(active_tasks), + "total_pages": 1, + "has_more": False + } + else: + # Apply pagination to non-active tasks and combine with active tasks + offset = (page - 1) * limit + + # Always include active tasks at the top + if page == 1: + # For first page, include active tasks + first batch of other tasks + available_space = limit - len(active_tasks) + paginated_other_tasks = other_tasks[:max(0, available_space)] + response_tasks = active_tasks + paginated_other_tasks + else: + # For subsequent pages, only include other tasks + # Adjust offset to account for active tasks shown on first page + adjusted_offset = offset - len(active_tasks) + if adjusted_offset < 0: + adjusted_offset = 0 + paginated_other_tasks = other_tasks[adjusted_offset:adjusted_offset + limit] + response_tasks = paginated_other_tasks + + total_items = len(active_tasks) + len(other_tasks) + total_pages = ((total_items - 1) // limit) + 1 if total_items > 0 else 1 + + pagination_info = { + "page": page, + "limit": limit, + "total_items": total_items, + "total_pages": total_pages, + "has_more": page < total_pages, + "active_tasks": len(active_tasks), + "total_other_tasks": len(other_tasks) } - if last_status and last_status.get("summary"): - response["summary"] = last_status["summary"] - detailed_tasks.append(response) + response = { + "tasks": response_tasks, + "pagination": pagination_info, + "total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter + "all_tasks_count": len(tasks), # Total count of all tasks + "task_counts": task_counts, # Categorized counts + "active_tasks": len(active_tasks), + "timestamp": time.time() + } - # Sort tasks by creation time (newest first) - detailed_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True) - - return jsonify(detailed_tasks) + return jsonify(response) except Exception as e: logger.error(f"Error in /api/prgs/list: {e}", exc_info=True) return jsonify({"error": "Failed to retrieve task list"}), 500 @@ -329,3 +551,141 @@ def cancel_all_tasks(): except Exception as e: logger.error(f"Error in /api/prgs/cancel/all: {e}", exc_info=True) return jsonify({"error": "Failed to cancel all tasks"}), 500 + + +@prgs_bp.route("/updates", methods=["GET"]) +def get_task_updates(): + """ + Retrieve only tasks that have been updated since the specified timestamp. + This endpoint is optimized for polling to reduce unnecessary data transfer. + + Query parameters: + since (float): Unix timestamp - only return tasks updated after this time + page (int): Page number for pagination (default: 1) + limit (int): Number of queued/completed tasks per page (default: 20, max: 100) + active_only (bool): If true, only return active tasks (downloading, processing, etc.) + + Returns: + JSON object containing: + - tasks: Array of updated task objects + - current_timestamp: Current server timestamp for next poll + - total_tasks: Total number of tasks in system + - active_tasks: Number of active tasks + - pagination: Pagination info for queued/completed tasks + """ + try: + # Get query parameters + since_param = request.args.get('since') + page = int(request.args.get('page', 1)) + limit = min(int(request.args.get('limit', 20)), 100) # Cap at 100 + active_only = request.args.get('active_only', '').lower() == 'true' + + if not since_param: + # If no 'since' parameter, return paginated tasks (fallback behavior) + return get_paginated_tasks(page, limit, active_only) + + try: + since_timestamp = float(since_param) + except (ValueError, TypeError): + return jsonify({"error": "Invalid 'since' timestamp format"}), 400 + + # Get all tasks + all_tasks = get_all_tasks() + updated_tasks = [] + active_tasks = [] + current_time = time.time() + + # Task categorization counters + task_counts = { + "active": 0, + "queued": 0, + "completed": 0, + "error": 0, + "cancelled": 0, + "retrying": 0, + "skipped": 0 + } + + for task_summary in all_tasks: + task_id = task_summary.get("task_id") + if not task_id: + continue + + task_info = get_task_info(task_id) + if not task_info: + continue + + last_status = get_last_task_status(task_id) + + # Check if task has been updated since the given timestamp + task_timestamp = last_status.get("timestamp") if last_status else task_info.get("created_at", 0) + + # Determine task status and categorize + task_status = last_status.get("status") if last_status else "unknown" + is_active_task = is_task_active(task_status) + + # Categorize tasks by status using ProgressState constants + if task_status == ProgressState.RETRYING: + task_counts["retrying"] += 1 + elif task_status in {ProgressState.QUEUED, "pending"}: # Keep "pending" for backward compatibility + task_counts["queued"] += 1 + elif task_status in {ProgressState.COMPLETE, ProgressState.DONE}: + task_counts["completed"] += 1 + elif task_status == ProgressState.ERROR: + task_counts["error"] += 1 + elif task_status == ProgressState.CANCELLED: + task_counts["cancelled"] += 1 + elif task_status == ProgressState.SKIPPED: + task_counts["skipped"] += 1 + elif is_active_task: + task_counts["active"] += 1 + + # Always include active tasks in updates, apply filtering to others + should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) + + if should_include: + # Construct the same detailed task object as in list_tasks() + task_response = _build_task_response(task_info, last_status, task_id, current_time) + + if is_active_task: + active_tasks.append(task_response) + else: + updated_tasks.append(task_response) + + # Apply pagination to non-active tasks + offset = (page - 1) * limit + paginated_updated_tasks = updated_tasks[offset:offset + limit] if not active_only else [] + + # Combine active tasks (always shown) with paginated updated tasks + all_returned_tasks = active_tasks + paginated_updated_tasks + + # Sort by priority (active first, then by creation time) + all_returned_tasks.sort(key=lambda x: ( + 0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1, + -x.get("created_at", 0) + )) + + response = { + "tasks": all_returned_tasks, + "current_timestamp": current_time, + "total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter + "all_tasks_count": len(all_tasks), # Total count of all tasks + "task_counts": task_counts, # Categorized counts + "active_tasks": len(active_tasks), + "updated_count": len(updated_tasks), + "since_timestamp": since_timestamp, + "pagination": { + "page": page, + "limit": limit, + "total_non_active": len(updated_tasks), + "has_more": len(updated_tasks) > offset + limit, + "returned_non_active": len(paginated_updated_tasks) + } + } + + logger.debug(f"Returning {len(active_tasks)} active + {len(paginated_updated_tasks)} paginated tasks out of {len(all_tasks)} total") + return jsonify(response) + + except Exception as e: + logger.error(f"Error in /api/prgs/updates: {e}", exc_info=True) + return jsonify({"error": "Failed to retrieve task updates"}), 500 diff --git a/routes/utils/watch/manager.py b/routes/utils/watch/manager.py index baa0f49..92703c5 100644 --- a/routes/utils/watch/manager.py +++ b/routes/utils/watch/manager.py @@ -361,7 +361,7 @@ def check_watched_playlists(specific_playlist_id: str = None): logger.warning( f"Playlist Watch Manager: No tracks returned for playlist {playlist_spotify_id} at offset {offset}" ) - break + break batch_items = tracks_batch.get("items", []) if not batch_items: diff --git a/spotizerr-ui/pnpm-lock.yaml b/spotizerr-ui/pnpm-lock.yaml index b5a8dd6..d7dc7a9 100644 --- a/spotizerr-ui/pnpm-lock.yaml +++ b/spotizerr-ui/pnpm-lock.yaml @@ -47,6 +47,9 @@ importers: react-icons: specifier: ^5.5.0 version: 5.5.0(react@19.1.0) + socket.io-client: + specifier: ^4.8.1 + version: 4.8.1 sonner: specifier: ^2.0.5 version: 2.0.5(react-dom@19.1.0(react@19.1.0))(react@19.1.0) @@ -1191,6 +1194,9 @@ packages: cpu: [x64] os: [win32] + '@socket.io/component-emitter@3.1.2': + resolution: {integrity: sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==} + '@surma/rollup-plugin-off-main-thread@2.2.3': resolution: {integrity: sha512-lR8q/9W7hZpMWweNiAKU7NQerBnzQQLvi8qnTDU/fxItPhtZVMbPV3lbCwjhIlNBe9Bbr5V+KHshvWmVSG9cxQ==} @@ -1710,6 +1716,15 @@ packages: resolution: {integrity: sha512-BS8PfmtDGnrgYdOonGZQdLZslWIeCGFP9tpan0hi1Co2Zr2NKADsvGYA8XxuG/4UWgJ6Cjtv+YJnB6MM69QGlQ==} engines: {node: '>= 0.4'} + debug@4.3.7: + resolution: {integrity: sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==} + engines: {node: '>=6.0'} + peerDependencies: + supports-color: '*' + peerDependenciesMeta: + supports-color: + optional: true + debug@4.4.1: resolution: {integrity: sha512-KcKCqiftBJcZr++7ykoDIEwSa3XWowTfNPo92BYxjXiyYEVrUQh2aLyhxBCwww+heortUFxEJYcRzosstTEBYQ==} engines: {node: '>=6.0'} @@ -1763,6 +1778,13 @@ packages: electron-to-chromium@1.5.191: resolution: {integrity: sha512-xcwe9ELcuxYLUFqZZxL19Z6HVKcvNkIwhbHUz7L3us6u12yR+7uY89dSl570f/IqNthx8dAw3tojG7i4Ni4tDA==} + engine.io-client@6.6.3: + resolution: {integrity: sha512-T0iLjnyNWahNyv/lcjS2y4oE358tVS/SYQNxYXGAJ9/GLgH4VCvOQ/mhTjqU88mLZCQgiG8RIegFHYCdVC+j5w==} + + engine.io-parser@5.2.3: + resolution: {integrity: sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==} + engines: {node: '>=10.0.0'} + enhanced-resolve@5.18.1: resolution: {integrity: sha512-ZSW3ma5GkcQBIpwZTSRAI8N71Uuwgs93IezB7mf7R60tC8ZbJideoDNKjHn2O9KIlx6rkGTTEk1xUCK2E1Y2Yg==} engines: {node: '>=10.13.0'} @@ -2881,6 +2903,14 @@ packages: smob@1.5.0: resolution: {integrity: sha512-g6T+p7QO8npa+/hNx9ohv1E5pVCmWrVCUzUXJyLdMmftX6ER0oiWY/w9knEonLpnOp6b6FenKnMfR8gqwWdwig==} + socket.io-client@4.8.1: + resolution: {integrity: sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==} + engines: {node: '>=10.0.0'} + + socket.io-parser@4.2.4: + resolution: {integrity: sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==} + engines: {node: '>=10.0.0'} + solid-js@1.9.7: resolution: {integrity: sha512-/saTKi8iWEM233n5OSi1YHCCuh66ZIQ7aK2hsToPe4tqGm7qAejU1SwNuTPivbWAYq7SjuHVVYxxuZQNRbICiw==} @@ -2904,6 +2934,7 @@ packages: source-map@0.8.0-beta.0: resolution: {integrity: sha512-2ymg6oRBpebeZi9UUNsgQ89bhx01TcTkmNTGnNO88imTmbSgy4nfujrgVEFKWpMTEGA11EDkTt7mqObTPdigIA==} engines: {node: '>= 8'} + deprecated: The work that was done in this beta branch won't be included in future versions sourcemap-codec@1.4.8: resolution: {integrity: sha512-9NykojV5Uih4lgo5So5dtw+f0JgJX30KCNI8gwhz2J9A15wD0Ml6tjHKwf6fTSa6fAdVBdZeNOs9eJ71qCk8vA==} @@ -3276,6 +3307,18 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + ws@8.17.1: + resolution: {integrity: sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + xhr@2.6.0: resolution: {integrity: sha512-/eCGLb5rxjx5e3mF1A7s+pLlR6CGyqWN91fv1JgER5mVWg1MZmlhBvy9kjcsOdRk8RrIujotWyJamfyrp+WIcA==} @@ -3290,6 +3333,10 @@ packages: resolution: {integrity: sha512-fDlsI/kFEx7gLvbecc0/ohLG50fugQp8ryHzMTuW9vSa1GJ0XYWKnhsUx7oie3G98+r56aTQIUB4kht42R3JvA==} engines: {node: '>=4.0'} + xmlhttprequest-ssl@2.1.2: + resolution: {integrity: sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==} + engines: {node: '>=0.4.0'} + xtend@4.0.2: resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==} engines: {node: '>=0.4'} @@ -4401,6 +4448,8 @@ snapshots: '@rollup/rollup-win32-x64-msvc@4.42.0': optional: true + '@socket.io/component-emitter@3.1.2': {} + '@surma/rollup-plugin-off-main-thread@2.2.3': dependencies: ejs: 3.1.10 @@ -4972,6 +5021,10 @@ snapshots: es-errors: 1.3.0 is-data-view: 1.0.2 + debug@4.3.7: + dependencies: + ms: 2.1.3 + debug@4.4.1: dependencies: ms: 2.1.3 @@ -5017,6 +5070,20 @@ snapshots: electron-to-chromium@1.5.191: {} + engine.io-client@6.6.3: + dependencies: + '@socket.io/component-emitter': 3.1.2 + debug: 4.3.7 + engine.io-parser: 5.2.3 + ws: 8.17.1 + xmlhttprequest-ssl: 2.1.2 + transitivePeerDependencies: + - bufferutil + - supports-color + - utf-8-validate + + engine.io-parser@5.2.3: {} + enhanced-resolve@5.18.1: dependencies: graceful-fs: 4.2.11 @@ -6239,6 +6306,24 @@ snapshots: smob@1.5.0: {} + socket.io-client@4.8.1: + dependencies: + '@socket.io/component-emitter': 3.1.2 + debug: 4.3.7 + engine.io-client: 6.6.3 + socket.io-parser: 4.2.4 + transitivePeerDependencies: + - bufferutil + - supports-color + - utf-8-validate + + socket.io-parser@4.2.4: + dependencies: + '@socket.io/component-emitter': 3.1.2 + debug: 4.3.7 + transitivePeerDependencies: + - supports-color + solid-js@1.9.7: dependencies: csstype: 3.1.3 @@ -6736,6 +6821,8 @@ snapshots: wrappy@1.0.2: {} + ws@8.17.1: {} + xhr@2.6.0: dependencies: global: 4.4.0 @@ -6752,6 +6839,8 @@ snapshots: xmlbuilder@11.0.1: {} + xmlhttprequest-ssl@2.1.2: {} + xtend@4.0.2: {} yallist@3.1.1: {} diff --git a/spotizerr-ui/src/components/Queue.tsx b/spotizerr-ui/src/components/Queue.tsx index eab4a18..0545eef 100644 --- a/spotizerr-ui/src/components/Queue.tsx +++ b/spotizerr-ui/src/components/Queue.tsx @@ -1,4 +1,4 @@ -import { useContext, useState, useRef } from "react"; +import { useContext, useState, useRef, useEffect } from "react"; import { FaTimes, FaSync, @@ -8,7 +8,7 @@ import { FaMusic, FaCompactDisc, } from "react-icons/fa"; -import { QueueContext, type QueueItem, type QueueStatus } from "@/contexts/queue-context"; +import { QueueContext, type QueueItem, type QueueStatus, isActiveTaskStatus } from "@/contexts/queue-context"; const isTerminalStatus = (status: QueueStatus) => ["completed", "error", "cancelled", "skipped", "done"].includes(status); @@ -101,6 +101,20 @@ const statusStyles: Record< borderColor: "border-info/30 dark:border-info/40", name: "Real-time Download", }, + progress: { + icon: , + color: "text-info", + bgColor: "bg-gradient-to-r from-blue-50 to-blue-100 dark:from-blue-900/20 dark:to-blue-800/30", + borderColor: "border-info/30 dark:border-info/40", + name: "Progress", + }, + track_progress: { + icon: , + color: "text-info", + bgColor: "bg-gradient-to-r from-blue-50 to-blue-100 dark:from-blue-900/20 dark:to-blue-800/30", + borderColor: "border-info/30 dark:border-info/40", + name: "Track Progress", + }, }; // Circular Progress Component @@ -227,7 +241,7 @@ const QueueItemCard = ({ item }: { item: QueueItem }) => { return null; } - if (actualStatus === "downloading" || actualStatus === "processing") { + if (actualStatus === "downloading" || actualStatus === "processing" || actualStatus === "progress" || actualStatus === "track_progress") { if (type === "track") { return progress !== undefined ? `${progress.toFixed(0)}%` : null; } @@ -456,10 +470,44 @@ export const Queue = () => { const [startY, setStartY] = useState(null); const [currentY, setCurrentY] = useState(null); const queueRef = useRef(null); + const scrollContainerRef = useRef(null); + // Extract values from context (with defaults to avoid crashes) + const { + items = [], + isVisible = false, + toggleVisibility = () => {}, + cancelAll = () => {}, + clearCompleted = () => {}, + hasMore = false, + isLoadingMore = false, + loadMoreTasks = () => {}, + totalTasks = 0 + } = context || {}; + + // Infinite scroll effect - MUST be called before any conditional returns + useEffect(() => { + if (!isVisible) return; // Early return if not visible + + const scrollContainer = scrollContainerRef.current; + if (!scrollContainer) return; + + const handleScroll = () => { + const { scrollTop, scrollHeight, clientHeight } = scrollContainer; + const scrollPercentage = (scrollTop + clientHeight) / scrollHeight; + + // Load more when user has scrolled 80% of the way down + if (scrollPercentage > 0.8 && hasMore && !isLoadingMore) { + loadMoreTasks(); + } + }; + + scrollContainer.addEventListener('scroll', handleScroll); + return () => scrollContainer.removeEventListener('scroll', handleScroll); + }, [isVisible, hasMore, isLoadingMore, loadMoreTasks]); + + // Early returns after all hooks if (!context) return null; - const { items, isVisible, toggleVisibility, cancelAll, clearCompleted } = context; - if (!isVisible) return null; const hasActive = items.some((item) => !isTerminalStatus(item.status)); @@ -524,7 +572,7 @@ export const Queue = () => { {/* Add drag indicator for mobile */}

- Download Queue ({items.length}) + Download Queue ({totalTasks})

-
+
{items.length === 0 ? (
@@ -562,7 +613,80 @@ export const Queue = () => {

Downloads will appear here

) : ( - items.map((item) => ) + (() => { + // Sort items by priority hierarchy + const sortedItems = [...items].sort((a, b) => { + // Extract actual status for both items + const statusA = (a.last_line?.status_info?.status as QueueStatus) || + (a.last_line?.status as QueueStatus) || + a.status; + const statusB = (b.last_line?.status_info?.status as QueueStatus) || + (b.last_line?.status as QueueStatus) || + b.status; + + // Define priority groups (lower number = higher priority) + const getPriority = (status: QueueStatus) => { + switch (status) { + case "real-time": return 1; + case "downloading": return 2; + case "processing": return 3; + case "initializing": return 4; + case "retrying": return 5; + case "queued": return 6; + case "pending": return 7; + case "completed": + case "done": return 8; + case "error": return 9; + case "cancelled": return 10; + case "skipped": return 11; + default: return 12; + } + }; + + const priorityA = getPriority(statusA); + const priorityB = getPriority(statusB); + + // First sort by priority + if (priorityA !== priorityB) { + return priorityA - priorityB; + } + + // Within same priority group, maintain original order (FIFO) + // Assuming items have some sort of timestamp or creation order + return 0; + }); + + return ( + <> + {sortedItems.map((item) => )} + + {/* Loading indicator for infinite scroll */} + {isLoadingMore && ( +
+
+
+ Loading more tasks... +
+
+ )} + + {/* Load More Button (fallback for manual loading) */} + {hasMore && !isLoadingMore && ( +
+ +
+ )} + + ); + })() )}
diff --git a/spotizerr-ui/src/contexts/QueueProvider.tsx b/spotizerr-ui/src/contexts/QueueProvider.tsx index b9799b8..ef0df48 100644 --- a/spotizerr-ui/src/contexts/QueueProvider.tsx +++ b/spotizerr-ui/src/contexts/QueueProvider.tsx @@ -5,6 +5,7 @@ import { type QueueItem, type DownloadType, type QueueStatus, + isActiveTaskStatus, } from "./queue-context"; import { toast } from "sonner"; import { v4 as uuidv4 } from "uuid"; @@ -41,6 +42,18 @@ export function QueueProvider({ children }: { children: ReactNode }) { const [isVisible, setIsVisible] = useState(false); const pollingIntervals = useRef>({}); const cancelledRemovalTimers = useRef>({}); + + // Smart polling state + const smartPollingInterval = useRef(null); + const lastUpdateTimestamp = useRef(0); + const isInitialized = useRef(false); + + // Pagination state + const [currentPage, setCurrentPage] = useState(1); + const [hasMore, setHasMore] = useState(true); + const [isLoadingMore, setIsLoadingMore] = useState(false); + const [totalTasks, setTotalTasks] = useState(0); + const pageSize = 20; // Number of non-active tasks per page // Calculate active downloads count const activeCount = useMemo(() => { @@ -130,50 +143,175 @@ export function QueueProvider({ children }: { children: ReactNode }) { return updatedItem; }, [scheduleCancelledTaskRemoval]); + const startSmartPolling = useCallback(() => { + if (smartPollingInterval.current) return; // Already polling + + console.log("Starting smart polling"); + + const intervalId = window.setInterval(async () => { + try { + const response = await apiClient.get<{ + tasks: any[]; + current_timestamp: number; + total_tasks: number; + active_tasks: number; + updated_count: number; + }>(`/prgs/updates?since=${lastUpdateTimestamp.current}&active_only=true`); + + const { tasks: updatedTasks, current_timestamp, total_tasks } = response.data; + + // Update the last timestamp for next poll + lastUpdateTimestamp.current = current_timestamp; + + // Update total tasks count + setTotalTasks(total_tasks || 0); + + if (updatedTasks.length > 0) { + console.log(`Smart polling: ${updatedTasks.length} tasks updated (${response.data.active_tasks} active) out of ${response.data.total_tasks} total`); + + // Create a map of updated tasks by task_id for efficient lookup + const updatedTasksMap = new Map(updatedTasks.map(task => [task.task_id, task])); + + setItems(prev => { + // Update existing items with new data, and add any new active tasks + const updatedItems = prev.map(item => { + const updatedTaskData = updatedTasksMap.get(item.taskId || item.id); + if (updatedTaskData) { + return updateItemFromPrgs(item, updatedTaskData); + } + return item; + }); + + // Only add new active tasks that aren't in our current items and aren't in terminal state + const currentTaskIds = new Set(prev.map(item => item.taskId || item.id)); + const newActiveTasks = updatedTasks + .filter(task => { + const isNew = !currentTaskIds.has(task.task_id); + const status = task.last_line?.status_info?.status || task.last_line?.status || "unknown"; + const isActive = isActiveTaskStatus(status); + const isTerminal = ["completed", "error", "cancelled", "skipped", "done"].includes(status); + return isNew && isActive && !isTerminal; + }) + .map(task => { + const spotifyId = task.original_url?.split("/").pop() || ""; + const baseItem: QueueItem = { + id: task.task_id, + taskId: task.task_id, + name: task.name || "Unknown", + type: task.download_type || "track", + spotifyId: spotifyId, + status: "initializing", + artist: task.artist, + }; + return updateItemFromPrgs(baseItem, task); + }); + + return newActiveTasks.length > 0 ? [...newActiveTasks, ...updatedItems] : updatedItems; + }); + } + } catch (error) { + console.error("Smart polling failed:", error); + } + }, 2000); // Poll every 2 seconds + + smartPollingInterval.current = intervalId; + }, [updateItemFromPrgs]); + + const stopSmartPolling = useCallback(() => { + if (smartPollingInterval.current) { + console.log("Stopping smart polling"); + clearInterval(smartPollingInterval.current); + smartPollingInterval.current = null; + } + }, []); + + const loadMoreTasks = useCallback(async () => { + if (!hasMore || isLoadingMore) return; + + setIsLoadingMore(true); + try { + const nextPage = currentPage + 1; + const response = await apiClient.get<{ + tasks: any[]; + pagination: { + has_more: boolean; + }; + }>(`/prgs/list?page=${nextPage}&limit=${pageSize}`); + + const { tasks: newTasks, pagination } = response.data; + + if (newTasks.length > 0) { + // Add new tasks to the end of the list (avoiding duplicates and filtering out terminal state tasks) + setItems(prev => { + const existingTaskIds = new Set(prev.map(item => item.taskId || item.id)); + const uniqueNewTasks = newTasks + .filter(task => { + // Skip if already exists + if (existingTaskIds.has(task.task_id)) return false; + + // Filter out terminal state tasks + const status = task.last_line?.status_info?.status || task.last_line?.status || "unknown"; + const isTerminal = ["completed", "error", "cancelled", "skipped", "done"].includes(status); + return !isTerminal; + }) + .map(task => { + const spotifyId = task.original_url?.split("/").pop() || ""; + const baseItem: QueueItem = { + id: task.task_id, + taskId: task.task_id, + name: task.name || "Unknown", + type: task.download_type || "track", + spotifyId: spotifyId, + status: "initializing", + artist: task.artist, + }; + return updateItemFromPrgs(baseItem, task); + }); + + return [...prev, ...uniqueNewTasks]; + }); + + setCurrentPage(nextPage); + } + + setHasMore(pagination.has_more); + } catch (error) { + console.error("Failed to load more tasks:", error); + toast.error("Failed to load more tasks"); + } finally { + setIsLoadingMore(false); + } + }, [hasMore, isLoadingMore, currentPage, pageSize, updateItemFromPrgs]); + const startPolling = useCallback( (taskId: string) => { - if (pollingIntervals.current[taskId]) return; - - const intervalId = window.setInterval(async () => { - try { - const response = await apiClient.get(`/prgs/${taskId}`); - setItems(prev => - prev.map(item => { - if (item.taskId !== taskId) return item; - const updatedItem = updateItemFromPrgs(item, response.data); - if (isTerminalStatus(updatedItem.status as QueueStatus)) { - stopPolling(taskId); - } - return updatedItem; - }), - ); - } catch (error) { - console.error(`Polling failed for task ${taskId}:`, error); - stopPolling(taskId); - setItems(prev => - prev.map(i => - i.taskId === taskId - ? { ...i, status: "error", error: "Connection lost" } - : i, - ), - ); - } - }, 2000); - - pollingIntervals.current[taskId] = intervalId; + // Legacy function - now just ensures smart polling is active + startSmartPolling(); }, - [stopPolling, updateItemFromPrgs, scheduleCancelledTaskRemoval], + [startSmartPolling], ); useEffect(() => { const fetchQueue = async () => { try { - const response = await apiClient.get("/prgs/list"); - const backendItems = response.data + console.log("Fetching initial queue with pagination"); + const response = await apiClient.get<{ + tasks: any[]; + pagination: { + has_more: boolean; + }; + total_tasks: number; + timestamp: number; + }>(`/prgs/list?page=1&limit=${pageSize}`); + + const { tasks, pagination, total_tasks, timestamp } = response.data; + + const backendItems = tasks .filter((task: any) => { - // Filter out cancelled tasks on initial fetch + // Filter out terminal state tasks on initial fetch const status = task.last_line?.status_info?.status || task.last_line?.status || task.status; - return status !== "cancelled"; + const isTerminal = ["completed", "error", "cancelled", "skipped", "done"].includes(status); + return !isTerminal; }) .map((task: any) => { const spotifyId = task.original_url?.split("/").pop() || ""; @@ -190,12 +328,15 @@ export function QueueProvider({ children }: { children: ReactNode }) { }); setItems(backendItems); + setHasMore(pagination.has_more); + setTotalTasks(total_tasks || 0); + + // Set initial timestamp to current time + lastUpdateTimestamp.current = timestamp; + isInitialized.current = true; - backendItems.forEach((item: QueueItem) => { - if (item.taskId && !isTerminalStatus(item.status)) { - startPolling(item.taskId); - } - }); + // Start smart polling for real-time updates + startSmartPolling(); } catch (error) { console.error("Failed to fetch queue from backend:", error); toast.error("Could not load queue. Please refresh the page."); @@ -203,6 +344,17 @@ export function QueueProvider({ children }: { children: ReactNode }) { }; fetchQueue(); + + // Cleanup function to stop polling when component unmounts + return () => { + stopSmartPolling(); + // Clean up any remaining individual polling intervals (legacy cleanup) + Object.values(pollingIntervals.current).forEach(clearInterval); + pollingIntervals.current = {}; + // Clean up removal timers + Object.values(cancelledRemovalTimers.current).forEach(clearTimeout); + cancelledRemovalTimers.current = {}; + }; // eslint-disable-next-line react-hooks/exhaustive-deps }, []); @@ -230,7 +382,8 @@ export function QueueProvider({ children }: { children: ReactNode }) { ), ); - startPolling(taskId); + // Ensure smart polling is active for the new task + startSmartPolling(); } catch (error: any) { console.error(`Failed to start download for ${item.name}:`, error); toast.error(`Failed to start download for ${item.name}`); @@ -247,7 +400,7 @@ export function QueueProvider({ children }: { children: ReactNode }) { ); } }, - [isVisible, startPolling], + [isVisible, startSmartPolling], ); const removeItem = useCallback((id: string) => { @@ -319,11 +472,12 @@ export function QueueProvider({ children }: { children: ReactNode }) { : i, ), ); - startPolling(item.taskId); + // Ensure smart polling is active for the retry + startSmartPolling(); toast.info(`Retrying download: ${item.name}`); } }, - [items, startPolling], + [items, startSmartPolling], ); const toggleVisibility = useCallback(() => { @@ -389,88 +543,7 @@ export function QueueProvider({ children }: { children: ReactNode }) { } }, [items, stopPolling, scheduleCancelledTaskRemoval]); - const clearAllPolls = useCallback(() => { - Object.values(pollingIntervals.current).forEach(clearInterval); - }, []); - useEffect(() => { - interface PrgsListEntry { - task_id: string; - name?: string; - download_type?: string; - status?: string; - original_request?: { url?: string }; - last_status_obj?: { - progress?: number; - current_track?: number; - total_tracks?: number; - error?: string; - can_retry?: boolean; - }; - summary?: SummaryObject; - } - - const syncActiveTasks = async () => { - try { - const response = await apiClient.get("/prgs/list"); - const activeTasks: QueueItem[] = response.data - .filter((task) => { - const status = task.status?.toLowerCase(); - return status && !isTerminalStatus(status as QueueStatus); - }) - .map((task) => { - const url = task.original_request?.url || ""; - const spotifyId = url.includes("spotify.com") ? url.split("/").pop() || "" : ""; - let type: DownloadType = "track"; - if (task.download_type === "album") type = "album"; - if (task.download_type === "playlist") type = "playlist"; - if (task.download_type === "artist") type = "artist"; - - const queueItem: QueueItem = { - id: task.task_id, - taskId: task.task_id, - name: task.name || "Unknown", - type, - spotifyId, - status: (task.status?.toLowerCase() || "pending") as QueueStatus, - progress: task.last_status_obj?.progress, - currentTrackNumber: task.last_status_obj?.current_track, - totalTracks: task.last_status_obj?.total_tracks, - error: task.last_status_obj?.error, - canRetry: task.last_status_obj?.can_retry, - summary: task.summary, - }; - return queueItem; - }); - - setItems((prevItems) => { - const newItems = [...prevItems]; - activeTasks.forEach((task) => { - const existingIndex = newItems.findIndex((item) => item.id === task.id); - if (existingIndex === -1) { - newItems.push(task); - } else { - newItems[existingIndex] = { ...newItems[existingIndex], ...task }; - } - if (task.taskId && !isTerminalStatus(task.status)) { - if (task.taskId && !isTerminalStatus(task.status)) { - startPolling(task.taskId); - } - } - }); - return newItems; - }); - } catch (error) { - console.error("Failed to sync active tasks:", error); - } - }; - - syncActiveTasks(); - return () => { - clearAllPolls(); - Object.values(cancelledRemovalTimers.current).forEach(clearTimeout); - }; - }, [startPolling, clearAllPolls]); const value = { items, @@ -483,6 +556,11 @@ export function QueueProvider({ children }: { children: ReactNode }) { clearCompleted, cancelAll, cancelItem, + // Pagination + hasMore, + isLoadingMore, + loadMoreTasks, + totalTasks, }; return {children}; diff --git a/spotizerr-ui/src/contexts/queue-context.ts b/spotizerr-ui/src/contexts/queue-context.ts index 39c8780..1f54c06 100644 --- a/spotizerr-ui/src/contexts/queue-context.ts +++ b/spotizerr-ui/src/contexts/queue-context.ts @@ -14,7 +14,28 @@ export type QueueStatus = | "done" | "queued" | "retrying" - | "real-time"; + | "real-time" + | "progress" + | "track_progress"; + +// Active task statuses - tasks that are currently working/processing +// This matches the ACTIVE_TASK_STATES constant in the backend +export const ACTIVE_TASK_STATUSES: Set = new Set([ + "initializing", // task is starting up + "processing", // task is being processed + "downloading", // actively downloading + "progress", // album/playlist progress updates + "track_progress", // real-time track progress + "real-time", // real-time download progress + "retrying", // task is retrying after error +]); + +/** + * Determine if a task status represents an active (working/processing) task + */ +export function isActiveTaskStatus(status: string): boolean { + return ACTIVE_TASK_STATUSES.has(status as QueueStatus); +} export interface QueueItem { id: string; @@ -81,6 +102,11 @@ export interface QueueContextType { clearCompleted: () => void; cancelAll: () => void; cancelItem: (id: string) => void; + // Pagination + hasMore: boolean; + isLoadingMore: boolean; + loadMoreTasks: () => void; + totalTasks: number; } export const QueueContext = createContext(undefined); diff --git a/spotizerr-ui/src/routes/root.tsx b/spotizerr-ui/src/routes/root.tsx index 585d110..5d54f12 100644 --- a/spotizerr-ui/src/routes/root.tsx +++ b/spotizerr-ui/src/routes/root.tsx @@ -77,7 +77,7 @@ function ThemeToggle() { } function AppLayout() { - const { toggleVisibility, activeCount } = useContext(QueueContext) || {}; + const { toggleVisibility, activeCount, totalTasks } = useContext(QueueContext) || {}; return (
@@ -100,9 +100,9 @@ function AppLayout() { @@ -151,9 +151,9 @@ function AppLayout() {