From e91668409215be2b7e56969e0e63025029e341e4 Mon Sep 17 00:00:00 2001 From: coolgitternotin Date: Mon, 24 Mar 2025 09:32:58 -0600 Subject: [PATCH] come on celery! --- routes/utils/celery_tasks.py | 1049 ++++++++++++++++------------------ 1 file changed, 498 insertions(+), 551 deletions(-) diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index 17cb915..59cda3f 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -76,7 +76,6 @@ def store_task_status(task_id, status_data): if 'timestamp' not in status_data: status_data['timestamp'] = time.time() - # Generate sequential ID for this status update (atomic operation) try: # Get next ID for this task's status updates status_id = redis_client.incr(f"task:{task_id}:status:next_id") @@ -110,91 +109,16 @@ def get_task_status(task_id): return [] def get_last_task_status(task_id): - """ - Get the most recent task status update from Redis - - If the task is an album or playlist, prioritize progress updates - showing current track information over generic processing status. - """ + """Get the most recent task status update from Redis""" try: - # Get all status updates for this task - all_statuses = redis_client.lrange(f"task:{task_id}:status", 0, -1) - if not all_statuses: - logger.debug(f"Task {task_id}: No status updates found") + # Get the last status update + status_list = redis_client.lrange(f"task:{task_id}:status", -1, -1) + if not status_list: return None - - logger.debug(f"Task {task_id}: Found {len(all_statuses)} status updates") - - # First decode and analyze all status updates - decoded_statuses = [] - has_progress_updates = False - - for status_json in all_statuses: - try: - status = json.loads(status_json.decode('utf-8')) - decoded_statuses.append(status) - - # Check if we have any progress updates with track information - if status.get("status") == "progress" and status.get("track"): - has_progress_updates = True - logger.debug(f"Task {task_id}: Found progress update with track: {status.get('track')}") - except Exception as e: - logger.error(f"Error decoding status update: {e}") - - if not has_progress_updates: - logger.debug(f"Task {task_id}: No progress updates with track info found") - - # Find the latest terminal status (complete, error, cancelled) - latest_status = decoded_statuses[-1] if decoded_statuses else None - if latest_status and latest_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED]: - logger.debug(f"Task {task_id}: Returning terminal status: {latest_status.get('status')}") - return latest_status - - # Find the most recent progress update with track information - # Start from the most recent and go backward - latest_progress = None - - for status in reversed(decoded_statuses): - status_type = status.get("status") - # For album/playlist downloads, find progress updates with track information - if status_type == "progress" and status.get("track"): - latest_progress = status - logger.debug(f"Task {task_id}: Selected progress update for track: {status.get('track')}") - break - - # If we found a progress update, make sure it has all the necessary fields - if latest_progress: - # Parse current_track from "X/Y" format if needed - current_track_raw = latest_progress.get("current_track", "0") - - # Always reprocess the values to ensure consistency - if isinstance(current_track_raw, str) and "/" in current_track_raw: - try: - parts = current_track_raw.split("/") - current_track = int(parts[0]) - total_tracks = int(parts[1]) - - # Calculate and update progress information - overall_progress = min(int((current_track / total_tracks) * 100), 100) - latest_progress["parsed_current_track"] = current_track - latest_progress["parsed_total_tracks"] = total_tracks - latest_progress["overall_progress"] = overall_progress - - logger.debug(f"Task {task_id}: Parsed track progress {current_track}/{total_tracks} ({overall_progress}%)") - except (ValueError, IndexError) as e: - logger.error(f"Error parsing track numbers: {e}") - - # Return the enhanced progress update - return latest_progress - - # If no suitable progress updates found, return the most recent status - logger.debug(f"Task {task_id}: No suitable progress updates found, returning latest status") - return latest_status - + return json.loads(status_list[0].decode('utf-8')) except Exception as e: - logger.error(f"Error getting last task status for {task_id}: {e}") - traceback.print_exc() + logger.error(f"Error getting last task status: {e}") return None def store_task_info(task_id, task_info): @@ -243,157 +167,130 @@ def retry_task(task_id): if not task_info: return {"status": "error", "message": f"Task {task_id} not found"} - logger.debug(f"Retry task {task_id} - Initial task_info: {json.dumps({k: v for k, v in task_info.items() if k != 'orig_request'})}") - - # Check if task has retry_count information + # Check if task has error status last_status = get_last_task_status(task_id) - if last_status and last_status.get("status") == "error": - # Get current retry count - retry_count = last_status.get("retry_count", 0) - - # Get retry configuration from config - config_params = get_config_params() - max_retries = config_params.get('maxRetries', 3) - initial_retry_delay = config_params.get('retryDelaySeconds', 5) - retry_delay_increase = config_params.get('retry_delay_increase', 5) - - # Check if we've exceeded max retries - if retry_count >= max_retries: - return { - "status": "error", - "message": f"Maximum retry attempts ({max_retries}) exceeded" - } - - # Calculate retry delay - retry_delay = initial_retry_delay + (retry_count * retry_delay_increase) - - # Create a new task_id for the retry - new_task_id = f"{task_id}_retry{retry_count + 1}" - - # Update task info for the retry - task_info["retry_count"] = retry_count + 1 - task_info["retry_of"] = task_id - - # Log current URL before potentially updating it - logger.debug(f"Retry task {task_id} - Current URL: {task_info.get('url', 'N/A')}") - logger.debug(f"Retry task {task_id} - Retry URL available: {'Yes' if 'retry_url' in task_info and task_info['retry_url'] else 'No'}") - - # Use retry_url if available, otherwise use the original url - # This is crucial for album tasks created from artist downloads - if "retry_url" in task_info and task_info["retry_url"]: - logger.info(f"Using retry_url for task {task_id}: {task_info['retry_url']}") - logger.debug(f"Retry task {task_id} - Replacing URL {task_info.get('url', 'N/A')} with retry_url {task_info['retry_url']}") - task_info["url"] = task_info["retry_url"] - else: - logger.debug(f"Retry task {task_id} - No retry_url found, keeping original URL: {task_info.get('url', 'N/A')}") - - # Get the service and fallback configuration from config - service = config_params.get("service") - fallback_enabled = config_params.get("fallback", False) - - # Update main, fallback, and quality parameters based on service and fallback setting - if service == 'spotify': - if fallback_enabled: - # If fallback is enabled with Spotify service: - # - main becomes the Deezer account - # - fallback becomes the Spotify account - task_info["main"] = config_params.get("deezer", "") - task_info["fallback"] = config_params.get("spotify", "") - task_info["quality"] = config_params.get("deezerQuality", "MP3_128") - task_info["fall_quality"] = config_params.get("spotifyQuality", "NORMAL") - else: - # If fallback is disabled with Spotify service: - # - main is the Spotify account - # - no fallback - task_info["main"] = config_params.get("spotify", "") - task_info["fallback"] = None - task_info["quality"] = config_params.get("spotifyQuality", "NORMAL") - task_info["fall_quality"] = None - elif service == 'deezer': - # For Deezer service: - # - main is the Deezer account - # - no fallback (even if enabled in config) + if not last_status or last_status.get("status") != ProgressState.ERROR: + return {"status": "error", "message": "Task is not in a failed state"} + + # Get current retry count + retry_count = last_status.get("retry_count", 0) + + # Get retry configuration from config + config_params = get_config_params() + max_retries = config_params.get('maxRetries', 3) + initial_retry_delay = config_params.get('retryDelaySeconds', 5) + retry_delay_increase = config_params.get('retry_delay_increase', 5) + + # Check if we've exceeded max retries + if retry_count >= max_retries: + return { + "status": "error", + "message": f"Maximum retry attempts ({max_retries}) exceeded" + } + + # Calculate retry delay + retry_delay = initial_retry_delay + (retry_count * retry_delay_increase) + + # Create a new task_id for the retry + new_task_id = f"{task_id}_retry{retry_count + 1}" + + # Update task info for the retry + task_info["retry_count"] = retry_count + 1 + task_info["retry_of"] = task_id + + # Use retry_url if available, otherwise use the original url + if "retry_url" in task_info and task_info["retry_url"]: + task_info["url"] = task_info["retry_url"] + + # Get service configuration + service = config_params.get("service") + fallback_enabled = config_params.get("fallback", False) + + # Update service settings + if service == 'spotify': + if fallback_enabled: task_info["main"] = config_params.get("deezer", "") - task_info["fallback"] = None + task_info["fallback"] = config_params.get("spotify", "") task_info["quality"] = config_params.get("deezerQuality", "MP3_128") - task_info["fall_quality"] = None + task_info["fall_quality"] = config_params.get("spotifyQuality", "NORMAL") else: - # Default to Spotify if unknown service task_info["main"] = config_params.get("spotify", "") task_info["fallback"] = None task_info["quality"] = config_params.get("spotifyQuality", "NORMAL") task_info["fall_quality"] = None - - # Ensure service comes from config for the retry - task_info["service"] = service - - # Update other config-derived parameters - task_info["real_time"] = task_info.get("real_time", config_params.get("realTime", False)) - task_info["custom_dir_format"] = task_info.get("custom_dir_format", config_params.get("customDirFormat", "%ar_album%/%album%")) - task_info["custom_track_format"] = task_info.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) - task_info["pad_tracks"] = task_info.get("pad_tracks", config_params.get("tracknum_padding", True)) - - # Log the final URL that will be used - logger.debug(f"Retry task {task_id} - Final URL for retry: {task_info.get('url', 'N/A')}") - - # Store the updated task info - store_task_info(new_task_id, task_info) - - # Create a queued status - store_task_status(new_task_id, { - "status": ProgressState.QUEUED, - "type": task_info.get("type", "unknown"), - "name": task_info.get("name", "Unknown"), - "artist": task_info.get("artist", ""), - "retry_count": retry_count + 1, - "max_retries": max_retries, - "retry_delay": retry_delay, - "timestamp": time.time() - }) - - # Launch the appropriate task based on download_type - download_type = task_info.get("download_type", "unknown") - task = None - - logger.info(f"Retrying task {task_id} as {new_task_id} (retry {retry_count + 1}/{max_retries})") - - if download_type == "track": - task = download_track.apply_async( - kwargs=task_info, - task_id=new_task_id, - queue='downloads' - ) - elif download_type == "album": - task = download_album.apply_async( - kwargs=task_info, - task_id=new_task_id, - queue='downloads' - ) - elif download_type == "playlist": - task = download_playlist.apply_async( - kwargs=task_info, - task_id=new_task_id, - queue='downloads' - ) - else: - logger.error(f"Unknown download type for retry: {download_type}") - return { - "status": "error", - "message": f"Unknown download type: {download_type}" - } - - return { - "status": "requeued", - "task_id": new_task_id, - "retry_count": retry_count + 1, - "max_retries": max_retries, - "retry_delay": retry_delay - } + elif service == 'deezer': + task_info["main"] = config_params.get("deezer", "") + task_info["fallback"] = None + task_info["quality"] = config_params.get("deezerQuality", "MP3_128") + task_info["fall_quality"] = None else: + task_info["main"] = config_params.get("spotify", "") + task_info["fallback"] = None + task_info["quality"] = config_params.get("spotifyQuality", "NORMAL") + task_info["fall_quality"] = None + + # Ensure service comes from config for the retry + task_info["service"] = service + + # Update other config-derived parameters + task_info["real_time"] = task_info.get("real_time", config_params.get("realTime", False)) + task_info["custom_dir_format"] = task_info.get("custom_dir_format", config_params.get("customDirFormat", "%ar_album%/%album%")) + task_info["custom_track_format"] = task_info.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) + task_info["pad_tracks"] = task_info.get("pad_tracks", config_params.get("tracknum_padding", True)) + + # Store the updated task info + store_task_info(new_task_id, task_info) + + # Create a queued status + store_task_status(new_task_id, { + "status": ProgressState.QUEUED, + "type": task_info.get("type", "unknown"), + "name": task_info.get("name", "Unknown"), + "artist": task_info.get("artist", ""), + "retry_count": retry_count + 1, + "max_retries": max_retries, + "retry_delay": retry_delay, + "timestamp": time.time() + }) + + # Launch the appropriate task based on download_type + download_type = task_info.get("download_type", "unknown") + task = None + + logger.info(f"Retrying task {task_id} as {new_task_id} (retry {retry_count + 1}/{max_retries})") + + if download_type == "track": + task = download_track.apply_async( + kwargs=task_info, + task_id=new_task_id, + queue='downloads' + ) + elif download_type == "album": + task = download_album.apply_async( + kwargs=task_info, + task_id=new_task_id, + queue='downloads' + ) + elif download_type == "playlist": + task = download_playlist.apply_async( + kwargs=task_info, + task_id=new_task_id, + queue='downloads' + ) + else: + logger.error(f"Unknown download type for retry: {download_type}") return { "status": "error", - "message": "Task is not in a failed state" + "message": f"Unknown download type: {download_type}" } + + return { + "status": "requeued", + "task_id": new_task_id, + "retry_count": retry_count + 1, + "max_retries": max_retries, + "retry_delay": retry_delay + } except Exception as e: logger.error(f"Error retrying task {task_id}: {e}") traceback.print_exc() @@ -435,293 +332,420 @@ class ProgressTrackingTask(Task): def progress_callback(self, progress_data): """ - Process progress data from deezspot library callbacks + Process progress data from deezspot library callbacks using the optimized approach + based on known status types and flow patterns. Args: - progress_data: Dictionary containing progress information + progress_data: Dictionary containing progress information from deezspot """ task_id = self.request.id - # Debug log incoming progress data - logger.debug(f"Task {task_id}: Got progress data: {json.dumps(progress_data)}") - # Add timestamp if not present if 'timestamp' not in progress_data: progress_data['timestamp'] = time.time() - # Map deezspot status to our progress state + # Get status type status = progress_data.get("status", "unknown") - # First, make a copy of the data to avoid modifying the original + # Create a work copy of the data to avoid modifying the original stored_data = progress_data.copy() - # Process the data based on status type + # Get task info for context + task_info = get_task_info(task_id) + + # Log raw progress data at debug level + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Task {task_id}: Raw progress data: {json.dumps(progress_data)}") + + # Process based on status type using a more streamlined approach if status == "initializing": - # Get content information when initializing - content_type = stored_data.get('type', '').upper() - album_name = stored_data.get('album', '') - name = stored_data.get('name', '') - artist = stored_data.get('artist', '') - total_tracks = stored_data.get('total_tracks', 0) - - # Store initialization details - if not name and album_name: - stored_data['name'] = album_name - - # Log initialization - if album_name: - logger.info(f"Task {task_id} initializing: {content_type} '{album_name}' with {total_tracks} tracks") - elif name: - logger.info(f"Task {task_id} initializing: {content_type} '{name}' with {total_tracks} tracks") - else: - logger.info(f"Task {task_id} initializing: {content_type} with {total_tracks} tracks") + # --- INITIALIZING: Start of a download operation --- + self._handle_initializing(task_id, stored_data, task_info) elif status == "downloading": - # Track starting to download - track_name = stored_data.get('song', 'Unknown') - artist = stored_data.get('artist', '') - album = stored_data.get('album', '') + # --- DOWNLOADING: Track download started --- + self._handle_downloading(task_id, stored_data, task_info) - if artist and album: - logger.info(f"Task {task_id} downloading: '{track_name}' by {artist} from {album}") - elif artist: - logger.info(f"Task {task_id} downloading: '{track_name}' by {artist}") - else: - logger.info(f"Task {task_id} downloading: '{track_name}'") - elif status == "progress": - # For album/playlist downloads, process track progress - track_name = stored_data.get("track", stored_data.get("song", "Unknown track")) - current_track_raw = stored_data.get("current_track", "0") - album = stored_data.get("album", "") - artist = stored_data.get("artist", "") + # --- PROGRESS: Album/playlist track progress --- + self._handle_progress(task_id, stored_data, task_info) - # Process and store artist correctly - if isinstance(artist, list) and len(artist) > 0: - # Take the first artist if it's a list - artist_name = artist[0] - # Store the processed artist back in the data - stored_data["artist_name"] = artist_name - elif isinstance(artist, str): - stored_data["artist_name"] = artist + elif status == "real_time" or status == "track_progress": + # --- REAL_TIME/TRACK_PROGRESS: Track download real-time progress --- + self._handle_real_time(task_id, stored_data) - # Parse current_track and total_tracks from the format "current/total" - if isinstance(current_track_raw, str) and "/" in current_track_raw: - try: - parts = current_track_raw.split("/") - current_track = int(parts[0]) - total_tracks = int(parts[1]) - - # Store the parsed values - stored_data["parsed_current_track"] = current_track - stored_data["parsed_total_tracks"] = total_tracks - - # Calculate overall percentage - overall_progress = min(int((current_track / total_tracks) * 100), 100) - stored_data["overall_progress"] = overall_progress - - logger.debug(f"Task {task_id}: Processed track progress {current_track}/{total_tracks} ({overall_progress}%) for '{track_name}'") - except (ValueError, IndexError) as e: - logger.error(f"Error parsing track numbers '{current_track_raw}': {e}") - elif isinstance(current_track_raw, int): - # Handle the case where it's already an integer - current_track = current_track_raw - total_tracks = stored_data.get("total_tracks", 0) + elif status == "skipped": + # --- SKIPPED: Track was skipped --- + self._handle_skipped(task_id, stored_data, task_info) + + elif status == "retrying": + # --- RETRYING: Download failed and being retried --- + self._handle_retrying(task_id, stored_data, task_info) + + elif status == "error": + # --- ERROR: Error occurred during download --- + self._handle_error(task_id, stored_data, task_info) + + elif status == "done": + # --- DONE: Download operation completed --- + self._handle_done(task_id, stored_data, task_info) + + else: + # --- UNKNOWN: Unrecognized status --- + logger.info(f"Task {task_id} {status}: {stored_data.get('message', 'No details')}") + + # Store the processed status update + store_task_status(task_id, stored_data) + + def _handle_initializing(self, task_id, data, task_info): + """Handle initializing status from deezspot""" + # Extract relevant fields + content_type = data.get('type', '').upper() + name = data.get('name', '') + album_name = data.get('album', '') + artist = data.get('artist', '') + total_tracks = data.get('total_tracks', 0) + + # Use album name as name if name is empty + if not name and album_name: + data['name'] = album_name + + # Log initialization with appropriate detail level + if album_name and artist: + logger.info(f"Task {task_id} initializing: {content_type} '{album_name}' by {artist} with {total_tracks} tracks") + elif album_name: + logger.info(f"Task {task_id} initializing: {content_type} '{album_name}' with {total_tracks} tracks") + elif name: + logger.info(f"Task {task_id} initializing: {content_type} '{name}' with {total_tracks} tracks") + else: + logger.info(f"Task {task_id} initializing: {content_type} with {total_tracks} tracks") + + # Update task info with total tracks count + if total_tracks > 0: + task_info['total_tracks'] = total_tracks + task_info['completed_tracks'] = task_info.get('completed_tracks', 0) + task_info['skipped_tracks'] = task_info.get('skipped_tracks', 0) + store_task_info(task_id, task_info) + + # Update status in data + data['status'] = ProgressState.INITIALIZING + + def _handle_downloading(self, task_id, data, task_info): + """Handle downloading status from deezspot""" + # Extract relevant fields + track_name = data.get('song', 'Unknown') + artist = data.get('artist', '') + album = data.get('album', '') + download_type = data.get('type', '') + + # Get parent task context + parent_type = task_info.get('type', '').lower() + + # If this is a track within an album/playlist, update progress + if parent_type in ['album', 'playlist'] and download_type == 'track': + total_tracks = task_info.get('total_tracks', 0) + current_track = task_info.get('current_track_num', 0) + 1 + + # Update task info + task_info['current_track_num'] = current_track + task_info['current_track'] = track_name + task_info['current_artist'] = artist + store_task_info(task_id, task_info) + + # Only calculate progress if we have total tracks + if total_tracks > 0: + overall_progress = min(int((current_track / total_tracks) * 100), 100) + data['overall_progress'] = overall_progress + data['parsed_current_track'] = current_track + data['parsed_total_tracks'] = total_tracks - if total_tracks > 0: - # Calculate overall percentage - overall_progress = min(int((current_track / total_tracks) * 100), 100) - stored_data["parsed_current_track"] = current_track - stored_data["parsed_total_tracks"] = total_tracks - stored_data["overall_progress"] = overall_progress + # Create a progress update for the album/playlist + progress_update = { + "status": ProgressState.DOWNLOADING, + "type": parent_type, + "track": track_name, + "current_track": f"{current_track}/{total_tracks}", + "album": album, + "artist": artist, + "timestamp": data['timestamp'], + "parent_task": True + } - # Log appropriate message based on available information - artist_name = stored_data.get("artist_name", artist) - if album and artist_name: - logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name} by {artist_name} from {album}") - elif album: - logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name} from {album}") - else: - logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name}") + # Store separate progress update + store_task_status(task_id, progress_update) + + # Log with appropriate detail level + if artist and album: + logger.info(f"Task {task_id} downloading: '{track_name}' by {artist} from {album}") + elif artist: + logger.info(f"Task {task_id} downloading: '{track_name}' by {artist}") + else: + logger.info(f"Task {task_id} downloading: '{track_name}'") + + # Update status + data['status'] = ProgressState.DOWNLOADING + + def _handle_progress(self, task_id, data, task_info): + """Handle progress status from deezspot""" + # Extract track info + track_name = data.get("track", data.get("song", "Unknown track")) + current_track_raw = data.get("current_track", "0") + album = data.get("album", "") + artist = data.get("artist", "") + + # Process artist if it's a list + if isinstance(artist, list) and len(artist) > 0: + data["artist_name"] = artist[0] + elif isinstance(artist, str): + data["artist_name"] = artist + + # Parse track numbers from "current/total" format + if isinstance(current_track_raw, str) and "/" in current_track_raw: + try: + parts = current_track_raw.split("/") + current_track = int(parts[0]) + total_tracks = int(parts[1]) - elif status == "track_progress" or status == "real_time": - # Track real-time progress of a file download - title = stored_data.get('title', stored_data.get('song', 'Unknown')) - artist = stored_data.get('artist', 'Unknown') - - # Handle different percent formats - percent = stored_data.get('percent', stored_data.get('percentage', 0)) - if isinstance(percent, float) and percent <= 1.0: - percent = int(percent * 100) - - # Update bytes received information if available - if 'bytes_received' in stored_data: - # Calculate and store download rate - last_update = stored_data.get('last_update_time', stored_data['timestamp']) - bytes_received = stored_data['bytes_received'] - last_bytes = stored_data.get('last_bytes_received', 0) - time_diff = stored_data['timestamp'] - last_update + # Update with parsed values + data["parsed_current_track"] = current_track + data["parsed_total_tracks"] = total_tracks - if time_diff > 0 and bytes_received > last_bytes: - bytes_diff = bytes_received - last_bytes - download_rate = bytes_diff / time_diff - stored_data['download_rate'] = download_rate - stored_data['last_update_time'] = stored_data['timestamp'] - stored_data['last_bytes_received'] = bytes_received + # Calculate percentage + overall_progress = min(int((current_track / total_tracks) * 100), 100) + data["overall_progress"] = overall_progress + + # Update task info + task_info['current_track_num'] = current_track + task_info['total_tracks'] = total_tracks + task_info['current_track'] = track_name + store_task_info(task_id, task_info) + + # Log progress with appropriate detail + artist_name = data.get("artist_name", artist) + if album and artist_name: + logger.info(f"Task {task_id} progress: [{current_track}/{total_tracks}] {overall_progress}% - {track_name} by {artist_name} from {album}") + elif album: + logger.info(f"Task {task_id} progress: [{current_track}/{total_tracks}] {overall_progress}% - {track_name} from {album}") + else: + logger.info(f"Task {task_id} progress: [{current_track}/{total_tracks}] {overall_progress}% - {track_name}") + + except (ValueError, IndexError) as e: + logger.error(f"Error parsing track numbers '{current_track_raw}': {e}") + + # Ensure correct status + data['status'] = ProgressState.PROGRESS + + def _handle_real_time(self, task_id, data): + """Handle real-time progress status from deezspot""" + # Extract track info + title = data.get('title', data.get('song', 'Unknown')) + artist = data.get('artist', 'Unknown') + + # Handle percent formatting + percent = data.get('percent', data.get('percentage', 0)) + if isinstance(percent, float) and percent <= 1.0: + percent = int(percent * 100) + data['percent'] = percent + + # Calculate download rate if bytes_received is available + if 'bytes_received' in data: + last_update = data.get('last_update_time', data['timestamp']) + bytes_received = data['bytes_received'] + last_bytes = data.get('last_bytes_received', 0) + time_diff = data['timestamp'] - last_update + + if time_diff > 0 and bytes_received > last_bytes: + bytes_diff = bytes_received - last_bytes + download_rate = bytes_diff / time_diff + data['download_rate'] = download_rate + data['last_update_time'] = data['timestamp'] + data['last_bytes_received'] = bytes_received # Format download rate for display - if 'download_rate' in stored_data: - rate = stored_data['download_rate'] - if rate < 1024: - stored_data['download_rate_formatted'] = f"{rate:.2f} B/s" - elif rate < 1024 * 1024: - stored_data['download_rate_formatted'] = f"{rate/1024:.2f} KB/s" - else: - stored_data['download_rate_formatted'] = f"{rate/(1024*1024):.2f} MB/s" - - # Log real-time progress - logger.debug(f"Task {task_id} track progress: {title} by {artist}: {percent}%") - - elif status == "track_complete" or (status == "done" and stored_data.get('type') == 'track'): - # Track completed successfully - title = stored_data.get('title', stored_data.get('song', 'Unknown')) - artist = stored_data.get('artist', 'Unknown') - album = stored_data.get('album', 'Unknown') - quality = stored_data.get('quality', 'Unknown') - path = stored_data.get('path', '') - - # Log completion with file size if available - if 'file_size' in stored_data: - size = stored_data['file_size'] - if size < 1024: - stored_data['file_size_formatted'] = f"{size} B" - elif size < 1024 * 1024: - stored_data['file_size_formatted'] = f"{size/1024:.2f} KB" - elif size < 1024 * 1024 * 1024: - stored_data['file_size_formatted'] = f"{size/(1024*1024):.2f} MB" + if download_rate < 1024: + data['download_rate_formatted'] = f"{download_rate:.2f} B/s" + elif download_rate < 1024 * 1024: + data['download_rate_formatted'] = f"{download_rate/1024:.2f} KB/s" else: - stored_data['file_size_formatted'] = f"{size/(1024*1024*1024):.2f} GB" + data['download_rate_formatted'] = f"{download_rate/(1024*1024):.2f} MB/s" + + # Log at debug level + logger.debug(f"Task {task_id} track progress: {title} by {artist}: {percent}%") + + # Set appropriate status + data['status'] = ProgressState.REAL_TIME if data.get('status') == "real_time" else ProgressState.TRACK_PROGRESS + + def _handle_skipped(self, task_id, data, task_info): + """Handle skipped status from deezspot""" + # Extract track info + title = data.get('song', 'Unknown') + artist = data.get('artist', 'Unknown') + reason = data.get('reason', 'Unknown reason') + + # Log skip + logger.info(f"Task {task_id} skipped: {artist} - {title}") + logger.debug(f"Task {task_id} skip reason: {reason}") + + # Update task info + skipped_tracks = task_info.get('skipped_tracks', 0) + 1 + task_info['skipped_tracks'] = skipped_tracks + store_task_info(task_id, task_info) + + # Check if part of album/playlist + parent_type = task_info.get('type', '').lower() + if parent_type in ['album', 'playlist']: + total_tracks = task_info.get('total_tracks', 0) + processed_tracks = task_info.get('completed_tracks', 0) + skipped_tracks + + if total_tracks > 0: + overall_progress = min(int((processed_tracks / total_tracks) * 100), 100) - logger.info(f"Task {task_id} track complete: {artist} - {title} ({quality}) {stored_data.get('file_size_formatted', '')}") + # Create parent progress update + progress_update = { + "status": ProgressState.PROGRESS, + "type": parent_type, + "track": title, + "current_track": f"{processed_tracks}/{total_tracks}", + "album": data.get('album', ''), + "artist": artist, + "timestamp": data['timestamp'], + "parsed_current_track": processed_tracks, + "parsed_total_tracks": total_tracks, + "overall_progress": overall_progress, + "track_skipped": True, + "skip_reason": reason, + "parent_task": True + } + + # Store progress update + store_task_status(task_id, progress_update) + + # Set status + data['status'] = ProgressState.SKIPPED + + def _handle_retrying(self, task_id, data, task_info): + """Handle retrying status from deezspot""" + # Extract retry info + song = data.get('song', 'Unknown') + artist = data.get('artist', 'Unknown') + retry_count = data.get('retry_count', 0) + seconds_left = data.get('seconds_left', 0) + error = data.get('error', 'Unknown error') + + # Log retry + logger.warning(f"Task {task_id} retrying: {artist} - {song} (Attempt {retry_count}, waiting {seconds_left}s)") + logger.debug(f"Task {task_id} retry reason: {error}") + + # Update task info + retry_count_total = task_info.get('retry_count', 0) + 1 + task_info['retry_count'] = retry_count_total + store_task_info(task_id, task_info) + + # Set status + data['status'] = ProgressState.RETRYING + + def _handle_error(self, task_id, data, task_info): + """Handle error status from deezspot""" + # Extract error info + message = data.get('message', 'Unknown error') + + # Log error + logger.error(f"Task {task_id} error: {message}") + + # Update task info + error_count = task_info.get('error_count', 0) + 1 + task_info['error_count'] = error_count + store_task_info(task_id, task_info) + + # Set status + data['status'] = ProgressState.ERROR + + def _handle_done(self, task_id, data, task_info): + """Handle done status from deezspot""" + # Extract data + content_type = data.get('type', '').lower() + album = data.get('album', '') + artist = data.get('artist', '') + song = data.get('song', '') + + # Handle based on content type + if content_type == 'track': + # For track completions + if artist and song: + logger.info(f"Task {task_id} completed: Track '{song}' by {artist}") else: - logger.info(f"Task {task_id} track complete: {artist} - {title} ({quality})") - - if path: - logger.debug(f"Task {task_id} saved to: {path}") + logger.info(f"Task {task_id} completed: Track '{song}'") - # Update completion progress - task_info = get_task_info(task_id) - total_tracks = task_info.get('total_tracks', stored_data.get('total_tracks', 0)) - completed_tracks = task_info.get('completed_tracks', 0) + 1 + # Update status to track_complete + data['status'] = ProgressState.TRACK_COMPLETE - # Update task info with new completed track count + # Update task info + completed_tracks = task_info.get('completed_tracks', 0) + 1 task_info['completed_tracks'] = completed_tracks store_task_info(task_id, task_info) - # Calculate completion percentage - if total_tracks > 0: - completion_percent = int((completed_tracks / total_tracks) * 100) - stored_data['completion_percent'] = completion_percent - logger.info(f"Task {task_id} progress: {completed_tracks}/{total_tracks} tracks ({completion_percent}%)") - - elif status == "skipped": - # Track was skipped (usually because it already exists) - title = stored_data.get('song', 'Unknown') - artist = stored_data.get('artist', 'Unknown') - reason = stored_data.get('reason', 'Unknown reason') + # If part of album/playlist, update progress + parent_type = task_info.get('type', '').lower() + if parent_type in ['album', 'playlist']: + total_tracks = task_info.get('total_tracks', 0) + if total_tracks > 0: + completion_percent = int((completed_tracks / total_tracks) * 100) + + # Create progress update + progress_update = { + "status": ProgressState.PROGRESS, + "type": parent_type, + "track": song, + "current_track": f"{completed_tracks}/{total_tracks}", + "album": album, + "artist": artist, + "timestamp": data['timestamp'], + "parsed_current_track": completed_tracks, + "parsed_total_tracks": total_tracks, + "overall_progress": completion_percent, + "track_complete": True, + "parent_task": True + } + + # Store progress update + store_task_status(task_id, progress_update) - logger.info(f"Task {task_id} skipped: {artist} - {title}") - logger.debug(f"Task {task_id} skip reason: {reason}") - - # Update task info with skipped track - task_info = get_task_info(task_id) - skipped_tracks = task_info.get('skipped_tracks', 0) + 1 - task_info['skipped_tracks'] = skipped_tracks - store_task_info(task_id, task_info) - - elif status == "retrying": - # Download failed and is being retried - song = stored_data.get('song', 'Unknown') - artist = stored_data.get('artist', 'Unknown') - retry_count = stored_data.get('retry_count', 0) - seconds_left = stored_data.get('seconds_left', 0) - error = stored_data.get('error', 'Unknown error') - - logger.warning(f"Task {task_id} retrying: {artist} - {song} (Attempt {retry_count}, waiting {seconds_left}s)") - logger.debug(f"Task {task_id} retry reason: {error}") - - # Update task info with retry count - task_info = get_task_info(task_id) - retry_count_total = task_info.get('retry_count', 0) + 1 - task_info['retry_count'] = retry_count_total - store_task_info(task_id, task_info) - - elif status == "error": - # Error occurred during download - message = stored_data.get('message', 'Unknown error') - - logger.error(f"Task {task_id} error: {message}") - - # Update task info with error count - task_info = get_task_info(task_id) - error_count = task_info.get('error_count', 0) + 1 - task_info['error_count'] = error_count - store_task_info(task_id, task_info) - - elif status == "done": - # Overall download operation completed - content_type = stored_data.get('type', '').upper() - name = stored_data.get('name', '') - album = stored_data.get('album', '') - artist = stored_data.get('artist', '') - total_tracks = stored_data.get('total_tracks', 0) - - # Get task info for summary - task_info = get_task_info(task_id) + elif content_type in ['album', 'playlist']: + # Get completion counts completed_tracks = task_info.get('completed_tracks', 0) skipped_tracks = task_info.get('skipped_tracks', 0) error_count = task_info.get('error_count', 0) - # Create completion message + # Log completion if album and artist: - logger.info(f"Task {task_id} completed: {content_type} '{album}' by {artist}") + logger.info(f"Task {task_id} completed: {content_type.upper()} '{album}' by {artist}") elif album: - logger.info(f"Task {task_id} completed: {content_type} '{album}'") - elif name: - logger.info(f"Task {task_id} completed: {content_type} '{name}'") + logger.info(f"Task {task_id} completed: {content_type.upper()} '{album}'") else: - logger.info(f"Task {task_id} completed") - + name = data.get('name', '') + if name: + logger.info(f"Task {task_id} completed: {content_type.upper()} '{name}'") + else: + logger.info(f"Task {task_id} completed: {content_type.upper()}") + + # Add summary + data["status"] = ProgressState.COMPLETE + data["message"] = f"Download complete: {completed_tracks} tracks downloaded, {skipped_tracks} skipped" + # Log summary logger.info(f"Task {task_id} summary: {completed_tracks} completed, {skipped_tracks} skipped, {error_count} errors") - # Update task status to complete - stored_data["status"] = ProgressState.COMPLETE - stored_data["message"] = f"Download complete: {completed_tracks} tracks downloaded" - - elif status == "processing": - # Processing status - log message with progress if available - progress = stored_data.get("progress", 0) - message = stored_data.get("message", "Processing") - - if progress > 0: - logger.debug(f"Task {task_id} processing: {progress}% - {message}") - else: - logger.info(f"Task {task_id} processing: {message}") - else: - # Unknown status - just log it - logger.info(f"Task {task_id} {status}: {stored_data.get('message', 'No details')}") - - # Store the enhanced progress update in Redis - store_task_status(task_id, stored_data) + # Generic done for other types + logger.info(f"Task {task_id} completed: {content_type.upper()}") + data["status"] = ProgressState.COMPLETE + data["message"] = "Download complete" # Celery signal handlers @task_prerun.connect def task_prerun_handler(task_id=None, task=None, *args, **kwargs): """Signal handler when a task begins running""" try: - # Get task info from Redis task_info = get_task_info(task_id) # Update task status to processing @@ -746,7 +770,7 @@ def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR]: return - # Get task info from Redis + # Get task info task_info = get_task_info(task_id) # Update task status based on Celery task state @@ -767,11 +791,11 @@ def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args def task_failure_handler(task_id=None, exception=None, traceback=None, *args, **kwargs): """Signal handler when a task fails""" try: - # Skip if Retry exception (will be handled by the retry mechanism) + # Skip if Retry exception if isinstance(exception, Retry): return - # Get task info and last status from Redis + # Get task info and status task_info = get_task_info(task_id) last_status = get_last_task_status(task_id) @@ -834,87 +858,42 @@ def download_track(self, **task_data): logger.info(f"Processing track download task: {task_data.get('name', 'Unknown')}") from routes.utils.track import download_track as download_track_func - # Get config parameters including service + # Get config parameters config_params = get_config_params() - - # Get the service from config service = config_params.get("service") - - # DEBUG: Log the config parameters - print(f"DEBUG: celery_tasks.py config_params:") - print(f"DEBUG: service = {service}") - print(f"DEBUG: spotify = {config_params.get('spotify', '')}") - print(f"DEBUG: deezer = {config_params.get('deezer', '')}") - print(f"DEBUG: fallback_enabled = {config_params.get('fallback', False)}") - - # Determine main, fallback, and quality parameters based on service and fallback setting fallback_enabled = config_params.get("fallback", False) + # Determine service parameters if service == 'spotify': if fallback_enabled: - # If fallback is enabled with Spotify service: - # - main becomes the Deezer account - # - fallback becomes the Spotify account main = config_params.get("deezer", "") fallback = config_params.get("spotify", "") quality = config_params.get("deezerQuality", "MP3_128") fall_quality = config_params.get("spotifyQuality", "NORMAL") - - # DEBUG: Log the values after fallback logic - print(f"DEBUG: Spotify with fallback enabled:") - print(f"DEBUG: main (Deezer account) = {main}") - print(f"DEBUG: fallback (Spotify account) = {fallback}") else: - # If fallback is disabled with Spotify service: - # - main is the Spotify account - # - no fallback main = config_params.get("spotify", "") fallback = None quality = config_params.get("spotifyQuality", "NORMAL") fall_quality = None - - # DEBUG: Log the values - print(f"DEBUG: Spotify without fallback:") - print(f"DEBUG: main (Spotify account) = {main}") elif service == 'deezer': - # For Deezer service: - # - main is the Deezer account - # - no fallback (even if enabled in config) main = config_params.get("deezer", "") fallback = None quality = config_params.get("deezerQuality", "MP3_128") fall_quality = None - - # DEBUG: Log the values - print(f"DEBUG: Deezer service:") - print(f"DEBUG: main (Deezer account) = {main}") else: - # Default to Spotify if unknown service main = config_params.get("spotify", "") fallback = None quality = config_params.get("spotifyQuality", "NORMAL") fall_quality = None - - # DEBUG: Log the values - print(f"DEBUG: Unknown service defaulting to Spotify:") - print(f"DEBUG: main (Spotify account) = {main}") - # Get remaining parameters from task_data or config + # Get remaining parameters url = task_data.get("url", "") real_time = task_data.get("real_time", config_params.get("realTime", False)) custom_dir_format = task_data.get("custom_dir_format", config_params.get("customDirFormat", "%ar_album%/%album%")) custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True)) - # Log task parameters for debugging - print(f"DEBUG: Final parameters for download_track_func:") - print(f"DEBUG: service = {service}") - print(f"DEBUG: main = {main}") - print(f"DEBUG: fallback = {fallback}") - print(f"DEBUG: quality = {quality}") - print(f"DEBUG: fall_quality = {fall_quality}") - - # Execute the download function with progress callback + # Execute the download download_track_func( service=service, url=url, @@ -926,7 +905,7 @@ def download_track(self, **task_data): custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, pad_tracks=pad_tracks, - progress_callback=self.progress_callback # Pass the callback from our ProgressTrackingTask + progress_callback=self.progress_callback ) return {"status": "success", "message": "Track download completed"} @@ -947,58 +926,42 @@ def download_album(self, **task_data): logger.info(f"Processing album download task: {task_data.get('name', 'Unknown')}") from routes.utils.album import download_album as download_album_func - # Get config parameters including service + # Get config parameters config_params = get_config_params() - - # Get the service from config service = config_params.get("service") - - # Determine main, fallback, and quality parameters based on service and fallback setting fallback_enabled = config_params.get("fallback", False) + # Determine service parameters if service == 'spotify': if fallback_enabled: - # If fallback is enabled with Spotify service: - # - main becomes the Deezer account - # - fallback becomes the Spotify account main = config_params.get("deezer", "") fallback = config_params.get("spotify", "") quality = config_params.get("deezerQuality", "MP3_128") fall_quality = config_params.get("spotifyQuality", "NORMAL") else: - # If fallback is disabled with Spotify service: - # - main is the Spotify account - # - no fallback main = config_params.get("spotify", "") fallback = None quality = config_params.get("spotifyQuality", "NORMAL") fall_quality = None elif service == 'deezer': - # For Deezer service: - # - main is the Deezer account - # - no fallback (even if enabled in config) main = config_params.get("deezer", "") fallback = None quality = config_params.get("deezerQuality", "MP3_128") fall_quality = None else: - # Default to Spotify if unknown service main = config_params.get("spotify", "") fallback = None quality = config_params.get("spotifyQuality", "NORMAL") fall_quality = None - # Get remaining parameters from task_data or config + # Get remaining parameters url = task_data.get("url", "") real_time = task_data.get("real_time", config_params.get("realTime", False)) custom_dir_format = task_data.get("custom_dir_format", config_params.get("customDirFormat", "%ar_album%/%album%")) custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True)) - # Log task parameters for debugging - logger.debug(f"Album download parameters: service={service}, quality={quality}, real_time={real_time}") - - # Execute the download function with progress callback + # Execute the download download_album_func( service=service, url=url, @@ -1010,7 +973,7 @@ def download_album(self, **task_data): custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, pad_tracks=pad_tracks, - progress_callback=self.progress_callback # Pass the callback from our ProgressTrackingTask + progress_callback=self.progress_callback ) return {"status": "success", "message": "Album download completed"} @@ -1031,58 +994,42 @@ def download_playlist(self, **task_data): logger.info(f"Processing playlist download task: {task_data.get('name', 'Unknown')}") from routes.utils.playlist import download_playlist as download_playlist_func - # Get config parameters including service + # Get config parameters config_params = get_config_params() - - # Get the service from config service = config_params.get("service") - - # Determine main, fallback, and quality parameters based on service and fallback setting fallback_enabled = config_params.get("fallback", False) + # Determine service parameters if service == 'spotify': if fallback_enabled: - # If fallback is enabled with Spotify service: - # - main becomes the Deezer account - # - fallback becomes the Spotify account main = config_params.get("deezer", "") fallback = config_params.get("spotify", "") quality = config_params.get("deezerQuality", "MP3_128") fall_quality = config_params.get("spotifyQuality", "NORMAL") else: - # If fallback is disabled with Spotify service: - # - main is the Spotify account - # - no fallback main = config_params.get("spotify", "") fallback = None quality = config_params.get("spotifyQuality", "NORMAL") fall_quality = None elif service == 'deezer': - # For Deezer service: - # - main is the Deezer account - # - no fallback (even if enabled in config) main = config_params.get("deezer", "") fallback = None quality = config_params.get("deezerQuality", "MP3_128") fall_quality = None else: - # Default to Spotify if unknown service main = config_params.get("spotify", "") fallback = None quality = config_params.get("spotifyQuality", "NORMAL") fall_quality = None - # Get remaining parameters from task_data or config + # Get remaining parameters url = task_data.get("url", "") real_time = task_data.get("real_time", config_params.get("realTime", False)) custom_dir_format = task_data.get("custom_dir_format", config_params.get("customDirFormat", "%ar_album%/%album%")) custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True)) - # Log task parameters for debugging - logger.debug(f"Playlist download parameters: service={service}, quality={quality}, real_time={real_time}") - - # Execute the download function with progress callback + # Execute the download download_playlist_func( service=service, url=url, @@ -1094,7 +1041,7 @@ def download_playlist(self, **task_data): custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, pad_tracks=pad_tracks, - progress_callback=self.progress_callback # Pass the callback from our ProgressTrackingTask + progress_callback=self.progress_callback ) return {"status": "success", "message": "Playlist download completed"}