From 5b261e45f320d83c5d3df187f402f472c0a559f7 Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Wed, 20 Aug 2025 09:36:42 -0500 Subject: [PATCH] fix: #271 --- requirements.txt | 2 +- routes/utils/history_manager.py | 2614 ++++++++++++++++--------------- 2 files changed, 1322 insertions(+), 1294 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9e7efac..1d9e200 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ fastapi==0.116.1 uvicorn[standard]==0.35.0 celery==5.5.3 -deezspot-spotizerr==2.7.1 +deezspot-spotizerr==2.7.2 httpx==0.28.1 bcrypt==4.2.1 PyJWT==2.10.1 diff --git a/routes/utils/history_manager.py b/routes/utils/history_manager.py index 33a7da1..26b9913 100644 --- a/routes/utils/history_manager.py +++ b/routes/utils/history_manager.py @@ -11,55 +11,55 @@ logger = logging.getLogger(__name__) class HistoryManager: - """ - Manages download history storage using SQLite database. - Stores hierarchical download data from deezspot callback objects. - """ + """ + Manages download history storage using SQLite database. + Stores hierarchical download data from deezspot callback objects. + """ - def __init__(self, db_path: str = "data/history/download_history.db"): - """ - Initialize the history manager with database path. + def __init__(self, db_path: str = "data/history/download_history.db"): + """ + Initialize the history manager with database path. - Args: - db_path: Path to SQLite database file - """ - self.db_path = Path(db_path) - self.db_path.parent.mkdir(parents=True, exist_ok=True) - self._ensure_database_exists() + Args: + db_path: Path to SQLite database file + """ + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._ensure_database_exists() - def _ensure_database_exists(self): - """Create database and main table if they don't exist and migrate schema safely.""" - expected_download_history_columns: Dict[str, str] = { - "id": "INTEGER PRIMARY KEY AUTOINCREMENT", - "download_type": "TEXT NOT NULL", - "title": "TEXT NOT NULL", - "artists": "TEXT", - "timestamp": "REAL NOT NULL", - "status": "TEXT NOT NULL", - "service": "TEXT", - "quality_format": "TEXT", - "quality_bitrate": "TEXT", - "total_tracks": "INTEGER", - "successful_tracks": "INTEGER", - "failed_tracks": "INTEGER", - "skipped_tracks": "INTEGER", - "children_table": "TEXT", - "task_id": "TEXT", - "external_ids": "TEXT", - "metadata": "TEXT", - "release_date": "TEXT", - "genres": "TEXT", - "images": "TEXT", - "owner": "TEXT", - "album_type": "TEXT", - "duration_total_ms": "INTEGER", - "explicit": "BOOLEAN", - } + def _ensure_database_exists(self): + """Create database and main table if they don't exist and migrate schema safely.""" + expected_download_history_columns: Dict[str, str] = { + "id": "INTEGER PRIMARY KEY AUTOINCREMENT", + "download_type": "TEXT NOT NULL", + "title": "TEXT NOT NULL", + "artists": "TEXT", + "timestamp": "REAL NOT NULL", + "status": "TEXT NOT NULL", + "service": "TEXT", + "quality_format": "TEXT", + "quality_bitrate": "TEXT", + "total_tracks": "INTEGER", + "successful_tracks": "INTEGER", + "failed_tracks": "INTEGER", + "skipped_tracks": "INTEGER", + "children_table": "TEXT", + "task_id": "TEXT", + "external_ids": "TEXT", + "metadata": "TEXT", + "release_date": "TEXT", + "genres": "TEXT", + "images": "TEXT", + "owner": "TEXT", + "album_type": "TEXT", + "duration_total_ms": "INTEGER", + "explicit": "BOOLEAN", + } - with self._get_connection() as conn: - cursor = conn.cursor() - # 1) Create table if missing with minimal schema - cursor.execute(""" + with self._get_connection() as conn: + cursor = conn.cursor() + # 1) Create table if missing with minimal schema + cursor.execute(""" CREATE TABLE IF NOT EXISTS download_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, download_type TEXT NOT NULL, @@ -67,215 +67,215 @@ class HistoryManager: ) """) - # 2) Ensure/upgrade schema columns idempotently - self._ensure_table_schema( - cursor, - "download_history", - expected_download_history_columns, - "download history", - ) + # 2) Ensure/upgrade schema columns idempotently + self._ensure_table_schema( + cursor, + "download_history", + expected_download_history_columns, + "download history", + ) - # 2.5) Backfill defaults for critical columns to avoid NULLs post-migration - try: - cursor.execute("PRAGMA table_info(download_history)") - cols = {row[1] for row in cursor.fetchall()} - if "title" in cols: - cursor.execute( - """ + # 2.5) Backfill defaults for critical columns to avoid NULLs post-migration + try: + cursor.execute("PRAGMA table_info(download_history)") + cols = {row[1] for row in cursor.fetchall()} + if "title" in cols: + cursor.execute( + """ UPDATE download_history SET title = COALESCE(NULLIF(title, ''), 'Unknown') WHERE title IS NULL OR title = '' """ - ) - if "status" in cols: - cursor.execute( - """ + ) + if "status" in cols: + cursor.execute( + """ UPDATE download_history SET status = COALESCE(NULLIF(status, ''), 'unknown') WHERE status IS NULL OR status = '' """ - ) - if "download_type" in cols: - cursor.execute( - """ + ) + if "download_type" in cols: + cursor.execute( + """ UPDATE download_history SET download_type = COALESCE(NULLIF(download_type, ''), 'unknown') WHERE download_type IS NULL OR download_type = '' """ - ) - except Exception as e: - logger.warning( - f"Non-fatal: failed to backfill defaults for download_history: {e}" - ) + ) + except Exception as e: + logger.warning( + f"Non-fatal: failed to backfill defaults for download_history: {e}" + ) - # 3) Migrate legacy columns to new ones (best-effort, non-fatal) - try: - cursor.execute("PRAGMA table_info(download_history)") - cols = {row[1] for row in cursor.fetchall()} + # 3) Migrate legacy columns to new ones (best-effort, non-fatal) + try: + cursor.execute("PRAGMA table_info(download_history)") + cols = {row[1] for row in cursor.fetchall()} - # Legacy timestamp columns → timestamp - if "timestamp" not in cols: - # Add column first - cursor.execute( - "ALTER TABLE download_history ADD COLUMN timestamp REAL" - ) - # Backfill from legacy columns if present - legacy_time_cols = [ - c for c in ["time", "created_at", "date"] if c in cols - ] - if legacy_time_cols: - # Pick the first legacy column to backfill - legacy_col = legacy_time_cols[0] - try: - cursor.execute( - f"UPDATE download_history SET timestamp = CASE WHEN {legacy_col} IS NOT NULL THEN {legacy_col} ELSE strftime('%s','now') END" - ) - except sqlite3.Error: - # Fallback: just set to now - cursor.execute( - "UPDATE download_history SET timestamp = strftime('%s','now')" - ) - else: - # Default all to now if nothing to migrate - cursor.execute( - "UPDATE download_history SET timestamp = strftime('%s','now')" - ) + # Legacy timestamp columns → timestamp + if "timestamp" not in cols: + # Add column first + cursor.execute( + "ALTER TABLE download_history ADD COLUMN timestamp REAL" + ) + # Backfill from legacy columns if present + legacy_time_cols = [ + c for c in ["time", "created_at", "date"] if c in cols + ] + if legacy_time_cols: + # Pick the first legacy column to backfill + legacy_col = legacy_time_cols[0] + try: + cursor.execute( + f"UPDATE download_history SET timestamp = CASE WHEN {legacy_col} IS NOT NULL THEN {legacy_col} ELSE strftime('%s','now') END" + ) + except sqlite3.Error: + # Fallback: just set to now + cursor.execute( + "UPDATE download_history SET timestamp = strftime('%s','now')" + ) + else: + # Default all to now if nothing to migrate + cursor.execute( + "UPDATE download_history SET timestamp = strftime('%s','now')" + ) - # quality → quality_format, bitrate → quality_bitrate - # Handle common legacy pairs non-fataly - cursor.execute("PRAGMA table_info(download_history)") - cols = {row[1] for row in cursor.fetchall()} - if "quality_format" not in cols and "quality" in cols: - cursor.execute( - "ALTER TABLE download_history ADD COLUMN quality_format TEXT" - ) - try: - cursor.execute( - "UPDATE download_history SET quality_format = quality WHERE quality_format IS NULL" - ) - except sqlite3.Error: - pass - if "quality_bitrate" not in cols and "bitrate" in cols: - cursor.execute( - "ALTER TABLE download_history ADD COLUMN quality_bitrate TEXT" - ) - try: - cursor.execute( - "UPDATE download_history SET quality_bitrate = bitrate WHERE quality_bitrate IS NULL" - ) - except sqlite3.Error: - pass - except Exception as e: - logger.warning( - f"Non-fatal: failed legacy column migration for download_history: {e}" - ) + # quality → quality_format, bitrate → quality_bitrate + # Handle common legacy pairs non-fataly + cursor.execute("PRAGMA table_info(download_history)") + cols = {row[1] for row in cursor.fetchall()} + if "quality_format" not in cols and "quality" in cols: + cursor.execute( + "ALTER TABLE download_history ADD COLUMN quality_format TEXT" + ) + try: + cursor.execute( + "UPDATE download_history SET quality_format = quality WHERE quality_format IS NULL" + ) + except sqlite3.Error: + pass + if "quality_bitrate" not in cols and "bitrate" in cols: + cursor.execute( + "ALTER TABLE download_history ADD COLUMN quality_bitrate TEXT" + ) + try: + cursor.execute( + "UPDATE download_history SET quality_bitrate = bitrate WHERE quality_bitrate IS NULL" + ) + except sqlite3.Error: + pass + except Exception as e: + logger.warning( + f"Non-fatal: failed legacy column migration for download_history: {e}" + ) - # 4) Create indexes only if columns exist (avoid startup failures) - try: - cursor.execute("PRAGMA table_info(download_history)") - cols = {row[1] for row in cursor.fetchall()} + # 4) Create indexes only if columns exist (avoid startup failures) + try: + cursor.execute("PRAGMA table_info(download_history)") + cols = {row[1] for row in cursor.fetchall()} - if "timestamp" in cols: - cursor.execute(""" + if "timestamp" in cols: + cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_download_history_timestamp ON download_history(timestamp) """) - if {"download_type", "status"}.issubset(cols): - cursor.execute(""" + if {"download_type", "status"}.issubset(cols): + cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_download_history_type_status ON download_history(download_type, status) """) - if "task_id" in cols: - cursor.execute(""" + if "task_id" in cols: + cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_download_history_task_id ON download_history(task_id) """) - # Preserve uniqueness from previous schema using a unique index (safer than table constraint for migrations) - if {"task_id", "download_type", "external_ids"}.issubset(cols): - cursor.execute( - """ + # Preserve uniqueness from previous schema using a unique index (safer than table constraint for migrations) + if {"task_id", "download_type", "external_ids"}.issubset(cols): + cursor.execute( + """ CREATE UNIQUE INDEX IF NOT EXISTS uq_download_history_task_type_ids ON download_history(task_id, download_type, external_ids) """ - ) - except Exception as e: - logger.warning( - f"Non-fatal: failed to create indexes for download_history: {e}" - ) + ) + except Exception as e: + logger.warning( + f"Non-fatal: failed to create indexes for download_history: {e}" + ) - # 5) Best-effort upgrade of existing children tables (album_*, playlist_*) - try: - self._migrate_existing_children_tables(cursor) - except Exception as e: - logger.warning(f"Non-fatal: failed to migrate children tables: {e}") + # 5) Best-effort upgrade of existing children tables (album_*, playlist_*) + try: + self._migrate_existing_children_tables(cursor) + except Exception as e: + logger.warning(f"Non-fatal: failed to migrate children tables: {e}") - @contextmanager - def _get_connection(self): - """Get database connection with proper error handling.""" - conn = None - try: - conn = sqlite3.connect(str(self.db_path)) - conn.row_factory = sqlite3.Row # Enable dict-like row access - yield conn - conn.commit() - except Exception as e: - if conn: - conn.rollback() - logger.error(f"Database error: {e}") - raise - finally: - if conn: - conn.close() + @contextmanager + def _get_connection(self): + """Get database connection with proper error handling.""" + conn = None + try: + conn = sqlite3.connect(str(self.db_path)) + conn.row_factory = sqlite3.Row # Enable dict-like row access + yield conn + conn.commit() + except Exception as e: + if conn: + conn.rollback() + logger.error(f"Database error: {e}") + raise + finally: + if conn: + conn.close() - def _ensure_table_schema( - self, - cursor: sqlite3.Cursor, - table_name: str, - expected_columns: Dict[str, str], - table_description: str, - ) -> None: - """Ensure all expected columns exist in the given table, adding any missing columns.""" - try: - cursor.execute(f"PRAGMA table_info({table_name})") - existing_info = cursor.fetchall() - existing_names = {row[1] for row in existing_info} + def _ensure_table_schema( + self, + cursor: sqlite3.Cursor, + table_name: str, + expected_columns: Dict[str, str], + table_description: str, + ) -> None: + """Ensure all expected columns exist in the given table, adding any missing columns.""" + try: + cursor.execute(f"PRAGMA table_info({table_name})") + existing_info = cursor.fetchall() + existing_names = {row[1] for row in existing_info} - for col_name, col_type in expected_columns.items(): - if col_name not in existing_names: - # Avoid adding PRIMARY KEY or NOT NULL on existing tables; strip them for ALTER - col_type_for_add = ( - col_type.replace("PRIMARY KEY", "") - .replace("AUTOINCREMENT", "") - .replace("NOT NULL", "") - .strip() - ) - try: - cursor.execute( - f"ALTER TABLE {table_name} ADD COLUMN {col_name} {col_type_for_add}" - ) - logger.info( - f"Added missing column '{col_name} {col_type_for_add}' to {table_description} table '{table_name}'." - ) - except sqlite3.Error as e: - logger.warning( - f"Could not add column '{col_name}' to {table_description} table '{table_name}': {e}" - ) - except sqlite3.Error as e: - logger.error( - f"Error ensuring schema for {table_description} table '{table_name}': {e}" - ) + for col_name, col_type in expected_columns.items(): + if col_name not in existing_names: + # Avoid adding PRIMARY KEY or NOT NULL on existing tables; strip them for ALTER + col_type_for_add = ( + col_type.replace("PRIMARY KEY", "") + .replace("AUTOINCREMENT", "") + .replace("NOT NULL", "") + .strip() + ) + try: + cursor.execute( + f"ALTER TABLE {table_name} ADD COLUMN {col_name} {col_type_for_add}" + ) + logger.info( + f"Added missing column '{col_name} {col_type_for_add}' to {table_description} table '{table_name}'." + ) + except sqlite3.Error as e: + logger.warning( + f"Could not add column '{col_name}' to {table_description} table '{table_name}': {e}" + ) + except sqlite3.Error as e: + logger.error( + f"Error ensuring schema for {table_description} table '{table_name}': {e}" + ) - def _create_children_table(self, table_name: str): - """ - Create a children table for storing individual tracks of an album/playlist. - Ensures schema upgrades for existing tables. + def _create_children_table(self, table_name: str): + """ + Create a children table for storing individual tracks of an album/playlist. + Ensures schema upgrades for existing tables. - Args: - table_name: Name of the children table (e.g., 'album_abc123') - """ - with self._get_connection() as conn: - cursor = conn.cursor() - cursor.execute(f""" + Args: + table_name: Name of the children table (e.g., 'album_abc123') + """ + with self._get_connection() as conn: + cursor = conn.cursor() + cursor.execute(f""" CREATE TABLE IF NOT EXISTS {table_name} ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, @@ -294,184 +294,190 @@ class HistoryManager: metadata TEXT ) """) - expected_children_columns = { - "id": "INTEGER PRIMARY KEY AUTOINCREMENT", - "title": "TEXT NOT NULL", - "artists": "TEXT", - "album_title": "TEXT", - "duration_ms": "INTEGER", - "track_number": "INTEGER", - "disc_number": "INTEGER", - "explicit": "BOOLEAN", - "status": "TEXT NOT NULL", - "external_ids": "TEXT", - "genres": "TEXT", - "isrc": "TEXT", - "timestamp": "REAL NOT NULL", - "position": "INTEGER", - "metadata": "TEXT", - } - self._ensure_table_schema( - cursor, table_name, expected_children_columns, "children history" - ) + expected_children_columns = { + "id": "INTEGER PRIMARY KEY AUTOINCREMENT", + "title": "TEXT NOT NULL", + "artists": "TEXT", + "album_title": "TEXT", + "duration_ms": "INTEGER", + "track_number": "INTEGER", + "disc_number": "INTEGER", + "explicit": "BOOLEAN", + "status": "TEXT NOT NULL", + "external_ids": "TEXT", + "genres": "TEXT", + "isrc": "TEXT", + "timestamp": "REAL NOT NULL", + "position": "INTEGER", + "metadata": "TEXT", + } + self._ensure_table_schema( + cursor, table_name, expected_children_columns, "children history" + ) - def _migrate_existing_children_tables(self, cursor: sqlite3.Cursor) -> None: - """Find album_* and playlist_* children tables and ensure they have the expected schema.""" - cursor.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE 'album_%' OR name LIKE 'playlist_%')" - ) - tables = [row[0] for row in cursor.fetchall() if row[0] != "download_history"] - for t in tables: - try: - # Ensure existence + schema upgrades - cursor.execute( - f"CREATE TABLE IF NOT EXISTS {t} (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)" - ) - self._create_children_table(t) - except Exception as e: - logger.warning(f"Non-fatal: failed to migrate children table {t}: {e}") + def _migrate_existing_children_tables(self, cursor: sqlite3.Cursor) -> None: + """Find album_* and playlist_* children tables and ensure they have the expected schema.""" + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE 'album_%' OR name LIKE 'playlist_%')" + ) + tables = [row[0] for row in cursor.fetchall() if row[0] != "download_history"] + for t in tables: + try: + # Ensure existence + schema upgrades + cursor.execute( + f"CREATE TABLE IF NOT EXISTS {t} (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)" + ) + self._create_children_table(t) + except Exception as e: + logger.warning(f"Non-fatal: failed to migrate children table {t}: {e}") - def _extract_artists(self, obj: Dict) -> List[str]: - """Extract artist names from various object types.""" - artists = obj.get("artists", []) - if not artists: - return [] + def _extract_artists(self, obj: Dict) -> List[str]: + """Extract artist names from various object types.""" + artists = obj.get("artists", []) + if not artists: + return [] - artist_names = [] - for artist in artists: - if isinstance(artist, dict): - name = artist.get("name", "") - if name: - artist_names.append(name) - elif isinstance(artist, str): - artist_names.append(artist) + artist_names = [] + for artist in artists: + if isinstance(artist, dict): + name = artist.get("name", "") + if name: + artist_names.append(name) + elif isinstance(artist, str): + artist_names.append(artist) - return artist_names + return artist_names - def _extract_external_ids(self, obj: Dict) -> Dict: - """Extract external service IDs from object.""" - return obj.get("ids", {}) + def _extract_external_ids(self, obj: Dict) -> Dict: + """Extract external service IDs from object.""" + return obj.get("ids", {}) - def _extract_images(self, obj: Dict) -> List[Dict]: - """Extract image information from object.""" - return obj.get("images", []) + def _extract_images(self, obj: Dict) -> List[Dict]: + """Extract image information from object.""" + return obj.get("images", []) - def _extract_release_date(self, obj: Dict) -> Dict: - """Extract release date information from object.""" - return obj.get("release_date", {}) + def _extract_release_date(self, obj: Dict) -> Dict: + """Extract release date information from object.""" + return obj.get("release_date", {}) - def _calculate_total_duration(self, tracks: List[Dict]) -> int: - """Calculate total duration from tracks list.""" - total = 0 - for track in tracks: - duration = track.get("duration_ms", 0) - if duration: - total += duration - return total + def _calculate_total_duration(self, tracks: List[Dict]) -> int: + """Calculate total duration from tracks list.""" + total = 0 + for track in tracks: + duration = track.get("duration_ms", 0) + if duration: + total += duration + return total - def _get_primary_service(self, external_ids: Dict) -> str: - """Determine primary service from external IDs.""" - if "spotify" in external_ids: - return "spotify" - elif "deezer" in external_ids: - return "deezer" - else: - return "unknown" + def _get_primary_service(self, external_ids: Dict) -> str: + """Determine primary service from external IDs.""" + if "spotify" in external_ids: + return "spotify" + elif "deezer" in external_ids: + return "deezer" + else: + return "unknown" - def create_children_table_for_album(self, callback_data: Dict, task_id: str) -> str: - """ - Create children table for album download at the start and return table name. + def create_children_table_for_album(self, callback_data: Dict, task_id: str) -> str: + """ + Create children table for album download at the start and return table name. - Args: - callback_data: Album callback object from deezspot - task_id: Celery task ID + Args: + callback_data: Album callback object from deezspot + task_id: Celery task ID - Returns: - Children table name - """ - # Generate children table name - album_uuid = str(uuid.uuid4()).replace("-", "")[:10] - children_table = f"album_{album_uuid}" + Returns: + Children table name + """ + # Generate children table name + album_uuid = str(uuid.uuid4()).replace("-", "")[:10] + children_table = f"album_{album_uuid}" - # Create the children table - self._create_children_table(children_table) + # Create the children table + self._create_children_table(children_table) - logger.info(f"Created album children table {children_table} for task {task_id}") - return children_table + logger.info(f"Created album children table {children_table} for task {task_id}") + return children_table - def create_children_table_for_playlist( - self, callback_data: Dict, task_id: str - ) -> str: - """ - Create children table for playlist download at the start and return table name. + def create_children_table_for_playlist( + self, callback_data: Dict, task_id: str + ) -> str: + """ + Create children table for playlist download at the start and return table name. - Args: - callback_data: Playlist callback object from deezspot - task_id: Celery task ID + Args: + callback_data: Playlist callback object from deezspot + task_id: Celery task ID - Returns: - Children table name - """ - # Generate children table name - playlist_uuid = str(uuid.uuid4()).replace("-", "")[:10] - children_table = f"playlist_{playlist_uuid}" + Returns: + Children table name + """ + # Generate children table name + playlist_uuid = str(uuid.uuid4()).replace("-", "")[:10] + children_table = f"playlist_{playlist_uuid}" - # Create the children table - self._create_children_table(children_table) + # Create the children table + self._create_children_table(children_table) - logger.info( - f"Created playlist children table {children_table} for task {task_id}" - ) - return children_table + logger.info( + f"Created playlist children table {children_table} for task {task_id}" + ) + return children_table - def store_track_history( - self, - callback_data: Dict, - task_id: str, - status: str = "completed", - table: str = "download_history", - ): - """ - Store individual track download history. + def store_track_history( + self, + callback_data: Dict, + task_id: str, + status: str = "completed", + table: str = "download_history", + ): + """ + Store individual track download history. - Args: - callback_data: Track callback object from deezspot - task_id: Celery task ID - status: Download status ('completed', 'failed', 'skipped') - table: Target table name (defaults to 'download_history', can be a children table name) - """ - try: - track = callback_data.get("track", {}) - status_info = callback_data.get("status_info", {}) + Args: + callback_data: Track callback object from deezspot + task_id: Celery task ID + status: Download status ('completed', 'failed', 'skipped') + table: Target table name (defaults to 'download_history', can be a children table name) + """ + try: + track = callback_data.get("track", {}) + status_info = callback_data.get("status_info", {}) - if not track: - logger.warning(f"No track data in callback for task {task_id}") - return + if not track: + logger.warning(f"No track data in callback for task {task_id}") + return - artists = self._extract_artists(track) - external_ids = self._extract_external_ids(track) + artists = self._extract_artists(track) + external_ids = self._extract_external_ids(track) - album = track.get("album", {}) - album_title = album.get("title", "") + # Prefer service/quality/bitrate from summary when available + summary = status_info.get("summary") or {} + service = summary.get("service") or self._get_primary_service(external_ids) + quality_format = summary.get("quality") or status_info.get("convert_to") + quality_bitrate = summary.get("bitrate") or status_info.get("bitrate") - # Prepare metadata - metadata = { - "callback_type": "track", - "parent": callback_data.get("parent"), - "current_track": callback_data.get("current_track"), - "total_tracks": callback_data.get("total_tracks"), - "album": album, - "status_info": status_info, - } + album = track.get("album", {}) + album_title = album.get("title", "") - with self._get_connection() as conn: - if table == "download_history": - # Store in main download_history table - logger.info( - f"Storing track '{track.get('title', 'Unknown')}' in MAIN table for task {task_id}" - ) - conn.execute( - """ + # Prepare metadata + metadata = { + "callback_type": "track", + "parent": callback_data.get("parent"), + "current_track": callback_data.get("current_track"), + "total_tracks": callback_data.get("total_tracks"), + "album": album, + "status_info": status_info, + } + + with self._get_connection() as conn: + if table == "download_history": + # Store in main download_history table + logger.info( + f"Storing track '{track.get('title', 'Unknown')}' in MAIN table for task {task_id}" + ) + conn.execute( + """ INSERT OR REPLACE INTO download_history ( download_type, title, artists, timestamp, status, service, quality_format, quality_bitrate, task_id, external_ids, @@ -479,186 +485,198 @@ class HistoryManager: duration_total_ms ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - ( - "track", - track.get("title", "Unknown"), - json.dumps(artists), - callback_data.get("timestamp", time.time()), - status, - self._get_primary_service(external_ids), - status_info.get("convert_to"), - status_info.get("bitrate"), - task_id, - json.dumps(external_ids), - json.dumps(metadata), - json.dumps(self._extract_release_date(album)), - json.dumps(track.get("genres", [])), - track.get("explicit", False), - album.get("album_type"), - track.get("duration_ms", 0), - ), - ) - else: - # Ensure target children table exists before write - self._create_children_table(table) - # Store in children table (for album/playlist tracks) - logger.info( - f"Storing track '{track.get('title', 'Unknown')}' in CHILDREN table '{table}' for task {task_id}" - ) - # Extract ISRC - isrc = external_ids.get("isrc", "") + ( + "track", + track.get("title", "Unknown"), + json.dumps(artists), + callback_data.get("timestamp", time.time()), + status, + service, + quality_format, + quality_bitrate, + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps(self._extract_release_date(album)), + json.dumps(track.get("genres", [])), + track.get("explicit", False), + album.get("album_type"), + track.get("duration_ms", 0), + ), + ) + else: + # Ensure target children table exists before write + self._create_children_table(table) + # Store in children table (for album/playlist tracks) + logger.info( + f"Storing track '{track.get('title', 'Unknown')}' in CHILDREN table '{table}' for task {task_id}" + ) + # Extract ISRC + isrc = external_ids.get("isrc", "") - # Prepare children table metadata - children_metadata = { - "album": album, - "type": track.get("type", ""), - "callback_type": "track", - "parent": callback_data.get("parent"), - "current_track": callback_data.get("current_track"), - "total_tracks": callback_data.get("total_tracks"), - "status_info": status_info, - } + # Prepare children table metadata + children_metadata = { + "album": album, + "type": track.get("type", ""), + "callback_type": "track", + "parent": callback_data.get("parent"), + "current_track": callback_data.get("current_track"), + "total_tracks": callback_data.get("total_tracks"), + "status_info": status_info, + } - conn.execute( - f""" + conn.execute( + f""" INSERT INTO {table} ( title, artists, album_title, duration_ms, track_number, disc_number, explicit, status, external_ids, genres, isrc, timestamp, position, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - ( - track.get("title", "Unknown"), - json.dumps(artists), - album_title, - track.get("duration_ms", 0), - track.get("track_number", 0), - track.get("disc_number", 1), - track.get("explicit", False), - status, - json.dumps(external_ids), - json.dumps(track.get("genres", [])), - isrc, - callback_data.get("timestamp", time.time()), - track.get("position", 0), # For playlist tracks - json.dumps(children_metadata), - ), - ) + ( + track.get("title", "Unknown"), + json.dumps(artists), + album_title, + track.get("duration_ms", 0), + track.get("track_number", 0), + track.get("disc_number", 1), + track.get("explicit", False), + status, + json.dumps(external_ids), + json.dumps(track.get("genres", [])), + isrc, + callback_data.get("timestamp", time.time()), + track.get("position", 0), # For playlist tracks + json.dumps(children_metadata), + ), + ) - logger.info( - f"Successfully stored track '{track.get('title')}' in table '{table}' (task: {task_id})" - ) + logger.info( + f"Successfully stored track '{track.get('title')}' in table '{table}' (task: {task_id})" + ) - except Exception as e: - logger.error(f"Failed to store track history for task {task_id}: {e}") + except Exception as e: + logger.error(f"Failed to store track history for task {task_id}: {e}") - def store_album_history( - self, callback_data: Dict, task_id: str, status: str = "completed" - ): - """ - Store album download history with children table for individual tracks. + def store_album_history( + self, callback_data: Dict, task_id: str, status: str = "completed" + ): + """ + Store album download history with children table for individual tracks. - Args: - callback_data: Album callback object from deezspot - task_id: Celery task ID - status: Download status ('completed', 'failed', 'in_progress') + Args: + callback_data: Album callback object from deezspot + task_id: Celery task ID + status: Download status ('completed', 'failed', 'in_progress') - Returns: - Children table name when status is 'in_progress', None otherwise - """ - try: - album = callback_data.get("album", {}) - status_info = callback_data.get("status_info", {}) + Returns: + Children table name when status is 'in_progress', None otherwise + """ + try: + album = callback_data.get("album", {}) + status_info = callback_data.get("status_info", {}) - if not album: - logger.warning(f"No album data in callback for task {task_id}") - return None + if not album: + logger.warning(f"No album data in callback for task {task_id}") + return None - if status == "in_progress": - # Phase 1: Create children table at start, don't store album entry yet - children_table = self.create_children_table_for_album( - callback_data, task_id - ) - logger.info( - f"Album download started for task {task_id}, children table: {children_table}" - ) - return children_table + if status == "in_progress": + # Phase 1: Create children table at start, don't store album entry yet + children_table = self.create_children_table_for_album( + callback_data, task_id + ) + logger.info( + f"Album download started for task {task_id}, children table: {children_table}" + ) + return children_table - # Phase 2: Store album entry in main table (for completed/failed status) - artists = self._extract_artists(album) - external_ids = self._extract_external_ids(album) + # Phase 2: Store album entry in main table (for completed/failed status) + artists = self._extract_artists(album) + external_ids = self._extract_external_ids(album) - # For completed/failed, we need to find the existing children table - # This should be stored in task info by the celery task - from routes.utils.celery_tasks import get_task_info + # For completed/failed, we need to find the existing children table + # This should be stored in task info by the celery task + from routes.utils.celery_tasks import get_task_info - task_info = get_task_info(task_id) - children_table = task_info.get("children_table") + task_info = get_task_info(task_id) + children_table = task_info.get("children_table") - if not children_table: - # Fallback: generate new children table name (shouldn't happen in normal flow) - album_uuid = str(uuid.uuid4()).replace("-", "")[:10] - children_table = f"album_{album_uuid}" - logger.warning( - f"No children table found for album task {task_id}, generating new: {children_table}" - ) + if not children_table: + # Fallback: generate new children table name (shouldn't happen in normal flow) + album_uuid = str(uuid.uuid4()).replace("-", "")[:10] + children_table = f"album_{album_uuid}" + logger.warning( + f"No children table found for album task {task_id}, generating new: {children_table}" + ) - # Extract summary data if available (from 'done' status) - summary = status_info.get("summary", {}) - successful_tracks = summary.get("total_successful", 0) - failed_tracks = summary.get("total_failed", 0) - skipped_tracks = summary.get("total_skipped", 0) - total_tracks = summary.get("total_successful", 0) + summary.get( - "total_skipped", 0 - ) + summary.get("total_failed", 0) or album.get("total_tracks", 0) + # Extract summary data if available (from 'done' status) + summary = status_info.get("summary", {}) + successful_tracks = summary.get("total_successful", 0) + failed_tracks = summary.get("total_failed", 0) + skipped_tracks = summary.get("total_skipped", 0) + total_tracks = summary.get("total_successful", 0) + summary.get( + "total_skipped", 0 + ) + summary.get("total_failed", 0) or album.get("total_tracks", 0) - # Enrich album metadata if missing - try: - album = self._enrich_album_metadata_from_summary(album, summary) - except Exception: - pass + # Enrich album metadata if missing + try: + album = self._enrich_album_metadata_from_summary(album, summary) + except Exception: + pass - # Calculate total duration - tracks = album.get("tracks", []) - total_duration = self._calculate_total_duration(tracks) + # Calculate total duration + tracks = album.get("tracks", []) + total_duration = self._calculate_total_duration(tracks) - # Derive accurate status if we have counters - status_to_store = status - try: - if total_tracks: - if ( - successful_tracks >= total_tracks - and failed_tracks == 0 - and skipped_tracks == 0 - ): - status_to_store = "completed" - elif successful_tracks > 0: - status_to_store = "partial" - else: - # None succeeded but there are failures/skips or unknown issues - status_to_store = "failed" - else: - # Fallback: if any failure recorded, mark failed/partial conservatively - if failed_tracks > 0 and successful_tracks == 0: - status_to_store = "failed" - elif failed_tracks > 0 and successful_tracks > 0: - status_to_store = "partial" - except Exception: - # Keep provided status - pass + # Derive accurate status if we have counters + status_to_store = status + try: + if total_tracks: + if ( + successful_tracks >= total_tracks + and failed_tracks == 0 + and skipped_tracks == 0 + ): + status_to_store = "completed" + elif successful_tracks > 0: + status_to_store = "partial" + else: + # None succeeded but there are failures/skips or unknown issues + status_to_store = "failed" + else: + # Fallback: if any failure recorded, mark failed/partial conservatively + if failed_tracks > 0 and successful_tracks == 0: + status_to_store = "failed" + elif failed_tracks > 0 and successful_tracks > 0: + status_to_store = "partial" + except Exception: + # Keep provided status + pass - # Prepare metadata - metadata = { - "callback_type": "album", - "status_info": status_info, - "copyrights": album.get("copyrights", []), - "tracks": tracks, # Store track list in metadata - } + # Prepare metadata + metadata = { + "callback_type": "album", + "status_info": status_info, + "copyrights": album.get("copyrights", []), + "tracks": tracks, # Store track list in metadata + } - with self._get_connection() as conn: - # Store main album entry - conn.execute( - """ + with self._get_connection() as conn: + # Store main album entry + + # Prefer service/quality/bitrate from summary when available + service_to_store = summary.get("service") or self._get_primary_service( + external_ids + ) + quality_format_to_store = summary.get("quality") or status_info.get( + "convert_to" + ) + quality_bitrate_to_store = summary.get("bitrate") or status_info.get( + "bitrate" + ) + + conn.execute( + """ INSERT OR REPLACE INTO download_history ( download_type, title, artists, timestamp, status, service, quality_format, quality_bitrate, total_tracks, successful_tracks, @@ -667,181 +685,193 @@ class HistoryManager: album_type, duration_total_ms ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - ( - "album", - album.get("title", "Unknown"), - json.dumps(artists), - callback_data.get("timestamp", time.time()), - status_to_store, - self._get_primary_service(external_ids), - status_info.get("convert_to"), - status_info.get("bitrate"), - total_tracks, - successful_tracks, - failed_tracks, - skipped_tracks, - children_table, - task_id, - json.dumps(external_ids), - json.dumps(metadata), - json.dumps(self._extract_release_date(album)), - json.dumps(album.get("genres", [])), - json.dumps(self._extract_images(album)), - album.get("album_type"), - total_duration, - ), - ) + ( + "album", + album.get("title", "Unknown"), + json.dumps(artists), + callback_data.get("timestamp", time.time()), + status_to_store, + service_to_store, + quality_format_to_store, + quality_bitrate_to_store, + total_tracks, + successful_tracks, + failed_tracks, + skipped_tracks, + children_table, + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps(self._extract_release_date(album)), + json.dumps(album.get("genres", [])), + json.dumps(self._extract_images(album)), + album.get("album_type"), + total_duration, + ), + ) - # If we have a summary (e.g., on cancellation), populate children from it including failed ones - try: - if summary: - self._populate_album_children_table( - children_table, summary, album.get("title", "") - ) - except Exception as e: - logger.warning( - f"Failed to populate children from summary for album {children_table}: {e}" - ) + # If we have a summary (e.g., on cancellation), populate children from it including failed ones + try: + if summary: + self._populate_album_children_table( + children_table, summary, album.get("title", "") + ) + except Exception as e: + logger.warning( + f"Failed to populate children from summary for album {children_table}: {e}" + ) - logger.info( - f"Stored album history for '{album.get('title')}' (task: {task_id}, children: {children_table}, status: {status_to_store})" - ) - return None + logger.info( + f"Stored album history for '{album.get('title')}' (task: {task_id}, children: {children_table}, status: {status_to_store})" + ) + return None - except Exception as e: - logger.error(f"Failed to store album history for task {task_id}: {e}") - return None + except Exception as e: + logger.error(f"Failed to store album history for task {task_id}: {e}") + return None - def store_playlist_history( - self, callback_data: Dict, task_id: str, status: str = "completed" - ): - """ - Store playlist download history with children table for individual tracks. + def store_playlist_history( + self, callback_data: Dict, task_id: str, status: str = "completed" + ): + """ + Store playlist download history with children table for individual tracks. - Args: - callback_data: Playlist callback object from deezspot - task_id: Celery task ID - status: Download status ('completed', 'failed', 'in_progress') + Args: + callback_data: Playlist callback object from deezspot + task_id: Celery task ID + status: Download status ('completed', 'failed', 'in_progress') - Returns: - Children table name when status is 'in_progress', None otherwise - """ - try: - playlist = callback_data.get("playlist", {}) - status_info = callback_data.get("status_info", {}) + Returns: + Children table name when status is 'in_progress', None otherwise + """ + try: + playlist = callback_data.get("playlist", {}) + status_info = callback_data.get("status_info", {}) - if not playlist: - logger.warning(f"No playlist data in callback for task {task_id}") - return None + if not playlist: + logger.warning(f"No playlist data in callback for task {task_id}") + return None - if status == "in_progress": - # Phase 1: Create children table at start, don't store playlist entry yet - children_table = self.create_children_table_for_playlist( - callback_data, task_id - ) - logger.info( - f"Playlist download started for task {task_id}, children table: {children_table}" - ) - return children_table + if status == "in_progress": + # Phase 1: Create children table at start, don't store playlist entry yet + children_table = self.create_children_table_for_playlist( + callback_data, task_id + ) + logger.info( + f"Playlist download started for task {task_id}, children table: {children_table}" + ) + return children_table - # Phase 2: Store playlist entry in main table (for completed/failed status) - external_ids = self._extract_external_ids(playlist) + # Phase 2: Store playlist entry in main table (for completed/failed status) + external_ids = self._extract_external_ids(playlist) - # For completed/failed, we need to find the existing children table - # This should be stored in task info by the celery task - from routes.utils.celery_tasks import get_task_info + # For completed/failed, we need to find the existing children table + # This should be stored in task info by the celery task + from routes.utils.celery_tasks import get_task_info - task_info = get_task_info(task_id) - children_table = task_info.get("children_table") + task_info = get_task_info(task_id) + children_table = task_info.get("children_table") - if not children_table: - # Fallback: generate new children table name (shouldn't happen in normal flow) - playlist_uuid = str(uuid.uuid4()).replace("-", "")[:10] - children_table = f"playlist_{playlist_uuid}" - logger.warning( - f"No children table found for playlist task {task_id}, generating new: {children_table}" - ) + if not children_table: + # Fallback: generate new children table name (shouldn't happen in normal flow) + playlist_uuid = str(uuid.uuid4()).replace("-", "")[:10] + children_table = f"playlist_{playlist_uuid}" + logger.warning( + f"No children table found for playlist task {task_id}, generating new: {children_table}" + ) - # Extract summary data if available - summary = status_info.get("summary", {}) - successful_tracks = summary.get("total_successful", 0) - failed_tracks = summary.get("total_failed", 0) - skipped_tracks = summary.get("total_skipped", 0) + # Extract summary data if available + summary = status_info.get("summary", {}) + successful_tracks = summary.get("total_successful", 0) + failed_tracks = summary.get("total_failed", 0) + skipped_tracks = summary.get("total_skipped", 0) - # Improve metadata for playlist main row using summary first success/skip/failed track - try: - if not playlist.get("images"): - for arr_key in ( - "successful_tracks", - "skipped_tracks", - "failed_tracks", - ): - arr = summary.get(arr_key, []) or [] - candidate = ( - ( - arr[0].get("album") - if arr_key == "failed_tracks" - and isinstance(arr[0], dict) - else ( - arr[0].get("album") - if arr and isinstance(arr[0], dict) - else {} - ) - ) - if arr - else {} - ) - if candidate and candidate.get("images"): - playlist.setdefault("images", candidate.get("images", [])) - break - except Exception: - pass + # Improve metadata for playlist main row using summary first success/skip/failed track + try: + if not playlist.get("images"): + for arr_key in ( + "successful_tracks", + "skipped_tracks", + "failed_tracks", + ): + arr = summary.get(arr_key, []) or [] + candidate = ( + ( + arr[0].get("album") + if arr_key == "failed_tracks" + and isinstance(arr[0], dict) + else ( + arr[0].get("album") + if arr and isinstance(arr[0], dict) + else {} + ) + ) + if arr + else {} + ) + if candidate and candidate.get("images"): + playlist.setdefault("images", candidate.get("images", [])) + break + except Exception: + pass - tracks = playlist.get("tracks", []) - total_tracks = ( - summary.get("total_successful", 0) - + summary.get("total_skipped", 0) - + summary.get("total_failed", 0) - ) or len(tracks) - total_duration = self._calculate_total_duration(tracks) + tracks = playlist.get("tracks", []) + total_tracks = ( + summary.get("total_successful", 0) + + summary.get("total_skipped", 0) + + summary.get("total_failed", 0) + ) or len(tracks) + total_duration = self._calculate_total_duration(tracks) - # Derive accurate status - status_to_store = status - try: - if total_tracks: - if ( - successful_tracks >= total_tracks - and failed_tracks == 0 - and skipped_tracks == 0 - ): - status_to_store = "completed" - elif successful_tracks > 0: - status_to_store = "partial" - else: - status_to_store = "failed" - else: - if failed_tracks > 0 and successful_tracks == 0: - status_to_store = "failed" - elif failed_tracks > 0 and successful_tracks > 0: - status_to_store = "partial" - except Exception: - pass + # Derive accurate status + status_to_store = status + try: + if total_tracks: + if ( + successful_tracks >= total_tracks + and failed_tracks == 0 + and skipped_tracks == 0 + ): + status_to_store = "completed" + elif successful_tracks > 0: + status_to_store = "partial" + else: + status_to_store = "failed" + else: + if failed_tracks > 0 and successful_tracks == 0: + status_to_store = "failed" + elif failed_tracks > 0 and successful_tracks > 0: + status_to_store = "partial" + except Exception: + pass - # Extract owner information - owner = playlist.get("owner", {}) + # Extract owner information + owner = playlist.get("owner", {}) - # Prepare metadata - metadata = { - "callback_type": "playlist", - "status_info": status_info, - "description": playlist.get("description", ""), - "tracks": tracks, # Store track list in metadata - } + # Prepare metadata + metadata = { + "callback_type": "playlist", + "status_info": status_info, + "description": playlist.get("description", ""), + "tracks": tracks, # Store track list in metadata + } - with self._get_connection() as conn: - # Store main playlist entry - conn.execute( - """ + with self._get_connection() as conn: + # Store main playlist entry + + # Prefer service/quality/bitrate from summary when available + service_to_store = summary.get("service") or self._get_primary_service( + external_ids + ) + quality_format_to_store = summary.get("quality") or status_info.get( + "convert_to" + ) + quality_bitrate_to_store = summary.get("bitrate") or status_info.get( + "bitrate" + ) + + conn.execute( + """ INSERT OR REPLACE INTO download_history ( download_type, title, artists, timestamp, status, service, quality_format, quality_bitrate, total_tracks, successful_tracks, @@ -850,321 +880,319 @@ class HistoryManager: duration_total_ms ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - ( - "playlist", - playlist.get("title", "Unknown"), - json.dumps( - [owner.get("name", "Unknown")] - ), # Use owner as "artist" - callback_data.get("timestamp", time.time()), - status_to_store, - self._get_primary_service(external_ids), - status_info.get("convert_to"), - status_info.get("bitrate"), - total_tracks, - successful_tracks, - failed_tracks, - skipped_tracks, - children_table, - task_id, - json.dumps(external_ids), - json.dumps(metadata), - json.dumps([]), # Playlists don't have genres typically - json.dumps(self._extract_images(playlist)), - json.dumps(owner), - total_duration, - ), - ) + ( + "playlist", + playlist.get("title", "Unknown"), + json.dumps([owner.get("name", "Unknown")]), + callback_data.get("timestamp", time.time()), + status_to_store, + service_to_store, + quality_format_to_store, + quality_bitrate_to_store, + total_tracks, + successful_tracks, + failed_tracks, + skipped_tracks, + children_table, + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps([]), + json.dumps(self._extract_images(playlist)), + json.dumps(owner), + total_duration, + ), + ) - # If we have a summary (e.g., on cancellation), populate children from it including failed ones - try: - if summary: - self._populate_playlist_children_table(children_table, summary) - except Exception as e: - logger.warning( - f"Failed to populate children from summary for playlist {children_table}: {e}" - ) + # If we have a summary (e.g., on cancellation), populate children from it including failed ones + try: + if summary: + self._populate_playlist_children_table(children_table, summary) + except Exception as e: + logger.warning( + f"Failed to populate children from summary for playlist {children_table}: {e}" + ) - logger.info( - f"Stored playlist history for '{playlist.get('title')}' (task: {task_id}, children: {children_table}, status: {status_to_store})" - ) - return None + logger.info( + f"Stored playlist history for '{playlist.get('title')}' (task: {task_id}, children: {children_table}, status: {status_to_store})" + ) + return None - except Exception as e: - logger.error(f"Failed to store playlist history for task {task_id}: {e}") - return None + except Exception as e: + logger.error(f"Failed to store playlist history for task {task_id}: {e}") + return None - def _populate_album_children_table( - self, table_name: str, summary: Dict, album_title: str - ): - """Populate children table with individual track records from album summary.""" - try: - # Ensure table exists before population - self._create_children_table(table_name) - all_rows = [] + def _populate_album_children_table( + self, table_name: str, summary: Dict, album_title: str + ): + """Populate children table with individual track records from album summary.""" + try: + # Ensure table exists before population + self._create_children_table(table_name) + all_rows = [] - # Add successful tracks - for track in summary.get("successful_tracks", []): - track_data = self._prepare_child_track_data( - track, album_title, "completed" - ) - all_rows.append(self._map_values_to_row(track_data["values"])) + # Add successful tracks + for track in summary.get("successful_tracks", []): + track_data = self._prepare_child_track_data( + track, album_title, "completed" + ) + all_rows.append(self._map_values_to_row(track_data["values"])) - # Add failed tracks - for failed_item in summary.get("failed_tracks", []): - track = failed_item.get("track", {}) - track_data = self._prepare_child_track_data( - track, album_title, "failed" - ) - track_data["metadata"]["failure_reason"] = failed_item.get( - "reason", "Unknown error" - ) - all_rows.append(self._map_values_to_row(track_data["values"])) + # Add failed tracks + for failed_item in summary.get("failed_tracks", []): + track = failed_item.get("track", {}) + track_data = self._prepare_child_track_data( + track, album_title, "failed" + ) + track_data["metadata"]["failure_reason"] = failed_item.get( + "reason", "Unknown error" + ) + all_rows.append(self._map_values_to_row(track_data["values"])) - # Add skipped tracks - for track in summary.get("skipped_tracks", []): - track_data = self._prepare_child_track_data( - track, album_title, "skipped" - ) - all_rows.append(self._map_values_to_row(track_data["values"])) + # Add skipped tracks + for track in summary.get("skipped_tracks", []): + track_data = self._prepare_child_track_data( + track, album_title, "skipped" + ) + all_rows.append(self._map_values_to_row(track_data["values"])) - # Upsert all rows - with self._get_connection() as conn: - for row in all_rows: - self._upsert_child_row(conn, table_name, row) + # Upsert all rows + with self._get_connection() as conn: + for row in all_rows: + self._upsert_child_row(conn, table_name, row) - logger.info( - f"Populated {len(all_rows)} tracks in children table {table_name}" - ) + logger.info( + f"Populated {len(all_rows)} tracks in children table {table_name}" + ) - except Exception as e: - logger.error(f"Failed to populate album children table {table_name}: {e}") + except Exception as e: + logger.error(f"Failed to populate album children table {table_name}: {e}") - def _populate_playlist_children_table(self, table_name: str, summary: Dict): - """Populate children table with individual track records from playlist summary.""" - try: - # Ensure table exists before population - self._create_children_table(table_name) - all_rows = [] + def _populate_playlist_children_table(self, table_name: str, summary: Dict): + """Populate children table with individual track records from playlist summary.""" + try: + # Ensure table exists before population + self._create_children_table(table_name) + all_rows = [] - # Add successful tracks - for track in summary.get("successful_tracks", []): - track_data = self._prepare_child_track_data(track, "", "completed") - all_rows.append(self._map_values_to_row(track_data["values"])) + # Add successful tracks + for track in summary.get("successful_tracks", []): + track_data = self._prepare_child_track_data(track, "", "completed") + all_rows.append(self._map_values_to_row(track_data["values"])) - # Add failed tracks - for failed_item in summary.get("failed_tracks", []): - track = failed_item.get("track", {}) - track_data = self._prepare_child_track_data(track, "", "failed") - track_data["metadata"]["failure_reason"] = failed_item.get( - "reason", "Unknown error" - ) - all_rows.append(self._map_values_to_row(track_data["values"])) + # Add failed tracks + for failed_item in summary.get("failed_tracks", []): + track = failed_item.get("track", {}) + track_data = self._prepare_child_track_data(track, "", "failed") + track_data["metadata"]["failure_reason"] = failed_item.get( + "reason", "Unknown error" + ) + all_rows.append(self._map_values_to_row(track_data["values"])) - # Add skipped tracks - for track in summary.get("skipped_tracks", []): - track_data = self._prepare_child_track_data(track, "", "skipped") - all_rows.append(self._map_values_to_row(track_data["values"])) + # Add skipped tracks + for track in summary.get("skipped_tracks", []): + track_data = self._prepare_child_track_data(track, "", "skipped") + all_rows.append(self._map_values_to_row(track_data["values"])) - with self._get_connection() as conn: - for row in all_rows: - self._upsert_child_row(conn, table_name, row) + with self._get_connection() as conn: + for row in all_rows: + self._upsert_child_row(conn, table_name, row) - logger.info( - f"Populated {len(all_rows)} tracks in children table {table_name}" - ) + logger.info( + f"Populated {len(all_rows)} tracks in children table {table_name}" + ) - except Exception as e: - logger.error( - f"Failed to populate playlist children table {table_name}: {e}" - ) + except Exception as e: + logger.error( + f"Failed to populate playlist children table {table_name}: {e}" + ) - def _prepare_child_track_data( - self, track: Dict, default_album: str, status: str - ) -> Dict: - """Prepare track data for insertion into children table.""" - artists = self._extract_artists(track) - external_ids = self._extract_external_ids(track) + def _prepare_child_track_data( + self, track: Dict, default_album: str, status: str + ) -> Dict: + """Prepare track data for insertion into children table.""" + artists = self._extract_artists(track) + external_ids = self._extract_external_ids(track) - # Get album info - album = track.get("album", {}) - album_title = album.get("title", default_album) + # Get album info + album = track.get("album", {}) + album_title = album.get("title", default_album) - # Extract ISRC - isrc = external_ids.get("isrc", "") + # Extract ISRC + isrc = external_ids.get("isrc", "") - # Prepare metadata - metadata = {"album": album, "type": track.get("type", "")} + # Prepare metadata + metadata = {"album": album, "type": track.get("type", "")} - values = ( - track.get("title", "Unknown"), - json.dumps(artists), - album_title, - track.get("duration_ms", 0), - track.get("track_number", 0), - track.get("disc_number", 1), - track.get("explicit", False), - status, - json.dumps(external_ids), - json.dumps(track.get("genres", [])), - isrc, - time.time(), - track.get("position", 0), # For playlist tracks - json.dumps(metadata), - ) + values = ( + track.get("title", "Unknown"), + json.dumps(artists), + album_title, + track.get("duration_ms", 0), + track.get("track_number", 0), + track.get("disc_number", 1), + track.get("explicit", False), + status, + json.dumps(external_ids), + json.dumps(track.get("genres", [])), + isrc, + time.time(), + track.get("position", 0), # For playlist tracks + json.dumps(metadata), + ) - return {"values": values, "metadata": metadata} + return {"values": values, "metadata": metadata} - def update_download_status(self, task_id: str, status: str): - """Update download status for existing history entry.""" - try: - with self._get_connection() as conn: - conn.execute( - """ + def update_download_status(self, task_id: str, status: str): + """Update download status for existing history entry.""" + try: + with self._get_connection() as conn: + conn.execute( + """ UPDATE download_history SET status = ? WHERE task_id = ? """, - (status, task_id), - ) + (status, task_id), + ) - logger.info(f"Updated download status to '{status}' for task {task_id}") + logger.info(f"Updated download status to '{status}' for task {task_id}") - except Exception as e: - logger.error(f"Failed to update download status for task {task_id}: {e}") + except Exception as e: + logger.error(f"Failed to update download status for task {task_id}: {e}") - def get_download_history( - self, - limit: int = 100, - offset: int = 0, - download_type: Optional[str] = None, - status: Optional[str] = None, - ) -> List[Dict]: - """ - Retrieve download history with optional filtering. + def get_download_history( + self, + limit: int = 100, + offset: int = 0, + download_type: Optional[str] = None, + status: Optional[str] = None, + ) -> List[Dict]: + """ + Retrieve download history with optional filtering. - Args: - limit: Maximum number of records to return - offset: Number of records to skip - download_type: Filter by download type ('track', 'album', 'playlist') - status: Filter by status ('completed', 'failed', 'skipped', 'in_progress') + Args: + limit: Maximum number of records to return + offset: Number of records to skip + download_type: Filter by download type ('track', 'album', 'playlist') + status: Filter by status ('completed', 'failed', 'skipped', 'in_progress') - Returns: - List of download history records - """ - try: - query = "SELECT * FROM download_history" - params: List[Union[str, int]] = [] - conditions = [] + Returns: + List of download history records + """ + try: + query = "SELECT * FROM download_history" + params: List[Union[str, int]] = [] + conditions = [] - if download_type: - conditions.append("download_type = ?") - params.append(download_type) + if download_type: + conditions.append("download_type = ?") + params.append(download_type) - if status: - conditions.append("status = ?") - params.append(status) + if status: + conditions.append("status = ?") + params.append(status) - if conditions: - query += " WHERE " + " AND ".join(conditions) + if conditions: + query += " WHERE " + " AND ".join(conditions) - query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" - params.extend([limit, offset]) + query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) - with self._get_connection() as conn: - cursor = conn.execute(query, params) - rows = cursor.fetchall() + with self._get_connection() as conn: + cursor = conn.execute(query, params) + rows = cursor.fetchall() - # Convert to list of dicts - result = [] - for row in rows: - record = dict(row) - # Parse JSON fields - for field in [ - "artists", - "external_ids", - "metadata", - "release_date", - "genres", - "images", - "owner", - ]: - if record.get(field): - try: - record[field] = json.loads(record[field]) - except (json.JSONDecodeError, TypeError): - pass - result.append(record) + # Convert to list of dicts + result = [] + for row in rows: + record = dict(row) + # Parse JSON fields + for field in [ + "artists", + "external_ids", + "metadata", + "release_date", + "genres", + "images", + "owner", + ]: + if record.get(field): + try: + record[field] = json.loads(record[field]) + except (json.JSONDecodeError, TypeError): + pass + result.append(record) - return result + return result - except Exception as e: - logger.error(f"Failed to retrieve download history: {e}") - return [] + except Exception as e: + logger.error(f"Failed to retrieve download history: {e}") + return [] - def get_children_history(self, children_table: str) -> List[Dict]: - """ - Retrieve track history from a children table. + def get_children_history(self, children_table: str) -> List[Dict]: + """ + Retrieve track history from a children table. - Args: - children_table: Name of the children table + Args: + children_table: Name of the children table - Returns: - List of track records - """ - try: - # Ensure table exists before reading - self._create_children_table(children_table) - with self._get_connection() as conn: - cursor = conn.execute(f""" + Returns: + List of track records + """ + try: + # Ensure table exists before reading + self._create_children_table(children_table) + with self._get_connection() as conn: + cursor = conn.execute(f""" SELECT * FROM {children_table} ORDER BY track_number, position """) - rows = cursor.fetchall() + rows = cursor.fetchall() - # Convert to list of dicts - result = [] - for row in rows: - record = dict(row) - # Parse JSON fields - for field in ["artists", "external_ids", "genres", "metadata"]: - if record.get(field): - try: - record[field] = json.loads(record[field]) - except (json.JSONDecodeError, TypeError): - pass - result.append(record) + # Convert to list of dicts + result = [] + for row in rows: + record = dict(row) + # Parse JSON fields + for field in ["artists", "external_ids", "genres", "metadata"]: + if record.get(field): + try: + record[field] = json.loads(record[field]) + except (json.JSONDecodeError, TypeError): + pass + result.append(record) - return result + return result - except Exception as e: - logger.error( - f"Failed to retrieve children history from {children_table}: {e}" - ) - return [] + except Exception as e: + logger.error( + f"Failed to retrieve children history from {children_table}: {e}" + ) + return [] - def get_download_stats(self) -> Dict: - """Get download statistics.""" - try: - with self._get_connection() as conn: - # Total downloads by type - cursor = conn.execute(""" + def get_download_stats(self) -> Dict: + """Get download statistics.""" + try: + with self._get_connection() as conn: + # Total downloads by type + cursor = conn.execute(""" SELECT download_type, status, COUNT(*) as count FROM download_history GROUP BY download_type, status """) - type_stats: Dict[str, Dict[str, int]] = {} - for row in cursor.fetchall(): - download_type = row["download_type"] - status = row["status"] - count = row["count"] + type_stats: Dict[str, Dict[str, int]] = {} + for row in cursor.fetchall(): + download_type = row["download_type"] + status = row["status"] + count = row["count"] - if download_type not in type_stats: - type_stats[download_type] = {} - type_stats[download_type][status] = count + if download_type not in type_stats: + type_stats[download_type] = {} + type_stats[download_type][status] = count - # Total tracks downloaded (including from albums/playlists) - cursor = conn.execute(""" + # Total tracks downloaded (including from albums/playlists) + cursor = conn.execute(""" SELECT SUM( CASE WHEN download_type = 'track' AND status = 'completed' THEN 1 @@ -1173,441 +1201,441 @@ class HistoryManager: ) as total_successful_tracks FROM download_history """) - total_tracks = cursor.fetchone()["total_successful_tracks"] or 0 + total_tracks = cursor.fetchone()["total_successful_tracks"] or 0 - # Recent downloads (last 7 days) - week_ago = time.time() - (7 * 24 * 60 * 60) - cursor = conn.execute( - """ + # Recent downloads (last 7 days) + week_ago = time.time() - (7 * 24 * 60 * 60) + cursor = conn.execute( + """ SELECT COUNT(*) as count FROM download_history WHERE timestamp > ? """, - (week_ago,), - ) - recent_downloads = cursor.fetchone()["count"] + (week_ago,), + ) + recent_downloads = cursor.fetchone()["count"] - return { - "by_type_and_status": type_stats, - "total_successful_tracks": total_tracks, - "recent_downloads_7d": recent_downloads, - } + return { + "by_type_and_status": type_stats, + "total_successful_tracks": total_tracks, + "recent_downloads_7d": recent_downloads, + } - except Exception as e: - logger.error(f"Failed to get download stats: {e}") - return {} + except Exception as e: + logger.error(f"Failed to get download stats: {e}") + return {} - def search_history(self, query: str, limit: int = 50) -> List[Dict]: - """ - Search download history by title or artist. + def search_history(self, query: str, limit: int = 50) -> List[Dict]: + """ + Search download history by title or artist. - Args: - query: Search query for title or artist - limit: Maximum number of results + Args: + query: Search query for title or artist + limit: Maximum number of results - Returns: - List of matching download records - """ - try: - search_pattern = f"%{query}%" + Returns: + List of matching download records + """ + try: + search_pattern = f"%{query}%" - with self._get_connection() as conn: - cursor = conn.execute( - """ + with self._get_connection() as conn: + cursor = conn.execute( + """ SELECT * FROM download_history WHERE title LIKE ? OR artists LIKE ? ORDER BY timestamp DESC LIMIT ? """, - (search_pattern, search_pattern, limit), - ) + (search_pattern, search_pattern, limit), + ) - rows = cursor.fetchall() + rows = cursor.fetchall() - # Convert to list of dicts - result = [] - for row in rows: - record = dict(row) - # Parse JSON fields - for field in [ - "artists", - "external_ids", - "metadata", - "release_date", - "genres", - "images", - "owner", - ]: - if record.get(field): - try: - record[field] = json.loads(record[field]) - except (json.JSONDecodeError, TypeError): - pass - result.append(record) + # Convert to list of dicts + result = [] + for row in rows: + record = dict(row) + # Parse JSON fields + for field in [ + "artists", + "external_ids", + "metadata", + "release_date", + "genres", + "images", + "owner", + ]: + if record.get(field): + try: + record[field] = json.loads(record[field]) + except (json.JSONDecodeError, TypeError): + pass + result.append(record) - return result + return result - except Exception as e: - logger.error(f"Failed to search download history: {e}") - return [] + except Exception as e: + logger.error(f"Failed to search download history: {e}") + return [] - def get_download_by_task_id(self, task_id: str) -> Optional[Dict]: - """ - Get download history entry by task ID. + def get_download_by_task_id(self, task_id: str) -> Optional[Dict]: + """ + Get download history entry by task ID. - Args: - task_id: Celery task ID + Args: + task_id: Celery task ID - Returns: - Download record or None if not found - """ - try: - with self._get_connection() as conn: - cursor = conn.execute( - """ + Returns: + Download record or None if not found + """ + try: + with self._get_connection() as conn: + cursor = conn.execute( + """ SELECT * FROM download_history WHERE task_id = ? LIMIT 1 """, - (task_id,), - ) + (task_id,), + ) - row = cursor.fetchone() - if not row: - return None + row = cursor.fetchone() + if not row: + return None - record = dict(row) - # Parse JSON fields - for field in [ - "artists", - "external_ids", - "metadata", - "release_date", - "genres", - "images", - "owner", - ]: - if record.get(field): - try: - record[field] = json.loads(record[field]) - except (json.JSONDecodeError, TypeError): - pass + record = dict(row) + # Parse JSON fields + for field in [ + "artists", + "external_ids", + "metadata", + "release_date", + "genres", + "images", + "owner", + ]: + if record.get(field): + try: + record[field] = json.loads(record[field]) + except (json.JSONDecodeError, TypeError): + pass - return record + return record - except Exception as e: - logger.error(f"Failed to get download by task ID {task_id}: {e}") - return None + except Exception as e: + logger.error(f"Failed to get download by task ID {task_id}: {e}") + return None - def get_recent_downloads(self, limit: int = 20) -> List[Dict]: - """Get most recent downloads.""" - return self.get_download_history(limit=limit, offset=0) + def get_recent_downloads(self, limit: int = 20) -> List[Dict]: + """Get most recent downloads.""" + return self.get_download_history(limit=limit, offset=0) - def get_failed_downloads(self, limit: int = 50) -> List[Dict]: - """Get failed downloads.""" - return self.get_download_history(limit=limit, status="failed") + def get_failed_downloads(self, limit: int = 50) -> List[Dict]: + """Get failed downloads.""" + return self.get_download_history(limit=limit, status="failed") - def clear_old_history(self, days_old: int = 30) -> int: - """ - Clear download history older than specified days. + def clear_old_history(self, days_old: int = 30) -> int: + """ + Clear download history older than specified days. - Args: - days_old: Number of days old to keep (default 30) + Args: + days_old: Number of days old to keep (default 30) - Returns: - Number of records deleted - """ - try: - cutoff_time = time.time() - (days_old * 24 * 60 * 60) + Returns: + Number of records deleted + """ + try: + cutoff_time = time.time() - (days_old * 24 * 60 * 60) - with self._get_connection() as conn: - # Get list of children tables to delete - cursor = conn.execute( - """ + with self._get_connection() as conn: + # Get list of children tables to delete + cursor = conn.execute( + """ SELECT children_table FROM download_history WHERE timestamp < ? AND children_table IS NOT NULL """, - (cutoff_time,), - ) + (cutoff_time,), + ) - children_tables = [row["children_table"] for row in cursor.fetchall()] + children_tables = [row["children_table"] for row in cursor.fetchall()] - # Delete main history records - cursor = conn.execute( - """ + # Delete main history records + cursor = conn.execute( + """ DELETE FROM download_history WHERE timestamp < ? """, - (cutoff_time,), - ) + (cutoff_time,), + ) - deleted_count = cursor.rowcount + deleted_count = cursor.rowcount - # Drop children tables - for table_name in children_tables: - try: - conn.execute(f"DROP TABLE IF EXISTS {table_name}") - except Exception as e: - logger.warning( - f"Failed to drop children table {table_name}: {e}" - ) + # Drop children tables + for table_name in children_tables: + try: + conn.execute(f"DROP TABLE IF EXISTS {table_name}") + except Exception as e: + logger.warning( + f"Failed to drop children table {table_name}: {e}" + ) - logger.info( - f"Cleared {deleted_count} old history records and {len(children_tables)} children tables" - ) - return deleted_count + logger.info( + f"Cleared {deleted_count} old history records and {len(children_tables)} children tables" + ) + return deleted_count - except Exception as e: - logger.error(f"Failed to clear old history: {e}") - return 0 + except Exception as e: + logger.error(f"Failed to clear old history: {e}") + return 0 - # --- New helpers for failed children insertion and metadata enrichment --- - def _populate_failed_children_for_album( - self, table_name: str, summary: Dict, album_title: str - ) -> None: - try: - self._create_children_table(table_name) - with self._get_connection() as conn: - for failed_item in summary.get("failed_tracks", []): - track = failed_item.get("track", {}) - track_data = self._prepare_child_track_data( - track, album_title, "failed" - ) - track_data["metadata"]["failure_reason"] = failed_item.get( - "reason", "cancelled" - ) - conn.execute( - f""" + # --- New helpers for failed children insertion and metadata enrichment --- + def _populate_failed_children_for_album( + self, table_name: str, summary: Dict, album_title: str + ) -> None: + try: + self._create_children_table(table_name) + with self._get_connection() as conn: + for failed_item in summary.get("failed_tracks", []): + track = failed_item.get("track", {}) + track_data = self._prepare_child_track_data( + track, album_title, "failed" + ) + track_data["metadata"]["failure_reason"] = failed_item.get( + "reason", "cancelled" + ) + conn.execute( + f""" INSERT INTO {table_name} ( title, artists, album_title, duration_ms, track_number, disc_number, explicit, status, external_ids, genres, isrc, timestamp, position, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - track_data["values"], - ) - except Exception as e: - logger.error( - f"Failed to insert failed children for album into {table_name}: {e}" - ) + track_data["values"], + ) + except Exception as e: + logger.error( + f"Failed to insert failed children for album into {table_name}: {e}" + ) - def _populate_failed_children_for_playlist( - self, table_name: str, summary: Dict - ) -> None: - try: - self._create_children_table(table_name) - with self._get_connection() as conn: - for failed_item in summary.get("failed_tracks", []): - track = failed_item.get("track", {}) - track_data = self._prepare_child_track_data(track, "", "failed") - track_data["metadata"]["failure_reason"] = failed_item.get( - "reason", "cancelled" - ) - conn.execute( - f""" + def _populate_failed_children_for_playlist( + self, table_name: str, summary: Dict + ) -> None: + try: + self._create_children_table(table_name) + with self._get_connection() as conn: + for failed_item in summary.get("failed_tracks", []): + track = failed_item.get("track", {}) + track_data = self._prepare_child_track_data(track, "", "failed") + track_data["metadata"]["failure_reason"] = failed_item.get( + "reason", "cancelled" + ) + conn.execute( + f""" INSERT INTO {table_name} ( title, artists, album_title, duration_ms, track_number, disc_number, explicit, status, external_ids, genres, isrc, timestamp, position, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - track_data["values"], - ) - except Exception as e: - logger.error( - f"Failed to insert failed children for playlist into {table_name}: {e}" - ) + track_data["values"], + ) + except Exception as e: + logger.error( + f"Failed to insert failed children for playlist into {table_name}: {e}" + ) - def _enrich_album_metadata_from_summary(self, album: Dict, summary: Dict) -> Dict: - if album.get("images") and album.get("release_date") and album.get("genres"): - return album - # Prefer successful track album data, then skipped, then failed - src_track = None - for key in ("successful_tracks", "skipped_tracks", "failed_tracks"): - arr = summary.get(key, []) or [] - if arr: - src_track = ( - arr[0] - if key != "failed_tracks" - else (arr[0].get("track") if isinstance(arr[0], dict) else None) - ) - break - if isinstance(src_track, dict): - album_obj = src_track.get("album", {}) or {} - album.setdefault("images", album_obj.get("images", [])) - album.setdefault("release_date", album_obj.get("release_date", {})) - album.setdefault("genres", album_obj.get("genres", [])) - album.setdefault( - "album_type", album_obj.get("album_type", album.get("album_type")) - ) - return album + def _enrich_album_metadata_from_summary(self, album: Dict, summary: Dict) -> Dict: + if album.get("images") and album.get("release_date") and album.get("genres"): + return album + # Prefer successful track album data, then skipped, then failed + src_track = None + for key in ("successful_tracks", "skipped_tracks", "failed_tracks"): + arr = summary.get(key, []) or [] + if arr: + src_track = ( + arr[0] + if key != "failed_tracks" + else (arr[0].get("track") if isinstance(arr[0], dict) else None) + ) + break + if isinstance(src_track, dict): + album_obj = src_track.get("album", {}) or {} + album.setdefault("images", album_obj.get("images", [])) + album.setdefault("release_date", album_obj.get("release_date", {})) + album.setdefault("genres", album_obj.get("genres", [])) + album.setdefault( + "album_type", album_obj.get("album_type", album.get("album_type")) + ) + return album - # --- Upsert helpers to avoid duplicate children rows and keep most complete --- - def _map_values_to_row(self, values: tuple) -> Dict: - ( - title, - artists_json, - album_title, - duration_ms, - track_number, - disc_number, - explicit, - status, - external_ids_json, - genres_json, - isrc, - timestamp, - position, - metadata_json, - ) = values - return { - "title": title, - "artists": artists_json, - "album_title": album_title, - "duration_ms": duration_ms, - "track_number": track_number, - "disc_number": disc_number, - "explicit": explicit, - "status": status, - "external_ids": external_ids_json, - "genres": genres_json, - "isrc": isrc, - "timestamp": timestamp, - "position": position, - "metadata": metadata_json, - } + # --- Upsert helpers to avoid duplicate children rows and keep most complete --- + def _map_values_to_row(self, values: tuple) -> Dict: + ( + title, + artists_json, + album_title, + duration_ms, + track_number, + disc_number, + explicit, + status, + external_ids_json, + genres_json, + isrc, + timestamp, + position, + metadata_json, + ) = values + return { + "title": title, + "artists": artists_json, + "album_title": album_title, + "duration_ms": duration_ms, + "track_number": track_number, + "disc_number": disc_number, + "explicit": explicit, + "status": status, + "external_ids": external_ids_json, + "genres": genres_json, + "isrc": isrc, + "timestamp": timestamp, + "position": position, + "metadata": metadata_json, + } - def _status_priority(self, status: str) -> int: - order = {"completed": 3, "skipped": 2, "failed": 1} - return order.get((status or "").lower(), 0) + def _status_priority(self, status: str) -> int: + order = {"completed": 3, "skipped": 2, "failed": 1} + return order.get((status or "").lower(), 0) - def _merge_child_rows(self, existing: Dict, new: Dict) -> Dict: - merged = existing.copy() - # Prefer non-empty/non-null values; for status use priority - for key in [ - "artists", - "album_title", - "duration_ms", - "track_number", - "disc_number", - "explicit", - "external_ids", - "genres", - "isrc", - "metadata", - ]: - old_val = merged.get(key) - new_val = new.get(key) - # Consider JSON strings: prefer longer/ non-empty - if (old_val in (None, "", 0)) and new_val not in (None, ""): - merged[key] = new_val - elif ( - isinstance(new_val, str) - and isinstance(old_val, str) - and len(new_val) > len(old_val) - ): - merged[key] = new_val - # Status: keep highest priority - if self._status_priority(new.get("status")) > self._status_priority( - existing.get("status") - ): - merged["status"] = new.get("status") - # Timestamp: keep earliest for creation but allow update to latest timestamp for last update - merged["timestamp"] = max( - existing.get("timestamp") or 0, new.get("timestamp") or 0 - ) - return merged + def _merge_child_rows(self, existing: Dict, new: Dict) -> Dict: + merged = existing.copy() + # Prefer non-empty/non-null values; for status use priority + for key in [ + "artists", + "album_title", + "duration_ms", + "track_number", + "disc_number", + "explicit", + "external_ids", + "genres", + "isrc", + "metadata", + ]: + old_val = merged.get(key) + new_val = new.get(key) + # Consider JSON strings: prefer longer/ non-empty + if (old_val in (None, "", 0)) and new_val not in (None, ""): + merged[key] = new_val + elif ( + isinstance(new_val, str) + and isinstance(old_val, str) + and len(new_val) > len(old_val) + ): + merged[key] = new_val + # Status: keep highest priority + if self._status_priority(new.get("status")) > self._status_priority( + existing.get("status") + ): + merged["status"] = new.get("status") + # Timestamp: keep earliest for creation but allow update to latest timestamp for last update + merged["timestamp"] = max( + existing.get("timestamp") or 0, new.get("timestamp") or 0 + ) + return merged - def _find_existing_child_row( - self, conn: sqlite3.Connection, table_name: str, new_row: Dict - ) -> Optional[Dict]: - try: - cursor = conn.execute( - f"SELECT * FROM {table_name} WHERE title = ?", - (new_row.get("title", ""),), - ) - candidates = [dict(r) for r in cursor.fetchall()] - if not candidates: - return None - # Try match by ISRC - isrc = new_row.get("isrc") - if isrc: - for r in candidates: - if (r.get("isrc") or "") == isrc: - return r - # Try match by position (playlist) then track_number (album) - pos = new_row.get("position") - if pos is not None: - for r in candidates: - if r.get("position") == pos: - return r - tn = new_row.get("track_number") - if tn: - for r in candidates: - if r.get("track_number") == tn: - return r - # Fallback: first candidate with same title - return candidates[0] - except Exception: - return None + def _find_existing_child_row( + self, conn: sqlite3.Connection, table_name: str, new_row: Dict + ) -> Optional[Dict]: + try: + cursor = conn.execute( + f"SELECT * FROM {table_name} WHERE title = ?", + (new_row.get("title", ""),), + ) + candidates = [dict(r) for r in cursor.fetchall()] + if not candidates: + return None + # Try match by ISRC + isrc = new_row.get("isrc") + if isrc: + for r in candidates: + if (r.get("isrc") or "") == isrc: + return r + # Try match by position (playlist) then track_number (album) + pos = new_row.get("position") + if pos is not None: + for r in candidates: + if r.get("position") == pos: + return r + tn = new_row.get("track_number") + if tn: + for r in candidates: + if r.get("track_number") == tn: + return r + # Fallback: first candidate with same title + return candidates[0] + except Exception: + return None - def _upsert_child_row( - self, conn: sqlite3.Connection, table_name: str, row: Dict - ) -> None: - existing = self._find_existing_child_row(conn, table_name, row) - if existing: - merged = self._merge_child_rows(existing, row) - conn.execute( - f""" + def _upsert_child_row( + self, conn: sqlite3.Connection, table_name: str, row: Dict + ) -> None: + existing = self._find_existing_child_row(conn, table_name, row) + if existing: + merged = self._merge_child_rows(existing, row) + conn.execute( + f""" UPDATE {table_name} SET artists = ?, album_title = ?, duration_ms = ?, track_number = ?, disc_number = ?, explicit = ?, status = ?, external_ids = ?, genres = ?, isrc = ?, timestamp = ?, position = ?, metadata = ? WHERE id = ? """, - ( - merged.get("artists"), - merged.get("album_title"), - merged.get("duration_ms"), - merged.get("track_number"), - merged.get("disc_number"), - merged.get("explicit"), - merged.get("status"), - merged.get("external_ids"), - merged.get("genres"), - merged.get("isrc"), - merged.get("timestamp"), - merged.get("position"), - merged.get("metadata"), - existing.get("id"), - ), - ) - else: - conn.execute( - f""" + ( + merged.get("artists"), + merged.get("album_title"), + merged.get("duration_ms"), + merged.get("track_number"), + merged.get("disc_number"), + merged.get("explicit"), + merged.get("status"), + merged.get("external_ids"), + merged.get("genres"), + merged.get("isrc"), + merged.get("timestamp"), + merged.get("position"), + merged.get("metadata"), + existing.get("id"), + ), + ) + else: + conn.execute( + f""" INSERT INTO {table_name} ( title, artists, album_title, duration_ms, track_number, disc_number, explicit, status, external_ids, genres, isrc, timestamp, position, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - ( - row.get("title"), - row.get("artists"), - row.get("album_title"), - row.get("duration_ms"), - row.get("track_number"), - row.get("disc_number"), - row.get("explicit"), - row.get("status"), - row.get("external_ids"), - row.get("genres"), - row.get("isrc"), - row.get("timestamp"), - row.get("position"), - row.get("metadata"), - ), - ) + ( + row.get("title"), + row.get("artists"), + row.get("album_title"), + row.get("duration_ms"), + row.get("track_number"), + row.get("disc_number"), + row.get("explicit"), + row.get("status"), + row.get("external_ids"), + row.get("genres"), + row.get("isrc"), + row.get("timestamp"), + row.get("position"), + row.get("metadata"), + ), + ) # Global history manager instance