diff --git a/requirements.txt b/requirements.txt index 139216e..38d25b1 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ fastapi==0.116.1 uvicorn[standard]==0.35.0 celery==5.5.3 -deezspot-spotizerr==2.2.4 +deezspot-spotizerr==2.2.6 httpx==0.28.1 bcrypt==4.2.1 PyJWT==2.10.1 diff --git a/routes/utils/album.py b/routes/utils/album.py index 50f3986..077eaea 100755 --- a/routes/utils/album.py +++ b/routes/utils/album.py @@ -27,13 +27,17 @@ def download_album( progress_callback=None, convert_to=None, bitrate=None, + artist_separator="; ", + recursive_quality=True, _is_celery_task_execution=False, # Added to skip duplicate check from Celery task ): if not _is_celery_task_execution: - existing_task = get_existing_task_id(url) # Check for duplicates only if not called by Celery task + existing_task = get_existing_task_id( + url + ) # Check for duplicates only if not called by Celery task if existing_task: raise DuplicateDownloadError( - f"Download for this URL is already in progress.", + "Download for this URL is already in progress.", existing_task=existing_task, ) try: @@ -96,7 +100,7 @@ def download_album( link_album=url, # Spotify URL output_dir="./downloads", quality_download=quality, # Deezer quality - recursive_quality=True, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, make_zip=False, @@ -109,6 +113,7 @@ def download_album( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: album.py - Album download via Deezer (account: {fallback}) successful for Spotify URL." @@ -151,7 +156,7 @@ def download_album( link_album=url, # Spotify URL output_dir="./downloads", quality_download=fall_quality, # Spotify quality - recursive_quality=True, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, make_zip=False, @@ -165,6 +170,7 @@ def download_album( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: album.py - Spotify direct download (account: {main} for blob) successful." @@ -205,7 +211,7 @@ def download_album( link_album=url, output_dir="./downloads", quality_download=quality, - recursive_quality=True, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, make_zip=False, @@ -219,6 +225,7 @@ def download_album( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: album.py - Direct Spotify download (account: {main} for blob) successful." @@ -246,7 +253,7 @@ def download_album( link_album=url, output_dir="./downloads", quality_download=quality, - recursive_quality=True, + recursive_quality=recursive_quality, recursive_download=False, make_zip=False, custom_dir_format=custom_dir_format, @@ -258,6 +265,7 @@ def download_album( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: album.py - Direct Deezer download (account: {main}) successful." diff --git a/routes/utils/celery_config.py b/routes/utils/celery_config.py index c751c4e..4f08117 100644 --- a/routes/utils/celery_config.py +++ b/routes/utils/celery_config.py @@ -44,6 +44,7 @@ DEFAULT_MAIN_CONFIG = { "retry_delay_increase": 5, "convertTo": None, "bitrate": None, + "artist_separator": "; ", } @@ -123,12 +124,12 @@ task_default_routing_key = "downloads" # Task routing - ensure SSE and utility tasks go to utility_tasks queue task_routes = { - 'routes.utils.celery_tasks.trigger_sse_update_task': {'queue': 'utility_tasks'}, - 'routes.utils.celery_tasks.cleanup_stale_errors': {'queue': 'utility_tasks'}, - 'routes.utils.celery_tasks.delayed_delete_task_data': {'queue': 'utility_tasks'}, - 'routes.utils.celery_tasks.download_track': {'queue': 'downloads'}, - 'routes.utils.celery_tasks.download_album': {'queue': 'downloads'}, - 'routes.utils.celery_tasks.download_playlist': {'queue': 'downloads'}, + "routes.utils.celery_tasks.trigger_sse_update_task": {"queue": "utility_tasks"}, + "routes.utils.celery_tasks.cleanup_stale_errors": {"queue": "utility_tasks"}, + "routes.utils.celery_tasks.delayed_delete_task_data": {"queue": "utility_tasks"}, + "routes.utils.celery_tasks.download_track": {"queue": "downloads"}, + "routes.utils.celery_tasks.download_album": {"queue": "downloads"}, + "routes.utils.celery_tasks.download_playlist": {"queue": "downloads"}, } # Celery task settings @@ -193,8 +194,8 @@ worker_disable_rate_limits = False # Celery Beat schedule beat_schedule = { - 'cleanup-old-tasks': { - 'task': 'routes.utils.celery_tasks.cleanup_old_tasks', - 'schedule': 3600.0, # Run every hour + "cleanup-old-tasks": { + "task": "routes.utils.celery_tasks.cleanup_old_tasks", + "schedule": 3600.0, # Run every hour }, } diff --git a/routes/utils/celery_queue_manager.py b/routes/utils/celery_queue_manager.py index b23b60a..5b12b35 100644 --- a/routes/utils/celery_queue_manager.py +++ b/routes/utils/celery_queue_manager.py @@ -60,6 +60,8 @@ def get_config_params(): "retry_delay_increase": config.get("retry_delay_increase", 5), "convertTo": config.get("convertTo", None), "bitrate": config.get("bitrate", None), + "artist_separator": config.get("artist_separator", "; "), + "recursive_quality": config.get("recursive_quality", False), } except Exception as e: logger.error(f"Error reading config for parameters: {e}") @@ -80,6 +82,8 @@ def get_config_params(): "retry_delay_increase": 5, "convertTo": None, # Default for conversion "bitrate": None, # Default for bitrate + "artist_separator": "; ", + "recursive_quality": False, } @@ -95,7 +99,9 @@ def get_existing_task_id(url, download_type=None): Returns: str | None: The task ID of the existing active task, or None if no active duplicate is found. """ - logger.debug(f"GET_EXISTING_TASK_ID: Checking for URL='{url}', type='{download_type}'") + logger.debug( + f"GET_EXISTING_TASK_ID: Checking for URL='{url}', type='{download_type}'" + ) if not url: logger.debug("GET_EXISTING_TASK_ID: No URL provided, returning None.") return None @@ -119,64 +125,95 @@ def get_existing_task_id(url, download_type=None): } logger.debug(f"GET_EXISTING_TASK_ID: Terminal states defined as: {TERMINAL_STATES}") - all_existing_tasks_summary = get_all_tasks() # This function already filters by default based on its own TERMINAL_STATES - logger.debug(f"GET_EXISTING_TASK_ID: Found {len(all_existing_tasks_summary)} tasks from get_all_tasks(). Iterating...") + all_existing_tasks_summary = ( + get_all_tasks() + ) # This function already filters by default based on its own TERMINAL_STATES + logger.debug( + f"GET_EXISTING_TASK_ID: Found {len(all_existing_tasks_summary)} tasks from get_all_tasks(). Iterating..." + ) for task_summary in all_existing_tasks_summary: existing_task_id = task_summary.get("task_id") if not existing_task_id: logger.debug("GET_EXISTING_TASK_ID: Skipping summary with no task_id.") continue - - logger.debug(f"GET_EXISTING_TASK_ID: Processing existing task_id='{existing_task_id}' from summary.") + + logger.debug( + f"GET_EXISTING_TASK_ID: Processing existing task_id='{existing_task_id}' from summary." + ) # First, check the status of the task directly from its latest status record. # get_all_tasks() might have its own view of terminal, but we re-check here for absolute certainty. existing_last_status_obj = get_last_task_status(existing_task_id) if not existing_last_status_obj: - logger.debug(f"GET_EXISTING_TASK_ID: No last status object for task_id='{existing_task_id}'. Skipping.") + logger.debug( + f"GET_EXISTING_TASK_ID: No last status object for task_id='{existing_task_id}'. Skipping." + ) continue - + # Extract status from standard structure (status_info.status) or fallback to top-level status existing_status = None - if "status_info" in existing_last_status_obj and existing_last_status_obj["status_info"]: + if ( + "status_info" in existing_last_status_obj + and existing_last_status_obj["status_info"] + ): existing_status = existing_last_status_obj["status_info"].get("status") if not existing_status: existing_status = existing_last_status_obj.get("status") - - logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', last_status_obj='{existing_last_status_obj}', extracted status='{existing_status}'.") + + logger.debug( + f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', last_status_obj='{existing_last_status_obj}', extracted status='{existing_status}'." + ) # If the task is in a terminal state, ignore it and move to the next one. if existing_status in TERMINAL_STATES: - logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' has terminal status='{existing_status}'. Skipping.") + logger.debug( + f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' has terminal status='{existing_status}'. Skipping." + ) continue - - logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' has ACTIVE status='{existing_status}'. Proceeding to check URL/type.") + + logger.debug( + f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' has ACTIVE status='{existing_status}'. Proceeding to check URL/type." + ) # If the task is active, then check if its URL and type match. existing_task_info = get_task_info(existing_task_id) if not existing_task_info: - logger.debug(f"GET_EXISTING_TASK_ID: No task info for active task_id='{existing_task_id}'. Skipping.") + logger.debug( + f"GET_EXISTING_TASK_ID: No task info for active task_id='{existing_task_id}'. Skipping." + ) continue existing_url = existing_task_info.get("url") - logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', info_url='{existing_url}'. Comparing with target_url='{url}'.") + logger.debug( + f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', info_url='{existing_url}'. Comparing with target_url='{url}'." + ) if existing_url != url: - logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' URL mismatch. Skipping.") + logger.debug( + f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' URL mismatch. Skipping." + ) continue if download_type: existing_type = existing_task_info.get("download_type") - logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', info_type='{existing_type}'. Comparing with target_type='{download_type}'.") + logger.debug( + f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', info_type='{existing_type}'. Comparing with target_type='{download_type}'." + ) if existing_type != download_type: - logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' type mismatch. Skipping.") + logger.debug( + f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' type mismatch. Skipping." + ) continue # Found an active task that matches the criteria. - logger.info(f"GET_EXISTING_TASK_ID: Found ACTIVE duplicate: task_id='{existing_task_id}' for URL='{url}', type='{download_type}'. Returning this ID.") + logger.info( + f"GET_EXISTING_TASK_ID: Found ACTIVE duplicate: task_id='{existing_task_id}' for URL='{url}', type='{download_type}'. Returning this ID." + ) return existing_task_id - logger.debug(f"GET_EXISTING_TASK_ID: No active duplicate found for URL='{url}', type='{download_type}'. Returning None.") + logger.debug( + f"GET_EXISTING_TASK_ID: No active duplicate found for URL='{url}', type='{download_type}'. Returning None." + ) return None @@ -255,11 +292,16 @@ class CeleryDownloadQueueManager: existing_url = existing_task_info.get("url") existing_type = existing_task_info.get("download_type") - + # Extract status from standard structure (status_info.status) or fallback to top-level status existing_status = None - if "status_info" in existing_last_status_obj and existing_last_status_obj["status_info"]: - existing_status = existing_last_status_obj["status_info"].get("status") + if ( + "status_info" in existing_last_status_obj + and existing_last_status_obj["status_info"] + ): + existing_status = existing_last_status_obj["status_info"].get( + "status" + ) if not existing_status: existing_status = existing_last_status_obj.get("status") @@ -350,6 +392,13 @@ class CeleryDownloadQueueManager: "bitrate": original_request.get( "bitrate", config_params.get("bitrate") ), + "artist_separator": original_request.get( + "artist_separator", config_params.get("artist_separator", "; ") + ), + "recursive_quality": self._parse_bool_param( + original_request.get("recursive_quality"), + config_params.get("recursive_quality", False), + ), "retry_count": 0, "original_request": original_request, "created_at": time.time(), diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index 7c1cec4..741466a 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -2,7 +2,6 @@ import time import json import logging import traceback -import asyncio from celery import Celery, Task, states from celery.signals import ( task_prerun, @@ -35,6 +34,7 @@ from routes.utils.history_manager import history_manager # Create Redis connection for storing task data that's not part of the Celery result backend import redis + # --- Helpers to build partial summaries from task logs --- def _read_task_log_json_lines(task_id: str) -> list: log_file_path = Path("./logs/tasks") / f"{task_id}.log" @@ -69,7 +69,10 @@ def _extract_parent_initial_tracks(log_lines: list, parent_type: str) -> dict: if album and isinstance(album, dict) and album.get("tracks"): for t in album.get("tracks", []): ids = (t or {}).get("ids", {}) or {} - key = ids.get("spotify") or f"{(t or {}).get('track_number', 0)}:{(t or {}).get('title', '')}" + key = ( + ids.get("spotify") + or f"{(t or {}).get('track_number', 0)}:{(t or {}).get('title', '')}" + ) track_map[key] = t break elif parent_type == "playlist": @@ -79,13 +82,18 @@ def _extract_parent_initial_tracks(log_lines: list, parent_type: str) -> dict: for t in playlist.get("tracks", []): ids = (t or {}).get("ids", {}) or {} # TrackPlaylistObject uses position - key = ids.get("spotify") or f"{(t or {}).get('position', 0)}:{(t or {}).get('title', '')}" + key = ( + ids.get("spotify") + or f"{(t or {}).get('position', 0)}:{(t or {}).get('title', '')}" + ) track_map[key] = t break return track_map -def _extract_completed_and_skipped_from_logs(log_lines: list) -> tuple[set, set, dict, dict]: +def _extract_completed_and_skipped_from_logs( + log_lines: list, +) -> tuple[set, set, dict, dict]: """ Returns (completed_keys, skipped_keys, completed_objects_by_key, skipped_objects_by_key) Keys prefer ids.spotify, falling back to index+title scheme consistent with initial map. @@ -102,7 +110,9 @@ def _extract_completed_and_skipped_from_logs(log_lines: list) -> tuple[set, set, status = status_info.get("status") ids = (track or {}).get("ids", {}) or {} # Fallback keys try track_number:title and position:title - fallback_key = f"{(track or {}).get('track_number', 0)}:{(track or {}).get('title', '')}" + fallback_key = ( + f"{(track or {}).get('track_number', 0)}:{(track or {}).get('title', '')}" + ) key = ids.get("spotify") or fallback_key if status == "done": completed_keys.add(key) @@ -128,11 +138,13 @@ def _to_track_object_from_initial(initial_track: dict, parent_type: str) -> dict artists_conv = [] for a in artists_src: if isinstance(a, dict): - artists_conv.append({ - "type": "artistTrack", - "name": a.get("name", ""), - "ids": a.get("ids", {}) or {}, - }) + artists_conv.append( + { + "type": "artistTrack", + "name": a.get("name", ""), + "ids": a.get("ids", {}) or {}, + } + ) # Convert album to AlbumTrackObject-like shape album_src = initial_track.get("album", {}) or {} @@ -177,16 +189,23 @@ def build_partial_summary_from_task_log(task_id: str, parent_type: str) -> dict: """ log_lines = _read_task_log_json_lines(task_id) initial_tracks_map = _extract_parent_initial_tracks(log_lines, parent_type) - completed_keys, skipped_keys, completed_objs, skipped_objs = _extract_completed_and_skipped_from_logs(log_lines) + completed_keys, skipped_keys, completed_objs, skipped_objs = ( + _extract_completed_and_skipped_from_logs(log_lines) + ) # Determine failed as initial - completed - skipped initial_keys = set(initial_tracks_map.keys()) failed_keys = initial_keys.difference(completed_keys.union(skipped_keys)) - successful_tracks = [completed_objs[k] for k in completed_keys if k in completed_objs] + successful_tracks = [ + completed_objs[k] for k in completed_keys if k in completed_objs + ] skipped_tracks = [skipped_objs[k] for k in skipped_keys if k in skipped_objs] failed_tracks = [ - {"track": _to_track_object_from_initial(initial_tracks_map[k], parent_type), "reason": "cancelled"} + { + "track": _to_track_object_from_initial(initial_tracks_map[k], parent_type), + "reason": "cancelled", + } for k in failed_keys if k in initial_tracks_map ] @@ -224,16 +243,15 @@ def trigger_sse_event(task_id: str, reason: str = "status_change"): trigger_sse_update_task.apply_async( args=[task_id, reason], queue="utility_tasks", - priority=9 # High priority for real-time updates + priority=9, # High priority for real-time updates ) # Only log at debug level to reduce verbosity logger.debug(f"SSE: Submitted SSE update task for {task_id} (reason: {reason})") - + except Exception as e: - logger.error(f"Error submitting SSE update task for task {task_id}: {e}", exc_info=True) - - - + logger.error( + f"Error submitting SSE update task for task {task_id}: {e}", exc_info=True + ) class ProgressState: @@ -318,10 +336,10 @@ def store_task_status(task_id, status_data): redis_client.publish( update_channel, json.dumps({"task_id": task_id, "status_id": status_id}) ) - + # Trigger immediate SSE event for real-time frontend updates trigger_sse_event(task_id, "status_update") - + except Exception as e: logger.error(f"Error storing task status: {e}") traceback.print_exc() @@ -421,7 +439,7 @@ def cancel_task(task_id): "status": ProgressState.CANCELLED, "error": "Task cancelled by user", "timestamp": time.time(), - } + }, }, ) @@ -616,6 +634,7 @@ def retry_task(task_id): logger.error(f"Error retrying task {task_id}: {e}", exc_info=True) return {"status": "error", "error": str(e)} + class ProgressTrackingTask(Task): """Base task class that tracks progress through callbacks""" @@ -633,7 +652,7 @@ class ProgressTrackingTask(Task): task_id = self.request.id # Ensure ./logs/tasks directory exists - logs_tasks_dir = Path("./logs/tasks") + logs_tasks_dir = Path("./logs/tasks") try: logs_tasks_dir.mkdir(parents=True, exist_ok=True) except Exception as e: @@ -650,7 +669,7 @@ class ProgressTrackingTask(Task): log_entry = progress_data.copy() if "timestamp" not in log_entry: log_entry["timestamp"] = time.time() - print(json.dumps(log_entry), file=log_file) + print(json.dumps(log_entry), file=log_file) except Exception as e: logger.error( f"Task {task_id}: Could not write to task log file {log_file_path}: {e}" @@ -663,7 +682,7 @@ class ProgressTrackingTask(Task): status_info = progress_data.get("status_info", {}) status = status_info.get("status", progress_data.get("status", "unknown")) task_info = get_task_info(task_id) - + logger.debug(f"Task {task_id}: Extracted status: '{status}' from callback") if logger.isEnabledFor(logging.DEBUG): @@ -704,59 +723,80 @@ class ProgressTrackingTask(Task): def _handle_initializing(self, task_id, data, task_info): """Handle initializing status from deezspot""" logger.info(f"Task {task_id} initializing...") - + # Initializing object is now very basic, mainly for acknowledging the start. # More detailed info comes with 'progress' or 'downloading' states. data["status"] = ProgressState.INITIALIZING - + # Store initial history entry for download start try: # Check for album/playlist FIRST since their callbacks contain both parent and track info if "album" in data: # Album download - create children table and store name in task info logger.info(f"Task {task_id}: Creating album children table") - children_table = history_manager.store_album_history(data, task_id, "in_progress") + children_table = history_manager.store_album_history( + data, task_id, "in_progress" + ) if children_table: task_info["children_table"] = children_table store_task_info(task_id, task_info) - logger.info(f"Task {task_id}: Created and stored children table '{children_table}' in task info") + logger.info( + f"Task {task_id}: Created and stored children table '{children_table}' in task info" + ) else: - logger.error(f"Task {task_id}: Failed to create album children table") + logger.error( + f"Task {task_id}: Failed to create album children table" + ) elif "playlist" in data: # Playlist download - create children table and store name in task info logger.info(f"Task {task_id}: Creating playlist children table") - children_table = history_manager.store_playlist_history(data, task_id, "in_progress") + children_table = history_manager.store_playlist_history( + data, task_id, "in_progress" + ) if children_table: task_info["children_table"] = children_table store_task_info(task_id, task_info) - logger.info(f"Task {task_id}: Created and stored children table '{children_table}' in task info") + logger.info( + f"Task {task_id}: Created and stored children table '{children_table}' in task info" + ) else: - logger.error(f"Task {task_id}: Failed to create playlist children table") + logger.error( + f"Task {task_id}: Failed to create playlist children table" + ) elif "track" in data: # Individual track download - check if it's part of an album/playlist children_table = task_info.get("children_table") if children_table: # Track is part of album/playlist - don't store in main table during initialization - logger.info(f"Task {task_id}: Skipping track initialization storage (part of album/playlist, children table: {children_table})") + logger.info( + f"Task {task_id}: Skipping track initialization storage (part of album/playlist, children table: {children_table})" + ) else: # Individual track download - store in main table - logger.info(f"Task {task_id}: Storing individual track history (initializing)") + logger.info( + f"Task {task_id}: Storing individual track history (initializing)" + ) history_manager.store_track_history(data, task_id, "in_progress") except Exception as e: - logger.error(f"Failed to store initial history for task {task_id}: {e}", exc_info=True) + logger.error( + f"Failed to store initial history for task {task_id}: {e}", + exc_info=True, + ) def _handle_downloading(self, task_id, data, task_info): """Handle downloading status from deezspot""" track_obj = data.get("track", {}) track_name = track_obj.get("title", "Unknown") - + artists = track_obj.get("artists", []) artist_name = artists[0].get("name", "") if artists else "" - + album_obj = track_obj.get("album", {}) album_name = album_obj.get("title", "") - logger.info(f"Task {task_id}: Starting download for track '{track_name}' by {artist_name}") + logger.info( + f"Task {task_id}: Starting download for track '{track_name}' by {artist_name}" + ) data["status"] = ProgressState.DOWNLOADING data["song"] = track_name @@ -767,14 +807,14 @@ class ProgressTrackingTask(Task): """Handle progress status for albums/playlists from deezspot""" item = data.get("playlist") or data.get("album", {}) track = data.get("track", {}) - + item_name = item.get("title", "Unknown Item") total_tracks = item.get("total_tracks", 0) - + track_name = track.get("title", "Unknown Track") artists = track.get("artists", []) artist_name = artists[0].get("name", "") if artists else "" - + # The 'progress' field in the callback is the track number being processed current_track_num = data.get("progress", 0) @@ -783,13 +823,17 @@ class ProgressTrackingTask(Task): task_info["completed_tracks"] = current_track_num - 1 task_info["current_track_num"] = current_track_num store_task_info(task_id, task_info) - - overall_progress = min(int(((current_track_num -1) / total_tracks) * 100), 100) + + overall_progress = min( + int(((current_track_num - 1) / total_tracks) * 100), 100 + ) data["overall_progress"] = overall_progress data["parsed_current_track"] = current_track_num data["parsed_total_tracks"] = total_tracks - logger.info(f"Task {task_id}: Progress on '{item_name}': Processing track {current_track_num}/{total_tracks} - '{track_name}'") + logger.info( + f"Task {task_id}: Progress on '{item_name}': Processing track {current_track_num}/{total_tracks} - '{track_name}'" + ) data["status"] = ProgressState.PROGRESS data["song"] = track_name @@ -801,9 +845,11 @@ class ProgressTrackingTask(Task): track_obj = data.get("track", {}) track_name = track_obj.get("title", "Unknown Track") percentage = data.get("percentage", 0) - - logger.debug(f"Task {task_id}: Real-time progress for '{track_name}': {percentage}%") - + + logger.debug( + f"Task {task_id}: Real-time progress for '{track_name}': {percentage}%" + ) + data["song"] = track_name artist = data.get("artist", "Unknown") @@ -838,28 +884,38 @@ class ProgressTrackingTask(Task): ) # Log at debug level - logger.debug(f"Task {task_id} track progress: {track_name} by {artist}: {percent}%") + logger.debug( + f"Task {task_id} track progress: {track_name} by {artist}: {percent}%" + ) def _handle_skipped(self, task_id, data, task_info): """Handle skipped status from deezspot""" - + # Store skipped history for deezspot callback format try: if "track" in data: # Individual track skipped - check if we should use children table children_table = task_info.get("children_table") - logger.debug(f"Task {task_id}: Skipped track, children_table = '{children_table}'") + logger.debug( + f"Task {task_id}: Skipped track, children_table = '{children_table}'" + ) if children_table: # Part of album/playlist - store progressively in children table - logger.info(f"Task {task_id}: Storing skipped track in children table '{children_table}' (progressive)") - history_manager.store_track_history(data, task_id, "skipped", children_table) + logger.info( + f"Task {task_id}: Storing skipped track in children table '{children_table}' (progressive)" + ) + history_manager.store_track_history( + data, task_id, "skipped", children_table + ) else: # Individual track download - store in main table - logger.info(f"Task {task_id}: Storing skipped track in main table (individual download)") + logger.info( + f"Task {task_id}: Storing skipped track in main table (individual download)" + ) history_manager.store_track_history(data, task_id, "skipped") except Exception as e: logger.error(f"Failed to store skipped history for task {task_id}: {e}") - + # Extract track info (legacy format support) title = data.get("song", "Unknown") artist = data.get("artist", "Unknown") @@ -933,7 +989,7 @@ class ProgressTrackingTask(Task): def _handle_error(self, task_id, data, task_info): """Handle error status from deezspot""" - + # Store error history for deezspot callback format try: # Check for album/playlist FIRST since their callbacks contain both parent and track info @@ -948,18 +1004,26 @@ class ProgressTrackingTask(Task): elif "track" in data: # Individual track failed - check if we should use children table children_table = task_info.get("children_table") - logger.debug(f"Task {task_id}: Failed track, children_table = '{children_table}'") + logger.debug( + f"Task {task_id}: Failed track, children_table = '{children_table}'" + ) if children_table: # Part of album/playlist - store progressively in children table - logger.info(f"Task {task_id}: Storing failed track in children table '{children_table}' (progressive)") - history_manager.store_track_history(data, task_id, "failed", children_table) + logger.info( + f"Task {task_id}: Storing failed track in children table '{children_table}' (progressive)" + ) + history_manager.store_track_history( + data, task_id, "failed", children_table + ) else: # Individual track download - store in main table - logger.info(f"Task {task_id}: Storing failed track in main table (individual download)") + logger.info( + f"Task {task_id}: Storing failed track in main table (individual download)" + ) history_manager.store_track_history(data, task_id, "failed") except Exception as e: logger.error(f"Failed to store error history for task {task_id}: {e}") - + # Extract error info (legacy format support) message = data.get("message", "Unknown error") @@ -977,7 +1041,7 @@ class ProgressTrackingTask(Task): def _handle_done(self, task_id, data, task_info): """Handle done status from deezspot""" - + # Store completion history for deezspot callback format try: # Check for album/playlist FIRST since their callbacks contain both parent and track info @@ -992,18 +1056,29 @@ class ProgressTrackingTask(Task): elif "track" in data: # Individual track completion - check if we should use children table children_table = task_info.get("children_table") - logger.debug(f"Task {task_id}: Completed track, children_table = '{children_table}'") + logger.debug( + f"Task {task_id}: Completed track, children_table = '{children_table}'" + ) if children_table: # Part of album/playlist - store progressively in children table - logger.info(f"Task {task_id}: Storing completed track in children table '{children_table}' (progressive)") - history_manager.store_track_history(data, task_id, "completed", children_table) + logger.info( + f"Task {task_id}: Storing completed track in children table '{children_table}' (progressive)" + ) + history_manager.store_track_history( + data, task_id, "completed", children_table + ) else: # Individual track download - store in main table - logger.info(f"Task {task_id}: Storing completed track in main table (individual download)") + logger.info( + f"Task {task_id}: Storing completed track in main table (individual download)" + ) history_manager.store_track_history(data, task_id, "completed") except Exception as e: - logger.error(f"Failed to store completion history for task {task_id}: {e}", exc_info=True) - + logger.error( + f"Failed to store completion history for task {task_id}: {e}", + exc_info=True, + ) + # Extract data (legacy format support) content_type = data.get("type", "").lower() album = data.get("album", "") @@ -1177,9 +1252,9 @@ def task_prerun_handler(task_id=None, task=None, *args, **kwargs): """Signal handler when a task begins running""" try: # Skip verbose logging for SSE tasks - if task and hasattr(task, 'name') and task.name in ['trigger_sse_update_task']: + if task and hasattr(task, "name") and task.name in ["trigger_sse_update_task"]: return - + task_info = get_task_info(task_id) # Update task status to processing @@ -1208,9 +1283,9 @@ def task_postrun_handler( """Signal handler when a task finishes""" try: # Skip verbose logging for SSE tasks - if task and hasattr(task, 'name') and task.name in ['trigger_sse_update_task']: + if task and hasattr(task, "name") and task.name in ["trigger_sse_update_task"]: return - + last_status_for_history = get_last_task_status(task_id) if last_status_for_history and last_status_for_history.get("status") in [ ProgressState.COMPLETE, @@ -1223,9 +1298,7 @@ def task_postrun_handler( state == states.REVOKED and last_status_for_history.get("status") != ProgressState.CANCELLED ): - logger.info( - f"Task {task_id} was REVOKED (likely cancelled)." - ) + logger.info(f"Task {task_id} was REVOKED (likely cancelled).") # return # Let status update proceed if necessary task_info = get_task_info(task_id) @@ -1235,8 +1308,13 @@ def task_postrun_handler( # If task was cancelled/revoked, finalize parent history with partial summary try: - if state == states.REVOKED or current_redis_status == ProgressState.CANCELLED: - parent_type = (task_info.get("download_type") or task_info.get("type") or "").lower() + if ( + state == states.REVOKED + or current_redis_status == ProgressState.CANCELLED + ): + parent_type = ( + task_info.get("download_type") or task_info.get("type") or "" + ).lower() if parent_type in ["album", "playlist"]: # Build detailed summary from the task log summary = build_partial_summary_from_task_log(task_id, parent_type) @@ -1247,14 +1325,24 @@ def task_postrun_handler( # Try to enrich parent payload with initial callback object (to capture artists, ids, images) try: log_lines = _read_task_log_json_lines(task_id) - initial_parent = _extract_initial_parent_object(log_lines, parent_type) + initial_parent = _extract_initial_parent_object( + log_lines, parent_type + ) except Exception: initial_parent = None if parent_type == "album": album_payload = {"title": title, "total_tracks": total_tracks} if isinstance(initial_parent, dict): - for k in ["artists", "ids", "images", "release_date", "genres", "album_type", "tracks"]: + for k in [ + "artists", + "ids", + "images", + "release_date", + "genres", + "album_type", + "tracks", + ]: if k in initial_parent: album_payload[k] = initial_parent.get(k) # Ensure a main history entry exists even on cancellation @@ -1266,7 +1354,13 @@ def task_postrun_handler( else: playlist_payload = {"title": title} if isinstance(initial_parent, dict): - for k in ["owner", "ids", "images", "tracks", "description"]: + for k in [ + "owner", + "ids", + "images", + "tracks", + "description", + ]: if k in initial_parent: playlist_payload[k] = initial_parent.get(k) history_manager.store_playlist_history( @@ -1314,14 +1408,17 @@ def task_postrun_handler( try: # Use task_id as primary source for metadata extraction add_single_track_to_playlist_db( - playlist_spotify_id=playlist_id, + playlist_spotify_id=playlist_id, track_item_for_db=track_item_for_db, # Keep as fallback - task_id=task_id # Primary source for metadata + task_id=task_id, # Primary source for metadata ) - + # Update the playlist's m3u file after successful track addition try: - from routes.utils.watch.manager import update_playlist_m3u_file + from routes.utils.watch.manager import ( + update_playlist_m3u_file, + ) + logger.info( f"Updating m3u file for playlist {playlist_id} after successful track download." ) @@ -1331,7 +1428,7 @@ def task_postrun_handler( f"Failed to update m3u file for playlist {playlist_id} after successful track download task {task_id}: {m3u_update_err}", exc_info=True, ) - + except Exception as db_add_err: logger.error( f"Failed to add track to DB for playlist {playlist_id} after successful download task {task_id}: {db_add_err}", @@ -1390,9 +1487,6 @@ def task_failure_handler( if isinstance(exception, Retry): return - # Define download task names - download_task_names = ["download_track", "download_album", "download_playlist"] - # Get task info and status task_info = get_task_info(task_id) last_status = get_last_task_status(task_id) @@ -1523,6 +1617,12 @@ def download_track(self, **task_data): save_cover = task_data.get("save_cover", config_params.get("save_cover", True)) convert_to = task_data.get("convertTo", config_params.get("convertTo")) bitrate = task_data.get("bitrate", config_params.get("bitrate")) + recursive_quality = task_data.get( + "recursive_quality", config_params.get("recursive_quality", False) + ) + artist_separator = task_data.get( + "artist_separator", config_params.get("artist_separator", "; ") + ) # Execute the download - service is now determined from URL download_track_func( @@ -1539,6 +1639,8 @@ def download_track(self, **task_data): progress_callback=self.progress_callback, convert_to=convert_to, bitrate=bitrate, + recursive_quality=recursive_quality, + artist_separator=artist_separator, _is_celery_task_execution=True, # Skip duplicate check inside Celery task (consistency) ) @@ -1610,6 +1712,12 @@ def download_album(self, **task_data): save_cover = task_data.get("save_cover", config_params.get("save_cover", True)) convert_to = task_data.get("convertTo", config_params.get("convertTo")) bitrate = task_data.get("bitrate", config_params.get("bitrate")) + recursive_quality = task_data.get( + "recursive_quality", config_params.get("recursive_quality", False) + ) + artist_separator = task_data.get( + "artist_separator", config_params.get("artist_separator", "; ") + ) # Execute the download - service is now determined from URL download_album_func( @@ -1626,6 +1734,8 @@ def download_album(self, **task_data): progress_callback=self.progress_callback, convert_to=convert_to, bitrate=bitrate, + recursive_quality=recursive_quality, + artist_separator=artist_separator, _is_celery_task_execution=True, # Skip duplicate check inside Celery task ) @@ -1697,6 +1807,12 @@ def download_playlist(self, **task_data): save_cover = task_data.get("save_cover", config_params.get("save_cover", True)) convert_to = task_data.get("convertTo", config_params.get("convertTo")) bitrate = task_data.get("bitrate", config_params.get("bitrate")) + recursive_quality = task_data.get( + "recursive_quality", config_params.get("recursive_quality", False) + ) + artist_separator = task_data.get( + "artist_separator", config_params.get("artist_separator", "; ") + ) # Get retry parameters initial_retry_delay = task_data.get( @@ -1725,6 +1841,8 @@ def download_playlist(self, **task_data): progress_callback=self.progress_callback, convert_to=convert_to, bitrate=bitrate, + recursive_quality=recursive_quality, + artist_separator=artist_separator, _is_celery_task_execution=True, # Skip duplicate check inside Celery task ) @@ -1868,11 +1986,7 @@ def delayed_delete_task_data(task_id, reason): delete_task_data_and_log(task_id, reason) -@celery_app.task( - name="trigger_sse_update_task", - queue="utility_tasks", - bind=True -) +@celery_app.task(name="trigger_sse_update_task", queue="utility_tasks", bind=True) def trigger_sse_update_task(self, task_id: str, reason: str = "status_update"): """ Dedicated Celery task for triggering SSE task summary updates. @@ -1880,35 +1994,41 @@ def trigger_sse_update_task(self, task_id: str, reason: str = "status_update"): """ try: # Send task summary update via Redis pub/sub - logger.debug(f"SSE Task: Processing summary update for task {task_id} (reason: {reason})") - + logger.debug( + f"SSE Task: Processing summary update for task {task_id} (reason: {reason})" + ) + event_data = { "task_id": task_id, "reason": reason, "timestamp": time.time(), "change_type": "task_summary", - "event_type": "summary_update" + "event_type": "summary_update", } - + # Use Redis pub/sub for cross-process communication redis_client.publish("sse_events", json.dumps(event_data)) logger.debug(f"SSE Task: Published summary update for task {task_id}") - + except Exception as e: # Only log errors, not success cases - logger.error(f"SSE Task: Failed to publish summary update for task {task_id}: {e}", exc_info=True) + logger.error( + f"SSE Task: Failed to publish summary update for task {task_id}: {e}", + exc_info=True, + ) # Don't raise exception to avoid task retry - SSE updates are best-effort def _extract_initial_parent_object(log_lines: list, parent_type: str) -> dict | None: """Return the first album/playlist object from the log's initializing callback, if present.""" - key = "album" if parent_type == "album" else ("playlist" if parent_type == "playlist" else None) + key = ( + "album" + if parent_type == "album" + else ("playlist" if parent_type == "playlist" else None) + ) if not key: return None for obj in log_lines: if key in obj and isinstance(obj[key], dict): return obj[key] return None - - - diff --git a/routes/utils/get_info.py b/routes/utils/get_info.py index 00aeb7f..c027fc9 100644 --- a/routes/utils/get_info.py +++ b/routes/utils/get_info.py @@ -1,12 +1,9 @@ import spotipy from spotipy.oauth2 import SpotifyClientCredentials -from routes.utils.celery_queue_manager import get_config_params -from routes.utils.credentials import get_credential, _get_global_spotify_api_creds +from routes.utils.credentials import _get_global_spotify_api_creds import logging import time -from typing import Dict, List, Optional, Any -import json -from pathlib import Path +from typing import Dict, Optional, Any # Import Deezer API and logging from deezspot.deezloader.dee_api import API as DeezerAPI @@ -19,19 +16,21 @@ _spotify_client = None _last_client_init = 0 _client_init_interval = 3600 # Reinitialize client every hour + def _get_spotify_client(): """ Get or create a Spotify client with global credentials. Implements client reuse and periodic reinitialization. """ global _spotify_client, _last_client_init - + current_time = time.time() - - # Reinitialize client if it's been more than an hour or if client doesn't exist - if (_spotify_client is None or - current_time - _last_client_init > _client_init_interval): - + + # Reinitialize client if it's been more than an hour or if client doesn't exist + if ( + _spotify_client is None + or current_time - _last_client_init > _client_init_interval + ): client_id, client_secret = _get_global_spotify_api_creds() if not client_id or not client_secret: @@ -42,177 +41,198 @@ def _get_spotify_client(): # Create new client _spotify_client = spotipy.Spotify( client_credentials_manager=SpotifyClientCredentials( - client_id=client_id, - client_secret=client_secret + client_id=client_id, client_secret=client_secret ) ) _last_client_init = current_time logger.info("Spotify client initialized/reinitialized") - + return _spotify_client + def _rate_limit_handler(func): """ Decorator to handle rate limiting with exponential backoff. """ + def wrapper(*args, **kwargs): max_retries = 3 base_delay = 1 - + for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: if "429" in str(e) or "rate limit" in str(e).lower(): if attempt < max_retries - 1: - delay = base_delay * (2 ** attempt) + delay = base_delay * (2**attempt) logger.warning(f"Rate limited, retrying in {delay} seconds...") time.sleep(delay) continue raise e return func(*args, **kwargs) + return wrapper + @_rate_limit_handler def get_playlist_metadata(playlist_id: str) -> Dict[str, Any]: """ Get playlist metadata only (no tracks) to avoid rate limiting. - + Args: playlist_id: The Spotify playlist ID - + Returns: Dictionary with playlist metadata (name, description, owner, etc.) """ client = _get_spotify_client() - + try: # Get basic playlist info without tracks - playlist = client.playlist(playlist_id, fields="id,name,description,owner,images,snapshot_id,public,followers,tracks.total") - + playlist = client.playlist( + playlist_id, + fields="id,name,description,owner,images,snapshot_id,public,followers,tracks.total", + ) + # Add a flag to indicate this is metadata only - playlist['_metadata_only'] = True - playlist['_tracks_loaded'] = False - - logger.debug(f"Retrieved playlist metadata for {playlist_id}: {playlist.get('name', 'Unknown')}") + playlist["_metadata_only"] = True + playlist["_tracks_loaded"] = False + + logger.debug( + f"Retrieved playlist metadata for {playlist_id}: {playlist.get('name', 'Unknown')}" + ) return playlist - + except Exception as e: logger.error(f"Error fetching playlist metadata for {playlist_id}: {e}") raise + @_rate_limit_handler -def get_playlist_tracks(playlist_id: str, limit: int = 100, offset: int = 0) -> Dict[str, Any]: +def get_playlist_tracks( + playlist_id: str, limit: int = 100, offset: int = 0 +) -> Dict[str, Any]: """ Get playlist tracks with pagination support to handle large playlists efficiently. - + Args: playlist_id: The Spotify playlist ID limit: Number of tracks to fetch per request (max 100) offset: Starting position for pagination - + Returns: Dictionary with tracks data """ client = _get_spotify_client() - + try: # Get tracks with specified limit and offset tracks_data = client.playlist_tracks( - playlist_id, + playlist_id, limit=min(limit, 100), # Spotify API max is 100 offset=offset, - fields="items(track(id,name,artists,album,external_urls,preview_url,duration_ms,explicit,popularity)),total,limit,offset" + fields="items(track(id,name,artists,album,external_urls,preview_url,duration_ms,explicit,popularity)),total,limit,offset", + ) + + logger.debug( + f"Retrieved {len(tracks_data.get('items', []))} tracks for playlist {playlist_id} (offset: {offset})" ) - - logger.debug(f"Retrieved {len(tracks_data.get('items', []))} tracks for playlist {playlist_id} (offset: {offset})") return tracks_data - + except Exception as e: logger.error(f"Error fetching playlist tracks for {playlist_id}: {e}") raise + @_rate_limit_handler def get_playlist_full(playlist_id: str, batch_size: int = 100) -> Dict[str, Any]: """ Get complete playlist data with all tracks, using batched requests to avoid rate limiting. - + Args: playlist_id: The Spotify playlist ID batch_size: Number of tracks to fetch per batch (max 100) - + Returns: Complete playlist data with all tracks """ - client = _get_spotify_client() - try: # First get metadata playlist = get_playlist_metadata(playlist_id) - + # Get total track count - total_tracks = playlist.get('tracks', {}).get('total', 0) - + total_tracks = playlist.get("tracks", {}).get("total", 0) + if total_tracks == 0: - playlist['tracks'] = {'items': [], 'total': 0} + playlist["tracks"] = {"items": [], "total": 0} return playlist - + # Fetch all tracks in batches all_tracks = [] offset = 0 - + while offset < total_tracks: batch = get_playlist_tracks(playlist_id, limit=batch_size, offset=offset) - batch_items = batch.get('items', []) + batch_items = batch.get("items", []) all_tracks.extend(batch_items) - + offset += len(batch_items) - + # Add small delay between batches to be respectful to API if offset < total_tracks: time.sleep(0.1) - + # Update playlist with complete tracks data - playlist['tracks'] = { - 'items': all_tracks, - 'total': total_tracks, - 'limit': batch_size, - 'offset': 0 + playlist["tracks"] = { + "items": all_tracks, + "total": total_tracks, + "limit": batch_size, + "offset": 0, } - playlist['_metadata_only'] = False - playlist['_tracks_loaded'] = True - - logger.info(f"Retrieved complete playlist {playlist_id} with {total_tracks} tracks") + playlist["_metadata_only"] = False + playlist["_tracks_loaded"] = True + + logger.info( + f"Retrieved complete playlist {playlist_id} with {total_tracks} tracks" + ) return playlist - + except Exception as e: logger.error(f"Error fetching complete playlist {playlist_id}: {e}") raise + def check_playlist_updated(playlist_id: str, last_snapshot_id: str) -> bool: """ Check if playlist has been updated by comparing snapshot_id. This is much more efficient than fetching all tracks. - + Args: playlist_id: The Spotify playlist ID last_snapshot_id: The last known snapshot_id - + Returns: True if playlist has been updated, False otherwise """ try: metadata = get_playlist_metadata(playlist_id) - current_snapshot_id = metadata.get('snapshot_id') - + current_snapshot_id = metadata.get("snapshot_id") + return current_snapshot_id != last_snapshot_id - + except Exception as e: logger.error(f"Error checking playlist update status for {playlist_id}: {e}") raise + @_rate_limit_handler -def get_spotify_info(spotify_id: str, spotify_type: str, limit: Optional[int] = None, offset: Optional[int] = None) -> Dict[str, Any]: +def get_spotify_info( + spotify_id: str, + spotify_type: str, + limit: Optional[int] = None, + offset: Optional[int] = None, +) -> Dict[str, Any]: """ Get info from Spotify API using Spotipy directly. Optimized to prevent rate limiting by using appropriate endpoints. @@ -227,37 +247,35 @@ def get_spotify_info(spotify_id: str, spotify_type: str, limit: Optional[int] = Dictionary with the entity information """ client = _get_spotify_client() - + try: if spotify_type == "track": return client.track(spotify_id) - + elif spotify_type == "album": return client.album(spotify_id) - + elif spotify_type == "playlist": # Use optimized playlist fetching return get_playlist_full(spotify_id) - + elif spotify_type == "playlist_metadata": # Get only metadata for playlists return get_playlist_metadata(spotify_id) - + elif spotify_type == "artist": return client.artist(spotify_id) - + elif spotify_type == "artist_discography": # Get artist's albums with pagination albums = client.artist_albums( - spotify_id, - limit=limit or 20, - offset=offset or 0 + spotify_id, limit=limit or 20, offset=offset or 0 ) return albums - + elif spotify_type == "episode": return client.episode(spotify_id) - + else: raise ValueError(f"Unsupported Spotify type: {spotify_type}") @@ -265,17 +283,19 @@ def get_spotify_info(spotify_id: str, spotify_type: str, limit: Optional[int] = logger.error(f"Error fetching {spotify_type} {spotify_id}: {e}") raise + # Cache for playlist metadata to reduce API calls -_playlist_metadata_cache = {} +_playlist_metadata_cache: Dict[str, tuple[Dict[str, Any], float]] = {} _cache_ttl = 300 # 5 minutes cache + def get_cached_playlist_metadata(playlist_id: str) -> Optional[Dict[str, Any]]: """ Get playlist metadata from cache if available and not expired. - + Args: playlist_id: The Spotify playlist ID - + Returns: Cached metadata or None if not available/expired """ @@ -283,44 +303,48 @@ def get_cached_playlist_metadata(playlist_id: str) -> Optional[Dict[str, Any]]: cached_data, timestamp = _playlist_metadata_cache[playlist_id] if time.time() - timestamp < _cache_ttl: return cached_data - + return None + def cache_playlist_metadata(playlist_id: str, metadata: Dict[str, Any]): """ Cache playlist metadata with timestamp. - + Args: playlist_id: The Spotify playlist ID metadata: The metadata to cache """ _playlist_metadata_cache[playlist_id] = (metadata, time.time()) -def get_playlist_info_optimized(playlist_id: str, include_tracks: bool = False) -> Dict[str, Any]: + +def get_playlist_info_optimized( + playlist_id: str, include_tracks: bool = False +) -> Dict[str, Any]: """ Optimized playlist info function that uses caching and selective loading. - + Args: playlist_id: The Spotify playlist ID include_tracks: Whether to include track data (default: False to save API calls) - + Returns: Playlist data with or without tracks """ # Check cache first cached_metadata = get_cached_playlist_metadata(playlist_id) - + if cached_metadata and not include_tracks: logger.debug(f"Returning cached metadata for playlist {playlist_id}") return cached_metadata - + if include_tracks: # Get complete playlist data playlist_data = get_playlist_full(playlist_id) # Cache the metadata portion - metadata_only = {k: v for k, v in playlist_data.items() if k != 'tracks'} - metadata_only['_metadata_only'] = True - metadata_only['_tracks_loaded'] = False + metadata_only = {k: v for k, v in playlist_data.items() if k != "tracks"} + metadata_only["_metadata_only"] = True + metadata_only["_tracks_loaded"] = False cache_playlist_metadata(playlist_id, metadata_only) return playlist_data else: @@ -329,6 +353,7 @@ def get_playlist_info_optimized(playlist_id: str, include_tracks: bool = False) cache_playlist_metadata(playlist_id, metadata) return metadata + # Keep the existing Deezer functions unchanged def get_deezer_info(deezer_id, deezer_type, limit=None): """ diff --git a/routes/utils/history_manager.py b/routes/utils/history_manager.py index db57b27..f724084 100644 --- a/routes/utils/history_manager.py +++ b/routes/utils/history_manager.py @@ -4,29 +4,29 @@ import uuid import time import logging from pathlib import Path -from typing import Dict, List, Optional, Any, Union -from datetime import datetime +from typing import Dict, List, Optional, Union from contextlib import contextmanager logger = logging.getLogger(__name__) + class HistoryManager: """ Manages download history storage using SQLite database. Stores hierarchical download data from deezspot callback objects. """ - + def __init__(self, db_path: str = "data/history/download_history.db"): """ Initialize the history manager with database path. - + Args: db_path: Path to SQLite database file """ self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._ensure_database_exists() - + def _ensure_database_exists(self): """Create database and main table if they don't exist and migrate schema safely.""" expected_download_history_columns: Dict[str, str] = { @@ -53,7 +53,7 @@ class HistoryManager: "owner": "TEXT", "album_type": "TEXT", "duration_total_ms": "INTEGER", - "explicit": "BOOLEAN" + "explicit": "BOOLEAN", } with self._get_connection() as conn: @@ -68,7 +68,12 @@ class HistoryManager: """) # 2) Ensure/upgrade schema columns idempotently - self._ensure_table_schema(cursor, "download_history", expected_download_history_columns, "download history") + self._ensure_table_schema( + cursor, + "download_history", + expected_download_history_columns, + "download history", + ) # 3) Migrate legacy columns to new ones (best-effort, non-fatal) try: @@ -78,39 +83,59 @@ class HistoryManager: # Legacy timestamp columns → timestamp if "timestamp" not in cols: # Add column first - cursor.execute("ALTER TABLE download_history ADD COLUMN timestamp REAL") + cursor.execute( + "ALTER TABLE download_history ADD COLUMN timestamp REAL" + ) # Backfill from legacy columns if present - legacy_time_cols = [c for c in ["time", "created_at", "date"] if c in cols] + legacy_time_cols = [ + c for c in ["time", "created_at", "date"] if c in cols + ] if legacy_time_cols: # Pick the first legacy column to backfill legacy_col = legacy_time_cols[0] try: - cursor.execute(f"UPDATE download_history SET timestamp = CASE WHEN {legacy_col} IS NOT NULL THEN {legacy_col} ELSE strftime('%s','now') END") + cursor.execute( + f"UPDATE download_history SET timestamp = CASE WHEN {legacy_col} IS NOT NULL THEN {legacy_col} ELSE strftime('%s','now') END" + ) except sqlite3.Error: # Fallback: just set to now - cursor.execute("UPDATE download_history SET timestamp = strftime('%s','now')") + cursor.execute( + "UPDATE download_history SET timestamp = strftime('%s','now')" + ) else: # Default all to now if nothing to migrate - cursor.execute("UPDATE download_history SET timestamp = strftime('%s','now')") - + cursor.execute( + "UPDATE download_history SET timestamp = strftime('%s','now')" + ) + # quality → quality_format, bitrate → quality_bitrate # Handle common legacy pairs non-fataly cursor.execute("PRAGMA table_info(download_history)") cols = {row[1] for row in cursor.fetchall()} if "quality_format" not in cols and "quality" in cols: - cursor.execute("ALTER TABLE download_history ADD COLUMN quality_format TEXT") + cursor.execute( + "ALTER TABLE download_history ADD COLUMN quality_format TEXT" + ) try: - cursor.execute("UPDATE download_history SET quality_format = quality WHERE quality_format IS NULL") + cursor.execute( + "UPDATE download_history SET quality_format = quality WHERE quality_format IS NULL" + ) except sqlite3.Error: pass if "quality_bitrate" not in cols and "bitrate" in cols: - cursor.execute("ALTER TABLE download_history ADD COLUMN quality_bitrate TEXT") + cursor.execute( + "ALTER TABLE download_history ADD COLUMN quality_bitrate TEXT" + ) try: - cursor.execute("UPDATE download_history SET quality_bitrate = bitrate WHERE quality_bitrate IS NULL") + cursor.execute( + "UPDATE download_history SET quality_bitrate = bitrate WHERE quality_bitrate IS NULL" + ) except sqlite3.Error: pass except Exception as e: - logger.warning(f"Non-fatal: failed legacy column migration for download_history: {e}") + logger.warning( + f"Non-fatal: failed legacy column migration for download_history: {e}" + ) # 4) Create indexes only if columns exist (avoid startup failures) try: @@ -141,14 +166,16 @@ class HistoryManager: """ ) except Exception as e: - logger.warning(f"Non-fatal: failed to create indexes for download_history: {e}") + logger.warning( + f"Non-fatal: failed to create indexes for download_history: {e}" + ) # 5) Best-effort upgrade of existing children tables (album_*, playlist_*) try: self._migrate_existing_children_tables(cursor) except Exception as e: logger.warning(f"Non-fatal: failed to migrate children tables: {e}") - + @contextmanager def _get_connection(self): """Get database connection with proper error handling.""" @@ -166,8 +193,14 @@ class HistoryManager: finally: if conn: conn.close() - - def _ensure_table_schema(self, cursor: sqlite3.Cursor, table_name: str, expected_columns: Dict[str, str], table_description: str) -> None: + + def _ensure_table_schema( + self, + cursor: sqlite3.Cursor, + table_name: str, + expected_columns: Dict[str, str], + table_description: str, + ) -> None: """Ensure all expected columns exist in the given table, adding any missing columns.""" try: cursor.execute(f"PRAGMA table_info({table_name})") @@ -177,20 +210,32 @@ class HistoryManager: for col_name, col_type in expected_columns.items(): if col_name not in existing_names: # Avoid adding PRIMARY KEY on existing tables; strip it for ALTER - col_type_for_add = col_type.replace("PRIMARY KEY", "").replace("AUTOINCREMENT", "").strip() + col_type_for_add = ( + col_type.replace("PRIMARY KEY", "") + .replace("AUTOINCREMENT", "") + .strip() + ) try: - cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {col_type_for_add}") - logger.info(f"Added missing column '{col_name} {col_type_for_add}' to {table_description} table '{table_name}'.") + cursor.execute( + f"ALTER TABLE {table_name} ADD COLUMN {col_name} {col_type_for_add}" + ) + logger.info( + f"Added missing column '{col_name} {col_type_for_add}' to {table_description} table '{table_name}'." + ) except sqlite3.Error as e: - logger.warning(f"Could not add column '{col_name}' to {table_description} table '{table_name}': {e}") + logger.warning( + f"Could not add column '{col_name}' to {table_description} table '{table_name}': {e}" + ) except sqlite3.Error as e: - logger.error(f"Error ensuring schema for {table_description} table '{table_name}': {e}") + logger.error( + f"Error ensuring schema for {table_description} table '{table_name}': {e}" + ) def _create_children_table(self, table_name: str): """ Create a children table for storing individual tracks of an album/playlist. Ensures schema upgrades for existing tables. - + Args: table_name: Name of the children table (e.g., 'album_abc123') """ @@ -232,26 +277,32 @@ class HistoryManager: "position": "INTEGER", "metadata": "TEXT", } - self._ensure_table_schema(cursor, table_name, expected_children_columns, "children history") + self._ensure_table_schema( + cursor, table_name, expected_children_columns, "children history" + ) def _migrate_existing_children_tables(self, cursor: sqlite3.Cursor) -> None: """Find album_* and playlist_* children tables and ensure they have the expected schema.""" - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE 'album_%' OR name LIKE 'playlist_%')") + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE 'album_%' OR name LIKE 'playlist_%')" + ) tables = [row[0] for row in cursor.fetchall() if row[0] != "download_history"] for t in tables: try: # Ensure existence + schema upgrades - cursor.execute(f"CREATE TABLE IF NOT EXISTS {t} (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)") + cursor.execute( + f"CREATE TABLE IF NOT EXISTS {t} (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)" + ) self._create_children_table(t) except Exception as e: logger.warning(f"Non-fatal: failed to migrate children table {t}: {e}") - + def _extract_artists(self, obj: Dict) -> List[str]: """Extract artist names from various object types.""" artists = obj.get("artists", []) if not artists: return [] - + artist_names = [] for artist in artists: if isinstance(artist, dict): @@ -260,21 +311,21 @@ class HistoryManager: artist_names.append(name) elif isinstance(artist, str): artist_names.append(artist) - + return artist_names - + def _extract_external_ids(self, obj: Dict) -> Dict: """Extract external service IDs from object.""" return obj.get("ids", {}) - + def _extract_images(self, obj: Dict) -> List[Dict]: """Extract image information from object.""" return obj.get("images", []) - + def _extract_release_date(self, obj: Dict) -> Dict: """Extract release date information from object.""" return obj.get("release_date", {}) - + def _calculate_total_duration(self, tracks: List[Dict]) -> int: """Calculate total duration from tracks list.""" total = 0 @@ -283,7 +334,7 @@ class HistoryManager: if duration: total += duration return total - + def _get_primary_service(self, external_ids: Dict) -> str: """Determine primary service from external IDs.""" if "spotify" in external_ids: @@ -292,53 +343,63 @@ class HistoryManager: return "deezer" else: return "unknown" - + def create_children_table_for_album(self, callback_data: Dict, task_id: str) -> str: """ Create children table for album download at the start and return table name. - + Args: - callback_data: Album callback object from deezspot + callback_data: Album callback object from deezspot task_id: Celery task ID - + Returns: Children table name """ # Generate children table name album_uuid = str(uuid.uuid4()).replace("-", "")[:10] children_table = f"album_{album_uuid}" - + # Create the children table self._create_children_table(children_table) - + logger.info(f"Created album children table {children_table} for task {task_id}") return children_table - - def create_children_table_for_playlist(self, callback_data: Dict, task_id: str) -> str: + + def create_children_table_for_playlist( + self, callback_data: Dict, task_id: str + ) -> str: """ Create children table for playlist download at the start and return table name. - + Args: callback_data: Playlist callback object from deezspot task_id: Celery task ID - + Returns: Children table name """ # Generate children table name playlist_uuid = str(uuid.uuid4()).replace("-", "")[:10] children_table = f"playlist_{playlist_uuid}" - + # Create the children table self._create_children_table(children_table) - - logger.info(f"Created playlist children table {children_table} for task {task_id}") + + logger.info( + f"Created playlist children table {children_table} for task {task_id}" + ) return children_table - - def store_track_history(self, callback_data: Dict, task_id: str, status: str = "completed", table: str = "download_history"): + + def store_track_history( + self, + callback_data: Dict, + task_id: str, + status: str = "completed", + table: str = "download_history", + ): """ Store individual track download history. - + Args: callback_data: Track callback object from deezspot task_id: Celery task ID @@ -348,17 +409,17 @@ class HistoryManager: try: track = callback_data.get("track", {}) status_info = callback_data.get("status_info", {}) - + if not track: logger.warning(f"No track data in callback for task {task_id}") return - + artists = self._extract_artists(track) external_ids = self._extract_external_ids(track) - + album = track.get("album", {}) album_title = album.get("title", "") - + # Prepare metadata metadata = { "callback_type": "track", @@ -366,46 +427,53 @@ class HistoryManager: "current_track": callback_data.get("current_track"), "total_tracks": callback_data.get("total_tracks"), "album": album, - "status_info": status_info + "status_info": status_info, } - + with self._get_connection() as conn: if table == "download_history": # Store in main download_history table - logger.info(f"Storing track '{track.get('title', 'Unknown')}' in MAIN table for task {task_id}") - conn.execute(""" + logger.info( + f"Storing track '{track.get('title', 'Unknown')}' in MAIN table for task {task_id}" + ) + conn.execute( + """ INSERT OR REPLACE INTO download_history ( download_type, title, artists, timestamp, status, service, quality_format, quality_bitrate, task_id, external_ids, metadata, release_date, genres, explicit, album_type, duration_total_ms ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - "track", - track.get("title", "Unknown"), - json.dumps(artists), - callback_data.get("timestamp", time.time()), - status, - self._get_primary_service(external_ids), - status_info.get("convert_to"), - status_info.get("bitrate"), - task_id, - json.dumps(external_ids), - json.dumps(metadata), - json.dumps(self._extract_release_date(album)), - json.dumps(track.get("genres", [])), - track.get("explicit", False), - album.get("album_type"), - track.get("duration_ms", 0) - )) + """, + ( + "track", + track.get("title", "Unknown"), + json.dumps(artists), + callback_data.get("timestamp", time.time()), + status, + self._get_primary_service(external_ids), + status_info.get("convert_to"), + status_info.get("bitrate"), + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps(self._extract_release_date(album)), + json.dumps(track.get("genres", [])), + track.get("explicit", False), + album.get("album_type"), + track.get("duration_ms", 0), + ), + ) else: # Ensure target children table exists before write self._create_children_table(table) # Store in children table (for album/playlist tracks) - logger.info(f"Storing track '{track.get('title', 'Unknown')}' in CHILDREN table '{table}' for task {task_id}") + logger.info( + f"Storing track '{track.get('title', 'Unknown')}' in CHILDREN table '{table}' for task {task_id}" + ) # Extract ISRC isrc = external_ids.get("isrc", "") - + # Prepare children table metadata children_metadata = { "album": album, @@ -414,85 +482,101 @@ class HistoryManager: "parent": callback_data.get("parent"), "current_track": callback_data.get("current_track"), "total_tracks": callback_data.get("total_tracks"), - "status_info": status_info + "status_info": status_info, } - - conn.execute(f""" + + conn.execute( + f""" INSERT INTO {table} ( title, artists, album_title, duration_ms, track_number, disc_number, explicit, status, external_ids, genres, isrc, timestamp, position, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - track.get("title", "Unknown"), - json.dumps(artists), - album_title, - track.get("duration_ms", 0), - track.get("track_number", 0), - track.get("disc_number", 1), - track.get("explicit", False), - status, - json.dumps(external_ids), - json.dumps(track.get("genres", [])), - isrc, - callback_data.get("timestamp", time.time()), - track.get("position", 0), # For playlist tracks - json.dumps(children_metadata) - )) - - logger.info(f"Successfully stored track '{track.get('title')}' in table '{table}' (task: {task_id})") - + """, + ( + track.get("title", "Unknown"), + json.dumps(artists), + album_title, + track.get("duration_ms", 0), + track.get("track_number", 0), + track.get("disc_number", 1), + track.get("explicit", False), + status, + json.dumps(external_ids), + json.dumps(track.get("genres", [])), + isrc, + callback_data.get("timestamp", time.time()), + track.get("position", 0), # For playlist tracks + json.dumps(children_metadata), + ), + ) + + logger.info( + f"Successfully stored track '{track.get('title')}' in table '{table}' (task: {task_id})" + ) + except Exception as e: logger.error(f"Failed to store track history for task {task_id}: {e}") - - def store_album_history(self, callback_data: Dict, task_id: str, status: str = "completed"): + + def store_album_history( + self, callback_data: Dict, task_id: str, status: str = "completed" + ): """ Store album download history with children table for individual tracks. - + Args: - callback_data: Album callback object from deezspot + callback_data: Album callback object from deezspot task_id: Celery task ID status: Download status ('completed', 'failed', 'in_progress') - + Returns: Children table name when status is 'in_progress', None otherwise """ try: album = callback_data.get("album", {}) status_info = callback_data.get("status_info", {}) - + if not album: logger.warning(f"No album data in callback for task {task_id}") return None - + if status == "in_progress": # Phase 1: Create children table at start, don't store album entry yet - children_table = self.create_children_table_for_album(callback_data, task_id) - logger.info(f"Album download started for task {task_id}, children table: {children_table}") + children_table = self.create_children_table_for_album( + callback_data, task_id + ) + logger.info( + f"Album download started for task {task_id}, children table: {children_table}" + ) return children_table - + # Phase 2: Store album entry in main table (for completed/failed status) artists = self._extract_artists(album) external_ids = self._extract_external_ids(album) - + # For completed/failed, we need to find the existing children table # This should be stored in task info by the celery task from routes.utils.celery_tasks import get_task_info + task_info = get_task_info(task_id) children_table = task_info.get("children_table") - + if not children_table: # Fallback: generate new children table name (shouldn't happen in normal flow) album_uuid = str(uuid.uuid4()).replace("-", "")[:10] children_table = f"album_{album_uuid}" - logger.warning(f"No children table found for album task {task_id}, generating new: {children_table}") - + logger.warning( + f"No children table found for album task {task_id}, generating new: {children_table}" + ) + # Extract summary data if available (from 'done' status) summary = status_info.get("summary", {}) successful_tracks = summary.get("total_successful", 0) - failed_tracks = summary.get("total_failed", 0) + failed_tracks = summary.get("total_failed", 0) skipped_tracks = summary.get("total_skipped", 0) - total_tracks = summary.get("total_successful", 0) + summary.get("total_skipped", 0) + summary.get("total_failed", 0) or album.get("total_tracks", 0) + total_tracks = summary.get("total_successful", 0) + summary.get( + "total_skipped", 0 + ) + summary.get("total_failed", 0) or album.get("total_tracks", 0) # Enrich album metadata if missing try: @@ -503,12 +587,16 @@ class HistoryManager: # Calculate total duration tracks = album.get("tracks", []) total_duration = self._calculate_total_duration(tracks) - + # Derive accurate status if we have counters status_to_store = status try: if total_tracks: - if successful_tracks >= total_tracks and failed_tracks == 0 and skipped_tracks == 0: + if ( + successful_tracks >= total_tracks + and failed_tracks == 0 + and skipped_tracks == 0 + ): status_to_store = "completed" elif successful_tracks > 0: status_to_store = "partial" @@ -524,18 +612,19 @@ class HistoryManager: except Exception: # Keep provided status pass - + # Prepare metadata metadata = { "callback_type": "album", "status_info": status_info, "copyrights": album.get("copyrights", []), - "tracks": tracks # Store track list in metadata + "tracks": tracks, # Store track list in metadata } - + with self._get_connection() as conn: # Store main album entry - conn.execute(""" + conn.execute( + """ INSERT OR REPLACE INTO download_history ( download_type, title, artists, timestamp, status, service, quality_format, quality_bitrate, total_tracks, successful_tracks, @@ -543,85 +632,102 @@ class HistoryManager: external_ids, metadata, release_date, genres, images, album_type, duration_total_ms ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - "album", - album.get("title", "Unknown"), - json.dumps(artists), - callback_data.get("timestamp", time.time()), - status_to_store, - self._get_primary_service(external_ids), - status_info.get("convert_to"), - status_info.get("bitrate"), - total_tracks, - successful_tracks, - failed_tracks, - skipped_tracks, - children_table, - task_id, - json.dumps(external_ids), - json.dumps(metadata), - json.dumps(self._extract_release_date(album)), - json.dumps(album.get("genres", [])), - json.dumps(self._extract_images(album)), - album.get("album_type"), - total_duration - )) - + """, + ( + "album", + album.get("title", "Unknown"), + json.dumps(artists), + callback_data.get("timestamp", time.time()), + status_to_store, + self._get_primary_service(external_ids), + status_info.get("convert_to"), + status_info.get("bitrate"), + total_tracks, + successful_tracks, + failed_tracks, + skipped_tracks, + children_table, + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps(self._extract_release_date(album)), + json.dumps(album.get("genres", [])), + json.dumps(self._extract_images(album)), + album.get("album_type"), + total_duration, + ), + ) + # If we have a summary (e.g., on cancellation), populate children from it including failed ones try: if summary: - self._populate_album_children_table(children_table, summary, album.get("title", "")) + self._populate_album_children_table( + children_table, summary, album.get("title", "") + ) except Exception as e: - logger.warning(f"Failed to populate children from summary for album {children_table}: {e}") - - logger.info(f"Stored album history for '{album.get('title')}' (task: {task_id}, children: {children_table}, status: {status_to_store})") + logger.warning( + f"Failed to populate children from summary for album {children_table}: {e}" + ) + + logger.info( + f"Stored album history for '{album.get('title')}' (task: {task_id}, children: {children_table}, status: {status_to_store})" + ) return None - + except Exception as e: logger.error(f"Failed to store album history for task {task_id}: {e}") return None - - def store_playlist_history(self, callback_data: Dict, task_id: str, status: str = "completed"): + + def store_playlist_history( + self, callback_data: Dict, task_id: str, status: str = "completed" + ): """ Store playlist download history with children table for individual tracks. - + Args: callback_data: Playlist callback object from deezspot - task_id: Celery task ID + task_id: Celery task ID status: Download status ('completed', 'failed', 'in_progress') - + Returns: Children table name when status is 'in_progress', None otherwise """ try: playlist = callback_data.get("playlist", {}) status_info = callback_data.get("status_info", {}) - + if not playlist: logger.warning(f"No playlist data in callback for task {task_id}") return None - + if status == "in_progress": # Phase 1: Create children table at start, don't store playlist entry yet - children_table = self.create_children_table_for_playlist(callback_data, task_id) - logger.info(f"Playlist download started for task {task_id}, children table: {children_table}") + children_table = self.create_children_table_for_playlist( + callback_data, task_id + ) + logger.info( + f"Playlist download started for task {task_id}, children table: {children_table}" + ) return children_table - + # Phase 2: Store playlist entry in main table (for completed/failed status) external_ids = self._extract_external_ids(playlist) - + # For completed/failed, we need to find the existing children table # This should be stored in task info by the celery task from routes.utils.celery_tasks import get_task_info + task_info = get_task_info(task_id) children_table = task_info.get("children_table") - + if not children_table: # Fallback: generate new children table name (shouldn't happen in normal flow) playlist_uuid = str(uuid.uuid4()).replace("-", "")[:10] children_table = f"playlist_{playlist_uuid}" - logger.warning(f"No children table found for playlist task {task_id}, generating new: {children_table}") - + logger.warning( + f"No children table found for playlist task {task_id}, generating new: {children_table}" + ) + # Extract summary data if available summary = status_info.get("summary", {}) successful_tracks = summary.get("total_successful", 0) @@ -631,9 +737,26 @@ class HistoryManager: # Improve metadata for playlist main row using summary first success/skip/failed track try: if not playlist.get("images"): - for arr_key in ("successful_tracks", "skipped_tracks", "failed_tracks"): + for arr_key in ( + "successful_tracks", + "skipped_tracks", + "failed_tracks", + ): arr = summary.get(arr_key, []) or [] - candidate = (arr[0].get("album") if arr_key == "failed_tracks" and isinstance(arr[0], dict) else (arr[0].get("album") if arr and isinstance(arr[0], dict) else {})) if arr else {} + candidate = ( + ( + arr[0].get("album") + if arr_key == "failed_tracks" + and isinstance(arr[0], dict) + else ( + arr[0].get("album") + if arr and isinstance(arr[0], dict) + else {} + ) + ) + if arr + else {} + ) if candidate and candidate.get("images"): playlist.setdefault("images", candidate.get("images", [])) break @@ -641,14 +764,22 @@ class HistoryManager: pass tracks = playlist.get("tracks", []) - total_tracks = (summary.get("total_successful", 0) + summary.get("total_skipped", 0) + summary.get("total_failed", 0)) or len(tracks) + total_tracks = ( + summary.get("total_successful", 0) + + summary.get("total_skipped", 0) + + summary.get("total_failed", 0) + ) or len(tracks) total_duration = self._calculate_total_duration(tracks) - + # Derive accurate status status_to_store = status try: if total_tracks: - if successful_tracks >= total_tracks and failed_tracks == 0 and skipped_tracks == 0: + if ( + successful_tracks >= total_tracks + and failed_tracks == 0 + and skipped_tracks == 0 + ): status_to_store = "completed" elif successful_tracks > 0: status_to_store = "partial" @@ -661,21 +792,22 @@ class HistoryManager: status_to_store = "partial" except Exception: pass - + # Extract owner information owner = playlist.get("owner", {}) - - # Prepare metadata + + # Prepare metadata metadata = { "callback_type": "playlist", "status_info": status_info, "description": playlist.get("description", ""), - "tracks": tracks # Store track list in metadata + "tracks": tracks, # Store track list in metadata } - + with self._get_connection() as conn: # Store main playlist entry - conn.execute(""" + conn.execute( + """ INSERT OR REPLACE INTO download_history ( download_type, title, artists, timestamp, status, service, quality_format, quality_bitrate, total_tracks, successful_tracks, @@ -683,128 +815,153 @@ class HistoryManager: external_ids, metadata, genres, images, owner, duration_total_ms ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - "playlist", - playlist.get("title", "Unknown"), - json.dumps([owner.get("name", "Unknown")]), # Use owner as "artist" - callback_data.get("timestamp", time.time()), - status_to_store, - self._get_primary_service(external_ids), - status_info.get("convert_to"), - status_info.get("bitrate"), - total_tracks, - successful_tracks, - failed_tracks, - skipped_tracks, - children_table, - task_id, - json.dumps(external_ids), - json.dumps(metadata), - json.dumps([]), # Playlists don't have genres typically - json.dumps(self._extract_images(playlist)), - json.dumps(owner), - total_duration - )) - + """, + ( + "playlist", + playlist.get("title", "Unknown"), + json.dumps( + [owner.get("name", "Unknown")] + ), # Use owner as "artist" + callback_data.get("timestamp", time.time()), + status_to_store, + self._get_primary_service(external_ids), + status_info.get("convert_to"), + status_info.get("bitrate"), + total_tracks, + successful_tracks, + failed_tracks, + skipped_tracks, + children_table, + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps([]), # Playlists don't have genres typically + json.dumps(self._extract_images(playlist)), + json.dumps(owner), + total_duration, + ), + ) + # If we have a summary (e.g., on cancellation), populate children from it including failed ones try: if summary: self._populate_playlist_children_table(children_table, summary) except Exception as e: - logger.warning(f"Failed to populate children from summary for playlist {children_table}: {e}") + logger.warning( + f"Failed to populate children from summary for playlist {children_table}: {e}" + ) - logger.info(f"Stored playlist history for '{playlist.get('title')}' (task: {task_id}, children: {children_table}, status: {status_to_store})") + logger.info( + f"Stored playlist history for '{playlist.get('title')}' (task: {task_id}, children: {children_table}, status: {status_to_store})" + ) return None - + except Exception as e: logger.error(f"Failed to store playlist history for task {task_id}: {e}") return None - - def _populate_album_children_table(self, table_name: str, summary: Dict, album_title: str): + + def _populate_album_children_table( + self, table_name: str, summary: Dict, album_title: str + ): """Populate children table with individual track records from album summary.""" try: # Ensure table exists before population self._create_children_table(table_name) all_rows = [] - + # Add successful tracks for track in summary.get("successful_tracks", []): - track_data = self._prepare_child_track_data(track, album_title, "completed") + track_data = self._prepare_child_track_data( + track, album_title, "completed" + ) all_rows.append(self._map_values_to_row(track_data["values"])) - - # Add failed tracks + + # Add failed tracks for failed_item in summary.get("failed_tracks", []): track = failed_item.get("track", {}) - track_data = self._prepare_child_track_data(track, album_title, "failed") - track_data["metadata"]["failure_reason"] = failed_item.get("reason", "Unknown error") + track_data = self._prepare_child_track_data( + track, album_title, "failed" + ) + track_data["metadata"]["failure_reason"] = failed_item.get( + "reason", "Unknown error" + ) all_rows.append(self._map_values_to_row(track_data["values"])) - + # Add skipped tracks for track in summary.get("skipped_tracks", []): - track_data = self._prepare_child_track_data(track, album_title, "skipped") + track_data = self._prepare_child_track_data( + track, album_title, "skipped" + ) all_rows.append(self._map_values_to_row(track_data["values"])) - + # Upsert all rows with self._get_connection() as conn: for row in all_rows: self._upsert_child_row(conn, table_name, row) - - logger.info(f"Populated {len(all_rows)} tracks in children table {table_name}") - + + logger.info( + f"Populated {len(all_rows)} tracks in children table {table_name}" + ) + except Exception as e: logger.error(f"Failed to populate album children table {table_name}: {e}") - + def _populate_playlist_children_table(self, table_name: str, summary: Dict): """Populate children table with individual track records from playlist summary.""" try: # Ensure table exists before population self._create_children_table(table_name) all_rows = [] - + # Add successful tracks for track in summary.get("successful_tracks", []): track_data = self._prepare_child_track_data(track, "", "completed") all_rows.append(self._map_values_to_row(track_data["values"])) - + # Add failed tracks for failed_item in summary.get("failed_tracks", []): track = failed_item.get("track", {}) track_data = self._prepare_child_track_data(track, "", "failed") - track_data["metadata"]["failure_reason"] = failed_item.get("reason", "Unknown error") + track_data["metadata"]["failure_reason"] = failed_item.get( + "reason", "Unknown error" + ) all_rows.append(self._map_values_to_row(track_data["values"])) - - # Add skipped tracks + + # Add skipped tracks for track in summary.get("skipped_tracks", []): track_data = self._prepare_child_track_data(track, "", "skipped") all_rows.append(self._map_values_to_row(track_data["values"])) - + with self._get_connection() as conn: for row in all_rows: self._upsert_child_row(conn, table_name, row) - - logger.info(f"Populated {len(all_rows)} tracks in children table {table_name}") - + + logger.info( + f"Populated {len(all_rows)} tracks in children table {table_name}" + ) + except Exception as e: - logger.error(f"Failed to populate playlist children table {table_name}: {e}") - - def _prepare_child_track_data(self, track: Dict, default_album: str, status: str) -> Dict: + logger.error( + f"Failed to populate playlist children table {table_name}: {e}" + ) + + def _prepare_child_track_data( + self, track: Dict, default_album: str, status: str + ) -> Dict: """Prepare track data for insertion into children table.""" artists = self._extract_artists(track) external_ids = self._extract_external_ids(track) - + # Get album info album = track.get("album", {}) album_title = album.get("title", default_album) - + # Extract ISRC isrc = external_ids.get("isrc", "") - + # Prepare metadata - metadata = { - "album": album, - "type": track.get("type", "") - } - + metadata = {"album": album, "type": track.get("type", "")} + values = ( track.get("title", "Unknown"), json.dumps(artists), @@ -819,91 +976,105 @@ class HistoryManager: isrc, time.time(), track.get("position", 0), # For playlist tracks - json.dumps(metadata) + json.dumps(metadata), ) - + return {"values": values, "metadata": metadata} - + def update_download_status(self, task_id: str, status: str): """Update download status for existing history entry.""" try: with self._get_connection() as conn: - conn.execute(""" - UPDATE download_history - SET status = ? + conn.execute( + """ + UPDATE download_history + SET status = ? WHERE task_id = ? - """, (status, task_id)) - + """, + (status, task_id), + ) + logger.info(f"Updated download status to '{status}' for task {task_id}") - + except Exception as e: logger.error(f"Failed to update download status for task {task_id}: {e}") - - def get_download_history(self, limit: int = 100, offset: int = 0, - download_type: Optional[str] = None, - status: Optional[str] = None) -> List[Dict]: + + def get_download_history( + self, + limit: int = 100, + offset: int = 0, + download_type: Optional[str] = None, + status: Optional[str] = None, + ) -> List[Dict]: """ Retrieve download history with optional filtering. - + Args: limit: Maximum number of records to return offset: Number of records to skip download_type: Filter by download type ('track', 'album', 'playlist') status: Filter by status ('completed', 'failed', 'skipped', 'in_progress') - + Returns: List of download history records """ try: query = "SELECT * FROM download_history" - params = [] + params: List[Union[str, int]] = [] conditions = [] - + if download_type: conditions.append("download_type = ?") params.append(download_type) - + if status: conditions.append("status = ?") params.append(status) - + if conditions: query += " WHERE " + " AND ".join(conditions) - + query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) - + with self._get_connection() as conn: cursor = conn.execute(query, params) rows = cursor.fetchall() - + # Convert to list of dicts result = [] for row in rows: record = dict(row) # Parse JSON fields - for field in ['artists', 'external_ids', 'metadata', 'release_date', - 'genres', 'images', 'owner']: + for field in [ + "artists", + "external_ids", + "metadata", + "release_date", + "genres", + "images", + "owner", + ]: if record.get(field): try: record[field] = json.loads(record[field]) except (json.JSONDecodeError, TypeError): pass result.append(record) - + return result - + except Exception as e: logger.error(f"Failed to retrieve download history: {e}") return [] - + def get_children_history(self, children_table: str) -> List[Dict]: """ Retrieve track history from a children table. - + Args: children_table: Name of the children table - + Returns: List of track records """ @@ -912,30 +1083,32 @@ class HistoryManager: self._create_children_table(children_table) with self._get_connection() as conn: cursor = conn.execute(f""" - SELECT * FROM {children_table} + SELECT * FROM {children_table} ORDER BY track_number, position """) rows = cursor.fetchall() - + # Convert to list of dicts result = [] for row in rows: record = dict(row) # Parse JSON fields - for field in ['artists', 'external_ids', 'genres', 'metadata']: + for field in ["artists", "external_ids", "genres", "metadata"]: if record.get(field): try: record[field] = json.loads(record[field]) except (json.JSONDecodeError, TypeError): pass result.append(record) - + return result - + except Exception as e: - logger.error(f"Failed to retrieve children history from {children_table}: {e}") + logger.error( + f"Failed to retrieve children history from {children_table}: {e}" + ) return [] - + def get_download_stats(self) -> Dict: """Get download statistics.""" try: @@ -946,217 +1119,270 @@ class HistoryManager: FROM download_history GROUP BY download_type, status """) - type_stats = {} + type_stats: Dict[str, Dict[str, int]] = {} for row in cursor.fetchall(): - download_type = row['download_type'] - status = row['status'] - count = row['count'] - + download_type = row["download_type"] + status = row["status"] + count = row["count"] + if download_type not in type_stats: type_stats[download_type] = {} type_stats[download_type][status] = count - + # Total tracks downloaded (including from albums/playlists) cursor = conn.execute(""" SELECT SUM( - CASE + CASE WHEN download_type = 'track' AND status = 'completed' THEN 1 ELSE COALESCE(successful_tracks, 0) END ) as total_successful_tracks FROM download_history """) - total_tracks = cursor.fetchone()['total_successful_tracks'] or 0 - + total_tracks = cursor.fetchone()["total_successful_tracks"] or 0 + # Recent downloads (last 7 days) week_ago = time.time() - (7 * 24 * 60 * 60) - cursor = conn.execute(""" + cursor = conn.execute( + """ SELECT COUNT(*) as count FROM download_history WHERE timestamp > ? - """, (week_ago,)) - recent_downloads = cursor.fetchone()['count'] - + """, + (week_ago,), + ) + recent_downloads = cursor.fetchone()["count"] + return { "by_type_and_status": type_stats, "total_successful_tracks": total_tracks, - "recent_downloads_7d": recent_downloads + "recent_downloads_7d": recent_downloads, } - + except Exception as e: logger.error(f"Failed to get download stats: {e}") return {} - + def search_history(self, query: str, limit: int = 50) -> List[Dict]: """ Search download history by title or artist. - + Args: query: Search query for title or artist limit: Maximum number of results - + Returns: List of matching download records """ try: search_pattern = f"%{query}%" - + with self._get_connection() as conn: - cursor = conn.execute(""" + cursor = conn.execute( + """ SELECT * FROM download_history WHERE title LIKE ? OR artists LIKE ? ORDER BY timestamp DESC LIMIT ? - """, (search_pattern, search_pattern, limit)) - + """, + (search_pattern, search_pattern, limit), + ) + rows = cursor.fetchall() - + # Convert to list of dicts result = [] for row in rows: record = dict(row) # Parse JSON fields - for field in ['artists', 'external_ids', 'metadata', 'release_date', - 'genres', 'images', 'owner']: + for field in [ + "artists", + "external_ids", + "metadata", + "release_date", + "genres", + "images", + "owner", + ]: if record.get(field): try: record[field] = json.loads(record[field]) except (json.JSONDecodeError, TypeError): pass result.append(record) - + return result - + except Exception as e: logger.error(f"Failed to search download history: {e}") return [] - + def get_download_by_task_id(self, task_id: str) -> Optional[Dict]: """ Get download history entry by task ID. - + Args: task_id: Celery task ID - + Returns: Download record or None if not found """ try: with self._get_connection() as conn: - cursor = conn.execute(""" + cursor = conn.execute( + """ SELECT * FROM download_history WHERE task_id = ? LIMIT 1 - """, (task_id,)) - + """, + (task_id,), + ) + row = cursor.fetchone() if not row: return None - + record = dict(row) # Parse JSON fields - for field in ['artists', 'external_ids', 'metadata', 'release_date', - 'genres', 'images', 'owner']: + for field in [ + "artists", + "external_ids", + "metadata", + "release_date", + "genres", + "images", + "owner", + ]: if record.get(field): try: record[field] = json.loads(record[field]) except (json.JSONDecodeError, TypeError): pass - + return record - + except Exception as e: logger.error(f"Failed to get download by task ID {task_id}: {e}") return None - + def get_recent_downloads(self, limit: int = 20) -> List[Dict]: """Get most recent downloads.""" return self.get_download_history(limit=limit, offset=0) - + def get_failed_downloads(self, limit: int = 50) -> List[Dict]: """Get failed downloads.""" return self.get_download_history(limit=limit, status="failed") - + def clear_old_history(self, days_old: int = 30) -> int: """ Clear download history older than specified days. - + Args: days_old: Number of days old to keep (default 30) - + Returns: Number of records deleted """ try: cutoff_time = time.time() - (days_old * 24 * 60 * 60) - + with self._get_connection() as conn: # Get list of children tables to delete - cursor = conn.execute(""" + cursor = conn.execute( + """ SELECT children_table FROM download_history WHERE timestamp < ? AND children_table IS NOT NULL - """, (cutoff_time,)) - - children_tables = [row['children_table'] for row in cursor.fetchall()] - + """, + (cutoff_time,), + ) + + children_tables = [row["children_table"] for row in cursor.fetchall()] + # Delete main history records - cursor = conn.execute(""" + cursor = conn.execute( + """ DELETE FROM download_history WHERE timestamp < ? - """, (cutoff_time,)) - + """, + (cutoff_time,), + ) + deleted_count = cursor.rowcount - + # Drop children tables for table_name in children_tables: try: conn.execute(f"DROP TABLE IF EXISTS {table_name}") except Exception as e: - logger.warning(f"Failed to drop children table {table_name}: {e}") - - logger.info(f"Cleared {deleted_count} old history records and {len(children_tables)} children tables") + logger.warning( + f"Failed to drop children table {table_name}: {e}" + ) + + logger.info( + f"Cleared {deleted_count} old history records and {len(children_tables)} children tables" + ) return deleted_count - + except Exception as e: logger.error(f"Failed to clear old history: {e}") return 0 # --- New helpers for failed children insertion and metadata enrichment --- - def _populate_failed_children_for_album(self, table_name: str, summary: Dict, album_title: str) -> None: + def _populate_failed_children_for_album( + self, table_name: str, summary: Dict, album_title: str + ) -> None: try: self._create_children_table(table_name) with self._get_connection() as conn: for failed_item in summary.get("failed_tracks", []): track = failed_item.get("track", {}) - track_data = self._prepare_child_track_data(track, album_title, "failed") - track_data["metadata"]["failure_reason"] = failed_item.get("reason", "cancelled") - conn.execute(f""" + track_data = self._prepare_child_track_data( + track, album_title, "failed" + ) + track_data["metadata"]["failure_reason"] = failed_item.get( + "reason", "cancelled" + ) + conn.execute( + f""" INSERT INTO {table_name} ( title, artists, album_title, duration_ms, track_number, disc_number, explicit, status, external_ids, genres, isrc, timestamp, position, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, track_data["values"]) + """, + track_data["values"], + ) except Exception as e: - logger.error(f"Failed to insert failed children for album into {table_name}: {e}") + logger.error( + f"Failed to insert failed children for album into {table_name}: {e}" + ) - def _populate_failed_children_for_playlist(self, table_name: str, summary: Dict) -> None: + def _populate_failed_children_for_playlist( + self, table_name: str, summary: Dict + ) -> None: try: self._create_children_table(table_name) with self._get_connection() as conn: for failed_item in summary.get("failed_tracks", []): track = failed_item.get("track", {}) track_data = self._prepare_child_track_data(track, "", "failed") - track_data["metadata"]["failure_reason"] = failed_item.get("reason", "cancelled") - conn.execute(f""" + track_data["metadata"]["failure_reason"] = failed_item.get( + "reason", "cancelled" + ) + conn.execute( + f""" INSERT INTO {table_name} ( title, artists, album_title, duration_ms, track_number, disc_number, explicit, status, external_ids, genres, isrc, timestamp, position, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, track_data["values"]) + """, + track_data["values"], + ) except Exception as e: - logger.error(f"Failed to insert failed children for playlist into {table_name}: {e}") + logger.error( + f"Failed to insert failed children for playlist into {table_name}: {e}" + ) def _enrich_album_metadata_from_summary(self, album: Dict, summary: Dict) -> Dict: if album.get("images") and album.get("release_date") and album.get("genres"): @@ -1166,14 +1392,20 @@ class HistoryManager: for key in ("successful_tracks", "skipped_tracks", "failed_tracks"): arr = summary.get(key, []) or [] if arr: - src_track = arr[0] if key != "failed_tracks" else (arr[0].get("track") if isinstance(arr[0], dict) else None) + src_track = ( + arr[0] + if key != "failed_tracks" + else (arr[0].get("track") if isinstance(arr[0], dict) else None) + ) break if isinstance(src_track, dict): album_obj = src_track.get("album", {}) or {} album.setdefault("images", album_obj.get("images", [])) album.setdefault("release_date", album_obj.get("release_date", {})) album.setdefault("genres", album_obj.get("genres", [])) - album.setdefault("album_type", album_obj.get("album_type", album.get("album_type"))) + album.setdefault( + "album_type", album_obj.get("album_type", album.get("album_type")) + ) return album # --- Upsert helpers to avoid duplicate children rows and keep most complete --- @@ -1235,18 +1467,31 @@ class HistoryManager: # Consider JSON strings: prefer longer/ non-empty if (old_val in (None, "", 0)) and new_val not in (None, ""): merged[key] = new_val - elif isinstance(new_val, str) and isinstance(old_val, str) and len(new_val) > len(old_val): + elif ( + isinstance(new_val, str) + and isinstance(old_val, str) + and len(new_val) > len(old_val) + ): merged[key] = new_val # Status: keep highest priority - if self._status_priority(new.get("status")) > self._status_priority(existing.get("status")): + if self._status_priority(new.get("status")) > self._status_priority( + existing.get("status") + ): merged["status"] = new.get("status") # Timestamp: keep earliest for creation but allow update to latest timestamp for last update - merged["timestamp"] = max(existing.get("timestamp") or 0, new.get("timestamp") or 0) + merged["timestamp"] = max( + existing.get("timestamp") or 0, new.get("timestamp") or 0 + ) return merged - def _find_existing_child_row(self, conn: sqlite3.Connection, table_name: str, new_row: Dict) -> Optional[Dict]: + def _find_existing_child_row( + self, conn: sqlite3.Connection, table_name: str, new_row: Dict + ) -> Optional[Dict]: try: - cursor = conn.execute(f"SELECT * FROM {table_name} WHERE title = ?", (new_row.get("title", ""),)) + cursor = conn.execute( + f"SELECT * FROM {table_name} WHERE title = ?", + (new_row.get("title", ""),), + ) candidates = [dict(r) for r in cursor.fetchall()] if not candidates: return None @@ -1272,7 +1517,9 @@ class HistoryManager: except Exception: return None - def _upsert_child_row(self, conn: sqlite3.Connection, table_name: str, row: Dict) -> None: + def _upsert_child_row( + self, conn: sqlite3.Connection, table_name: str, row: Dict + ) -> None: existing = self._find_existing_child_row(conn, table_name, row) if existing: merged = self._merge_child_rows(existing, row) @@ -1330,4 +1577,4 @@ class HistoryManager: # Global history manager instance -history_manager = HistoryManager() \ No newline at end of file +history_manager = HistoryManager() diff --git a/routes/utils/playlist.py b/routes/utils/playlist.py index 78c3f40..e268467 100755 --- a/routes/utils/playlist.py +++ b/routes/utils/playlist.py @@ -24,13 +24,17 @@ def download_playlist( progress_callback=None, convert_to=None, bitrate=None, + artist_separator="; ", + recursive_quality=True, _is_celery_task_execution=False, # Added to skip duplicate check from Celery task ): if not _is_celery_task_execution: - existing_task = get_existing_task_id(url) # Check for duplicates only if not called by Celery task + existing_task = get_existing_task_id( + url + ) # Check for duplicates only if not called by Celery task if existing_task: raise DuplicateDownloadError( - f"Download for this URL is already in progress.", + "Download for this URL is already in progress.", existing_task=existing_task, ) try: @@ -93,7 +97,7 @@ def download_playlist( link_playlist=url, # Spotify URL output_dir="./downloads", quality_download=quality, # Deezer quality - recursive_quality=True, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, make_zip=False, @@ -106,6 +110,7 @@ def download_playlist( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: playlist.py - Playlist download via Deezer (account: {fallback}) successful for Spotify URL." @@ -153,7 +158,7 @@ def download_playlist( link_playlist=url, # Spotify URL output_dir="./downloads", quality_download=fall_quality, # Spotify quality - recursive_quality=True, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, make_zip=False, @@ -167,6 +172,7 @@ def download_playlist( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: playlist.py - Spotify direct download (account: {main} for blob) successful." @@ -213,7 +219,7 @@ def download_playlist( link_playlist=url, output_dir="./downloads", quality_download=quality, - recursive_quality=True, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, make_zip=False, @@ -227,6 +233,7 @@ def download_playlist( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: playlist.py - Direct Spotify download (account: {main} for blob) successful." @@ -254,7 +261,7 @@ def download_playlist( link_playlist=url, output_dir="./downloads", quality_download=quality, - recursive_quality=False, # Usually False for playlists to get individual track qualities + recursive_quality=recursive_quality, # Usually False for playlists to get individual track qualities recursive_download=False, make_zip=False, custom_dir_format=custom_dir_format, @@ -266,6 +273,7 @@ def download_playlist( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: playlist.py - Direct Deezer download (account: {main}) successful." diff --git a/routes/utils/track.py b/routes/utils/track.py index 6344816..25a60aa 100755 --- a/routes/utils/track.py +++ b/routes/utils/track.py @@ -25,6 +25,8 @@ def download_track( progress_callback=None, convert_to=None, bitrate=None, + artist_separator="; ", + recursive_quality=False, _is_celery_task_execution=False, # Added for consistency, not currently used for duplicate check ): try: @@ -91,7 +93,7 @@ def download_track( link_track=url, # Spotify URL output_dir="./downloads", quality_download=quality, # Deezer quality - recursive_quality=False, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, custom_dir_format=custom_dir_format, @@ -102,6 +104,7 @@ def download_track( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: track.py - Track download via Deezer (account: {fallback}) successful for Spotify URL." @@ -147,7 +150,7 @@ def download_track( link_track=url, # Spotify URL output_dir="./downloads", quality_download=fall_quality, # Spotify quality - recursive_quality=False, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, real_time_dl=real_time, @@ -160,6 +163,7 @@ def download_track( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: track.py - Spotify direct download (account: {main} for blob) successful." @@ -202,7 +206,7 @@ def download_track( link_track=url, output_dir="./downloads", quality_download=quality, - recursive_quality=False, + recursive_quality=recursive_quality, recursive_download=False, not_interface=False, real_time_dl=real_time, @@ -215,6 +219,7 @@ def download_track( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: track.py - Direct Spotify download (account: {main} for blob) successful." @@ -242,7 +247,7 @@ def download_track( link_track=url, output_dir="./downloads", quality_download=quality, - recursive_quality=False, + recursive_quality=recursive_quality, recursive_download=False, custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, @@ -253,6 +258,7 @@ def download_track( max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, + artist_separator=artist_separator, ) print( f"DEBUG: track.py - Direct Deezer download (account: {main}) successful." diff --git a/routes/utils/watch/db.py b/routes/utils/watch/db.py index 0f7bac0..6600b47 100644 --- a/routes/utils/watch/db.py +++ b/routes/utils/watch/db.py @@ -166,11 +166,11 @@ def init_playlists_db(): "watched playlists", ): conn.commit() - + # Update all existing playlist track tables with new schema _update_all_playlist_track_tables(cursor) conn.commit() - + logger.info( f"Playlists database initialized/updated successfully at {PLAYLISTS_DB_PATH}" ) @@ -183,9 +183,11 @@ def _update_all_playlist_track_tables(cursor: sqlite3.Cursor): """Updates all existing playlist track tables to ensure they have the latest schema.""" try: # Get all table names that start with 'playlist_' - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'playlist_%'") + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'playlist_%'" + ) playlist_tables = cursor.fetchall() - + for table_row in playlist_tables: table_name = table_row[0] if _ensure_table_schema( @@ -194,8 +196,10 @@ def _update_all_playlist_track_tables(cursor: sqlite3.Cursor): EXPECTED_PLAYLIST_TRACKS_COLUMNS, f"playlist tracks ({table_name})", ): - logger.info(f"Updated schema for existing playlist track table: {table_name}") - + logger.info( + f"Updated schema for existing playlist track table: {table_name}" + ) + except sqlite3.Error as e: logger.error(f"Error updating playlist track tables schema: {e}", exc_info=True) @@ -205,7 +209,7 @@ def update_all_existing_tables_schema(): try: with _get_playlists_db_connection() as conn: cursor = conn.cursor() - + # Update main watched_playlists table if _ensure_table_schema( cursor, @@ -214,13 +218,15 @@ def update_all_existing_tables_schema(): "watched playlists", ): logger.info("Updated schema for watched_playlists table") - + # Update all playlist track tables _update_all_playlist_track_tables(cursor) - + conn.commit() - logger.info("Successfully updated all existing tables schema in playlists database") - + logger.info( + "Successfully updated all existing tables schema in playlists database" + ) + except sqlite3.Error as e: logger.error(f"Error updating existing tables schema: {e}", exc_info=True) raise @@ -232,15 +238,17 @@ def ensure_playlist_table_schema(playlist_spotify_id: str): try: with _get_playlists_db_connection() as conn: cursor = conn.cursor() - + # Check if table exists cursor.execute( f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';" ) if cursor.fetchone() is None: - logger.warning(f"Table {table_name} does not exist. Cannot update schema.") + logger.warning( + f"Table {table_name} does not exist. Cannot update schema." + ) return False - + # Update schema if _ensure_table_schema( cursor, @@ -252,11 +260,16 @@ def ensure_playlist_table_schema(playlist_spotify_id: str): logger.info(f"Updated schema for playlist track table: {table_name}") return True else: - logger.info(f"Schema already up-to-date for playlist track table: {table_name}") + logger.info( + f"Schema already up-to-date for playlist track table: {table_name}" + ) return True - + except sqlite3.Error as e: - logger.error(f"Error updating schema for playlist {playlist_spotify_id}: {e}", exc_info=True) + logger.error( + f"Error updating schema for playlist {playlist_spotify_id}: {e}", + exc_info=True, + ) return False @@ -306,10 +319,10 @@ def add_playlist_to_watch(playlist_data: dict): """Adds a playlist to the watched_playlists table and creates its tracks table in playlists.db.""" try: _create_playlist_tracks_table(playlist_data["id"]) - + # Construct Spotify URL manually since external_urls might not be present in metadata spotify_url = f"https://open.spotify.com/playlist/{playlist_data['id']}" - + with _get_playlists_db_connection() as conn: # Use playlists connection cursor = conn.cursor() cursor.execute( @@ -455,10 +468,12 @@ def get_playlist_track_ids_from_db(playlist_spotify_id: str): return track_ids -def get_playlist_tracks_with_snapshot_from_db(playlist_spotify_id: str): +def get_playlist_tracks_with_snapshot_from_db( + playlist_spotify_id: str, +) -> dict[str, dict[str, str]]: """Retrieves all tracks with their snapshot_ids from a specific playlist's tracks table in playlists.db.""" table_name = f"playlist_{playlist_spotify_id.replace('-', '_')}" - tracks_data = {} + tracks_data: dict[str, dict[str, str]] = {} try: with _get_playlists_db_connection() as conn: # Use playlists connection cursor = conn.cursor() @@ -470,7 +485,7 @@ def get_playlist_tracks_with_snapshot_from_db(playlist_spotify_id: str): f"Track table {table_name} does not exist in {PLAYLISTS_DB_PATH}. Cannot fetch track data." ) return tracks_data - + # Ensure the table has the latest schema before querying _ensure_table_schema( cursor, @@ -478,7 +493,7 @@ def get_playlist_tracks_with_snapshot_from_db(playlist_spotify_id: str): EXPECTED_PLAYLIST_TRACKS_COLUMNS, f"playlist tracks ({playlist_spotify_id})", ) - + cursor.execute( f"SELECT spotify_track_id, snapshot_id, title FROM {table_name} WHERE is_present_in_spotify = 1" ) @@ -486,7 +501,7 @@ def get_playlist_tracks_with_snapshot_from_db(playlist_spotify_id: str): for row in rows: tracks_data[row["spotify_track_id"]] = { "snapshot_id": row["snapshot_id"], - "title": row["title"] + "title": row["title"], } return tracks_data except sqlite3.Error as e: @@ -508,7 +523,7 @@ def get_playlist_total_tracks_from_db(playlist_spotify_id: str) -> int: ) if cursor.fetchone() is None: return 0 - + # Ensure the table has the latest schema before querying _ensure_table_schema( cursor, @@ -516,7 +531,7 @@ def get_playlist_total_tracks_from_db(playlist_spotify_id: str) -> int: EXPECTED_PLAYLIST_TRACKS_COLUMNS, f"playlist tracks ({playlist_spotify_id})", ) - + cursor.execute( f"SELECT COUNT(*) as count FROM {table_name} WHERE is_present_in_spotify = 1" ) @@ -530,12 +545,14 @@ def get_playlist_total_tracks_from_db(playlist_spotify_id: str) -> int: return 0 -def add_tracks_to_playlist_db(playlist_spotify_id: str, tracks_data: list, snapshot_id: str = None): +def add_tracks_to_playlist_db( + playlist_spotify_id: str, tracks_data: list, snapshot_id: str = None +): """ Updates existing tracks in the playlist's DB table to mark them as currently present in Spotify and updates their last_seen timestamp and snapshot_id. Also refreshes metadata. Does NOT insert new tracks. New tracks are only added upon successful download. - + Args: playlist_spotify_id: The Spotify playlist ID tracks_data: List of track items from Spotify API @@ -574,8 +591,10 @@ def add_tracks_to_playlist_db(playlist_spotify_id: str, tracks_data: list, snaps track_number = track.get("track_number") # Log the raw track_number value for debugging if track_number is None or track_number == 0: - logger.debug(f"Track '{track.get('name', 'Unknown')}' has track_number: {track_number} (raw API value)") - + logger.debug( + f"Track '{track.get('name', 'Unknown')}' has track_number: {track_number} (raw API value)" + ) + # Prepare tuple for UPDATE statement. # Order: title, artist_names, album_name, album_artist_names, track_number, # album_spotify_id, duration_ms, added_at_playlist, @@ -790,11 +809,16 @@ def remove_specific_tracks_from_playlist_table( return 0 -def add_single_track_to_playlist_db(playlist_spotify_id: str, track_item_for_db: dict, snapshot_id: str = None, task_id: str = None): +def add_single_track_to_playlist_db( + playlist_spotify_id: str, + track_item_for_db: dict, + snapshot_id: str = None, + task_id: str = None, +): """ Adds or updates a single track in the specified playlist's tracks table in playlists.db. Uses deezspot callback data as the source of metadata. - + Args: playlist_spotify_id: The Spotify playlist ID track_item_for_db: Track item data (used only for spotify_track_id and added_at) @@ -802,68 +826,87 @@ def add_single_track_to_playlist_db(playlist_spotify_id: str, track_item_for_db: task_id: Task ID to extract metadata from callback data """ if not task_id: - logger.error(f"No task_id provided for playlist {playlist_spotify_id}. Task ID is required to extract metadata from deezspot callback.") + logger.error( + f"No task_id provided for playlist {playlist_spotify_id}. Task ID is required to extract metadata from deezspot callback." + ) return - + if not track_item_for_db or not track_item_for_db.get("track", {}).get("id"): - logger.error(f"No track_item_for_db or spotify track ID provided for playlist {playlist_spotify_id}") + logger.error( + f"No track_item_for_db or spotify track ID provided for playlist {playlist_spotify_id}" + ) return table_name = f"playlist_{playlist_spotify_id.replace('-', '_')}" - + # Extract metadata ONLY from deezspot callback data try: # Import here to avoid circular imports from routes.utils.celery_tasks import get_last_task_status - + last_status = get_last_task_status(task_id) if not last_status or "raw_callback" not in last_status: - logger.error(f"No raw_callback found in task status for task {task_id}. Cannot extract metadata.") + logger.error( + f"No raw_callback found in task status for task {task_id}. Cannot extract metadata." + ) return - + callback_data = last_status["raw_callback"] - + # Extract metadata from deezspot callback using correct structure from callbacks.ts track_obj = callback_data.get("track", {}) if not track_obj: logger.error(f"No track object found in callback data for task {task_id}") return - + track_name = track_obj.get("title", "N/A") track_number = track_obj.get("track_number", 1) # Default to 1 if missing duration_ms = track_obj.get("duration_ms", 0) - + # Extract artist names from artists array artists = track_obj.get("artists", []) - artist_names = ", ".join([artist.get("name", "") for artist in artists if artist.get("name")]) + artist_names = ", ".join( + [artist.get("name", "") for artist in artists if artist.get("name")] + ) if not artist_names: artist_names = "N/A" - + # Extract album information album_obj = track_obj.get("album", {}) album_name = album_obj.get("title", "N/A") - + # Extract album artist names from album artists array album_artists = album_obj.get("artists", []) - album_artist_names = ", ".join([artist.get("name", "") for artist in album_artists if artist.get("name")]) + album_artist_names = ", ".join( + [artist.get("name", "") for artist in album_artists if artist.get("name")] + ) if not album_artist_names: album_artist_names = "N/A" - - logger.debug(f"Extracted metadata from deezspot callback for '{track_name}': track_number={track_number}") - + + logger.debug( + f"Extracted metadata from deezspot callback for '{track_name}': track_number={track_number}" + ) + except Exception as e: - logger.error(f"Error extracting metadata from task {task_id} callback: {e}", exc_info=True) + logger.error( + f"Error extracting metadata from task {task_id} callback: {e}", + exc_info=True, + ) return current_time = int(time.time()) - + # Get spotify_track_id and added_at from original track_item_for_db track_id = track_item_for_db["track"]["id"] added_at = track_item_for_db.get("added_at") - album_id = track_item_for_db.get("track", {}).get("album", {}).get("id") # Only album ID from original data - - logger.info(f"Adding track '{track_name}' (ID: {track_id}) to playlist {playlist_spotify_id} with track_number: {track_number} (from deezspot callback)") - + album_id = ( + track_item_for_db.get("track", {}).get("album", {}).get("id") + ) # Only album ID from original data + + logger.info( + f"Adding track '{track_name}' (ID: {track_id}) to playlist {playlist_spotify_id} with track_number: {track_number} (from deezspot callback)" + ) + track_data_tuple = ( track_id, track_name, diff --git a/routes/utils/watch/manager.py b/routes/utils/watch/manager.py index 894a076..eb59643 100644 --- a/routes/utils/watch/manager.py +++ b/routes/utils/watch/manager.py @@ -28,7 +28,6 @@ from routes.utils.get_info import ( get_spotify_info, get_playlist_metadata, get_playlist_tracks, - check_playlist_updated, ) # To fetch playlist, track, artist, and album details from routes.utils.celery_queue_manager import download_queue_manager @@ -38,12 +37,12 @@ STOP_EVENT = threading.Event() # Format mapping for audio file conversions AUDIO_FORMAT_EXTENSIONS = { - 'mp3': '.mp3', - 'flac': '.flac', - 'm4a': '.m4a', - 'aac': '.m4a', - 'ogg': '.ogg', - 'wav': '.wav', + "mp3": ".mp3", + "flac": ".flac", + "m4a": ".m4a", + "aac": ".m4a", + "ogg": ".ogg", + "wav": ".wav", } DEFAULT_WATCH_CONFIG = { @@ -106,11 +105,11 @@ def has_playlist_changed(playlist_spotify_id: str, current_snapshot_id: str) -> """ Check if a playlist has changed by comparing snapshot_id. This is much more efficient than fetching all tracks. - + Args: playlist_spotify_id: The Spotify playlist ID current_snapshot_id: The current snapshot_id from API - + Returns: True if playlist has changed, False otherwise """ @@ -119,29 +118,33 @@ def has_playlist_changed(playlist_spotify_id: str, current_snapshot_id: str) -> if not db_playlist: # Playlist not in database, consider it as "changed" to trigger initial processing return True - + last_snapshot_id = db_playlist.get("snapshot_id") if not last_snapshot_id: # No previous snapshot_id, consider it as "changed" to trigger initial processing return True - + return current_snapshot_id != last_snapshot_id - + except Exception as e: - logger.error(f"Error checking playlist change status for {playlist_spotify_id}: {e}") + logger.error( + f"Error checking playlist change status for {playlist_spotify_id}: {e}" + ) # On error, assume playlist has changed to be safe return True -def needs_track_sync(playlist_spotify_id: str, current_snapshot_id: str, api_total_tracks: int) -> tuple[bool, list[str]]: +def needs_track_sync( + playlist_spotify_id: str, current_snapshot_id: str, api_total_tracks: int +) -> tuple[bool, list[str]]: """ Check if tracks need to be synchronized by comparing snapshot_ids and total counts. - + Args: playlist_spotify_id: The Spotify playlist ID current_snapshot_id: The current snapshot_id from API api_total_tracks: The total number of tracks reported by API - + Returns: Tuple of (needs_sync, tracks_to_find) where: - needs_sync: True if tracks need to be synchronized @@ -151,7 +154,7 @@ def needs_track_sync(playlist_spotify_id: str, current_snapshot_id: str, api_tot # Get tracks from database with their snapshot_ids db_tracks = get_playlist_tracks_with_snapshot_from_db(playlist_spotify_id) db_total_tracks = get_playlist_total_tracks_from_db(playlist_spotify_id) - + # Check if total count matches if db_total_tracks != api_total_tracks: logger.info( @@ -160,39 +163,41 @@ def needs_track_sync(playlist_spotify_id: str, current_snapshot_id: str, api_tot # Always do full sync when counts don't match to ensure we don't miss any tracks # This handles cases like: # - Empty database (DB=0, API=1345) - # - Missing tracks (DB=1000, API=1345) + # - Missing tracks (DB=1000, API=1345) # - Removed tracks (DB=1345, API=1000) return True, [] # Empty list indicates full sync needed - + # Check if any tracks have different snapshot_id tracks_to_find = [] for track_id, track_data in db_tracks.items(): if track_data.get("snapshot_id") != current_snapshot_id: tracks_to_find.append(track_id) - + if tracks_to_find: logger.info( f"Found {len(tracks_to_find)} tracks with outdated snapshot_id for playlist {playlist_spotify_id}" ) return True, tracks_to_find - + return False, [] - + except Exception as e: logger.error(f"Error checking track sync status for {playlist_spotify_id}: {e}") # On error, assume sync is needed to be safe return True, [] -def find_tracks_in_playlist(playlist_spotify_id: str, tracks_to_find: list[str], current_snapshot_id: str) -> tuple[list, list]: +def find_tracks_in_playlist( + playlist_spotify_id: str, tracks_to_find: list[str], current_snapshot_id: str +) -> tuple[list, list]: """ Progressively fetch playlist tracks until all specified tracks are found or playlist is exhausted. - + Args: playlist_spotify_id: The Spotify playlist ID tracks_to_find: List of track IDs to find current_snapshot_id: The current snapshot_id - + Returns: Tuple of (found_tracks, not_found_tracks) where: - found_tracks: List of track items that were found @@ -202,24 +207,28 @@ def find_tracks_in_playlist(playlist_spotify_id: str, tracks_to_find: list[str], not_found_tracks = tracks_to_find.copy() offset = 0 limit = 100 - + logger.info( f"Searching for {len(tracks_to_find)} tracks in playlist {playlist_spotify_id} starting from offset {offset}" ) - + while not_found_tracks and offset < 10000: # Safety limit try: - tracks_batch = get_playlist_tracks(playlist_spotify_id, limit=limit, offset=offset) - + tracks_batch = get_playlist_tracks( + playlist_spotify_id, limit=limit, offset=offset + ) + if not tracks_batch or "items" not in tracks_batch: - logger.warning(f"No tracks returned for playlist {playlist_spotify_id} at offset {offset}") + logger.warning( + f"No tracks returned for playlist {playlist_spotify_id} at offset {offset}" + ) break - + batch_items = tracks_batch.get("items", []) if not batch_items: logger.info(f"No more tracks found at offset {offset}") break - + # Check each track in this batch for track_item in batch_items: track = track_item.get("track") @@ -229,22 +238,24 @@ def find_tracks_in_playlist(playlist_spotify_id: str, tracks_to_find: list[str], found_tracks.append(track_item) not_found_tracks.remove(track_id) logger.debug(f"Found track {track_id} at offset {offset}") - + offset += len(batch_items) - + # Add small delay between batches time.sleep(0.1) - + except Exception as e: - logger.error(f"Error fetching tracks batch for playlist {playlist_spotify_id} at offset {offset}: {e}") + logger.error( + f"Error fetching tracks batch for playlist {playlist_spotify_id} at offset {offset}: {e}" + ) break - + logger.info( f"Track search complete for playlist {playlist_spotify_id}: " f"Found {len(found_tracks)}/{len(tracks_to_find)} tracks, " f"Not found: {len(not_found_tracks)}" ) - + return found_tracks, not_found_tracks @@ -283,7 +294,7 @@ def check_watched_playlists(specific_playlist_id: str = None): try: # Ensure the playlist's track table has the latest schema before processing ensure_playlist_table_schema(playlist_spotify_id) - + # First, get playlist metadata to check if it has changed current_playlist_metadata = get_playlist_metadata(playlist_spotify_id) if not current_playlist_metadata: @@ -293,17 +304,23 @@ def check_watched_playlists(specific_playlist_id: str = None): continue api_snapshot_id = current_playlist_metadata.get("snapshot_id") - api_total_tracks = current_playlist_metadata.get("tracks", {}).get("total", 0) - + api_total_tracks = current_playlist_metadata.get("tracks", {}).get( + "total", 0 + ) + # Enhanced snapshot_id checking with track-level tracking if use_snapshot_checking: # First check if playlist snapshot_id has changed - playlist_changed = has_playlist_changed(playlist_spotify_id, api_snapshot_id) - + playlist_changed = has_playlist_changed( + playlist_spotify_id, api_snapshot_id + ) + if not playlist_changed: # Even if playlist snapshot_id hasn't changed, check if individual tracks need sync - needs_sync, tracks_to_find = needs_track_sync(playlist_spotify_id, api_snapshot_id, api_total_tracks) - + needs_sync, tracks_to_find = needs_track_sync( + playlist_spotify_id, api_snapshot_id, api_total_tracks + ) + if not needs_sync: logger.info( f"Playlist Watch Manager: Playlist '{playlist_name}' ({playlist_spotify_id}) has not changed since last check (snapshot_id: {api_snapshot_id}). Skipping detailed check." @@ -321,19 +338,25 @@ def check_watched_playlists(specific_playlist_id: str = None): f"Playlist Watch Manager: Playlist '{playlist_name}' snapshot_id unchanged, but {len(tracks_to_find)} tracks need sync. Proceeding with targeted check." ) # Use targeted track search instead of full fetch - found_tracks, not_found_tracks = find_tracks_in_playlist(playlist_spotify_id, tracks_to_find, api_snapshot_id) - + found_tracks, not_found_tracks = find_tracks_in_playlist( + playlist_spotify_id, tracks_to_find, api_snapshot_id + ) + # Update found tracks with new snapshot_id if found_tracks: - add_tracks_to_playlist_db(playlist_spotify_id, found_tracks, api_snapshot_id) - + add_tracks_to_playlist_db( + playlist_spotify_id, found_tracks, api_snapshot_id + ) + # Mark not found tracks as removed if not_found_tracks: logger.info( f"Playlist Watch Manager: {len(not_found_tracks)} tracks not found in playlist '{playlist_name}'. Marking as removed." ) - mark_tracks_as_not_present_in_spotify(playlist_spotify_id, not_found_tracks) - + mark_tracks_as_not_present_in_spotify( + playlist_spotify_id, not_found_tracks + ) + # Update the playlist's m3u file after tracks are removed try: logger.info( @@ -347,7 +370,9 @@ def check_watched_playlists(specific_playlist_id: str = None): ) # Update playlist snapshot and continue to next playlist - update_playlist_snapshot(playlist_spotify_id, api_snapshot_id, api_total_tracks) + update_playlist_snapshot( + playlist_spotify_id, api_snapshot_id, api_total_tracks + ) logger.info( f"Playlist Watch Manager: Finished targeted sync for playlist '{playlist_name}'. Snapshot ID updated to {api_snapshot_id}." ) @@ -369,18 +394,18 @@ def check_watched_playlists(specific_playlist_id: str = None): logger.info( f"Playlist Watch Manager: Fetching all tracks for playlist '{playlist_name}' ({playlist_spotify_id}) with {api_total_tracks} total tracks." ) - + all_api_track_items = [] offset = 0 limit = 100 # Use maximum batch size for efficiency - + while offset < api_total_tracks: try: # Use the optimized get_playlist_tracks function tracks_batch = get_playlist_tracks( playlist_spotify_id, limit=limit, offset=offset ) - + if not tracks_batch or "items" not in tracks_batch: logger.warning( f"Playlist Watch Manager: No tracks returned for playlist {playlist_spotify_id} at offset {offset}" @@ -390,14 +415,14 @@ def check_watched_playlists(specific_playlist_id: str = None): batch_items = tracks_batch.get("items", []) if not batch_items: break - + all_api_track_items.extend(batch_items) offset += len(batch_items) - + # Add small delay between batches to be respectful to API if offset < api_total_tracks: time.sleep(0.1) - + except Exception as e: logger.error( f"Playlist Watch Manager: Error fetching tracks batch for playlist {playlist_spotify_id} at offset {offset}: {e}" @@ -482,7 +507,9 @@ def check_watched_playlists(specific_playlist_id: str = None): logger.info( f"Playlist Watch Manager: Refreshing {len(all_api_track_items)} tracks from API in local DB for playlist '{playlist_name}'." ) - add_tracks_to_playlist_db(playlist_spotify_id, all_api_track_items, api_snapshot_id) + add_tracks_to_playlist_db( + playlist_spotify_id, all_api_track_items, api_snapshot_id + ) removed_db_ids = db_track_ids - current_api_track_ids if removed_db_ids: @@ -504,7 +531,7 @@ def check_watched_playlists(specific_playlist_id: str = None): logger.error( f"Failed to update m3u file for playlist '{playlist_name}' after playlist changes: {m3u_update_err}", exc_info=True, - ) + ) update_playlist_snapshot( playlist_spotify_id, api_snapshot_id, api_total_tracks @@ -564,11 +591,11 @@ def check_watched_artists(specific_artist_id: str = None): all_artist_albums_from_api: List[Dict[str, Any]] = [] offset = 0 limit = 50 # Spotify API limit for artist albums - + logger.info( f"Artist Watch Manager: Fetching albums for artist '{artist_name}' ({artist_spotify_id})" ) - + while True: logger.debug( f"Artist Watch Manager: Fetching albums for {artist_spotify_id}. Limit: {limit}, Offset: {offset}" @@ -810,13 +837,18 @@ def start_watch_manager(): # Renamed from start_playlist_watch_manager init_playlists_db() # For playlists init_artists_db() # For artists - + # Update all existing tables to ensure they have the latest schema try: update_all_existing_tables_schema() - logger.info("Watch Manager: Successfully updated all existing tables schema") + logger.info( + "Watch Manager: Successfully updated all existing tables schema" + ) except Exception as e: - logger.error(f"Watch Manager: Error updating existing tables schema: {e}", exc_info=True) + logger.error( + f"Watch Manager: Error updating existing tables schema: {e}", + exc_info=True, + ) _watch_scheduler_thread = threading.Thread( target=playlist_watch_scheduler, daemon=True @@ -847,22 +879,26 @@ def stop_watch_manager(): # Renamed from stop_playlist_watch_manager def get_playlist_tracks_for_m3u(playlist_spotify_id: str) -> List[Dict[str, Any]]: """ Get all tracks for a playlist from the database with complete metadata needed for m3u generation. - + Args: playlist_spotify_id: The Spotify playlist ID - + Returns: List of track dictionaries with metadata """ table_name = f"playlist_{playlist_spotify_id.replace('-', '_')}" - tracks = [] - + tracks: List[Dict[str, Any]] = [] + try: - from routes.utils.watch.db import _get_playlists_db_connection, _ensure_table_schema, EXPECTED_PLAYLIST_TRACKS_COLUMNS - + from routes.utils.watch.db import ( + _get_playlists_db_connection, + _ensure_table_schema, + EXPECTED_PLAYLIST_TRACKS_COLUMNS, + ) + with _get_playlists_db_connection() as conn: cursor = conn.cursor() - + # Check if table exists cursor.execute( f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';" @@ -872,7 +908,7 @@ def get_playlist_tracks_for_m3u(playlist_spotify_id: str) -> List[Dict[str, Any] f"Track table {table_name} does not exist. Cannot generate m3u file." ) return tracks - + # Ensure the table has the latest schema before querying _ensure_table_schema( cursor, @@ -880,30 +916,33 @@ def get_playlist_tracks_for_m3u(playlist_spotify_id: str) -> List[Dict[str, Any] EXPECTED_PLAYLIST_TRACKS_COLUMNS, f"playlist tracks ({playlist_spotify_id})", ) - + # Get all tracks that are present in Spotify cursor.execute(f""" - SELECT spotify_track_id, title, artist_names, album_name, + SELECT spotify_track_id, title, artist_names, album_name, album_artist_names, track_number, duration_ms - FROM {table_name} + FROM {table_name} WHERE is_present_in_spotify = 1 ORDER BY track_number, title """) - + rows = cursor.fetchall() for row in rows: - tracks.append({ - "spotify_track_id": row["spotify_track_id"], - "title": row["title"] or "Unknown Track", - "artist_names": row["artist_names"] or "Unknown Artist", - "album_name": row["album_name"] or "Unknown Album", - "album_artist_names": row["album_artist_names"] or "Unknown Artist", - "track_number": row["track_number"] or 0, - "duration_ms": row["duration_ms"] or 0, - }) - + tracks.append( + { + "spotify_track_id": row["spotify_track_id"], + "title": row["title"] or "Unknown Track", + "artist_names": row["artist_names"] or "Unknown Artist", + "album_name": row["album_name"] or "Unknown Album", + "album_artist_names": row["album_artist_names"] + or "Unknown Artist", + "track_number": row["track_number"] or 0, + "duration_ms": row["duration_ms"] or 0, + } + ) + return tracks - + except Exception as e: logger.error( f"Error retrieving tracks for m3u generation for playlist {playlist_spotify_id}: {e}", @@ -912,17 +951,22 @@ def get_playlist_tracks_for_m3u(playlist_spotify_id: str) -> List[Dict[str, Any] return tracks -def generate_track_file_path(track: Dict[str, Any], custom_dir_format: str, custom_track_format: str, convert_to: str = None) -> str: +def generate_track_file_path( + track: Dict[str, Any], + custom_dir_format: str, + custom_track_format: str, + convert_to: str = None, +) -> str: """ Generate the file path for a track based on custom format strings. This mimics the path generation logic used by the deezspot library. - + Args: track: Track metadata dictionary custom_dir_format: Directory format string (e.g., "%ar_album%/%album%") custom_track_format: Track format string (e.g., "%tracknum%. %music% - %artist%") convert_to: Target conversion format (e.g., "mp3", "flac", "m4a") - + Returns: Generated file path relative to output directory """ @@ -934,23 +978,25 @@ def generate_track_file_path(track: Dict[str, Any], custom_dir_format: str, cust title = track.get("title", "Unknown Track") track_number = track.get("track_number", 0) duration_ms = track.get("duration_ms", 0) - + # Use album artist for directory structure, main artist for track name main_artist = artist_names.split(", ")[0] if artist_names else "Unknown Artist" - album_artist = album_artist_names.split(", ")[0] if album_artist_names else main_artist - + album_artist = ( + album_artist_names.split(", ")[0] if album_artist_names else main_artist + ) + # Clean names for filesystem def clean_name(name): # Remove or replace characters that are problematic in filenames - name = re.sub(r'[<>:"/\\|?*]', '_', str(name)) - name = re.sub(r'[\x00-\x1f]', '', name) # Remove control characters + name = re.sub(r'[<>:"/\\|?*]', "_", str(name)) + name = re.sub(r"[\x00-\x1f]", "", name) # Remove control characters return name.strip() - + clean_album_artist = clean_name(album_artist) clean_album = clean_name(album_name) clean_main_artist = clean_name(main_artist) clean_title = clean_name(title) - + # Prepare placeholder replacements replacements = { # Common placeholders @@ -960,58 +1006,66 @@ def generate_track_file_path(track: Dict[str, Any], custom_dir_format: str, cust "%ar_album%": clean_album_artist, "%tracknum%": f"{track_number:02d}" if track_number > 0 else "00", "%year%": "", # Not available in current DB schema - # Additional placeholders (not available in current DB schema, using defaults) "%discnum%": "01", # Default to disc 1 "%date%": "", # Not available "%genre%": "", # Not available "%isrc%": "", # Not available "%explicit%": "", # Not available - "%duration%": str(duration_ms // 1000) if duration_ms > 0 else "0", # Convert ms to seconds + "%duration%": str(duration_ms // 1000) + if duration_ms > 0 + else "0", # Convert ms to seconds } - + # Apply replacements to directory format dir_path = custom_dir_format for placeholder, value in replacements.items(): dir_path = dir_path.replace(placeholder, value) - + # Apply replacements to track format track_filename = custom_track_format for placeholder, value in replacements.items(): track_filename = track_filename.replace(placeholder, value) - + # Combine and clean up path full_path = os.path.join(dir_path, track_filename) full_path = os.path.normpath(full_path) - + # Determine file extension based on convert_to setting or default to mp3 - if not any(full_path.lower().endswith(ext) for ext in ['.mp3', '.flac', '.m4a', '.ogg', '.wav']): + if not any( + full_path.lower().endswith(ext) + for ext in [".mp3", ".flac", ".m4a", ".ogg", ".wav"] + ): if convert_to: - extension = AUDIO_FORMAT_EXTENSIONS.get(convert_to.lower(), '.mp3') + extension = AUDIO_FORMAT_EXTENSIONS.get(convert_to.lower(), ".mp3") full_path += extension else: - full_path += '.mp3' # Default fallback - + full_path += ".mp3" # Default fallback + return full_path - + except Exception as e: - logger.error(f"Error generating file path for track {track.get('title', 'Unknown')}: {e}") + logger.error( + f"Error generating file path for track {track.get('title', 'Unknown')}: {e}" + ) # Return a fallback path with appropriate extension - safe_title = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', str(track.get('title', 'Unknown Track'))) - + safe_title = re.sub( + r'[<>:"/\\|?*\x00-\x1f]', "_", str(track.get("title", "Unknown Track")) + ) + # Determine extension for fallback if convert_to: - extension = AUDIO_FORMAT_EXTENSIONS.get(convert_to.lower(), '.mp3') + extension = AUDIO_FORMAT_EXTENSIONS.get(convert_to.lower(), ".mp3") else: - extension = '.mp3' - + extension = ".mp3" + return f"Unknown Artist/Unknown Album/{safe_title}{extension}" def update_playlist_m3u_file(playlist_spotify_id: str): """ Generate/update the m3u file for a watched playlist based on tracks in the database. - + Args: playlist_spotify_id: The Spotify playlist ID """ @@ -1019,62 +1073,79 @@ def update_playlist_m3u_file(playlist_spotify_id: str): # Get playlist metadata playlist_info = get_watched_playlist(playlist_spotify_id) if not playlist_info: - logger.warning(f"Playlist {playlist_spotify_id} not found in watched playlists. Cannot update m3u file.") + logger.warning( + f"Playlist {playlist_spotify_id} not found in watched playlists. Cannot update m3u file." + ) return - + playlist_name = playlist_info.get("name", "Unknown Playlist") - + # Get configuration settings from routes.utils.celery_config import get_config_params + config = get_config_params() - + custom_dir_format = config.get("customDirFormat", "%ar_album%/%album%") custom_track_format = config.get("customTrackFormat", "%tracknum%. %music%") convert_to = config.get("convertTo") # Get conversion format setting - output_dir = "./downloads" # This matches the output_dir used in download functions - + output_dir = ( + "./downloads" # This matches the output_dir used in download functions + ) + # Get all tracks for the playlist tracks = get_playlist_tracks_for_m3u(playlist_spotify_id) - + if not tracks: - logger.info(f"No tracks found for playlist '{playlist_name}'. M3U file will be empty or removed.") - + logger.info( + f"No tracks found for playlist '{playlist_name}'. M3U file will be empty or removed." + ) + # Clean playlist name for filename - safe_playlist_name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', playlist_name).strip() - + safe_playlist_name = re.sub( + r'[<>:"/\\|?*\x00-\x1f]', "_", playlist_name + ).strip() + # Create m3u file path playlists_dir = Path(output_dir) / "playlists" playlists_dir.mkdir(parents=True, exist_ok=True) m3u_file_path = playlists_dir / f"{safe_playlist_name}.m3u" - + # Generate m3u content m3u_lines = ["#EXTM3U"] - + for track in tracks: # Generate file path for this track - track_file_path = generate_track_file_path(track, custom_dir_format, custom_track_format, convert_to) - + track_file_path = generate_track_file_path( + track, custom_dir_format, custom_track_format, convert_to + ) + # Create relative path from m3u file location to track file # M3U file is in ./downloads/playlists/ # Track files are in ./downloads/{custom_dir_format}/ relative_path = os.path.join("..", track_file_path) - relative_path = relative_path.replace("\\", "/") # Use forward slashes for m3u compatibility - + relative_path = relative_path.replace( + "\\", "/" + ) # Use forward slashes for m3u compatibility + # Add EXTINF line with track duration and title - duration_seconds = (track.get("duration_ms", 0) // 1000) if track.get("duration_ms") else -1 + duration_seconds = ( + (track.get("duration_ms", 0) // 1000) + if track.get("duration_ms") + else -1 + ) artist_and_title = f"{track.get('artist_names', 'Unknown Artist')} - {track.get('title', 'Unknown Track')}" - + m3u_lines.append(f"#EXTINF:{duration_seconds},{artist_and_title}") m3u_lines.append(relative_path) - + # Write m3u file - with open(m3u_file_path, 'w', encoding='utf-8') as f: - f.write('\n'.join(m3u_lines)) - + with open(m3u_file_path, "w", encoding="utf-8") as f: + f.write("\n".join(m3u_lines)) + logger.info( f"Updated m3u file for playlist '{playlist_name}' at {m3u_file_path} with {len(tracks)} tracks{f' (format: {convert_to})' if convert_to else ''}." ) - + except Exception as e: logger.error( f"Error updating m3u file for playlist {playlist_spotify_id}: {e}", diff --git a/spotizerr-ui/package.json b/spotizerr-ui/package.json index 38a2f5d..2d55918 100644 --- a/spotizerr-ui/package.json +++ b/spotizerr-ui/package.json @@ -1,7 +1,7 @@ { "name": "spotizerr-ui", "private": true, - "version": "3.0.5", + "version": "3.0.6", "type": "module", "scripts": { "dev": "vite", diff --git a/spotizerr-ui/src/components/config/DownloadsTab.tsx b/spotizerr-ui/src/components/config/DownloadsTab.tsx index 732cf5c..c21e68f 100644 --- a/spotizerr-ui/src/components/config/DownloadsTab.tsx +++ b/spotizerr-ui/src/components/config/DownloadsTab.tsx @@ -21,6 +21,7 @@ interface DownloadSettings { hlsThreads: number; deezerQuality: "MP3_128" | "MP3_320" | "FLAC"; spotifyQuality: "NORMAL" | "HIGH" | "VERY_HIGH"; + recursiveQuality?: boolean; // frontend field (mapped to recursive_quality on save) } interface WatchConfig { @@ -49,8 +50,14 @@ const CONVERSION_FORMATS: Record = { }; // --- API Functions --- -const saveDownloadConfig = async (data: Partial) => { - const { data: response } = await authApiClient.client.post("/config", data); +const saveDownloadConfig = async (data: Partial & { recursive_quality?: boolean }) => { + // Map camelCase to snake_case for backend compatibility + const payload: any = { ...data }; + if (typeof data.recursiveQuality !== "undefined") { + payload.recursive_quality = data.recursiveQuality; + delete payload.recursiveQuality; + } + const { data: response } = await authApiClient.client.post("/config", payload); return response; }; @@ -189,6 +196,10 @@ export function DownloadsTab({ config, isLoading }: DownloadsTabProps) { +
+ + +
{/* Watch validation info */} {watchConfig?.enabled && ( diff --git a/spotizerr-ui/src/components/config/FormattingTab.tsx b/spotizerr-ui/src/components/config/FormattingTab.tsx index 6c5b798..02bb4ca 100644 --- a/spotizerr-ui/src/components/config/FormattingTab.tsx +++ b/spotizerr-ui/src/components/config/FormattingTab.tsx @@ -14,6 +14,7 @@ interface FormattingSettings { album: string; playlist: string; compilation: string; + artistSeparator: string; } interface FormattingTabProps { @@ -23,7 +24,12 @@ interface FormattingTabProps { // --- API Functions --- const saveFormattingConfig = async (data: Partial) => { - const { data: response } = await authApiClient.client.post("/config", data); + const payload: any = { ...data }; + if (typeof data.artistSeparator !== "undefined") { + payload.artist_separator = data.artistSeparator; + delete payload.artistSeparator; + } + const { data: response } = await authApiClient.client.post("/config", payload); return response; }; @@ -160,6 +166,17 @@ export function FormattingTab({ config, isLoading }: FormattingTabProps) { className="h-6 w-6 rounded" /> +
+ + +
diff --git a/spotizerr-ui/src/contexts/SettingsProvider.tsx b/spotizerr-ui/src/contexts/SettingsProvider.tsx index f1ebdfb..d5958c4 100644 --- a/spotizerr-ui/src/contexts/SettingsProvider.tsx +++ b/spotizerr-ui/src/contexts/SettingsProvider.tsx @@ -57,6 +57,7 @@ export type FlatAppSettings = { album: string; playlist: string; compilation: string; + artistSeparator: string; }; const defaultSettings: FlatAppSettings = { @@ -89,6 +90,7 @@ const defaultSettings: FlatAppSettings = { album: "{artist_name}/{album_name}", playlist: "Playlists/{playlist_name}", compilation: "Compilations/{album_name}", + artistSeparator: "; ", watch: { enabled: false, }, diff --git a/spotizerr-ui/src/contexts/settings-context.ts b/spotizerr-ui/src/contexts/settings-context.ts index dab378d..319e593 100644 --- a/spotizerr-ui/src/contexts/settings-context.ts +++ b/spotizerr-ui/src/contexts/settings-context.ts @@ -31,6 +31,7 @@ export interface AppSettings { album: string; playlist: string; compilation: string; + artistSeparator: string; watch: { enabled: boolean; // Add other watch properties from the old type if they still exist in the API response diff --git a/spotizerr-ui/src/types/settings.ts b/spotizerr-ui/src/types/settings.ts index 7ef39c1..cc9031f 100644 --- a/spotizerr-ui/src/types/settings.ts +++ b/spotizerr-ui/src/types/settings.ts @@ -29,6 +29,7 @@ export interface AppSettings { album: string; playlist: string; compilation: string; + artistSeparator: string; watch: { enabled: boolean; // Add other watch properties from the old type if they still exist in the API response