diff --git a/routes/prgs.py b/routes/prgs.py index 62fc0a9..1f25f97 100755 --- a/routes/prgs.py +++ b/routes/prgs.py @@ -1,6 +1,8 @@ -from flask import Blueprint, abort, jsonify +from flask import Blueprint, abort, jsonify, Response, stream_with_context import os import json +import logging +import time from routes.utils.celery_tasks import ( get_task_info, @@ -8,9 +10,14 @@ from routes.utils.celery_tasks import ( get_last_task_status, get_all_tasks, cancel_task, - retry_task + retry_task, + ProgressState, + redis_client ) +# Configure logging +logger = logging.getLogger(__name__) + prgs_bp = Blueprint('prgs', __name__, url_prefix='/api/prgs') # The old path for PRG files (keeping for backward compatibility during transition) @@ -35,9 +42,18 @@ def get_prg_file(task_id): if task_info: # This is a task ID in the new system original_request = task_info.get("original_request", {}) - last_status = get_last_task_status(task_id) - return jsonify({ + # Get the latest status update for this task + last_status = get_last_task_status(task_id) + logger.debug(f"API: Got last_status for {task_id}: {json.dumps(last_status) if last_status else None}") + + # Get all status updates for debugging + all_statuses = get_task_status(task_id) + status_count = len(all_statuses) + logger.debug(f"API: Task {task_id} has {status_count} status updates") + + # Prepare the response with basic info + response = { "type": task_info.get("type", ""), "name": task_info.get("name", ""), "artist": task_info.get("artist", ""), @@ -45,8 +61,84 @@ def get_prg_file(task_id): "original_request": original_request, "display_title": original_request.get("display_title", task_info.get("name", "")), "display_type": original_request.get("display_type", task_info.get("type", "")), - "display_artist": original_request.get("display_artist", task_info.get("artist", "")) - }) + "display_artist": original_request.get("display_artist", task_info.get("artist", "")), + "status_count": status_count + } + + # Handle different status types + if last_status: + status_type = last_status.get("status", "unknown") + + # For terminal statuses (complete, error, cancelled) + if status_type in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED]: + response["progress_message"] = last_status.get("message", f"Download {status_type}") + + # For progress status with track information + elif status_type == "progress" and last_status.get("track"): + # Add explicit track progress fields to the top level for easy access + response["current_track"] = last_status.get("track", "") + response["track_number"] = last_status.get("parsed_current_track", 0) + response["total_tracks"] = last_status.get("parsed_total_tracks", 0) + response["progress_percent"] = last_status.get("overall_progress", 0) + response["album"] = last_status.get("album", "") + + # Format a nice progress message for display + track_info = last_status.get("track", "") + current = last_status.get("parsed_current_track", 0) + total = last_status.get("parsed_total_tracks", 0) + progress = last_status.get("overall_progress", 0) + + if current and total: + response["progress_message"] = f"Downloading track {current}/{total} ({progress}%): {track_info}" + elif track_info: + response["progress_message"] = f"Downloading: {track_info}" + + # For initializing status + elif status_type == "initializing": + album = last_status.get("album", "") + if album: + response["progress_message"] = f"Initializing download for {album}" + else: + response["progress_message"] = "Initializing download..." + + # For processing status (default) + elif status_type == "processing": + # Search for the most recent track progress in all statuses + has_progress = False + for status in reversed(all_statuses): + if status.get("status") == "progress" and status.get("track"): + # Use this track progress information + track_info = status.get("track", "") + current_raw = status.get("current_track", "") + response["current_track"] = track_info + + # Try to parse track numbers if available + if isinstance(current_raw, str) and "/" in current_raw: + try: + parts = current_raw.split("/") + current = int(parts[0]) + total = int(parts[1]) + response["track_number"] = current + response["total_tracks"] = total + response["progress_percent"] = min(int((current / total) * 100), 100) + response["progress_message"] = f"Processing track {current}/{total}: {track_info}" + except (ValueError, IndexError): + response["progress_message"] = f"Processing: {track_info}" + else: + response["progress_message"] = f"Processing: {track_info}" + + has_progress = True + break + + if not has_progress: + # Just use the processing message + response["progress_message"] = last_status.get("message", "Processing download...") + + # For other status types + else: + response["progress_message"] = last_status.get("message", f"Status: {status_type}") + + return jsonify(response) # If not found in new system, try the old PRG file system # Security check to prevent path traversal attacks. @@ -265,3 +357,164 @@ def cancel_task_endpoint(task_id): }), 400 except Exception as e: abort(500, f"An error occurred: {e}") + + +@prgs_bp.route('/stream/', methods=['GET']) +def stream_task_status(task_id): + """ + Stream task status updates as Server-Sent Events (SSE). + This endpoint opens a persistent connection and sends updates in real-time. + + Args: + task_id: The ID of the task to stream updates for + """ + def generate(): + try: + # Get initial task info to send as the opening message + task_info = get_task_info(task_id) + + if not task_info: + # Check if this is an old PRG file + if os.path.exists(os.path.join(PRGS_DIR, task_id)): + # Return error - SSE not supported for old PRG files + yield f"event: error\ndata: {json.dumps({'error': 'SSE streaming not supported for old PRG files'})}\n\n" + return + else: + # Task not found + yield f"event: error\ndata: {json.dumps({'error': 'Task not found'})}\n\n" + return + + # Get the original request and other basic info for the opening message + original_request = task_info.get("original_request", {}) + download_type = task_info.get("type", "") + name = task_info.get("name", "") + artist = task_info.get("artist", "") + + # Prepare the opening message with the required information + opening_data = { + "event": "start", + "task_id": task_id, + "type": download_type, + "name": name, + "artist": artist, + "url": original_request.get("url", ""), + "service": original_request.get("service", ""), + "timestamp": time.time(), + "status": "initializing", + "message": f"Starting {download_type} download: {name}" + (f" by {artist}" if artist else "") + } + + # Send the opening message + yield f"event: start\ndata: {json.dumps(opening_data)}\n\n" + + # Get existing status updates to catch up (most recent first) + all_updates = get_task_status(task_id) + # Sort updates by id + sorted_updates = sorted(all_updates, key=lambda x: x.get("id", 0)) + + # Send the most recent updates first (up to 10) + for i, update in enumerate(sorted_updates[-10:]): + # Add the task_id to each update message + update["task_id"] = task_id + yield f"event: update\ndata: {json.dumps(update)}\n\n" + + # Keep track of the last update ID we've sent + last_sent_id = 0 + if sorted_updates: + last_sent_id = sorted_updates[-1].get("id", 0) + + # Create a Redis connection for subscribing to updates + redis_pubsub = redis_client.pubsub() + redis_pubsub.subscribe(f"task_updates:{task_id}") + + # Hold the connection open and check for updates + last_heartbeat = time.time() + heartbeat_interval = 15 # Send heartbeat every 15 seconds + + while True: + # Check for new updates via Redis Pub/Sub + message = redis_pubsub.get_message(timeout=1.0) + + if message and message['type'] == 'message': + # Got a new message from Redis Pub/Sub + try: + data = json.loads(message['data'].decode('utf-8')) + status_id = data.get('status_id', 0) + + # Fetch the actual status data + if status_id > last_sent_id: + all_status = redis_client.lrange(f"task:{task_id}:status", 0, -1) + + for status_data in all_status: + try: + status = json.loads(status_data.decode('utf-8')) + if status.get("id") == status_id: + # Add the task_id to the update + status["task_id"] = task_id + + # Choose the appropriate event type based on status + status_type = status.get("status", "") + event_type = "update" + + if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE: + event_type = "complete" + elif status_type == ProgressState.TRACK_COMPLETE: + # Create a distinct event type for track completion to prevent UI issues + event_type = "track_complete" + elif status_type == ProgressState.ERROR: + event_type = "error" + elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]: + event_type = "progress" + + # Send the update + yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n" + last_sent_id = status_id + break + except Exception as e: + logger.error(f"Error parsing status data: {e}") + except Exception as e: + logger.error(f"Error processing Redis Pub/Sub message: {e}") + + # Check if task is complete, error, or cancelled - if so, end the stream + last_status = get_last_task_status(task_id) + if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]: + # Send final message + final_data = { + "event": "end", + "task_id": task_id, + "status": last_status.get("status"), + "message": last_status.get("message", "Download complete"), + "timestamp": time.time() + } + yield f"event: end\ndata: {json.dumps(final_data)}\n\n" + break + + # Send a heartbeat periodically to keep the connection alive + now = time.time() + if now - last_heartbeat >= heartbeat_interval: + yield f"event: heartbeat\ndata: {json.dumps({'timestamp': now})}\n\n" + last_heartbeat = now + + # Small sleep to prevent CPU spinning + time.sleep(0.1) + + except Exception as e: + logger.error(f"Error in SSE stream: {e}") + yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" + finally: + # Clean up: unsubscribe and close Redis Pub/Sub connection + if 'redis_pubsub' in locals(): + try: + redis_pubsub.unsubscribe() + redis_pubsub.close() + except Exception as e: + logger.error(f"Error closing Redis Pub/Sub: {e}") + + return Response( + stream_with_context(generate()), + mimetype='text/event-stream', + headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no' # Disable Nginx buffering + } + ) diff --git a/routes/utils/album.py b/routes/utils/album.py index af4e4f2..bb8ba90 100755 --- a/routes/utils/album.py +++ b/routes/utils/album.py @@ -22,6 +22,9 @@ def download_album( progress_callback=None ): try: + # DEBUG: Print parameters + print(f"DEBUG: album.py received - service={service}, main={main}, fallback={fallback}") + # Load Spotify client credentials if available spotify_client_id = None spotify_client_secret = None @@ -30,9 +33,11 @@ def download_album( if service == 'spotify' and fallback: # If fallback is enabled, use the fallback account for Spotify search credentials search_creds_path = Path(f'./creds/spotify/{fallback}/search.json') + print(f"DEBUG: Using Spotify search credentials from fallback: {search_creds_path}") else: # Otherwise use the main account for Spotify search credentials search_creds_path = Path(f'./creds/spotify/{main}/search.json') + print(f"DEBUG: Using Spotify search credentials from main: {search_creds_path}") if search_creds_path.exists(): try: @@ -40,6 +45,7 @@ def download_album( search_creds = json.load(f) spotify_client_id = search_creds.get('client_id') spotify_client_secret = search_creds.get('client_secret') + print(f"DEBUG: Loaded Spotify client credentials successfully") except Exception as e: print(f"Error loading Spotify search credentials: {e}") @@ -56,6 +62,19 @@ def download_album( # Load Deezer credentials from 'main' under deezer directory deezer_creds_dir = os.path.join('./creds/deezer', main) deezer_creds_path = os.path.abspath(os.path.join(deezer_creds_dir, 'credentials.json')) + + # DEBUG: Print Deezer credential paths being used + print(f"DEBUG: Looking for Deezer credentials at:") + print(f"DEBUG: deezer_creds_dir = {deezer_creds_dir}") + print(f"DEBUG: deezer_creds_path = {deezer_creds_path}") + print(f"DEBUG: Directory exists = {os.path.exists(deezer_creds_dir)}") + print(f"DEBUG: Credentials file exists = {os.path.exists(deezer_creds_path)}") + + # List available directories to compare + print(f"DEBUG: Available Deezer credential directories:") + for dir_name in os.listdir('./creds/deezer'): + print(f"DEBUG: ./creds/deezer/{dir_name}") + with open(deezer_creds_path, 'r') as f: deezer_creds = json.load(f) # Initialize DeeLogin with Deezer credentials and Spotify client credentials @@ -65,6 +84,7 @@ def download_album( spotify_client_secret=spotify_client_secret, progress_callback=progress_callback ) + print(f"DEBUG: Starting album download using Deezer credentials (download_albumspo)") # Download using download_albumspo; pass real_time_dl accordingly and the custom formatting dl.download_albumspo( link_album=url, @@ -82,6 +102,7 @@ def download_album( retry_delay_increase=retry_delay_increase, max_retries=max_retries ) + print(f"DEBUG: Album download completed successfully using Deezer credentials") except Exception as e: deezer_error = e # Immediately report the Deezer error @@ -94,6 +115,9 @@ def download_album( spo_creds_dir = os.path.join('./creds/spotify', fallback) spo_creds_path = os.path.abspath(os.path.join(spo_creds_dir, 'credentials.json')) + print(f"DEBUG: Using Spotify fallback credentials from: {spo_creds_path}") + print(f"DEBUG: Fallback credentials exist: {os.path.exists(spo_creds_path)}") + # We've already loaded the Spotify client credentials above based on fallback spo = SpoLogin( @@ -102,6 +126,7 @@ def download_album( spotify_client_secret=spotify_client_secret, progress_callback=progress_callback ) + print(f"DEBUG: Starting album download using Spotify fallback credentials") spo.download_album( link_album=url, output_dir="./downloads", @@ -119,8 +144,10 @@ def download_album( retry_delay_increase=retry_delay_increase, max_retries=max_retries ) + print(f"DEBUG: Album download completed successfully using Spotify fallback") except Exception as e2: # If fallback also fails, raise an error indicating both attempts failed + print(f"ERROR: Spotify fallback also failed: {e2}") raise RuntimeError( f"Both main (Deezer) and fallback (Spotify) attempts failed. " f"Deezer error: {deezer_error}, Spotify error: {e2}" @@ -131,12 +158,16 @@ def download_album( quality = 'HIGH' creds_dir = os.path.join('./creds/spotify', main) credentials_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json')) + print(f"DEBUG: Using Spotify main credentials from: {credentials_path}") + print(f"DEBUG: Credentials exist: {os.path.exists(credentials_path)}") + spo = SpoLogin( credentials_path=credentials_path, spotify_client_id=spotify_client_id, spotify_client_secret=spotify_client_secret, progress_callback=progress_callback ) + print(f"DEBUG: Starting album download using Spotify main credentials") spo.download_album( link_album=url, output_dir="./downloads", @@ -154,12 +185,16 @@ def download_album( retry_delay_increase=retry_delay_increase, max_retries=max_retries ) + print(f"DEBUG: Album download completed successfully using Spotify main") elif service == 'deezer': if quality is None: quality = 'FLAC' # Existing code remains the same, ignoring fallback creds_dir = os.path.join('./creds/deezer', main) creds_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json')) + print(f"DEBUG: Using Deezer credentials from: {creds_path}") + print(f"DEBUG: Credentials exist: {os.path.exists(creds_path)}") + with open(creds_path, 'r') as f: creds = json.load(f) dl = DeeLogin( @@ -168,6 +203,7 @@ def download_album( spotify_client_secret=spotify_client_secret, progress_callback=progress_callback ) + print(f"DEBUG: Starting album download using Deezer credentials (download_albumdee)") dl.download_albumdee( link_album=url, output_dir="./downloads", @@ -183,8 +219,10 @@ def download_album( retry_delay_increase=retry_delay_increase, max_retries=max_retries ) + print(f"DEBUG: Album download completed successfully using Deezer direct") else: raise ValueError(f"Unsupported service: {service}") except Exception as e: + print(f"ERROR: Album download failed with exception: {e}") traceback.print_exc() raise # Re-raise the exception after logging diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index 31dcf7a..9200d35 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -33,7 +33,17 @@ class ProgressState: COMPLETE = "complete" ERROR = "error" RETRYING = "retrying" - CANCELLED = "cancel" + CANCELLED = "cancelled" + PROGRESS = "progress" + + # Additional states from deezspot library + INITIALIZING = "initializing" + DOWNLOADING = "downloading" + TRACK_PROGRESS = "track_progress" + TRACK_COMPLETE = "track_complete" + REAL_TIME = "real_time" + SKIPPED = "skipped" + DONE = "done" # Reuse the application's logging configuration for Celery workers @setup_logging.connect @@ -55,16 +65,37 @@ def worker_init_handler(**kwargs): logger.debug("Worker Redis connection: " + REDIS_URL) def store_task_status(task_id, status_data): - """Store task status information in Redis""" + """ + Store task status information in Redis with a sequential ID + + Args: + task_id: The task ID + status_data: Dictionary containing status information + """ # Add timestamp if not present if 'timestamp' not in status_data: status_data['timestamp'] = time.time() - # Convert to JSON and store in Redis + # Generate sequential ID for this status update (atomic operation) try: + # Get next ID for this task's status updates + status_id = redis_client.incr(f"task:{task_id}:status:next_id") + status_data['id'] = status_id + + # Convert to JSON and store in Redis redis_client.rpush(f"task:{task_id}:status", json.dumps(status_data)) + # Set expiry for the list to avoid filling up Redis with old data redis_client.expire(f"task:{task_id}:status", 60 * 60 * 24 * 7) # 7 days + redis_client.expire(f"task:{task_id}:status:next_id", 60 * 60 * 24 * 7) # 7 days + + # Publish an update event to a Redis channel for subscribers + # This will be used by the SSE endpoint to push updates in real-time + update_channel = f"task_updates:{task_id}" + redis_client.publish(update_channel, json.dumps({ + "task_id": task_id, + "status_id": status_id + })) except Exception as e: logger.error(f"Error storing task status: {e}") traceback.print_exc() @@ -79,14 +110,91 @@ def get_task_status(task_id): return [] def get_last_task_status(task_id): - """Get the most recent task status update from Redis""" + """ + Get the most recent task status update from Redis + + If the task is an album or playlist, prioritize progress updates + showing current track information over generic processing status. + """ try: - last_status = redis_client.lindex(f"task:{task_id}:status", -1) - if last_status: - return json.loads(last_status.decode('utf-8')) - return None + # Get all status updates for this task + all_statuses = redis_client.lrange(f"task:{task_id}:status", 0, -1) + if not all_statuses: + logger.debug(f"Task {task_id}: No status updates found") + return None + + logger.debug(f"Task {task_id}: Found {len(all_statuses)} status updates") + + # First decode and analyze all status updates + decoded_statuses = [] + has_progress_updates = False + + for status_json in all_statuses: + try: + status = json.loads(status_json.decode('utf-8')) + decoded_statuses.append(status) + + # Check if we have any progress updates with track information + if status.get("status") == "progress" and status.get("track"): + has_progress_updates = True + logger.debug(f"Task {task_id}: Found progress update with track: {status.get('track')}") + except Exception as e: + logger.error(f"Error decoding status update: {e}") + + if not has_progress_updates: + logger.debug(f"Task {task_id}: No progress updates with track info found") + + # Find the latest terminal status (complete, error, cancelled) + latest_status = decoded_statuses[-1] if decoded_statuses else None + if latest_status and latest_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED]: + logger.debug(f"Task {task_id}: Returning terminal status: {latest_status.get('status')}") + return latest_status + + # Find the most recent progress update with track information + # Start from the most recent and go backward + latest_progress = None + + for status in reversed(decoded_statuses): + status_type = status.get("status") + + # For album/playlist downloads, find progress updates with track information + if status_type == "progress" and status.get("track"): + latest_progress = status + logger.debug(f"Task {task_id}: Selected progress update for track: {status.get('track')}") + break + + # If we found a progress update, make sure it has all the necessary fields + if latest_progress: + # Parse current_track from "X/Y" format if needed + current_track_raw = latest_progress.get("current_track", "0") + + # Always reprocess the values to ensure consistency + if isinstance(current_track_raw, str) and "/" in current_track_raw: + try: + parts = current_track_raw.split("/") + current_track = int(parts[0]) + total_tracks = int(parts[1]) + + # Calculate and update progress information + overall_progress = min(int((current_track / total_tracks) * 100), 100) + latest_progress["parsed_current_track"] = current_track + latest_progress["parsed_total_tracks"] = total_tracks + latest_progress["overall_progress"] = overall_progress + + logger.debug(f"Task {task_id}: Parsed track progress {current_track}/{total_tracks} ({overall_progress}%)") + except (ValueError, IndexError) as e: + logger.error(f"Error parsing track numbers: {e}") + + # Return the enhanced progress update + return latest_progress + + # If no suitable progress updates found, return the most recent status + logger.debug(f"Task {task_id}: No suitable progress updates found, returning latest status") + return latest_status + except Exception as e: - logger.error(f"Error getting last task status: {e}") + logger.error(f"Error getting last task status for {task_id}: {e}") + traceback.print_exc() return None def store_task_info(task_id, task_info): @@ -316,6 +424,9 @@ class ProgressTrackingTask(Task): """ task_id = self.request.id + # Debug log incoming progress data + logger.debug(f"Task {task_id}: Got progress data: {json.dumps(progress_data)}") + # Add timestamp if not present if 'timestamp' not in progress_data: progress_data['timestamp'] = time.time() @@ -323,25 +434,269 @@ class ProgressTrackingTask(Task): # Map deezspot status to our progress state status = progress_data.get("status", "unknown") - # Store the progress update in Redis - store_task_status(task_id, progress_data) + # First, make a copy of the data to avoid modifying the original + stored_data = progress_data.copy() - # Log the progress update with appropriate level - message = progress_data.get("message", "Progress update") + # Process the data based on status type + if status == "initializing": + # Get content information when initializing + content_type = stored_data.get('type', '').upper() + album_name = stored_data.get('album', '') + name = stored_data.get('name', '') + artist = stored_data.get('artist', '') + total_tracks = stored_data.get('total_tracks', 0) + + # Store initialization details + if not name and album_name: + stored_data['name'] = album_name + + # Log initialization + if album_name: + logger.info(f"Task {task_id} initializing: {content_type} '{album_name}' with {total_tracks} tracks") + elif name: + logger.info(f"Task {task_id} initializing: {content_type} '{name}' with {total_tracks} tracks") + else: + logger.info(f"Task {task_id} initializing: {content_type} with {total_tracks} tracks") + + elif status == "downloading": + # Track starting to download + track_name = stored_data.get('song', 'Unknown') + artist = stored_data.get('artist', '') + album = stored_data.get('album', '') + + if artist and album: + logger.info(f"Task {task_id} downloading: '{track_name}' by {artist} from {album}") + elif artist: + logger.info(f"Task {task_id} downloading: '{track_name}' by {artist}") + else: + logger.info(f"Task {task_id} downloading: '{track_name}'") + + elif status == "progress": + # For album/playlist downloads, process track progress + track_name = stored_data.get("track", stored_data.get("song", "Unknown track")) + current_track_raw = stored_data.get("current_track", "0") + album = stored_data.get("album", "") + artist = stored_data.get("artist", "") + + # Process and store artist correctly + if isinstance(artist, list) and len(artist) > 0: + # Take the first artist if it's a list + artist_name = artist[0] + # Store the processed artist back in the data + stored_data["artist_name"] = artist_name + elif isinstance(artist, str): + stored_data["artist_name"] = artist + + # Parse current_track and total_tracks from the format "current/total" + if isinstance(current_track_raw, str) and "/" in current_track_raw: + try: + parts = current_track_raw.split("/") + current_track = int(parts[0]) + total_tracks = int(parts[1]) + + # Store the parsed values + stored_data["parsed_current_track"] = current_track + stored_data["parsed_total_tracks"] = total_tracks + + # Calculate overall percentage + overall_progress = min(int((current_track / total_tracks) * 100), 100) + stored_data["overall_progress"] = overall_progress + + logger.debug(f"Task {task_id}: Processed track progress {current_track}/{total_tracks} ({overall_progress}%) for '{track_name}'") + except (ValueError, IndexError) as e: + logger.error(f"Error parsing track numbers '{current_track_raw}': {e}") + elif isinstance(current_track_raw, int): + # Handle the case where it's already an integer + current_track = current_track_raw + total_tracks = stored_data.get("total_tracks", 0) + + if total_tracks > 0: + # Calculate overall percentage + overall_progress = min(int((current_track / total_tracks) * 100), 100) + stored_data["parsed_current_track"] = current_track + stored_data["parsed_total_tracks"] = total_tracks + stored_data["overall_progress"] = overall_progress + + # Log appropriate message based on available information + artist_name = stored_data.get("artist_name", artist) + if album and artist_name: + logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name} by {artist_name} from {album}") + elif album: + logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name} from {album}") + else: + logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name}") + + elif status == "track_progress" or status == "real_time": + # Track real-time progress of a file download + title = stored_data.get('title', stored_data.get('song', 'Unknown')) + artist = stored_data.get('artist', 'Unknown') + + # Handle different percent formats + percent = stored_data.get('percent', stored_data.get('percentage', 0)) + if isinstance(percent, float) and percent <= 1.0: + percent = int(percent * 100) + + # Update bytes received information if available + if 'bytes_received' in stored_data: + # Calculate and store download rate + last_update = stored_data.get('last_update_time', stored_data['timestamp']) + bytes_received = stored_data['bytes_received'] + last_bytes = stored_data.get('last_bytes_received', 0) + time_diff = stored_data['timestamp'] - last_update + + if time_diff > 0 and bytes_received > last_bytes: + bytes_diff = bytes_received - last_bytes + download_rate = bytes_diff / time_diff + stored_data['download_rate'] = download_rate + stored_data['last_update_time'] = stored_data['timestamp'] + stored_data['last_bytes_received'] = bytes_received + + # Format download rate for display + if 'download_rate' in stored_data: + rate = stored_data['download_rate'] + if rate < 1024: + stored_data['download_rate_formatted'] = f"{rate:.2f} B/s" + elif rate < 1024 * 1024: + stored_data['download_rate_formatted'] = f"{rate/1024:.2f} KB/s" + else: + stored_data['download_rate_formatted'] = f"{rate/(1024*1024):.2f} MB/s" + + # Log real-time progress + logger.debug(f"Task {task_id} track progress: {title} by {artist}: {percent}%") - if status == "processing": - progress = progress_data.get("progress", 0) + elif status == "track_complete" or (status == "done" and stored_data.get('type') == 'track'): + # Track completed successfully + title = stored_data.get('title', stored_data.get('song', 'Unknown')) + artist = stored_data.get('artist', 'Unknown') + album = stored_data.get('album', 'Unknown') + quality = stored_data.get('quality', 'Unknown') + path = stored_data.get('path', '') + + # Log completion with file size if available + if 'file_size' in stored_data: + size = stored_data['file_size'] + if size < 1024: + stored_data['file_size_formatted'] = f"{size} B" + elif size < 1024 * 1024: + stored_data['file_size_formatted'] = f"{size/1024:.2f} KB" + elif size < 1024 * 1024 * 1024: + stored_data['file_size_formatted'] = f"{size/(1024*1024):.2f} MB" + else: + stored_data['file_size_formatted'] = f"{size/(1024*1024*1024):.2f} GB" + + logger.info(f"Task {task_id} track complete: {artist} - {title} ({quality}) {stored_data.get('file_size_formatted', '')}") + else: + logger.info(f"Task {task_id} track complete: {artist} - {title} ({quality})") + + if path: + logger.debug(f"Task {task_id} saved to: {path}") + + # Update completion progress + task_info = get_task_info(task_id) + total_tracks = task_info.get('total_tracks', stored_data.get('total_tracks', 0)) + completed_tracks = task_info.get('completed_tracks', 0) + 1 + + # Update task info with new completed track count + task_info['completed_tracks'] = completed_tracks + store_task_info(task_id, task_info) + + # Calculate completion percentage + if total_tracks > 0: + completion_percent = int((completed_tracks / total_tracks) * 100) + stored_data['completion_percent'] = completion_percent + logger.info(f"Task {task_id} progress: {completed_tracks}/{total_tracks} tracks ({completion_percent}%)") + + elif status == "skipped": + # Track was skipped (usually because it already exists) + title = stored_data.get('song', 'Unknown') + artist = stored_data.get('artist', 'Unknown') + reason = stored_data.get('reason', 'Unknown reason') + + logger.info(f"Task {task_id} skipped: {artist} - {title}") + logger.debug(f"Task {task_id} skip reason: {reason}") + + # Update task info with skipped track + task_info = get_task_info(task_id) + skipped_tracks = task_info.get('skipped_tracks', 0) + 1 + task_info['skipped_tracks'] = skipped_tracks + store_task_info(task_id, task_info) + + elif status == "retrying": + # Download failed and is being retried + song = stored_data.get('song', 'Unknown') + artist = stored_data.get('artist', 'Unknown') + retry_count = stored_data.get('retry_count', 0) + seconds_left = stored_data.get('seconds_left', 0) + error = stored_data.get('error', 'Unknown error') + + logger.warning(f"Task {task_id} retrying: {artist} - {song} (Attempt {retry_count}, waiting {seconds_left}s)") + logger.debug(f"Task {task_id} retry reason: {error}") + + # Update task info with retry count + task_info = get_task_info(task_id) + retry_count_total = task_info.get('retry_count', 0) + 1 + task_info['retry_count'] = retry_count_total + store_task_info(task_id, task_info) + + elif status == "error": + # Error occurred during download + message = stored_data.get('message', 'Unknown error') + + logger.error(f"Task {task_id} error: {message}") + + # Update task info with error count + task_info = get_task_info(task_id) + error_count = task_info.get('error_count', 0) + 1 + task_info['error_count'] = error_count + store_task_info(task_id, task_info) + + elif status == "done": + # Overall download operation completed + content_type = stored_data.get('type', '').upper() + name = stored_data.get('name', '') + album = stored_data.get('album', '') + artist = stored_data.get('artist', '') + total_tracks = stored_data.get('total_tracks', 0) + + # Get task info for summary + task_info = get_task_info(task_id) + completed_tracks = task_info.get('completed_tracks', 0) + skipped_tracks = task_info.get('skipped_tracks', 0) + error_count = task_info.get('error_count', 0) + + # Create completion message + if album and artist: + logger.info(f"Task {task_id} completed: {content_type} '{album}' by {artist}") + elif album: + logger.info(f"Task {task_id} completed: {content_type} '{album}'") + elif name: + logger.info(f"Task {task_id} completed: {content_type} '{name}'") + else: + logger.info(f"Task {task_id} completed") + + # Log summary + logger.info(f"Task {task_id} summary: {completed_tracks} completed, {skipped_tracks} skipped, {error_count} errors") + + # Update task status to complete + stored_data["status"] = ProgressState.COMPLETE + stored_data["message"] = f"Download complete: {completed_tracks} tracks downloaded" + + elif status == "processing": + # Processing status - log message with progress if available + progress = stored_data.get("progress", 0) + message = stored_data.get("message", "Processing") + if progress > 0: - logger.debug(f"Task {task_id} progress: {progress}% - {message}") + logger.debug(f"Task {task_id} processing: {progress}% - {message}") else: logger.info(f"Task {task_id} processing: {message}") - elif status == "error": - error_message = progress_data.get("error", message) - logger.error(f"Task {task_id} error: {error_message}") - elif status == "complete": - logger.info(f"Task {task_id} completed: {message}") + else: - logger.info(f"Task {task_id} {status}: {message}") + # Unknown status - just log it + logger.info(f"Task {task_id} {status}: {stored_data.get('message', 'No details')}") + + # Store the enhanced progress update in Redis + store_task_status(task_id, stored_data) # Celery signal handlers @task_prerun.connect diff --git a/routes/utils/playlist.py b/routes/utils/playlist.py index 96ce1ff..3878a7a 100755 --- a/routes/utils/playlist.py +++ b/routes/utils/playlist.py @@ -22,6 +22,9 @@ def download_playlist( progress_callback=None ): try: + # DEBUG: Print parameters + print(f"DEBUG: playlist.py received - service={service}, main={main}, fallback={fallback}") + # Load Spotify client credentials if available spotify_client_id = None spotify_client_secret = None @@ -30,9 +33,11 @@ def download_playlist( if service == 'spotify' and fallback: # If fallback is enabled, use the fallback account for Spotify search credentials search_creds_path = Path(f'./creds/spotify/{fallback}/search.json') + print(f"DEBUG: Using Spotify search credentials from fallback: {search_creds_path}") else: # Otherwise use the main account for Spotify search credentials search_creds_path = Path(f'./creds/spotify/{main}/search.json') + print(f"DEBUG: Using Spotify search credentials from main: {search_creds_path}") if search_creds_path.exists(): try: @@ -40,6 +45,7 @@ def download_playlist( search_creds = json.load(f) spotify_client_id = search_creds.get('client_id') spotify_client_secret = search_creds.get('client_secret') + print(f"DEBUG: Loaded Spotify client credentials successfully") except Exception as e: print(f"Error loading Spotify search credentials: {e}") @@ -56,6 +62,14 @@ def download_playlist( # Load Deezer credentials from 'main' under deezer directory deezer_creds_dir = os.path.join('./creds/deezer', main) deezer_creds_path = os.path.abspath(os.path.join(deezer_creds_dir, 'credentials.json')) + + # DEBUG: Print Deezer credential paths being used + print(f"DEBUG: Looking for Deezer credentials at:") + print(f"DEBUG: deezer_creds_dir = {deezer_creds_dir}") + print(f"DEBUG: deezer_creds_path = {deezer_creds_path}") + print(f"DEBUG: Directory exists = {os.path.exists(deezer_creds_dir)}") + print(f"DEBUG: Credentials file exists = {os.path.exists(deezer_creds_path)}") + with open(deezer_creds_path, 'r') as f: deezer_creds = json.load(f) # Initialize DeeLogin with Deezer credentials @@ -65,6 +79,7 @@ def download_playlist( spotify_client_secret=spotify_client_secret, progress_callback=progress_callback ) + print(f"DEBUG: Starting playlist download using Deezer credentials (download_playlistspo)") # Download using download_playlistspo; pass the custom formatting parameters. dl.download_playlistspo( link_playlist=url, @@ -82,6 +97,7 @@ def download_playlist( retry_delay_increase=retry_delay_increase, max_retries=max_retries ) + print(f"DEBUG: Playlist download completed successfully using Deezer credentials") except Exception as e: deezer_error = e # Immediately report the Deezer error @@ -94,6 +110,9 @@ def download_playlist( spo_creds_dir = os.path.join('./creds/spotify', fallback) spo_creds_path = os.path.abspath(os.path.join(spo_creds_dir, 'credentials.json')) + print(f"DEBUG: Using Spotify fallback credentials from: {spo_creds_path}") + print(f"DEBUG: Fallback credentials exist: {os.path.exists(spo_creds_path)}") + # We've already loaded the Spotify client credentials above based on fallback spo = SpoLogin( @@ -102,6 +121,7 @@ def download_playlist( spotify_client_secret=spotify_client_secret, progress_callback=progress_callback ) + print(f"DEBUG: Starting playlist download using Spotify fallback credentials") spo.download_playlist( link_playlist=url, output_dir="./downloads", @@ -119,8 +139,10 @@ def download_playlist( retry_delay_increase=retry_delay_increase, max_retries=max_retries ) + print(f"DEBUG: Playlist download completed successfully using Spotify fallback") except Exception as e2: # If fallback also fails, raise an error indicating both attempts failed + print(f"ERROR: Spotify fallback also failed: {e2}") raise RuntimeError( f"Both main (Deezer) and fallback (Spotify) attempts failed. " f"Deezer error: {deezer_error}, Spotify error: {e2}" @@ -131,12 +153,16 @@ def download_playlist( quality = 'HIGH' creds_dir = os.path.join('./creds/spotify', main) credentials_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json')) + print(f"DEBUG: Using Spotify main credentials from: {credentials_path}") + print(f"DEBUG: Credentials exist: {os.path.exists(credentials_path)}") + spo = SpoLogin( credentials_path=credentials_path, spotify_client_id=spotify_client_id, spotify_client_secret=spotify_client_secret, progress_callback=progress_callback ) + print(f"DEBUG: Starting playlist download using Spotify main credentials") spo.download_playlist( link_playlist=url, output_dir="./downloads", @@ -154,12 +180,16 @@ def download_playlist( retry_delay_increase=retry_delay_increase, max_retries=max_retries ) + print(f"DEBUG: Playlist download completed successfully using Spotify main") elif service == 'deezer': if quality is None: quality = 'FLAC' # Existing code for Deezer, using main as Deezer account. creds_dir = os.path.join('./creds/deezer', main) creds_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json')) + print(f"DEBUG: Using Deezer credentials from: {creds_path}") + print(f"DEBUG: Credentials exist: {os.path.exists(creds_path)}") + with open(creds_path, 'r') as f: creds = json.load(f) dl = DeeLogin( @@ -168,6 +198,7 @@ def download_playlist( spotify_client_secret=spotify_client_secret, progress_callback=progress_callback ) + print(f"DEBUG: Starting playlist download using Deezer direct") dl.download_playlistdee( link_playlist=url, output_dir="./downloads", @@ -183,8 +214,10 @@ def download_playlist( retry_delay_increase=retry_delay_increase, max_retries=max_retries ) + print(f"DEBUG: Playlist download completed successfully using Deezer direct") else: raise ValueError(f"Unsupported service: {service}") except Exception as e: + print(f"ERROR: Playlist download failed with exception: {e}") traceback.print_exc() raise # Re-raise the exception after logging diff --git a/static/js/queue.js b/static/js/queue.js index 8a58441..bf2fa70 100644 --- a/static/js/queue.js +++ b/static/js/queue.js @@ -23,6 +23,9 @@ class DownloadQueue { this.downloadQueue = {}; // keyed by unique queueId this.currentConfig = {}; // Cache for current config + + // EventSource connections for SSE tracking + this.sseConnections = {}; // keyed by prgFile/task_id // Load the saved visible count (or default to 10) const storedVisibleCount = localStorage.getItem("downloadQueueVisibleCount"); @@ -109,6 +112,10 @@ class DownloadQueue { const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); if (logElement) logElement.textContent = "Download cancelled"; entry.hasEnded = true; + + // Close SSE connection + this.closeSSEConnection(queueId); + if (entry.intervalId) { clearInterval(entry.intervalId); entry.intervalId = null; @@ -121,6 +128,11 @@ class DownloadQueue { } }); } + + // Close all SSE connections when the page is about to unload + window.addEventListener('beforeunload', () => { + this.closeAllSSEConnections(); + }); } /* Public API */ @@ -209,42 +221,25 @@ class DownloadQueue { async startEntryMonitoring(queueId) { const entry = this.downloadQueue[queueId]; if (!entry || entry.hasEnded) return; - if (entry.intervalId) return; + + // Don't restart monitoring if SSE connection already exists + if (this.sseConnections[queueId]) return; // Show a preparing message for new entries if (entry.isNew) { const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); if (logElement) { - logElement.textContent = "Reading status..."; + logElement.textContent = "Initializing download..."; } } - // Track status check failures - entry.statusCheckFailures = 0; - const MAX_STATUS_CHECK_FAILURES = 5; // Maximum number of consecutive failures before showing error - - entry.intervalId = setInterval(async () => { - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (entry.hasEnded) { - clearInterval(entry.intervalId); - return; - } - try { - // Show checking status message on first few attempts - if (entry.statusCheckFailures > 0 && entry.statusCheckFailures < MAX_STATUS_CHECK_FAILURES && logElement) { - logElement.textContent = `Checking download status (attempt ${entry.statusCheckFailures+1})...`; - } - - const response = await fetch(`/api/prgs/${entry.prgFile}`); - if (!response.ok) { - throw new Error(`Server returned ${response.status}`); - } - - // Reset failure counter on success - entry.statusCheckFailures = 0; - + // For backward compatibility, first try to get initial status from the REST API + try { + const response = await fetch(`/api/prgs/${entry.prgFile}`); + if (response.ok) { const data = await response.json(); - + + // Update entry type if available if (data.type) { entry.type = data.type; @@ -252,11 +247,11 @@ class DownloadQueue { const typeElement = entry.element.querySelector('.type'); if (typeElement) { typeElement.textContent = data.type.charAt(0).toUpperCase() + data.type.slice(1); - // Update type class without triggering animation typeElement.className = `type ${data.type}`; } } - + + // Update request URL if available if (!entry.requestUrl && data.original_request) { const params = new CustomURLSearchParams(); for (const key in data.original_request) { @@ -264,69 +259,41 @@ class DownloadQueue { } entry.requestUrl = `/api/${entry.type}/download?${params.toString()}`; } - - const progress = data.last_line; - - if (progress && typeof progress.status === 'undefined') { - if (entry.type === 'playlist') { - logElement.textContent = "Reading tracks list..."; + + // Process the initial status + if (data.last_line) { + entry.lastStatus = data.last_line; + entry.lastUpdated = Date.now(); + entry.status = data.last_line.status; + + // Update status message without recreating the element + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + const statusMessage = this.getStatusMessage(data.last_line); + logElement.textContent = statusMessage; } - this.updateQueueOrder(); - return; - } - if (!progress) { - if (entry.type === 'playlist') { - logElement.textContent = "Reading tracks list..."; - } else { - this.handleInactivity(entry, queueId, logElement); + + // Apply appropriate CSS classes based on status + this.applyStatusClasses(entry, data.last_line); + + // Save updated status to cache + this.queueCache[entry.prgFile] = data.last_line; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + + // If the entry is already in a terminal state, don't set up SSE + if (['error', 'complete', 'cancel', 'cancelled', 'done'].includes(data.last_line.status)) { + entry.hasEnded = true; + this.handleTerminalState(entry, queueId, data.last_line); + return; } - this.updateQueueOrder(); - return; - } - if (JSON.stringify(entry.lastStatus) === JSON.stringify(progress)) { - this.handleInactivity(entry, queueId, logElement); - this.updateQueueOrder(); - return; - } - - // Update the entry and cache. - entry.lastStatus = progress; - entry.lastUpdated = Date.now(); - entry.status = progress.status; - - // Update status message without recreating the element - if (logElement) { - const statusMessage = this.getStatusMessage(progress); - logElement.textContent = statusMessage; - } - - // Apply appropriate CSS classes based on status - this.applyStatusClasses(entry, progress); - - // Save updated status to cache. - this.queueCache[entry.prgFile] = progress; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); - - if (['error', 'complete', 'cancel'].includes(progress.status)) { - this.handleTerminalState(entry, queueId, progress); - } - } catch (error) { - console.error('Status check failed:', error); - - // Increment failure counter - entry.statusCheckFailures = (entry.statusCheckFailures || 0) + 1; - - // Only show error after multiple consecutive failures - if (entry.statusCheckFailures >= MAX_STATUS_CHECK_FAILURES) { - this.handleTerminalState(entry, queueId, { - status: 'error', - message: 'Status check error: ' + error.message, - can_retry: !!entry.requestUrl - }); } } - this.updateQueueOrder(); - }, 2000); + } catch (error) { + console.error('Initial status check failed:', error); + } + + // Set up SSE connection for real-time updates + this.setupSSEConnection(queueId); } /* Helper Methods */ @@ -338,6 +305,8 @@ class DownloadQueue { * Creates a new queue entry. It checks localStorage for any cached info. */ createQueueEntry(item, type, prgFile, queueId, requestUrl) { + console.log(`Creating queue entry with initial type: ${type}`); + // Build the basic entry. const entry = { item, @@ -352,8 +321,11 @@ class DownloadQueue { uniqueId: queueId, retryCount: 0, autoRetryInterval: null, - isNew: true // Add flag to track if this is a new entry + isNew: true, // Add flag to track if this is a new entry + status: 'initializing', + lastMessage: `Initializing ${type} download...` }; + // If cached info exists for this PRG file, use it. if (this.queueCache[prgFile]) { entry.lastStatus = this.queueCache[prgFile]; @@ -417,6 +389,10 @@ class DownloadQueue { // Apply appropriate CSS classes based on cached status this.applyStatusClasses(entry, this.queueCache[prgFile]); } + + // Store it in our queue object + this.downloadQueue[queueId] = entry; + return entry; } @@ -468,6 +444,10 @@ class DownloadQueue { } else if (status.status === 'complete' || status.status === 'done') { entry.element.classList.add('download-success'); entry.hasEnded = true; + // Distinguish 'track_complete' from final 'complete' state + } else if (status.status === 'track_complete') { + // Don't mark as ended, just show it's in progress + entry.element.classList.add('queue-item--processing'); } else if (status.status === 'cancel' || status.status === 'interrupted') { entry.hasEnded = true; } @@ -483,6 +463,7 @@ class DownloadQueue { btn.style.display = 'none'; const { prg, type, queueid } = btn.dataset; try { + // First cancel the download const response = await fetch(`/api/${type}/download/cancel?prg_file=${prg}`); const data = await response.json(); if (data.status === "cancel") { @@ -491,9 +472,32 @@ class DownloadQueue { const entry = this.downloadQueue[queueid]; if (entry) { entry.hasEnded = true; - clearInterval(entry.intervalId); - entry.intervalId = null; + + // Close any active connections + this.closeSSEConnection(queueid); + + if (entry.intervalId) { + clearInterval(entry.intervalId); + entry.intervalId = null; + } + + // Mark as cancelled in the cache to prevent re-loading on page refresh + entry.status = "cancelled"; + this.queueCache[prg] = { status: "cancelled" }; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + + // Immediately delete from server instead of just waiting for UI cleanup + try { + await fetch(`/api/prgs/delete/${prg}`, { + method: 'DELETE' + }); + console.log(`Deleted cancelled task from server: ${prg}`); + } catch (deleteError) { + console.error('Error deleting cancelled task:', deleteError); + } } + + // Still do UI cleanup after a short delay setTimeout(() => this.cleanupEntry(queueid), 5000); } } catch (error) { @@ -630,24 +634,45 @@ class DownloadQueue { return index >= 0 && index < this.visibleCount; } - cleanupEntry(queueId) { + async cleanupEntry(queueId) { const entry = this.downloadQueue[queueId]; if (entry) { + // Close any SSE connection + this.closeSSEConnection(queueId); + + // Clean up any intervals if (entry.intervalId) { clearInterval(entry.intervalId); } if (entry.autoRetryInterval) { clearInterval(entry.autoRetryInterval); } + + // Remove from the DOM entry.element.remove(); + + // Delete from in-memory queue delete this.downloadQueue[queueId]; - // Remove the cached info. + + // Remove the cached info if (this.queueCache[entry.prgFile]) { delete this.queueCache[entry.prgFile]; localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); } - fetch(`/api/prgs/delete/${entry.prgFile}`, { method: 'DELETE' }) - .catch(console.error); + + // Delete the entry from the server + try { + const response = await fetch(`/api/prgs/delete/${entry.prgFile}`, { method: 'DELETE' }); + if (response.ok) { + console.log(`Successfully deleted task ${entry.prgFile} from server`); + } else { + console.warn(`Failed to delete task ${entry.prgFile}: ${response.status} ${response.statusText}`); + } + } catch (error) { + console.error(`Error deleting task ${entry.prgFile}:`, error); + } + + // Update the queue display this.updateQueueOrder(); } } @@ -914,6 +939,9 @@ class DownloadQueue { } try { + // Close any existing SSE connection + this.closeSSEConnection(queueId); + // Use the stored original request URL to create a new download const retryResponse = await fetch(entry.requestUrl); if (!retryResponse.ok) { @@ -944,25 +972,14 @@ class DownloadQueue { entry.statusCheckFailures = 0; // Reset failure counter logEl.textContent = 'Retry initiated...'; - // Make sure any existing interval is cleared before starting a new one + // Make sure any existing interval is cleared if (entry.intervalId) { clearInterval(entry.intervalId); entry.intervalId = null; } - // Always start monitoring right away - don't wait for verification - this.startEntryMonitoring(queueId); - - // Verify the PRG file exists as a secondary check, but don't wait for it to start monitoring - try { - const verifyResponse = await fetch(`/api/prgs/${retryData.prg_file}`); - // Just log the verification result, monitoring is already started - if (!verifyResponse.ok) { - console.log(`PRG file verification failed for ${retryData.prg_file}, but monitoring already started`); - } - } catch (verifyError) { - console.log(`PRG file verification error for ${retryData.prg_file}, but monitoring already started:`, verifyError); - } + // Set up a new SSE connection for the retried download + this.setupSSEConnection(queueId); } else { logElement.textContent = 'Retry failed: invalid response from server'; } @@ -979,8 +996,8 @@ class DownloadQueue { for (const queueId in this.downloadQueue) { const entry = this.downloadQueue[queueId]; // Only start monitoring if the entry is not in a terminal state and is visible - if (!entry.hasEnded && this.isEntryVisible(queueId) && !entry.intervalId) { - this.startEntryMonitoring(queueId); + if (!entry.hasEnded && this.isEntryVisible(queueId) && !this.sseConnections[queueId]) { + this.setupSSEConnection(queueId); } } } @@ -1035,23 +1052,14 @@ class DownloadQueue { queueIds.push({queueId, prgFile}); }); - // Wait a short time before checking the status to give server time to create files + // Wait a short time before setting up SSE connections await new Promise(resolve => setTimeout(resolve, 1000)); - // Start monitoring each entry after confirming PRG files exist + // Set up SSE connections for each entry for (const {queueId, prgFile} of queueIds) { - try { - const statusResponse = await fetch(`/api/prgs/${prgFile}`); - if (statusResponse.ok) { - // Only start monitoring after confirming the PRG file exists - const entry = this.downloadQueue[queueId]; - if (entry) { - // Start monitoring regardless of visibility - this.startEntryMonitoring(queueId); - } - } - } catch (statusError) { - console.log(`Initial status check pending for ${prgFile}, will retry on next interval`); + const entry = this.downloadQueue[queueId]; + if (entry && !entry.hasEnded) { + this.setupSSEConnection(queueId); } } @@ -1060,22 +1068,13 @@ class DownloadQueue { // Handle single-file downloads (tracks, albums, playlists) const queueId = this.addDownload(item, type, data.prg_file, apiUrl, false); - // Wait a short time before checking the status to give server time to create the file + // Wait a short time before setting up SSE connection await new Promise(resolve => setTimeout(resolve, 1000)); - // Ensure the PRG file exists and has initial data by making a status check - try { - const statusResponse = await fetch(`/api/prgs/${data.prg_file}`); - if (statusResponse.ok) { - // Only start monitoring after confirming the PRG file exists - const entry = this.downloadQueue[queueId]; - if (entry) { - // Start monitoring regardless of visibility - this.startEntryMonitoring(queueId); - } - } - } catch (statusError) { - console.log('Initial status check pending, will retry on next interval'); + // Set up SSE connection + const entry = this.downloadQueue[queueId]; + if (entry && !entry.hasEnded) { + this.setupSSEConnection(queueId); } return queueId; @@ -1110,9 +1109,11 @@ class DownloadQueue { if (!prgResponse.ok) continue; const prgData = await prgResponse.json(); - // Skip prg files that are marked as cancelled or completed + // Skip prg files that are marked as cancelled, completed, or interrupted if (prgData.last_line && (prgData.last_line.status === 'cancel' || + prgData.last_line.status === 'cancelled' || + prgData.last_line.status === 'interrupted' || prgData.last_line.status === 'complete')) { // Delete old completed or cancelled PRG files try { @@ -1124,6 +1125,22 @@ class DownloadQueue { continue; } + // Check cached status - if we marked it cancelled locally, delete it and skip + const cachedStatus = this.queueCache[prgFile]; + if (cachedStatus && + (cachedStatus.status === 'cancelled' || + cachedStatus.status === 'cancel' || + cachedStatus.status === 'interrupted' || + cachedStatus.status === 'complete')) { + try { + await fetch(`/api/prgs/delete/${prgFile}`, { method: 'DELETE' }); + console.log(`Cleaned up cached cancelled PRG file: ${prgFile}`); + } catch (error) { + console.error(`Failed to delete cached cancelled PRG file ${prgFile}:`, error); + } + continue; + } + // Use the enhanced original request info from the first line const originalRequest = prgData.original_request || {}; @@ -1249,6 +1266,311 @@ class DownloadQueue { isExplicitFilterEnabled() { return !!this.currentConfig.explicitFilter; } + + /* Sets up a Server-Sent Events connection for real-time status updates */ + setupSSEConnection(queueId) { + const entry = this.downloadQueue[queueId]; + if (!entry || entry.hasEnded) return; + + // Close any existing connection + this.closeSSEConnection(queueId); + + // Create a new EventSource connection + try { + const sse = new EventSource(`/api/prgs/stream/${entry.prgFile}`); + + // Store the connection + this.sseConnections[queueId] = sse; + + // Set up event handlers + sse.addEventListener('start', (event) => { + const data = JSON.parse(event.data); + console.log('SSE start event:', data); + + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + logElement.textContent = `Starting ${data.type} download: ${data.name}${data.artist ? ` by ${data.artist}` : ''}`; + } + + // IMPORTANT: Save the download type from the start event + if (data.type) { + console.log(`Setting entry type to: ${data.type}`); + entry.type = data.type; + + // Update type display if element exists + const typeElement = entry.element.querySelector('.type'); + if (typeElement) { + typeElement.textContent = data.type.charAt(0).toUpperCase() + data.type.slice(1); + // Update type class without triggering animation + typeElement.className = `type ${data.type}`; + } + } + + // Store the initial status + entry.lastStatus = data; + entry.lastUpdated = Date.now(); + entry.status = data.status; + }); + + sse.addEventListener('update', (event) => { + const data = JSON.parse(event.data); + console.log('SSE update event:', data); + this.handleSSEUpdate(queueId, data); + }); + + sse.addEventListener('progress', (event) => { + const data = JSON.parse(event.data); + console.log('SSE progress event:', data); + this.handleSSEUpdate(queueId, data); + }); + + // Add specific handler for track_complete events + sse.addEventListener('track_complete', (event) => { + const data = JSON.parse(event.data); + console.log('SSE track_complete event:', data); + console.log(`Current entry type: ${entry.type}`); + + // Mark this status as a track completion + data.status = 'track_complete'; + + // Only update the log message without changing status colors + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + let message = `Completed track: ${data.title || data.track || 'Unknown'}`; + if (data.artist) message += ` by ${data.artist}`; + logElement.textContent = message; + } + + // For single track downloads, track_complete is a terminal state + if (entry.type === 'track') { + console.log('Single track download completed - terminating'); + // Mark the track as ended + entry.hasEnded = true; + + // Handle as a terminal state + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + } else { + console.log(`Album/playlist track completed - continuing download (type: ${entry.type})`); + // For albums/playlists, just update entry data without changing status + entry.lastStatus = data; + entry.lastUpdated = Date.now(); + + // Save to cache + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + } + }); + + // Also handle 'done' events which can come for individual tracks + sse.addEventListener('done', (event) => { + const data = JSON.parse(event.data); + console.log('SSE done event (individual track):', data); + console.log(`Current entry type: ${entry.type}`); + + // Only update the log message without changing status colors for album tracks + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + let message = `Completed track: ${data.song || data.title || data.track || 'Unknown'}`; + if (data.artist) message += ` by ${data.artist}`; + logElement.textContent = message; + } + + // For single track downloads, done is a terminal state + if (entry.type === 'track') { + console.log('Single track download completed (done) - terminating'); + // Mark the track as ended + entry.hasEnded = true; + + // Handle as a terminal state + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + } else if (data.song) { + console.log(`Album/playlist individual track done - continuing download (type: ${entry.type})`); + // For albums/playlists, just update entry data without changing status + data._isIndividualTrack = true; // Mark it for special handling in update logic + entry.lastStatus = data; + entry.lastUpdated = Date.now(); + + // Save to cache + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + } else { + // This is a real done event for the entire album/playlist + console.log(`Entire ${entry.type} completed - finalizing`); + this.handleSSEUpdate(queueId, data); + entry.hasEnded = true; + + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + } + }); + + sse.addEventListener('complete', (event) => { + const data = JSON.parse(event.data); + console.log('SSE complete event:', data); + console.log(`Current entry type: ${entry.type}`); + + // Skip terminal processing for track_complete status in albums/playlists + // Also skip for "done" status when it's for an individual track in an album/playlist + if ((data.status === 'track_complete' && entry.type !== 'track') || + (data.status === 'done' && data.song && entry.type !== 'track')) { + console.log(`Track ${data.status} in ${entry.type} download - continuing`); + // Don't process individual track completion events here + return; + } + + this.handleSSEUpdate(queueId, data); + + // Always mark as terminal state for 'complete' events (except individual track completions in albums) + entry.hasEnded = true; + + // Close the connection after a short delay + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + }); + + sse.addEventListener('error', (event) => { + const data = JSON.parse(event.data); + console.log('SSE error event:', data); + this.handleSSEUpdate(queueId, data); + + // Mark the download as ended with error + entry.hasEnded = true; + + // Close the connection, but don't automatically clean up the entry + // to allow for potential retry + this.closeSSEConnection(queueId); + }); + + sse.addEventListener('end', (event) => { + const data = JSON.parse(event.data); + console.log('SSE end event:', data); + + // Update with final status + this.handleSSEUpdate(queueId, data); + + // Mark the download as ended + entry.hasEnded = true; + + // Close the connection + this.closeSSEConnection(queueId); + + // Clean up the entry after a delay if it's a success + if (data.status === 'complete' || data.status === 'done') { + setTimeout(() => this.cleanupEntry(queueId), 5000); + } + }); + + // Handle connection error + sse.onerror = (error) => { + console.error('SSE connection error:', error); + + // If the connection is closed, try to reconnect after a delay + if (sse.readyState === EventSource.CLOSED) { + console.log('SSE connection closed, will try to reconnect'); + + // Only attempt to reconnect if the entry is still active + if (entry && !entry.hasEnded) { + setTimeout(() => { + this.setupSSEConnection(queueId); + }, 5000); + } + } + }; + + return sse; + } catch (error) { + console.error('Error setting up SSE connection:', error); + return null; + } + } + + /* Close an existing SSE connection */ + closeSSEConnection(queueId) { + if (this.sseConnections[queueId]) { + try { + this.sseConnections[queueId].close(); + } catch (error) { + console.error('Error closing SSE connection:', error); + } + delete this.sseConnections[queueId]; + } + } + + /* Handle SSE update events */ + handleSSEUpdate(queueId, data) { + const entry = this.downloadQueue[queueId]; + if (!entry) return; + + // Skip if the status hasn't changed + if (entry.lastStatus && + entry.lastStatus.id === data.id && + entry.lastStatus.status === data.status) { + return; + } + + console.log(`handleSSEUpdate for ${queueId} with type ${entry.type} and status ${data.status}`); + + // Track completion is special - don't change visible status ONLY for albums/playlists + // Check for both 'track_complete' and 'done' statuses for individual tracks in albums + const isTrackCompletion = data.status === 'track_complete' || + (data.status === 'done' && data.song && entry.type !== 'track'); + const isAlbumOrPlaylist = entry.type !== 'track'; // Anything that's not a track is treated as multi-track + const skipStatusChange = isTrackCompletion && isAlbumOrPlaylist; + + if (skipStatusChange) { + console.log(`Skipping status change for ${data.status} in ${entry.type} download - track: ${data.song || data.track || 'Unknown'}`); + } + + // Update the entry + entry.lastStatus = data; + entry.lastUpdated = Date.now(); + + // Only update visible status if not skipping status change + if (!skipStatusChange) { + entry.status = data.status; + } + + // Update status message in the UI + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + const statusMessage = this.getStatusMessage(data); + logElement.textContent = statusMessage; + } + + // Apply appropriate CSS classes based on status only if not skipping status change + if (!skipStatusChange) { + this.applyStatusClasses(entry, data); + } + + // Save updated status to cache + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + + // Special handling for error status + if (data.status === 'error') { + this.handleTerminalState(entry, queueId, data); + } + + // Update the queue order + this.updateQueueOrder(); + } + + /* Close all active SSE connections */ + closeAllSSEConnections() { + for (const queueId in this.sseConnections) { + this.closeSSEConnection(queueId); + } + } } // Singleton instance