From 68bf60ec7e33fccbd9133b7a0d6963c43b093f0a Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Tue, 29 Jul 2025 20:35:51 -0600 Subject: [PATCH] Finally fix history (#187) --- app.py | 4 - routes/history.py | 927 ++++------- routes/utils/celery_tasks.py | 286 ++-- routes/utils/history_manager.py | 2248 +++++++++++---------------- spotizerr-ui/src/routes/history.tsx | 1159 ++++---------- 5 files changed, 1642 insertions(+), 2982 deletions(-) diff --git a/app.py b/app.py index a313565..6c8fa73 100755 --- a/app.py +++ b/app.py @@ -23,7 +23,6 @@ from urllib.parse import urlparse # Import Celery configuration and manager from routes.utils.celery_manager import celery_manager from routes.utils.celery_config import REDIS_URL -from routes.utils.history_manager import init_history_db # Configure application-wide logging @@ -150,9 +149,6 @@ def create_app(): # Set up CORS CORS(app) - # Initialize databases - init_history_db() - # Register blueprints app.register_blueprint(config_bp, url_prefix="/api") app.register_blueprint(search_bp, url_prefix="/api") diff --git a/routes/history.py b/routes/history.py index 7b8ee84..a59879c 100644 --- a/routes/history.py +++ b/routes/history.py @@ -1,649 +1,374 @@ -from flask import Blueprint, jsonify, request -from routes.utils.history_manager import ( - get_task_history, - get_child_tracks, - get_status_history, - get_track_mini_history, - add_track_status_update, - # Legacy compatibility - get_history_entries -) +from flask import Blueprint, Response, request, jsonify +import json +import traceback import logging +from routes.utils.history_manager import history_manager logger = logging.getLogger(__name__) -history_bp = Blueprint("history", __name__, url_prefix="/api/history") -""" -Enhanced History API Endpoints: - -Main History Endpoints: -- GET /api/history - Get paginated download history with filtering -- GET /api/history/task/ - Get detailed task information -- GET /api/history/summary - Get summary statistics - -Track Management Endpoints: -- GET /api/history/tracks/ - Get all tracks for a parent task - ?include_mini_histories=true - Include comprehensive mini-histories for each track -- GET /api/history/tracks//mini-histories - Get mini-histories for all tracks - -Individual Track Endpoints: -- GET /api/history/track///mini-history - Get comprehensive mini-history for a specific track -- GET /api/history/track///timeline - Get simplified timeline view -- POST /api/history/track///status - Update track status (admin/testing) - -Status & Legacy: -- GET /api/history/status/ - Get complete status history for a task -- GET /api/history/legacy - Legacy endpoint for backward compatibility - -Mini-History Features: -- Complete status progression timeline with timestamps -- Progress tracking and retry information -- File size, quality, and download path details -- Error information and duration statistics -- Human-readable timestamps and calculated metrics -""" +history_bp = Blueprint("history", __name__) -@history_bp.route("", methods=["GET"]) -def get_download_history(): - """API endpoint to retrieve download history with pagination, sorting, and filtering.""" +@history_bp.route("/", methods=["GET"]) +def get_history(): + """ + Retrieve download history with optional filtering and pagination. + + Query parameters: + - limit: Maximum number of records (default: 100, max: 500) + - offset: Number of records to skip (default: 0) + - download_type: Filter by type ('track', 'album', 'playlist') + - status: Filter by status ('completed', 'failed', 'skipped', 'in_progress') + """ try: - limit = request.args.get("limit", 25, type=int) - offset = request.args.get("offset", 0, type=int) - sort_by = request.args.get("sort_by", "timestamp_updated") - sort_order = request.args.get("sort_order", "DESC") - include_children = request.args.get("include_children", "false").lower() == "true" - - # Create filters dictionary for various filter options - filters = {} + # Parse query parameters + limit = min(int(request.args.get("limit", 100)), 500) # Cap at 500 + offset = max(int(request.args.get("offset", 0)), 0) + download_type = request.args.get("download_type") + status = request.args.get("status") - # Status filter - support both old and new field names - status_filter = request.args.get("status_final") - if status_filter: - filters["status_final"] = status_filter - - # Task type filter (renamed from download_type) - type_filter = request.args.get("task_type") or request.args.get("download_type") - if type_filter: - filters["task_type"] = type_filter - - # Parent task filter - parent_task_filter = request.args.get("parent_task_id") - if parent_task_filter: - filters["parent_task_id"] = parent_task_filter - - # Show/hide child tracks (tasks with parent_task_id) - hide_child_tracks = request.args.get("hide_child_tracks", "false").lower() == "true" - if hide_child_tracks: - filters["parent_task_id"] = None # Only show parent entries or standalone tracks - - # Show only child tracks - only_child_tracks = request.args.get("only_child_tracks", "false").lower() == "true" - if only_child_tracks and not parent_task_filter: - # This would require a NOT NULL filter, but we'll handle it differently - # by excluding tasks that don't have a parent_task_id - pass # We'll implement this in the query logic - - # Additional filters - current_status_filter = request.args.get("status_current") - if current_status_filter: - filters["status_current"] = current_status_filter - - tasks, total_count = get_task_history( - limit=limit, - offset=offset, - sort_by=sort_by, - sort_order=sort_order, - filters=filters, - include_children=include_children + # Validate download_type if provided + valid_types = ["track", "album", "playlist"] + if download_type and download_type not in valid_types: + return Response( + json.dumps({"error": f"Invalid download_type. Must be one of: {valid_types}"}), + status=400, + mimetype="application/json", + ) + + # Validate status if provided + valid_statuses = ["completed", "failed", "skipped", "in_progress"] + if status and status not in valid_statuses: + return Response( + json.dumps({"error": f"Invalid status. Must be one of: {valid_statuses}"}), + status=400, + mimetype="application/json", + ) + + # Get history from manager + history = history_manager.get_download_history( + limit=limit, + offset=offset, + download_type=download_type, + status=status ) - - # Transform data for backward compatibility and add computed fields - entries = [] - for task in tasks: - entry = { - # Core fields - "task_id": task["task_id"], - "task_type": task["task_type"], - "title": task["title"], - "status_current": task["status_current"], - "status_final": task["status_final"], - "timestamp_created": task["timestamp_created"], - "timestamp_updated": task["timestamp_updated"], - "timestamp_completed": task["timestamp_completed"], - "parent_task_id": task["parent_task_id"], - "position": task["position"], - - # Legacy compatibility fields - "download_type": task["task_type"], - "item_name": task["title"], - "timestamp_added": task["timestamp_created"], - - # Rich data fields (parsed JSON) - "artists": task.get("artists", []), - "ids": task.get("ids", {}), - "metadata": task.get("metadata", {}), - "config": task.get("config", {}), - "error_info": task.get("error_info", {}), - "progress": task.get("progress", {}), - "summary": task.get("summary", {}), - - # Child information - "children_table": task["children_table"], - "has_children": bool(task["children_table"]), - "child_tracks": task.get("child_tracks", []) if include_children else [] + + # Add pagination info + response_data = { + "downloads": history, + "pagination": { + "limit": limit, + "offset": offset, + "returned_count": len(history) } - - # Extract commonly used fields for easier access - if entry["artists"]: - entry["artist_names"] = [artist.get("name", "") for artist in entry["artists"]] - entry["item_artist"] = ", ".join(entry["artist_names"]) # Legacy compatibility - - if entry["config"]: - entry["service_used"] = entry["config"].get("service_used") - entry["quality_profile"] = entry["config"].get("quality_profile") - entry["convert_to"] = entry["config"].get("convert_to") - entry["bitrate"] = entry["config"].get("bitrate") - - if entry["error_info"]: - entry["error_message"] = entry["error_info"].get("message") # Legacy compatibility - - # Extract album info from metadata if available - if entry["metadata"] and "album" in entry["metadata"]: - entry["item_album"] = entry["metadata"]["album"].get("title") - - # Child track summary - if entry["child_tracks"]: - entry["child_track_count"] = len(entry["child_tracks"]) - entry["child_track_summary"] = { - "completed": len([t for t in entry["child_tracks"] if t.get("status_final") == "COMPLETED"]), - "error": len([t for t in entry["child_tracks"] if t.get("status_final") == "ERROR"]), - "skipped": len([t for t in entry["child_tracks"] if t.get("status_final") == "SKIPPED"]) - } - - entries.append(entry) - - return jsonify({ - "entries": entries, - "total_count": total_count, - "limit": limit, - "offset": offset, - "include_children": include_children - }) + } - except Exception as e: - logger.error(f"Error in /api/history endpoint: {e}", exc_info=True) - return jsonify({"error": "Failed to retrieve download history"}), 500 - - -@history_bp.route("/task/", methods=["GET"]) -def get_task_details(task_id): - """API endpoint to retrieve detailed information about a specific task.""" - try: - include_children = request.args.get("include_children", "true").lower() == "true" - include_status_history = request.args.get("include_status_history", "false").lower() == "true" + if download_type: + response_data["filters"] = {"download_type": download_type} + if status: + if "filters" not in response_data: + response_data["filters"] = {} + response_data["filters"]["status"] = status - # Get the task - tasks, _ = get_task_history( - limit=1, - offset=0, - filters={"task_id": task_id}, - include_children=include_children + return Response( + json.dumps(response_data), + status=200, + mimetype="application/json" ) - if not tasks: - return jsonify({"error": f"Task {task_id} not found"}), 404 - - task = tasks[0] - - # Add status history if requested - if include_status_history: - task["status_history"] = get_status_history(task_id) - - return jsonify({ - "task": task, - "include_children": include_children, - "include_status_history": include_status_history - }) - + except ValueError as e: + return Response( + json.dumps({"error": f"Invalid parameter value: {str(e)}"}), + status=400, + mimetype="application/json", + ) except Exception as e: - logger.error(f"Error in /api/history/task/{task_id} endpoint: {e}", exc_info=True) - return jsonify({"error": f"Failed to retrieve task {task_id}"}), 500 + logger.error(f"Error retrieving download history: {e}", exc_info=True) + return Response( + json.dumps({"error": "Failed to retrieve download history", "details": str(e)}), + status=500, + mimetype="application/json", + ) -@history_bp.route("/tracks/", methods=["GET"]) -def get_tracks_for_parent(parent_task_id): - """API endpoint to retrieve all track entries for a specific parent task.""" +@history_bp.route("/", methods=["GET"]) +def get_download_by_task_id(task_id): + """ + Retrieve specific download history by task ID. + + Args: + task_id: Celery task ID + """ try: - # First, verify the parent task exists and get its children table - parent_tasks, _ = get_task_history( - limit=1, - offset=0, - filters={"task_id": parent_task_id} + download = history_manager.get_download_by_task_id(task_id) + + if not download: + return Response( + json.dumps({"error": f"Download with task ID '{task_id}' not found"}), + status=404, + mimetype="application/json", + ) + + return Response( + json.dumps(download), + status=200, + mimetype="application/json" ) - if not parent_tasks: - return jsonify({"error": f"Parent task {parent_task_id} not found"}), 404 + except Exception as e: + logger.error(f"Error retrieving download for task {task_id}: {e}", exc_info=True) + return Response( + json.dumps({"error": "Failed to retrieve download", "details": str(e)}), + status=500, + mimetype="application/json", + ) + + +@history_bp.route("//children", methods=["GET"]) +def get_download_children(task_id): + """ + Retrieve children tracks for an album or playlist download. + + Args: + task_id: Celery task ID + """ + try: + # First get the main download to find the children table + download = history_manager.get_download_by_task_id(task_id) - parent_task = parent_tasks[0] - children_table = parent_task.get("children_table") + if not download: + return Response( + json.dumps({"error": f"Download with task ID '{task_id}' not found"}), + status=404, + mimetype="application/json", + ) + children_table = download.get("children_table") if not children_table: - return jsonify({ - "parent_task_id": parent_task_id, - "tracks": [], - "total_count": 0, - "message": "No child tracks found for this task" - }) + return Response( + json.dumps({"error": f"Download '{task_id}' has no children tracks"}), + status=404, + mimetype="application/json", + ) - # Get tracks from the child table - tracks = get_child_tracks(children_table) + # Get children tracks + children = history_manager.get_children_history(children_table) - # Check if mini-histories should be included - include_mini_histories = request.args.get("include_mini_histories", "false").lower() == "true" - - # Sort tracks if requested - sort_by = request.args.get("sort_by", "position") - sort_order = request.args.get("sort_order", "ASC") - - if sort_by == "position": - tracks.sort(key=lambda x: x.get("position", 0), reverse=(sort_order.upper() == "DESC")) - elif sort_by == "timestamp_completed": - tracks.sort(key=lambda x: x.get("timestamp_completed", 0) or 0, reverse=(sort_order.upper() == "DESC")) - - # Transform tracks for easier consumption - transformed_tracks = [] - for track in tracks: - track_info = { - "track_id": track["track_id"], - "parent_task_id": track["parent_task_id"], - "position": track["position"], - "status_current": track["status_current"], - "status_final": track["status_final"], - "timestamp_created": track["timestamp_created"], - "timestamp_completed": track["timestamp_completed"], - "error_info": track.get("error_info"), - "config": track.get("config"), - } - - # Parse track data - if track["track_data"]: - track_data = track["track_data"] - track_info.update({ - "title": track_data.get("title"), - "artists": track_data.get("artists", []), - "album": track_data.get("album", {}), - "duration_ms": track_data.get("duration_ms"), - "track_number": track_data.get("track_number"), - "disc_number": track_data.get("disc_number"), - "explicit": track_data.get("explicit"), - "ids": track_data.get("ids", {}) - }) - - # Extract artist names for easier display - if track_info["artists"]: - track_info["artist_names"] = [artist.get("name", "") for artist in track_info["artists"]] - - # Include mini-history if requested - if include_mini_histories: - mini_history = get_track_mini_history(track["track_id"], children_table) - if mini_history: - track_info["mini_history"] = mini_history - # Add quick access to timeline and key metrics - track_info["timeline"] = mini_history.get("timeline", []) - track_info["retry_count"] = mini_history.get("retry_count", 0) - track_info["time_elapsed"] = mini_history.get("time_elapsed") - track_info["quality_achieved"] = mini_history.get("quality_achieved") - track_info["file_size"] = mini_history.get("file_size") - track_info["download_path"] = mini_history.get("download_path") - - transformed_tracks.append(track_info) - - return jsonify({ - "parent_task_id": parent_task_id, - "parent_task_info": { - "title": parent_task["title"], - "task_type": parent_task["task_type"], - "status_final": parent_task["status_final"] - }, - "tracks": transformed_tracks, - "total_count": len(transformed_tracks), - "include_mini_histories": include_mini_histories - }) - - except Exception as e: - logger.error(f"Error in /api/history/tracks/{parent_task_id} endpoint: {e}", exc_info=True) - return jsonify({"error": f"Failed to retrieve tracks for parent task {parent_task_id}"}), 500 - - -@history_bp.route("/status/", methods=["GET"]) -def get_task_status_history(task_id): - """API endpoint to retrieve the complete status history for a task.""" - try: - status_history = get_status_history(task_id) - - if not status_history: - return jsonify({ - "task_id": task_id, - "status_history": [], - "message": "No status history found for this task" - }) - - return jsonify({ + response_data = { "task_id": task_id, - "status_history": status_history, - "total_updates": len(status_history) - }) - - except Exception as e: - logger.error(f"Error in /api/history/status/{task_id} endpoint: {e}", exc_info=True) - return jsonify({"error": f"Failed to retrieve status history for task {task_id}"}), 500 - - -@history_bp.route("/summary", methods=["GET"]) -def get_history_summary(): - """API endpoint to retrieve summary statistics about download history.""" - try: - # Get overall statistics - all_tasks, total_tasks = get_task_history(limit=10000, offset=0) # Get a large number to count - - # Calculate statistics - stats = { - "total_tasks": total_tasks, - "by_type": {}, - "by_status": {}, - "recent_activity": { - "last_24h": 0, - "last_7d": 0, - "last_30d": 0 - } + "download_type": download.get("download_type"), + "title": download.get("title"), + "children_table": children_table, + "tracks": children, + "track_count": len(children) } - import time - current_time = time.time() - day_seconds = 24 * 60 * 60 - - for task in all_tasks: - # Count by type - task_type = task.get("task_type", "unknown") - stats["by_type"][task_type] = stats["by_type"].get(task_type, 0) + 1 - - # Count by status - status = task.get("status_final", "unknown") - stats["by_status"][status] = stats["by_status"].get(status, 0) + 1 - - # Count recent activity - if task.get("timestamp_created"): - time_diff = current_time - task["timestamp_created"] - if time_diff <= day_seconds: - stats["recent_activity"]["last_24h"] += 1 - if time_diff <= 7 * day_seconds: - stats["recent_activity"]["last_7d"] += 1 - if time_diff <= 30 * day_seconds: - stats["recent_activity"]["last_30d"] += 1 - - return jsonify(stats) + return Response( + json.dumps(response_data), + status=200, + mimetype="application/json" + ) except Exception as e: - logger.error(f"Error in /api/history/summary endpoint: {e}", exc_info=True) - return jsonify({"error": "Failed to retrieve history summary"}), 500 - - -@history_bp.route("/track///mini-history", methods=["GET"]) -def get_track_mini_history_api(parent_task_id, track_id): - """API endpoint to retrieve comprehensive mini-history for a specific track.""" - try: - # First, verify the parent task exists and get its children table - parent_tasks, _ = get_task_history( - limit=1, - offset=0, - filters={"task_id": parent_task_id} + logger.error(f"Error retrieving children for task {task_id}: {e}", exc_info=True) + return Response( + json.dumps({"error": "Failed to retrieve download children", "details": str(e)}), + status=500, + mimetype="application/json", ) + + +@history_bp.route("/stats", methods=["GET"]) +def get_download_stats(): + """ + Get download statistics and summary information. + """ + try: + stats = history_manager.get_download_stats() - if not parent_tasks: - return jsonify({"error": f"Parent task {parent_task_id} not found"}), 404 - - parent_task = parent_tasks[0] - children_table = parent_task.get("children_table") - - if not children_table: - return jsonify({"error": f"No child tracks found for parent task {parent_task_id}"}), 404 - - # Get the track mini-history - mini_history = get_track_mini_history(track_id, children_table) - - if not mini_history: - return jsonify({"error": f"Track {track_id} not found in parent task {parent_task_id}"}), 404 - - return jsonify({ - "parent_task_id": parent_task_id, - "parent_task_info": { - "title": parent_task["title"], - "task_type": parent_task["task_type"] - }, - "track_mini_history": mini_history - }) + return Response( + json.dumps(stats), + status=200, + mimetype="application/json" + ) except Exception as e: - logger.error(f"Error in /api/history/track/{parent_task_id}/{track_id}/mini-history endpoint: {e}", exc_info=True) - return jsonify({"error": f"Failed to retrieve mini-history for track {track_id}"}), 500 + logger.error(f"Error retrieving download stats: {e}", exc_info=True) + return Response( + json.dumps({"error": "Failed to retrieve download statistics", "details": str(e)}), + status=500, + mimetype="application/json", + ) -@history_bp.route("/tracks//mini-histories", methods=["GET"]) -def get_all_track_mini_histories(parent_task_id): - """API endpoint to retrieve mini-histories for all tracks in a parent task.""" +@history_bp.route("/search", methods=["GET"]) +def search_history(): + """ + Search download history by title or artist. + + Query parameters: + - q: Search query (required) + - limit: Maximum number of results (default: 50, max: 200) + """ try: - # Verify the parent task exists and get its children table - parent_tasks, _ = get_task_history( - limit=1, - offset=0, - filters={"task_id": parent_task_id} - ) + query = request.args.get("q") + if not query: + return Response( + json.dumps({"error": "Missing required parameter: q (search query)"}), + status=400, + mimetype="application/json", + ) - if not parent_tasks: - return jsonify({"error": f"Parent task {parent_task_id} not found"}), 404 + limit = min(int(request.args.get("limit", 50)), 200) # Cap at 200 - parent_task = parent_tasks[0] - children_table = parent_task.get("children_table") + # Search history + results = history_manager.search_history(query, limit) - if not children_table: - return jsonify({ - "parent_task_id": parent_task_id, - "track_mini_histories": [], - "total_count": 0, - "message": "No child tracks found for this task" - }) - - # Get all child tracks - tracks = get_child_tracks(children_table) - - # Get mini-history for each track - track_mini_histories = [] - for track in tracks: - mini_history = get_track_mini_history(track["track_id"], children_table) - if mini_history: - track_mini_histories.append(mini_history) - - # Sort by position or track number - track_mini_histories.sort(key=lambda x: ( - x.get("disc_number", 1), - x.get("track_number", 0), - x.get("position", 0) - )) - - return jsonify({ - "parent_task_id": parent_task_id, - "parent_task_info": { - "title": parent_task["title"], - "task_type": parent_task["task_type"], - "status_final": parent_task["status_final"] - }, - "track_mini_histories": track_mini_histories, - "total_count": len(track_mini_histories) - }) - - except Exception as e: - logger.error(f"Error in /api/history/tracks/{parent_task_id}/mini-histories endpoint: {e}", exc_info=True) - return jsonify({"error": f"Failed to retrieve mini-histories for parent task {parent_task_id}"}), 500 - - -@history_bp.route("/track///status", methods=["POST"]) -def update_track_status(parent_task_id, track_id): - """API endpoint to update the status of a specific track (for testing/admin purposes).""" - try: - # Verify the parent task exists and get its children table - parent_tasks, _ = get_task_history( - limit=1, - offset=0, - filters={"task_id": parent_task_id} - ) - - if not parent_tasks: - return jsonify({"error": f"Parent task {parent_task_id} not found"}), 404 - - parent_task = parent_tasks[0] - children_table = parent_task.get("children_table") - - if not children_table: - return jsonify({"error": f"No child tracks found for parent task {parent_task_id}"}), 404 - - # Parse request data - data = request.get_json() - if not data: - return jsonify({"error": "Request body must contain JSON data"}), 400 - - status_type = data.get("status_type") - if not status_type: - return jsonify({"error": "status_type is required"}), 400 - - status_data = data.get("status_data", {}) - progress_info = data.get("progress_info") - error_info = data.get("error_info") - - # Update the track status - add_track_status_update( - track_id=track_id, - table_name=children_table, - status_type=status_type, - status_data=status_data, - progress_info=progress_info, - error_info=error_info - ) - - # Get updated mini-history - updated_mini_history = get_track_mini_history(track_id, children_table) - - return jsonify({ - "message": f"Track {track_id} status updated to {status_type}", - "parent_task_id": parent_task_id, - "track_id": track_id, - "updated_mini_history": updated_mini_history - }) - - except Exception as e: - logger.error(f"Error in /api/history/track/{parent_task_id}/{track_id}/status endpoint: {e}", exc_info=True) - return jsonify({"error": f"Failed to update status for track {track_id}"}), 500 - - -@history_bp.route("/track///timeline", methods=["GET"]) -def get_track_timeline(parent_task_id, track_id): - """API endpoint to get a simplified timeline view of a track's status progression.""" - try: - # Verify the parent task exists and get its children table - parent_tasks, _ = get_task_history( - limit=1, - offset=0, - filters={"task_id": parent_task_id} - ) - - if not parent_tasks: - return jsonify({"error": f"Parent task {parent_task_id} not found"}), 404 - - parent_task = parent_tasks[0] - children_table = parent_task.get("children_table") - - if not children_table: - return jsonify({"error": f"No child tracks found for parent task {parent_task_id}"}), 404 - - # Get the track mini-history - mini_history = get_track_mini_history(track_id, children_table) - - if not mini_history: - return jsonify({"error": f"Track {track_id} not found in parent task {parent_task_id}"}), 404 - - # Extract timeline and add summary statistics - timeline = mini_history.get("timeline", []) - - # Calculate timeline statistics - timeline_stats = { - "total_status_changes": len(timeline), - "duration_seconds": mini_history.get("time_elapsed"), - "calculated_duration": mini_history.get("calculated_duration"), - "retry_count": mini_history.get("retry_count", 0), - "final_status": mini_history.get("status_final"), - "quality_achieved": mini_history.get("quality_achieved"), - "file_size": mini_history.get("file_size"), - "download_path": mini_history.get("download_path") + response_data = { + "query": query, + "results": results, + "result_count": len(results), + "limit": limit } - return jsonify({ - "parent_task_id": parent_task_id, - "track_id": track_id, - "track_info": { - "title": mini_history.get("title"), - "disc_number": mini_history.get("disc_number"), - "track_number": mini_history.get("track_number"), - "position": mini_history.get("position"), - "duration_ms": mini_history.get("duration_ms") - }, - "timeline": timeline, - "timeline_stats": timeline_stats - }) + return Response( + json.dumps(response_data), + status=200, + mimetype="application/json" + ) + except ValueError as e: + return Response( + json.dumps({"error": f"Invalid parameter value: {str(e)}"}), + status=400, + mimetype="application/json", + ) except Exception as e: - logger.error(f"Error in /api/history/track/{parent_task_id}/{track_id}/timeline endpoint: {e}", exc_info=True) - return jsonify({"error": f"Failed to retrieve timeline for track {track_id}"}), 500 - - -# Legacy endpoint for backward compatibility -@history_bp.route("/legacy", methods=["GET"]) -def get_download_history_legacy(): - """Legacy API endpoint using the old history system (for backward compatibility).""" - try: - limit = request.args.get("limit", 25, type=int) - offset = request.args.get("offset", 0, type=int) - sort_by = request.args.get("sort_by", "timestamp_completed") - sort_order = request.args.get("sort_order", "DESC") - - filters = {} - - # Status filter - status_filter = request.args.get("status_final") - if status_filter: - filters["status_final"] = status_filter - - # Download type filter - type_filter = request.args.get("download_type") - if type_filter: - filters["download_type"] = type_filter - - # Parent task filter - parent_task_filter = request.args.get("parent_task_id") - if parent_task_filter: - filters["parent_task_id"] = parent_task_filter - - entries, total_count = get_history_entries( - limit, offset, sort_by, sort_order, filters + logger.error(f"Error searching download history: {e}", exc_info=True) + return Response( + json.dumps({"error": "Failed to search download history", "details": str(e)}), + status=500, + mimetype="application/json", ) - return jsonify({ - "entries": entries, - "total_count": total_count, - "limit": limit, - "offset": offset, - "note": "This is the legacy endpoint. Consider migrating to /api/history" - }) + +@history_bp.route("/recent", methods=["GET"]) +def get_recent_downloads(): + """ + Get most recent downloads. + + Query parameters: + - limit: Maximum number of results (default: 20, max: 100) + """ + try: + limit = min(int(request.args.get("limit", 20)), 100) # Cap at 100 + + recent = history_manager.get_recent_downloads(limit) + + response_data = { + "downloads": recent, + "count": len(recent), + "limit": limit + } + + return Response( + json.dumps(response_data), + status=200, + mimetype="application/json" + ) + + except ValueError as e: + return Response( + json.dumps({"error": f"Invalid parameter value: {str(e)}"}), + status=400, + mimetype="application/json", + ) + except Exception as e: + logger.error(f"Error retrieving recent downloads: {e}", exc_info=True) + return Response( + json.dumps({"error": "Failed to retrieve recent downloads", "details": str(e)}), + status=500, + mimetype="application/json", + ) + + +@history_bp.route("/failed", methods=["GET"]) +def get_failed_downloads(): + """ + Get failed downloads. + + Query parameters: + - limit: Maximum number of results (default: 50, max: 200) + """ + try: + limit = min(int(request.args.get("limit", 50)), 200) # Cap at 200 + + failed = history_manager.get_failed_downloads(limit) + + response_data = { + "downloads": failed, + "count": len(failed), + "limit": limit + } + + return Response( + json.dumps(response_data), + status=200, + mimetype="application/json" + ) + + except ValueError as e: + return Response( + json.dumps({"error": f"Invalid parameter value: {str(e)}"}), + status=400, + mimetype="application/json", + ) + except Exception as e: + logger.error(f"Error retrieving failed downloads: {e}", exc_info=True) + return Response( + json.dumps({"error": "Failed to retrieve failed downloads", "details": str(e)}), + status=500, + mimetype="application/json", + ) + + +@history_bp.route("/cleanup", methods=["POST"]) +def cleanup_old_history(): + """ + Clean up old download history. + + JSON body: + - days_old: Number of days old to keep (default: 30) + """ + try: + data = request.get_json() or {} + days_old = data.get("days_old", 30) + + if not isinstance(days_old, int) or days_old <= 0: + return Response( + json.dumps({"error": "days_old must be a positive integer"}), + status=400, + mimetype="application/json", + ) + + deleted_count = history_manager.clear_old_history(days_old) + + response_data = { + "message": f"Successfully cleaned up old download history", + "deleted_records": deleted_count, + "days_old": days_old + } + + return Response( + json.dumps(response_data), + status=200, + mimetype="application/json" + ) except Exception as e: - logger.error(f"Error in /api/history/legacy endpoint: {e}", exc_info=True) - return jsonify({"error": "Failed to retrieve download history"}), 500 + logger.error(f"Error cleaning up old history: {e}", exc_info=True) + return Response( + json.dumps({"error": "Failed to cleanup old history", "details": str(e)}), + status=500, + mimetype="application/json", + ) \ No newline at end of file diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index 91b4abb..74abbca 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -28,8 +28,8 @@ from routes.utils.watch.db import ( add_or_update_album_for_artist, ) -# Import history manager function -from .history_manager import add_entry_to_history, add_tracks_from_summary +# Import for download history management +from routes.utils.history_manager import history_manager # Create Redis connection for storing task data that's not part of the Celery result backend import redis @@ -217,131 +217,6 @@ def get_all_tasks(): return [] - -# --- History Logging Helper --- -def _log_task_to_history(task_id, final_status_str, error_msg=None): - """Helper function to gather task data and log it to the history database.""" - try: - task_info = get_task_info(task_id) - last_status_obj = get_last_task_status(task_id) - - if not task_info: - logger.warning( - f"History: No task_info found for task_id {task_id}. Cannot log to history." - ) - return - - # Determine service_used and quality_profile - main_service_name = str( - task_info.get("main", "Unknown") - ).capitalize() # e.g. Spotify, Deezer from their respective .env values - fallback_service_name = str(task_info.get("fallback", "")).capitalize() - - service_used_str = main_service_name - if ( - task_info.get("fallback") and fallback_service_name - ): # Check if fallback was configured - # Try to infer actual service used if possible, otherwise show configured. - # This part is a placeholder for more accurate determination if deezspot gives explicit feedback. - # For now, we assume 'main' was used unless an error hints otherwise. - # A more robust solution would involve deezspot callback providing this. - service_used_str = ( - f"{main_service_name} (Fallback: {fallback_service_name})" - ) - # If error message indicates fallback, we could try to parse it. - # e.g. if error_msg and "fallback" in error_msg.lower(): service_used_str = f"{fallback_service_name} (Used Fallback)" - - # Determine quality profile (primarily from the 'quality' field) - # 'quality' usually holds the primary service's quality (e.g., spotifyQuality, deezerQuality) - quality_profile_str = str(task_info.get("quality", "N/A")) - - # Get convertTo and bitrate - convert_to_str = str( - task_info.get("convertTo", "") - ) # Empty string if None or not present - bitrate_str = str( - task_info.get("bitrate", "") - ) # Empty string if None or not present - - # Extract Spotify ID from item URL if possible - spotify_id = None - item_url = task_info.get("url", "") - if item_url: - try: - spotify_id = item_url.split("/")[-1] - # Further validation if it looks like a Spotify ID (e.g., 22 chars, alphanumeric) - if not (spotify_id and len(spotify_id) == 22 and spotify_id.isalnum()): - spotify_id = None # Reset if not a valid-looking ID - except Exception: - spotify_id = None # Ignore errors in parsing - - # Check for the new summary object in the last status - summary_obj = last_status_obj.get("summary") if last_status_obj else None - - history_entry = { - "task_id": task_id, - "download_type": task_info.get("download_type"), - "item_name": task_info.get("name"), - "item_artist": task_info.get("artist"), - "item_album": task_info.get( - "album", - task_info.get("name") - if task_info.get("download_type") == "album" - else None, - ), - "item_url": item_url, - "spotify_id": spotify_id, - "status_final": final_status_str, - "error_message": error_msg - if error_msg - else (last_status_obj.get("error") if last_status_obj else None), - "timestamp_added": task_info.get("created_at", time.time()), - "timestamp_completed": last_status_obj.get("timestamp", time.time()) - if last_status_obj - else time.time(), - "original_request_json": json.dumps(task_info.get("original_request", {})), - "last_status_obj_json": json.dumps( - last_status_obj if last_status_obj else {} - ), - "service_used": service_used_str, - "quality_profile": quality_profile_str, - "convert_to": convert_to_str - if convert_to_str - else None, # Store None if empty string - "bitrate": bitrate_str - if bitrate_str - else None, # Store None if empty string - "summary_json": json.dumps(summary_obj) if summary_obj else None, - "total_successful": summary_obj.get("total_successful") - if summary_obj - else None, - "total_skipped": summary_obj.get("total_skipped") if summary_obj else None, - "total_failed": summary_obj.get("total_failed") if summary_obj else None, - } - - # Add the main history entry for the task - add_entry_to_history(history_entry) - - # Process track-level entries from summary if this is a multi-track download - if summary_obj and task_info.get("download_type") in ["album", "playlist"]: - tracks_processed = add_tracks_from_summary( - summary_data=summary_obj, - parent_task_id=task_id, - parent_history_data=history_entry - ) - logger.info( - f"Track-level history: Processed {tracks_processed['successful']} successful, " - f"{tracks_processed['skipped']} skipped, and {tracks_processed['failed']} failed tracks for task {task_id}" - ) - - except Exception as e: - logger.error( - f"History: Error preparing or logging history for task {task_id}: {e}", - exc_info=True, - ) -# --- End History Logging Helper --- - - def cancel_task(task_id): """Cancel a task by its ID""" try: @@ -358,9 +233,6 @@ def cancel_task(task_id): # Try to revoke the Celery task if it hasn't started yet celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM") - # Log cancellation to history - _log_task_to_history(task_id, "CANCELLED", "Task cancelled by user") - # Schedule deletion of task data after 30 seconds delayed_delete_task_data.apply_async( args=[task_id, "Task cancelled by user and auto-cleaned."], countdown=30 @@ -592,8 +464,12 @@ class ProgressTrackingTask(Task): if "timestamp" not in progress_data: progress_data["timestamp"] = time.time() - status = progress_data.get("status", "unknown") + # Extract status from status_info (deezspot callback format) + status_info = progress_data.get("status_info", {}) + status = status_info.get("status", progress_data.get("status", "unknown")) task_info = get_task_info(task_id) + + logger.debug(f"Task {task_id}: Extracted status: '{status}' from callback") if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -609,12 +485,18 @@ class ProgressTrackingTask(Task): elif status in ["real_time", "track_progress"]: self._handle_real_time(task_id, progress_data) elif status == "skipped": + # Re-fetch task_info to ensure we have the latest children_table info + task_info = get_task_info(task_id) self._handle_skipped(task_id, progress_data, task_info) elif status == "retrying": self._handle_retrying(task_id, progress_data, task_info) elif status == "error": + # Re-fetch task_info to ensure we have the latest children_table info + task_info = get_task_info(task_id) self._handle_error(task_id, progress_data, task_info) elif status == "done": + # Re-fetch task_info to ensure we have the latest children_table info + task_info = get_task_info(task_id) self._handle_done(task_id, progress_data, task_info) else: logger.info( @@ -627,9 +509,46 @@ class ProgressTrackingTask(Task): def _handle_initializing(self, task_id, data, task_info): """Handle initializing status from deezspot""" logger.info(f"Task {task_id} initializing...") + # Initializing object is now very basic, mainly for acknowledging the start. # More detailed info comes with 'progress' or 'downloading' states. data["status"] = ProgressState.INITIALIZING + + # Store initial history entry for download start + try: + # Check for album/playlist FIRST since their callbacks contain both parent and track info + if "album" in data: + # Album download - create children table and store name in task info + logger.info(f"Task {task_id}: Creating album children table") + children_table = history_manager.store_album_history(data, task_id, "in_progress") + if children_table: + task_info["children_table"] = children_table + store_task_info(task_id, task_info) + logger.info(f"Task {task_id}: Created and stored children table '{children_table}' in task info") + else: + logger.error(f"Task {task_id}: Failed to create album children table") + elif "playlist" in data: + # Playlist download - create children table and store name in task info + logger.info(f"Task {task_id}: Creating playlist children table") + children_table = history_manager.store_playlist_history(data, task_id, "in_progress") + if children_table: + task_info["children_table"] = children_table + store_task_info(task_id, task_info) + logger.info(f"Task {task_id}: Created and stored children table '{children_table}' in task info") + else: + logger.error(f"Task {task_id}: Failed to create playlist children table") + elif "track" in data: + # Individual track download - check if it's part of an album/playlist + children_table = task_info.get("children_table") + if children_table: + # Track is part of album/playlist - don't store in main table during initialization + logger.info(f"Task {task_id}: Skipping track initialization storage (part of album/playlist, children table: {children_table})") + else: + # Individual track download - store in main table + logger.info(f"Task {task_id}: Storing individual track history (initializing)") + history_manager.store_track_history(data, task_id, "in_progress") + except Exception as e: + logger.error(f"Failed to store initial history for task {task_id}: {e}", exc_info=True) def _handle_downloading(self, task_id, data, task_info): """Handle downloading status from deezspot""" @@ -725,7 +644,7 @@ class ProgressTrackingTask(Task): ) # Log at debug level - logger.debug(f"Task {task_id} track progress: {title} by {artist}: {percent}%") + logger.debug(f"Task {task_id} track progress: {track_name} by {artist}: {percent}%") # Set appropriate status # data["status"] = ( @@ -736,7 +655,25 @@ class ProgressTrackingTask(Task): def _handle_skipped(self, task_id, data, task_info): """Handle skipped status from deezspot""" - # Extract track info + + # Store skipped history for deezspot callback format + try: + if "track" in data: + # Individual track skipped - check if we should use children table + children_table = task_info.get("children_table") + logger.debug(f"Task {task_id}: Skipped track, children_table = '{children_table}'") + if children_table: + # Part of album/playlist - store progressively in children table + logger.info(f"Task {task_id}: Storing skipped track in children table '{children_table}' (progressive)") + history_manager.store_track_history(data, task_id, "skipped", children_table) + else: + # Individual track download - store in main table + logger.info(f"Task {task_id}: Storing skipped track in main table (individual download)") + history_manager.store_track_history(data, task_id, "skipped") + except Exception as e: + logger.error(f"Failed to store skipped history for task {task_id}: {e}") + + # Extract track info (legacy format support) title = data.get("song", "Unknown") artist = data.get("artist", "Unknown") reason = data.get("reason", "Unknown reason") @@ -809,7 +746,34 @@ class ProgressTrackingTask(Task): def _handle_error(self, task_id, data, task_info): """Handle error status from deezspot""" - # Extract error info + + # Store error history for deezspot callback format + try: + # Check for album/playlist FIRST since their callbacks contain both parent and track info + if "album" in data: + # Album failed - store in main table + logger.info(f"Task {task_id}: Storing album history (failed)") + history_manager.store_album_history(data, task_id, "failed") + elif "playlist" in data: + # Playlist failed - store in main table + logger.info(f"Task {task_id}: Storing playlist history (failed)") + history_manager.store_playlist_history(data, task_id, "failed") + elif "track" in data: + # Individual track failed - check if we should use children table + children_table = task_info.get("children_table") + logger.debug(f"Task {task_id}: Failed track, children_table = '{children_table}'") + if children_table: + # Part of album/playlist - store progressively in children table + logger.info(f"Task {task_id}: Storing failed track in children table '{children_table}' (progressive)") + history_manager.store_track_history(data, task_id, "failed", children_table) + else: + # Individual track download - store in main table + logger.info(f"Task {task_id}: Storing failed track in main table (individual download)") + history_manager.store_track_history(data, task_id, "failed") + except Exception as e: + logger.error(f"Failed to store error history for task {task_id}: {e}") + + # Extract error info (legacy format support) message = data.get("message", "Unknown error") # Log error @@ -826,7 +790,34 @@ class ProgressTrackingTask(Task): def _handle_done(self, task_id, data, task_info): """Handle done status from deezspot""" - # Extract data + + # Store completion history for deezspot callback format + try: + # Check for album/playlist FIRST since their callbacks contain both parent and track info + if "album" in data: + # Album completion with summary - store in main table + logger.info(f"Task {task_id}: Storing album history (completed)") + history_manager.store_album_history(data, task_id, "completed") + elif "playlist" in data: + # Playlist completion with summary - store in main table + logger.info(f"Task {task_id}: Storing playlist history (completed)") + history_manager.store_playlist_history(data, task_id, "completed") + elif "track" in data: + # Individual track completion - check if we should use children table + children_table = task_info.get("children_table") + logger.debug(f"Task {task_id}: Completed track, children_table = '{children_table}'") + if children_table: + # Part of album/playlist - store progressively in children table + logger.info(f"Task {task_id}: Storing completed track in children table '{children_table}' (progressive)") + history_manager.store_track_history(data, task_id, "completed", children_table) + else: + # Individual track download - store in main table + logger.info(f"Task {task_id}: Storing completed track in main table (individual download)") + history_manager.store_track_history(data, task_id, "completed") + except Exception as e: + logger.error(f"Failed to store completion history for task {task_id}: {e}", exc_info=True) + + # Extract data (legacy format support) content_type = data.get("type", "").lower() album = data.get("album", "") artist = data.get("artist", "") @@ -1025,9 +1016,6 @@ def task_postrun_handler( ): """Signal handler when a task finishes""" try: - # Define download task names - download_task_names = ["download_track", "download_album", "download_playlist"] - last_status_for_history = get_last_task_status(task_id) if last_status_for_history and last_status_for_history.get("status") in [ ProgressState.COMPLETE, @@ -1041,14 +1029,8 @@ def task_postrun_handler( and last_status_for_history.get("status") != ProgressState.CANCELLED ): logger.info( - f"Task {task_id} was REVOKED (likely cancelled), logging to history." + f"Task {task_id} was REVOKED (likely cancelled)." ) - if ( - task and task.name in download_task_names - ): # Check if it's a download task - _log_task_to_history( - task_id, "CANCELLED", "Task was revoked/cancelled." - ) # return # Let status update proceed if necessary task_info = get_task_info(task_id) @@ -1065,10 +1047,6 @@ def task_postrun_handler( logger.info( f"Task {task_id} completed successfully: {task_info.get('name', 'Unknown')}" ) - if ( - task and task.name in download_task_names - ): # Check if it's a download task - _log_task_to_history(task_id, "COMPLETED") if ( task_info.get("download_type") == "track" @@ -1189,10 +1167,6 @@ def task_failure_handler( ) logger.error(f"Task {task_id} failed: {str(exception)}") - if ( - sender and sender.name in download_task_names - ): # Check if it's a download task - _log_task_to_history(task_id, "ERROR", str(exception)) if can_retry: logger.info(f"Task {task_id} can be retried ({retry_count}/{max_retries})") @@ -1552,12 +1526,6 @@ def delete_task_data_and_log(task_id, reason="Task data deleted"): "timestamp": time.time(), }, ) - # History logging for COMPLETION, CANCELLATION, or definitive ERROR should have occurred when those states were first reached. - # If this cleanup is for a task that *wasn't* in such a state (e.g. stale, still processing), log it now. - if final_redis_status == ProgressState.ERROR_AUTO_CLEANED: - _log_task_to_history( - task_id, "ERROR", error_message_for_status - ) # Or a more specific status if desired # Delete Redis keys associated with the task redis_client.delete(f"task:{task_id}:info") diff --git a/routes/utils/history_manager.py b/routes/utils/history_manager.py index a4d0895..82208f6 100644 --- a/routes/utils/history_manager.py +++ b/routes/utils/history_manager.py @@ -1,1414 +1,938 @@ import sqlite3 import json +import uuid import time import logging -import uuid from pathlib import Path from typing import Dict, List, Optional, Any, Union from datetime import datetime +from contextlib import contextmanager logger = logging.getLogger(__name__) -HISTORY_DIR = Path("./data/history") -HISTORY_DB_FILE = HISTORY_DIR / "download_history.db" - -# Main tasks table schema -MAIN_TASKS_SCHEMA = { - "task_id": "TEXT PRIMARY KEY", - "task_type": "TEXT NOT NULL", # 'track', 'album', 'playlist' - "title": "TEXT", - "artists": "TEXT", # JSON array of artist objects - "ids": "TEXT", # JSON object with spotify, deezer, isrc, upc - "status_current": "TEXT", # Current status: initializing, retrying, real-time, skipped, error, done - "status_final": "TEXT", # Final result: COMPLETED, ERROR, CANCELLED, SKIPPED - "timestamp_created": "REAL", - "timestamp_updated": "REAL", - "timestamp_completed": "REAL", - "children_table": "TEXT", # Table name for nested items (album_uuid, playlist_uuid) - "metadata": "TEXT", # JSON - Complete object data (albumObject, playlistObject, trackObject) - "config": "TEXT", # JSON - Download config (quality, convert_to, bitrate, service) - "error_info": "TEXT", # JSON - Error details - "progress": "TEXT", # JSON - Progress info (current/total, time_elapsed, etc.) - "summary": "TEXT", # JSON - Final summary for albums/playlists - "parent_task_id": "TEXT", # Reference to parent task for individual tracks - "position": "INTEGER", # Position in parent (for playlist tracks) - "original_request": "TEXT" # JSON - Original request data -} - -# Status history table for tracking all status changes -STATUS_HISTORY_SCHEMA = { - "status_id": "INTEGER PRIMARY KEY AUTOINCREMENT", - "task_id": "TEXT NOT NULL", - "status_type": "TEXT NOT NULL", # initializing, retrying, real-time, skipped, error, done - "status_data": "TEXT", # JSON - Complete status object - "timestamp": "REAL NOT NULL" -} - -# Schema for individual track tables within albums/playlists -CHILD_TRACK_SCHEMA = { - "track_id": "TEXT PRIMARY KEY", - "parent_task_id": "TEXT NOT NULL", - "position": "INTEGER", - "disc_number": "INTEGER", - "track_number": "INTEGER", - "title": "TEXT", - "duration_ms": "INTEGER", - "explicit": "BOOLEAN", - "track_data": "TEXT", # JSON - Complete trackObject (trackAlbumObject/trackPlaylistObject) - "artists_data": "TEXT", # JSON - Array of artist objects - "album_data": "TEXT", # JSON - Album context data (for playlist tracks) - "ids_data": "TEXT", # JSON - IDs object (spotify, deezer, isrc, etc.) - "status_current": "TEXT", # Current status: initializing, retrying, real-time, skipped, error, done - "status_final": "TEXT", # Final result: COMPLETED, ERROR, CANCELLED, SKIPPED - "status_history": "TEXT", # JSON - Array of all status updates for this track - "timestamp_created": "REAL", - "timestamp_started": "REAL", # When download actually started - "timestamp_completed": "REAL", - "time_elapsed": "REAL", # Total processing time in seconds - "retry_count": "INTEGER", # Number of retries attempted - "error_info": "TEXT", # JSON - Error details and reason - "progress_info": "TEXT", # JSON - Progress data during download - "config": "TEXT", # JSON - Download config inherited from parent - "download_path": "TEXT", # Final download path/filename - "file_size": "INTEGER", # File size in bytes - "quality_achieved": "TEXT" # Actual quality/bitrate achieved -} - - -def init_history_db(): - """Initialize the improved history database with new schema.""" - conn = None - try: - HISTORY_DIR.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - # Create main tasks table - _create_table_from_schema(cursor, "download_tasks", MAIN_TASKS_SCHEMA) - - # Create status history table - _create_table_from_schema(cursor, "status_history", STATUS_HISTORY_SCHEMA) - - # Check if we need to migrate from old schema - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='download_history'") - old_table_exists = cursor.fetchone() is not None - - if old_table_exists: - logger.info("Old schema detected. Starting migration...") - _migrate_from_old_schema(conn) - - conn.commit() - logger.info(f"History database initialized successfully at {HISTORY_DB_FILE}") - - except sqlite3.Error as e: - logger.error(f"Error initializing history database: {e}", exc_info=True) - finally: - if conn: - conn.close() - - -def _create_table_from_schema(cursor, table_name: str, schema: Dict[str, str]): - """Create a table from a schema dictionary.""" - columns = [] +class HistoryManager: + """ + Manages download history storage using SQLite database. + Stores hierarchical download data from deezspot callback objects. + """ - for col_name, col_def in schema.items(): - columns.append(f"{col_name} {col_def}") - - create_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})" - - cursor.execute(create_sql) - logger.info(f"Created/verified table: {table_name}") - - -def _migrate_from_old_schema(conn): - """Migrate data from the old download_history table to the new schema.""" - cursor = conn.cursor() - - try: - # Get all data from old table - cursor.execute("SELECT * FROM download_history") - old_records = cursor.fetchall() - - # Get column names - cursor.execute("PRAGMA table_info(download_history)") - old_columns = [col[1] for col in cursor.fetchall()] - - logger.info(f"Migrating {len(old_records)} records from old schema...") - - # Create backup of old table - backup_table = f"download_history_backup_{int(time.time())}" - cursor.execute(f"CREATE TABLE {backup_table} AS SELECT * FROM download_history") - - migrated_count = 0 - for record in old_records: - old_data = dict(zip(old_columns, record)) - - # Convert old record to new format - new_task = _convert_old_record_to_new(old_data) - if new_task: - add_task_to_history(new_task) - migrated_count += 1 - - logger.info(f"Successfully migrated {migrated_count} records. Old table backed up as {backup_table}") - - except Exception as e: - logger.error(f"Error during migration: {e}", exc_info=True) - - -def _convert_old_record_to_new(old_data: Dict) -> Optional[Dict]: - """Convert an old history record to the new format.""" - try: - # Create basic task structure - task_data = { - "task_id": old_data.get("task_id"), - "task_type": old_data.get("download_type", "track"), - "title": old_data.get("item_name", ""), - "timestamp_created": old_data.get("timestamp_added"), - "timestamp_completed": old_data.get("timestamp_completed"), - "status_final": old_data.get("status_final"), - "parent_task_id": old_data.get("parent_task_id"), - "original_request": old_data.get("original_request_json") - } - - # Build artists array - if old_data.get("item_artist"): - task_data["artists"] = json.dumps([{"name": old_data["item_artist"]}]) - - # Build IDs object - ids = {} - if old_data.get("spotify_id"): - ids["spotify"] = old_data["spotify_id"] - if ids: - task_data["ids"] = json.dumps(ids) - - # Build config object - config = {} - if old_data.get("service_used"): - config["service_used"] = old_data["service_used"] - if old_data.get("quality_profile"): - config["quality_profile"] = old_data["quality_profile"] - if old_data.get("convert_to"): - config["convert_to"] = old_data["convert_to"] - if old_data.get("bitrate"): - config["bitrate"] = old_data["bitrate"] - if config: - task_data["config"] = json.dumps(config) - - # Handle error information - if old_data.get("error_message"): - task_data["error_info"] = json.dumps({"message": old_data["error_message"]}) - - # Build basic metadata object - metadata = { - "type": task_data["task_type"], - "title": task_data["title"], - "url": old_data.get("item_url") - } - - if old_data.get("item_album"): - metadata["album"] = {"title": old_data["item_album"]} - - task_data["metadata"] = json.dumps(metadata) - - return task_data - - except Exception as e: - logger.warning(f"Failed to convert old record {old_data.get('task_id')}: {e}") - return None - - -def create_child_table(parent_task_id: str, task_type: str) -> str: - """Create a child table for album or playlist tracks using UUID-based naming.""" - # Generate a shorter UUID for the table name to avoid database identifier length limits - import uuid as uuid_mod - table_uuid = uuid_mod.uuid4().hex[:12] # Use first 12 characters of UUID - table_name = f"{task_type}_{table_uuid}" - - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - # Create the child table - _create_table_from_schema(cursor, table_name, CHILD_TRACK_SCHEMA) - - # Create an index on parent_task_id for faster queries - cursor.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_parent ON {table_name}(parent_task_id)") - - # Create an index on position for proper ordering - cursor.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_position ON {table_name}(position)") - - conn.commit() - - logger.info(f"Created child table: {table_name} for parent task: {parent_task_id}") - return table_name - - except sqlite3.Error as e: - logger.error(f"Error creating child table {table_name}: {e}") - return "" - finally: - if conn: - conn.close() - - -def add_task_to_history(task_data: Dict): - """Add or update a main task in the history.""" - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - # Ensure required fields are present - required_fields = ["task_id", "task_type"] - for field in required_fields: - if field not in task_data: - raise ValueError(f"Missing required field: {field}") - - # Set default timestamps - current_time = time.time() - task_data.setdefault("timestamp_created", current_time) - task_data.setdefault("timestamp_updated", current_time) - - # Convert all values to appropriate types - processed_data = {} - for col_name in MAIN_TASKS_SCHEMA.keys(): - if col_name in task_data: - value = task_data[col_name] - # Convert objects to JSON strings - if isinstance(value, (dict, list)): - processed_data[col_name] = json.dumps(value) - else: - processed_data[col_name] = value - else: - processed_data[col_name] = None - - # Create INSERT OR REPLACE query - columns = list(processed_data.keys()) - placeholders = ["?" for _ in columns] - values = [processed_data[col] for col in columns] - - query = f""" - INSERT OR REPLACE INTO download_tasks ({', '.join(columns)}) - VALUES ({', '.join(placeholders)}) + def __init__(self, db_path: str = "data/history/download_history.db"): """ + Initialize the history manager with database path. - cursor.execute(query, values) - conn.commit() - - logger.info(f"Added/updated task: {task_data['task_id']} ({task_data['task_type']})") - - except Exception as e: - logger.error(f"Error adding task to history: {e}", exc_info=True) - finally: - if conn: - conn.close() - - -def add_status_update(task_id: str, status_type: str, status_data: Dict): - """Add a status update to the status history.""" - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO status_history (task_id, status_type, status_data, timestamp) - VALUES (?, ?, ?, ?) - """, (task_id, status_type, json.dumps(status_data), time.time())) - - # Also update the current status in main table - cursor.execute(""" - UPDATE download_tasks - SET status_current = ?, timestamp_updated = ? - WHERE task_id = ? - """, (status_type, time.time(), task_id)) - - conn.commit() - logger.debug(f"Added status update for {task_id}: {status_type}") - - except Exception as e: - logger.error(f"Error adding status update: {e}", exc_info=True) - finally: - if conn: - conn.close() - - -def add_child_track(parent_task_id: str, track_data: Dict): - """Add a track to a child table (album or playlist) with comprehensive data extraction.""" - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - # Find the parent task to get the children table name - cursor.execute("SELECT children_table FROM download_tasks WHERE task_id = ?", (parent_task_id,)) - result = cursor.fetchone() - - if not result or not result[0]: - logger.error(f"No children table found for parent task: {parent_task_id}") - return - - table_name = result[0] - - # Generate track ID if not provided - track_id = track_data.get("track_id", f"{parent_task_id}_track_{uuid.uuid4().hex[:8]}") - - # Extract track object data if provided - track_obj = track_data.get("track_data", {}) - if isinstance(track_obj, str): - try: - track_obj = json.loads(track_obj) - except json.JSONDecodeError: - track_obj = {} - - # Prepare comprehensive track record - track_record = { - "track_id": track_id, - "parent_task_id": parent_task_id, - "position": track_data.get("position") or track_obj.get("position", 0), - "disc_number": track_obj.get("disc_number", 1), - "track_number": track_obj.get("track_number", 0), - "title": track_obj.get("title", "Unknown Track"), - "duration_ms": track_obj.get("duration_ms", 0), - "explicit": track_obj.get("explicit", False), - "track_data": json.dumps(track_obj) if track_obj else None, - "artists_data": json.dumps(track_obj.get("artists", [])), - "album_data": json.dumps(track_obj.get("album", {})) if track_obj.get("album") else None, - "ids_data": json.dumps(track_obj.get("ids", {})), - "status_current": track_data.get("status_current", "initializing"), - "status_final": track_data.get("status_final"), - "status_history": json.dumps(track_data.get("status_history", [])), - "timestamp_created": track_data.get("timestamp_created", time.time()), - "timestamp_started": track_data.get("timestamp_started"), - "timestamp_completed": track_data.get("timestamp_completed"), - "time_elapsed": track_data.get("time_elapsed"), - "retry_count": track_data.get("retry_count", 0), - "error_info": json.dumps(track_data.get("error_info", {})) if track_data.get("error_info") else None, - "progress_info": json.dumps(track_data.get("progress_info", {})) if track_data.get("progress_info") else None, - "config": json.dumps(track_data.get("config", {})) if track_data.get("config") else None, - "download_path": track_data.get("download_path"), - "file_size": track_data.get("file_size"), - "quality_achieved": track_data.get("quality_achieved") - } - - # Filter out None values to avoid issues - track_record = {k: v for k, v in track_record.items() if v is not None} - - # Insert into child table - columns = list(track_record.keys()) - placeholders = ["?" for _ in columns] - values = [track_record[col] for col in columns] - - query = f""" - INSERT OR REPLACE INTO {table_name} ({', '.join(columns)}) - VALUES ({', '.join(placeholders)}) + Args: + db_path: Path to SQLite database file """ - - cursor.execute(query, values) - conn.commit() - - logger.info(f"Added track to {table_name}: {track_id} - {track_record.get('title', 'Unknown')}") - - return track_id - - except Exception as e: - logger.error(f"Error adding child track: {e}", exc_info=True) - return None - finally: - if conn: - conn.close() - - -def get_task_history( - limit: int = 25, - offset: int = 0, - sort_by: str = "timestamp_updated", - sort_order: str = "DESC", - filters: Optional[Dict] = None, - include_children: bool = False -) -> tuple[List[Dict], int]: - """Get task history with enhanced filtering and optional child data.""" - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - # Build query - base_query = "FROM download_tasks" - where_clauses = [] - params = [] - - if filters: - for column, value in filters.items(): - if column in MAIN_TASKS_SCHEMA: - if value is None: - where_clauses.append(f"{column} IS NULL") - else: - where_clauses.append(f"{column} = ?") - params.append(value) - - if where_clauses: - base_query += " WHERE " + " AND ".join(where_clauses) - - # Get total count - cursor.execute(f"SELECT COUNT(*) {base_query}", params) - total_count = cursor.fetchone()[0] - - # Validate sort parameters - if sort_by not in MAIN_TASKS_SCHEMA: - sort_by = "timestamp_updated" - if sort_order.upper() not in ["ASC", "DESC"]: - sort_order = "DESC" - - # Get paginated results - query = f"SELECT * {base_query} ORDER BY {sort_by} {sort_order} LIMIT ? OFFSET ?" - cursor.execute(query, params + [limit, offset]) - - tasks = [] - for row in cursor.fetchall(): - task = dict(row) - - # Parse JSON fields - json_fields = ["artists", "ids", "metadata", "config", "error_info", "progress", "summary"] - for field in json_fields: - if task[field]: - try: - task[field] = json.loads(task[field]) - except json.JSONDecodeError: - pass - - # Include child tracks if requested - if include_children and task["children_table"]: - task["child_tracks"] = get_child_tracks(task["children_table"]) - - tasks.append(task) - - return tasks, total_count - - except Exception as e: - logger.error(f"Error getting task history: {e}", exc_info=True) - return [], 0 - finally: - if conn: - conn.close() - - -def add_track_status_update(track_id: str, table_name: str, status_type: str, status_data: Dict, - progress_info: Dict = None, error_info: Dict = None): - """Add a status update to a track's mini-history.""" - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - # Get current status history - cursor.execute(f"SELECT status_history, retry_count FROM {table_name} WHERE track_id = ?", (track_id,)) - result = cursor.fetchone() - - if not result: - logger.warning(f"Track {track_id} not found in table {table_name}") - return - - current_history = [] - retry_count = result[1] or 0 - - if result[0]: - try: - current_history = json.loads(result[0]) - except json.JSONDecodeError: - current_history = [] - - # Add new status update - status_update = { - "timestamp": time.time(), - "status_type": status_type, - "status_data": status_data - } - - if progress_info: - status_update["progress_info"] = progress_info - if error_info: - status_update["error_info"] = error_info - - current_history.append(status_update) - - # Update fields based on status - update_fields = { - "status_current": status_type, - "status_history": json.dumps(current_history), - "timestamp_updated": time.time() - } - - # Handle specific status transitions - if status_type == "real-time": - if not result or not cursor.execute(f"SELECT timestamp_started FROM {table_name} WHERE track_id = ?", (track_id,)).fetchone()[0]: - update_fields["timestamp_started"] = time.time() - if progress_info: - update_fields["progress_info"] = json.dumps(progress_info) - - elif status_type == "retrying": - update_fields["retry_count"] = retry_count + 1 - if error_info: - update_fields["error_info"] = json.dumps(error_info) - - elif status_type in ["done", "error", "skipped"]: - update_fields["timestamp_completed"] = time.time() - update_fields["status_final"] = { - "done": "COMPLETED", - "error": "ERROR", - "skipped": "SKIPPED" - }[status_type] - - if error_info: - update_fields["error_info"] = json.dumps(error_info) - - # Calculate time elapsed if we have start time - cursor.execute(f"SELECT timestamp_started FROM {table_name} WHERE track_id = ?", (track_id,)) - start_result = cursor.fetchone() - if start_result and start_result[0]: - update_fields["time_elapsed"] = time.time() - start_result[0] - - # Update the track record - set_clauses = [f"{key} = ?" for key in update_fields.keys()] - values = list(update_fields.values()) + [track_id] - - query = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE track_id = ?" - cursor.execute(query, values) - conn.commit() - - logger.debug(f"Updated track {track_id} status to {status_type}") - - except Exception as e: - logger.error(f"Error updating track status: {e}", exc_info=True) - finally: - if conn: - conn.close() - - -def get_child_tracks(table_name: str) -> List[Dict]: - """Get all tracks from a child table with parsed JSON fields.""" - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - cursor.execute(f"SELECT * FROM {table_name} ORDER BY disc_number, track_number, position") - tracks = [] - - for row in cursor.fetchall(): - track = dict(row) - - # Parse JSON fields - json_fields = ["track_data", "artists_data", "album_data", "ids_data", - "status_history", "error_info", "progress_info", "config"] - - for field in json_fields: - if track.get(field): - try: - track[field] = json.loads(track[field]) - except json.JSONDecodeError: - pass - - tracks.append(track) - - return tracks - - except Exception as e: - logger.error(f"Error getting child tracks from {table_name}: {e}") - return [] - finally: - if conn: - conn.close() - - -def get_status_history(task_id: str) -> List[Dict]: - """Get complete status history for a task.""" - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - cursor.execute(""" - SELECT * FROM status_history - WHERE task_id = ? - ORDER BY timestamp ASC - """, (task_id,)) - - history = [] - for row in cursor.fetchall(): - entry = dict(row) - if entry["status_data"]: - try: - entry["status_data"] = json.loads(entry["status_data"]) - except json.JSONDecodeError: - pass - history.append(entry) - - return history - - except Exception as e: - logger.error(f"Error getting status history for {task_id}: {e}") - return [] - finally: - if conn: - conn.close() - - -def process_callback_object(callback_obj: Dict, task_id: str = None): - """Process a callback object and update history accordingly.""" - try: - if not task_id: - task_id = str(uuid.uuid4()) - - # Determine callback type and extract data - if "track" in callback_obj: - _process_track_callback(callback_obj, task_id) - elif "album" in callback_obj: - _process_album_callback(callback_obj, task_id) - elif "playlist" in callback_obj: - _process_playlist_callback(callback_obj, task_id) - else: - logger.warning(f"Unknown callback object type for task {task_id}") + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._ensure_database_exists() - except Exception as e: - logger.error(f"Error processing callback object: {e}", exc_info=True) - - -def _process_track_callback(callback_obj: Dict, task_id: str): - """Process a trackCallbackObject with comprehensive status tracking.""" - track_data = callback_obj.get("track", {}) - status_info = callback_obj.get("status_info", {}) - parent_info = callback_obj.get("parent") + def _ensure_database_exists(self): + """Create database and main table if they don't exist.""" + with self._get_connection() as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS download_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + download_type TEXT NOT NULL, -- 'track', 'album', 'playlist' + title TEXT NOT NULL, + artists TEXT, -- JSON array of artist names + timestamp REAL NOT NULL, + status TEXT NOT NULL, -- 'completed', 'failed', 'skipped', 'in_progress' + service TEXT, -- 'spotify', 'deezer' + quality_format TEXT, -- 'mp3', 'flac', etc. + quality_bitrate TEXT, -- '320', '1411', etc. + total_tracks INTEGER, -- For albums/playlists + successful_tracks INTEGER, -- For albums/playlists + failed_tracks INTEGER, -- For albums/playlists + skipped_tracks INTEGER, -- For albums/playlists + children_table TEXT, -- Table name for nested tracks + task_id TEXT, + external_ids TEXT, -- JSON object with service IDs + metadata TEXT, -- JSON object with additional data + release_date TEXT, -- JSON object with date info + genres TEXT, -- JSON array of genres + images TEXT, -- JSON array of image objects + owner TEXT, -- For playlists - JSON object + album_type TEXT, -- 'album', 'ep', 'single', etc. + duration_total_ms INTEGER, -- Total duration for albums/playlists + explicit BOOLEAN, -- For individual tracks + UNIQUE(task_id, download_type, external_ids) + ) + """) + + # Create index for faster queries + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_download_history_timestamp + ON download_history(timestamp DESC) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_download_history_task_id + ON download_history(task_id) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_download_history_type_status + ON download_history(download_type, status) + """) - # Check if this is a child track (part of album/playlist) - if parent_info and parent_info.get("task_id"): - parent_task_id = parent_info["task_id"] - - # Find parent task's children table + @contextmanager + def _get_connection(self): + """Get database connection with proper error handling.""" conn = None try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - cursor.execute("SELECT children_table FROM download_tasks WHERE task_id = ?", (parent_task_id,)) - result = cursor.fetchone() - - if result and result[0]: - table_name = result[0] - - # Extract progress and error info - progress_info = None - error_info = None - - if status_info.get("status") == "real-time": - progress_info = { - "time_elapsed": status_info.get("time_elapsed", 0), - "progress": status_info.get("progress", 0) - } - elif status_info.get("status") == "retrying": - error_info = { - "retry_count": status_info.get("retry_count", 0), - "seconds_left": status_info.get("seconds_left", 0), - "error": status_info.get("error", "") - } - elif status_info.get("status") == "error": - error_info = { - "message": status_info.get("error", "Unknown error") - } - elif status_info.get("status") == "skipped": - error_info = { - "reason": status_info.get("reason", "Unknown reason") - } - - # Update track status in child table - add_track_status_update( - track_id=task_id, - table_name=table_name, - status_type=status_info.get("status", "initializing"), - status_data=status_info, - progress_info=progress_info, - error_info=error_info - ) - + conn = sqlite3.connect(str(self.db_path)) + conn.row_factory = sqlite3.Row # Enable dict-like row access + yield conn + conn.commit() except Exception as e: - logger.error(f"Error processing child track callback: {e}", exc_info=True) + if conn: + conn.rollback() + logger.error(f"Database error: {e}") + raise finally: if conn: conn.close() - else: - # Handle standalone track - task_entry = { - "task_id": task_id, - "task_type": "track", - "title": track_data.get("title", ""), - "artists": [{"name": artist.get("name", "")} for artist in track_data.get("artists", [])], - "ids": track_data.get("ids", {}), - "metadata": track_data, - "status_current": status_info.get("status", "initializing"), - "position": callback_obj.get("current_track") - } + + def _create_children_table(self, table_name: str): + """ + Create a children table for storing individual tracks of an album/playlist. - # Set final status based on status_info - if status_info.get("status") == "done": - task_entry["status_final"] = "COMPLETED" - task_entry["timestamp_completed"] = time.time() - elif status_info.get("status") == "error": - task_entry["status_final"] = "ERROR" - task_entry["error_info"] = {"message": status_info.get("error", "")} - elif status_info.get("status") == "skipped": - task_entry["status_final"] = "SKIPPED" + Args: + table_name: Name of the children table (e.g., 'album_abc123') + """ + with self._get_connection() as conn: + conn.execute(f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title TEXT NOT NULL, + artists TEXT, -- JSON array of artist names + album_title TEXT, + duration_ms INTEGER, + track_number INTEGER, + disc_number INTEGER, + explicit BOOLEAN, + status TEXT NOT NULL, -- 'completed', 'failed', 'skipped' + external_ids TEXT, -- JSON object with service IDs + genres TEXT, -- JSON array of genres + isrc TEXT, + timestamp REAL NOT NULL, + position INTEGER, -- For playlist tracks + metadata TEXT -- JSON object with additional track data + ) + """) + + def _extract_artists(self, obj: Dict) -> List[str]: + """Extract artist names from various object types.""" + artists = obj.get("artists", []) + if not artists: + return [] - add_task_to_history(task_entry) - add_status_update(task_id, status_info.get("status", "initializing"), status_info) - - -def _process_album_callback(callback_obj: Dict, task_id: str): - """Process an albumCallbackObject with comprehensive track management.""" - album_data = callback_obj.get("album", {}) - status_info = callback_obj.get("status_info", {}) - - # Create children table for tracks - children_table = create_child_table(task_id, "album") - - # Create main task entry - task_entry = { - "task_id": task_id, - "task_type": "album", - "title": album_data.get("title", ""), - "artists": [{"name": artist.get("name", "")} for artist in album_data.get("artists", [])], - "ids": album_data.get("ids", {}), - "metadata": album_data, - "children_table": children_table, - "status_current": status_info.get("status", "initializing") - } - - # Initialize tracks in child table when album processing starts - if status_info.get("status") == "initializing" and album_data.get("tracks"): - for i, track in enumerate(album_data["tracks"]): - track_data = { - "track_data": track, - "position": i + 1, - "status_current": "initializing", - "timestamp_created": time.time() - } - add_child_track(task_id, track_data) - - # Handle completion with summary - if status_info.get("status") == "done" and status_info.get("summary"): - task_entry["status_final"] = "COMPLETED" - task_entry["timestamp_completed"] = time.time() - task_entry["summary"] = status_info["summary"] + artist_names = [] + for artist in artists: + if isinstance(artist, dict): + name = artist.get("name", "") + if name: + artist_names.append(name) + elif isinstance(artist, str): + artist_names.append(artist) - # Update individual tracks in child table based on summary - summary = status_info["summary"] - - # Process successful tracks - for track in summary.get("successful_tracks", []): - if isinstance(track, dict): - # Find matching track in child table and update status - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - # Try to match by title and artist - track_title = track.get("title", "") - cursor.execute( - f"SELECT track_id FROM {children_table} WHERE title = ? AND parent_task_id = ?", - (track_title, task_id) - ) - result = cursor.fetchone() - - if result: - add_track_status_update( - track_id=result[0], - table_name=children_table, - status_type="done", - status_data={"status": "done"}, - progress_info={"progress": 100} - ) - except Exception as e: - logger.error(f"Error updating successful track: {e}") - finally: - if conn: - conn.close() - - # Process skipped tracks - for track in summary.get("skipped_tracks", []): - if isinstance(track, dict): - # Similar matching and update logic - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - track_title = track.get("title", "") - cursor.execute( - f"SELECT track_id FROM {children_table} WHERE title = ? AND parent_task_id = ?", - (track_title, task_id) - ) - result = cursor.fetchone() - - if result: - add_track_status_update( - track_id=result[0], - table_name=children_table, - status_type="skipped", - status_data={"status": "skipped"}, - error_info={"reason": "Skipped during processing"} - ) - except Exception as e: - logger.error(f"Error updating skipped track: {e}") - finally: - if conn: - conn.close() - - # Process failed tracks - for failed_track in summary.get("failed_tracks", []): - track = failed_track.get("track", {}) if isinstance(failed_track, dict) else failed_track - reason = failed_track.get("reason", "Unknown error") if isinstance(failed_track, dict) else "Download failed" - - if isinstance(track, dict): - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - track_title = track.get("title", "") - cursor.execute( - f"SELECT track_id FROM {children_table} WHERE title = ? AND parent_task_id = ?", - (track_title, task_id) - ) - result = cursor.fetchone() - - if result: - add_track_status_update( - track_id=result[0], - table_name=children_table, - status_type="error", - status_data={"status": "error"}, - error_info={"message": reason} - ) - except Exception as e: - logger.error(f"Error updating failed track: {e}") - finally: - if conn: - conn.close() + return artist_names - add_task_to_history(task_entry) - add_status_update(task_id, status_info.get("status", "initializing"), status_info) - - -def _process_playlist_callback(callback_obj: Dict, task_id: str): - """Process a playlistCallbackObject with comprehensive track management.""" - playlist_data = callback_obj.get("playlist", {}) - status_info = callback_obj.get("status_info", {}) + def _extract_external_ids(self, obj: Dict) -> Dict: + """Extract external service IDs from object.""" + return obj.get("ids", {}) - # Create children table for tracks - children_table = create_child_table(task_id, "playlist") + def _extract_images(self, obj: Dict) -> List[Dict]: + """Extract image information from object.""" + return obj.get("images", []) - # Create main task entry - task_entry = { - "task_id": task_id, - "task_type": "playlist", - "title": playlist_data.get("title", ""), - "metadata": playlist_data, - "children_table": children_table, - "status_current": status_info.get("status", "initializing") - } + def _extract_release_date(self, obj: Dict) -> Dict: + """Extract release date information from object.""" + return obj.get("release_date", {}) - # Add playlist owner info to metadata if available - if playlist_data.get("owner"): - task_entry["metadata"]["owner_info"] = playlist_data["owner"] + def _calculate_total_duration(self, tracks: List[Dict]) -> int: + """Calculate total duration from tracks list.""" + total = 0 + for track in tracks: + duration = track.get("duration_ms", 0) + if duration: + total += duration + return total - # Initialize tracks in child table when playlist processing starts - if status_info.get("status") == "initializing" and playlist_data.get("tracks"): - for track in playlist_data["tracks"]: - track_data = { - "track_data": track, - "position": track.get("position", 0), - "status_current": "initializing", - "timestamp_created": time.time() - } - add_child_track(task_id, track_data) - - # Handle completion with summary - if status_info.get("status") == "done" and status_info.get("summary"): - task_entry["status_final"] = "COMPLETED" - task_entry["timestamp_completed"] = time.time() - task_entry["summary"] = status_info["summary"] - - # Update individual tracks in child table based on summary - summary = status_info["summary"] - - # Process successful tracks - for track in summary.get("successful_tracks", []): - if isinstance(track, dict): - # Find matching track in child table and update status - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - # Try to match by title and position - track_title = track.get("title", "") - track_position = track.get("position", 0) - cursor.execute( - f"SELECT track_id FROM {children_table} WHERE title = ? AND position = ? AND parent_task_id = ?", - (track_title, track_position, task_id) - ) - result = cursor.fetchone() - - if result: - add_track_status_update( - track_id=result[0], - table_name=children_table, - status_type="done", - status_data={"status": "done"}, - progress_info={"progress": 100} - ) - except Exception as e: - logger.error(f"Error updating successful playlist track: {e}") - finally: - if conn: - conn.close() - - # Process skipped tracks - for track in summary.get("skipped_tracks", []): - if isinstance(track, dict): - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - track_title = track.get("title", "") - track_position = track.get("position", 0) - cursor.execute( - f"SELECT track_id FROM {children_table} WHERE title = ? AND position = ? AND parent_task_id = ?", - (track_title, track_position, task_id) - ) - result = cursor.fetchone() - - if result: - add_track_status_update( - track_id=result[0], - table_name=children_table, - status_type="skipped", - status_data={"status": "skipped"}, - error_info={"reason": "Skipped during processing"} - ) - except Exception as e: - logger.error(f"Error updating skipped playlist track: {e}") - finally: - if conn: - conn.close() - - # Process failed tracks - for failed_track in summary.get("failed_tracks", []): - track = failed_track.get("track", {}) if isinstance(failed_track, dict) else failed_track - reason = failed_track.get("reason", "Unknown error") if isinstance(failed_track, dict) else "Download failed" - - if isinstance(track, dict): - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - track_title = track.get("title", "") - track_position = track.get("position", 0) - cursor.execute( - f"SELECT track_id FROM {children_table} WHERE title = ? AND position = ? AND parent_task_id = ?", - (track_title, track_position, task_id) - ) - result = cursor.fetchone() - - if result: - add_track_status_update( - track_id=result[0], - table_name=children_table, - status_type="error", - status_data={"status": "error"}, - error_info={"message": reason} - ) - except Exception as e: - logger.error(f"Error updating failed playlist track: {e}") - finally: - if conn: - conn.close() - - add_task_to_history(task_entry) - add_status_update(task_id, status_info.get("status", "initializing"), status_info) - - -# Legacy compatibility functions -def add_entry_to_history(history_data: dict): - """Legacy compatibility function - converts old format to new.""" - logger.warning("Using legacy add_entry_to_history - consider migrating to add_task_to_history") - - converted = _convert_old_record_to_new(history_data) - if converted: - add_task_to_history(converted) - - -def add_tracks_from_summary(summary_data, parent_task_id, parent_history_data=None): - """Legacy compatibility function - processes a summary object from a completed task and adds individual track entries. - - Args: - summary_data (dict): The summary data containing track lists - parent_task_id (str): The ID of the parent task - parent_history_data (dict, optional): The history data of the parent task - - Returns: - dict: Summary of processed tracks - """ - logger.warning("Using legacy add_tracks_from_summary - consider migrating to add_child_track and process_callback_object") - - processed = { - "successful": 0, - "skipped": 0, - "failed": 0 - } - - if not summary_data: - logger.warning(f"No summary data provided for task {parent_task_id}") - return processed - - # Check if parent task has a children table, if not create one - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - cursor = conn.cursor() - - cursor.execute("SELECT children_table, task_type FROM download_tasks WHERE task_id = ?", (parent_task_id,)) - result = cursor.fetchone() - - children_table = None - if result: - children_table = result[0] - task_type = result[1] or "album" - - # Create children table if it doesn't exist - if not children_table: - children_table = create_child_table(parent_task_id, task_type) - cursor.execute("UPDATE download_tasks SET children_table = ? WHERE task_id = ?", - (children_table, parent_task_id)) - conn.commit() + def _get_primary_service(self, external_ids: Dict) -> str: + """Determine primary service from external IDs.""" + if "spotify" in external_ids: + return "spotify" + elif "deezer" in external_ids: + return "deezer" else: - # Parent task doesn't exist, create a basic one - logger.warning(f"Parent task {parent_task_id} not found, creating basic entry...") - task_data = { - "task_id": parent_task_id, - "task_type": "album", - "title": "Unknown Album", - "status_final": "COMPLETED", - "children_table": create_child_table(parent_task_id, "album") + return "unknown" + + def create_children_table_for_album(self, callback_data: Dict, task_id: str) -> str: + """ + Create children table for album download at the start and return table name. + + Args: + callback_data: Album callback object from deezspot + task_id: Celery task ID + + Returns: + Children table name + """ + # Generate children table name + album_uuid = str(uuid.uuid4()).replace("-", "")[:10] + children_table = f"album_{album_uuid}" + + # Create the children table + self._create_children_table(children_table) + + logger.info(f"Created album children table {children_table} for task {task_id}") + return children_table + + def create_children_table_for_playlist(self, callback_data: Dict, task_id: str) -> str: + """ + Create children table for playlist download at the start and return table name. + + Args: + callback_data: Playlist callback object from deezspot + task_id: Celery task ID + + Returns: + Children table name + """ + # Generate children table name + playlist_uuid = str(uuid.uuid4()).replace("-", "")[:10] + children_table = f"playlist_{playlist_uuid}" + + # Create the children table + self._create_children_table(children_table) + + logger.info(f"Created playlist children table {children_table} for task {task_id}") + return children_table + + def store_track_history(self, callback_data: Dict, task_id: str, status: str = "completed", table: str = "download_history"): + """ + Store individual track download history. + + Args: + callback_data: Track callback object from deezspot + task_id: Celery task ID + status: Download status ('completed', 'failed', 'skipped') + table: Target table name (defaults to 'download_history', can be a children table name) + """ + try: + track = callback_data.get("track", {}) + status_info = callback_data.get("status_info", {}) + + if not track: + logger.warning(f"No track data in callback for task {task_id}") + return + + artists = self._extract_artists(track) + external_ids = self._extract_external_ids(track) + + album = track.get("album", {}) + album_title = album.get("title", "") + + # Prepare metadata + metadata = { + "callback_type": "track", + "parent": callback_data.get("parent"), + "current_track": callback_data.get("current_track"), + "total_tracks": callback_data.get("total_tracks"), + "album": album, + "status_info": status_info } - add_task_to_history(task_data) - children_table = task_data["children_table"] - - except Exception as e: - logger.error(f"Error setting up children table for {parent_task_id}: {e}") - finally: - if conn: - conn.close() - - # Process successful tracks - for track_entry in summary_data.get("successful_tracks", []): - try: - # Parse "track_name - artist_name" format or handle trackObject - if isinstance(track_entry, dict): - # Handle trackObject - track_data = { - "track_data": track_entry, - "status_final": "COMPLETED", - "timestamp_completed": time.time() - } - else: - # Handle string format "track_name - artist_name" - parts = track_entry.split(" - ", 1) - if len(parts) == 2: - track_name, artist_name = parts - track_data = { - "track_data": { - "title": track_name, - "artists": [{"name": artist_name}] - }, - "status_final": "COMPLETED", - "timestamp_completed": time.time() - } - else: - logger.warning(f"Could not parse track entry: {track_entry}") - continue - add_child_track(parent_task_id, track_data) - processed["successful"] += 1 + with self._get_connection() as conn: + if table == "download_history": + # Store in main download_history table + logger.info(f"Storing track '{track.get('title', 'Unknown')}' in MAIN table for task {task_id}") + conn.execute(""" + INSERT OR REPLACE INTO download_history ( + download_type, title, artists, timestamp, status, service, + quality_format, quality_bitrate, task_id, external_ids, + metadata, release_date, genres, explicit, album_type, + duration_total_ms + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + "track", + track.get("title", "Unknown"), + json.dumps(artists), + callback_data.get("timestamp", time.time()), + status, + self._get_primary_service(external_ids), + status_info.get("convert_to"), + status_info.get("bitrate"), + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps(self._extract_release_date(album)), + json.dumps(track.get("genres", [])), + track.get("explicit", False), + album.get("album_type"), + track.get("duration_ms", 0) + )) + else: + # Store in children table (for album/playlist tracks) + logger.info(f"Storing track '{track.get('title', 'Unknown')}' in CHILDREN table '{table}' for task {task_id}") + # Extract ISRC + isrc = external_ids.get("isrc", "") + + # Prepare children table metadata + children_metadata = { + "album": album, + "type": track.get("type", ""), + "callback_type": "track", + "parent": callback_data.get("parent"), + "current_track": callback_data.get("current_track"), + "total_tracks": callback_data.get("total_tracks"), + "status_info": status_info + } + + conn.execute(f""" + INSERT INTO {table} ( + title, artists, album_title, duration_ms, track_number, + disc_number, explicit, status, external_ids, genres, + isrc, timestamp, position, metadata + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + track.get("title", "Unknown"), + json.dumps(artists), + album_title, + track.get("duration_ms", 0), + track.get("track_number", 0), + track.get("disc_number", 1), + track.get("explicit", False), + status, + json.dumps(external_ids), + json.dumps(track.get("genres", [])), + isrc, + callback_data.get("timestamp", time.time()), + track.get("position", 0), # For playlist tracks + json.dumps(children_metadata) + )) + + logger.info(f"Successfully stored track '{track.get('title')}' in table '{table}' (task: {task_id})") except Exception as e: - logger.error(f"Error processing successful track {track_entry}: {e}", exc_info=True) + logger.error(f"Failed to store track history for task {task_id}: {e}") - # Process skipped tracks - for track_entry in summary_data.get("skipped_tracks", []): - try: - if isinstance(track_entry, dict): - # Handle trackObject - track_data = { - "track_data": track_entry, - "status_final": "SKIPPED", - "timestamp_completed": time.time() - } - else: - # Handle string format - parts = track_entry.split(" - ", 1) - if len(parts) == 2: - track_name, artist_name = parts - track_data = { - "track_data": { - "title": track_name, - "artists": [{"name": artist_name}] - }, - "status_final": "SKIPPED", - "timestamp_completed": time.time() - } - else: - logger.warning(f"Could not parse skipped track entry: {track_entry}") - continue + def store_album_history(self, callback_data: Dict, task_id: str, status: str = "completed"): + """ + Store album download history with children table for individual tracks. + + Args: + callback_data: Album callback object from deezspot + task_id: Celery task ID + status: Download status ('completed', 'failed', 'in_progress') - add_child_track(parent_task_id, track_data) - processed["skipped"] += 1 + Returns: + Children table name when status is 'in_progress', None otherwise + """ + try: + album = callback_data.get("album", {}) + status_info = callback_data.get("status_info", {}) + + if not album: + logger.warning(f"No album data in callback for task {task_id}") + return None + + if status == "in_progress": + # Phase 1: Create children table at start, don't store album entry yet + children_table = self.create_children_table_for_album(callback_data, task_id) + logger.info(f"Album download started for task {task_id}, children table: {children_table}") + return children_table + + # Phase 2: Store album entry in main table (for completed/failed status) + artists = self._extract_artists(album) + external_ids = self._extract_external_ids(album) + + # For completed/failed, we need to find the existing children table + # This should be stored in task info by the celery task + from routes.utils.celery_tasks import get_task_info + task_info = get_task_info(task_id) + children_table = task_info.get("children_table") + + if not children_table: + # Fallback: generate new children table name (shouldn't happen in normal flow) + album_uuid = str(uuid.uuid4()).replace("-", "")[:10] + children_table = f"album_{album_uuid}" + logger.warning(f"No children table found for album task {task_id}, generating new: {children_table}") + + # Extract summary data if available (from 'done' status) + summary = status_info.get("summary", {}) + successful_tracks = summary.get("total_successful", 0) + failed_tracks = summary.get("total_failed", 0) + skipped_tracks = summary.get("total_skipped", 0) + total_tracks = album.get("total_tracks", 0) + + # Calculate total duration + tracks = album.get("tracks", []) + total_duration = self._calculate_total_duration(tracks) + + # Prepare metadata + metadata = { + "callback_type": "album", + "status_info": status_info, + "copyrights": album.get("copyrights", []), + "tracks": tracks # Store track list in metadata + } + + with self._get_connection() as conn: + # Store main album entry + conn.execute(""" + INSERT OR REPLACE INTO download_history ( + download_type, title, artists, timestamp, status, service, + quality_format, quality_bitrate, total_tracks, successful_tracks, + failed_tracks, skipped_tracks, children_table, task_id, + external_ids, metadata, release_date, genres, images, + album_type, duration_total_ms + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + "album", + album.get("title", "Unknown"), + json.dumps(artists), + callback_data.get("timestamp", time.time()), + status, + self._get_primary_service(external_ids), + status_info.get("convert_to"), + status_info.get("bitrate"), + total_tracks, + successful_tracks, + failed_tracks, + skipped_tracks, + children_table, + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps(self._extract_release_date(album)), + json.dumps(album.get("genres", [])), + json.dumps(self._extract_images(album)), + album.get("album_type"), + total_duration + )) + + # Children table is populated progressively during track processing, not from summary + + logger.info(f"Stored album history for '{album.get('title')}' (task: {task_id}, children: {children_table})") + return None except Exception as e: - logger.error(f"Error processing skipped track {track_entry}: {e}", exc_info=True) + logger.error(f"Failed to store album history for task {task_id}: {e}") + return None - # Process failed tracks - for track_entry in summary_data.get("failed_tracks", []): - try: - if isinstance(track_entry, dict): - # Handle failedTrackObject or trackObject - if "track" in track_entry: - # failedTrackObject format - track_obj = track_entry["track"] - error_reason = track_entry.get("reason", "Unknown error") - track_data = { - "track_data": track_obj, - "status_final": "ERROR", - "error_info": {"message": error_reason}, - "timestamp_completed": time.time() - } - else: - # Plain trackObject - track_data = { - "track_data": track_entry, - "status_final": "ERROR", - "timestamp_completed": time.time() - } - else: - # Handle string format - parts = track_entry.split(" - ", 1) - if len(parts) == 2: - track_name, artist_name = parts - track_data = { - "track_data": { - "title": track_name, - "artists": [{"name": artist_name}] - }, - "status_final": "ERROR", - "timestamp_completed": time.time() - } - else: - logger.warning(f"Could not parse failed track entry: {track_entry}") - continue + def store_playlist_history(self, callback_data: Dict, task_id: str, status: str = "completed"): + """ + Store playlist download history with children table for individual tracks. + + Args: + callback_data: Playlist callback object from deezspot + task_id: Celery task ID + status: Download status ('completed', 'failed', 'in_progress') - add_child_track(parent_task_id, track_data) - processed["failed"] += 1 + Returns: + Children table name when status is 'in_progress', None otherwise + """ + try: + playlist = callback_data.get("playlist", {}) + status_info = callback_data.get("status_info", {}) + + if not playlist: + logger.warning(f"No playlist data in callback for task {task_id}") + return None + + if status == "in_progress": + # Phase 1: Create children table at start, don't store playlist entry yet + children_table = self.create_children_table_for_playlist(callback_data, task_id) + logger.info(f"Playlist download started for task {task_id}, children table: {children_table}") + return children_table + + # Phase 2: Store playlist entry in main table (for completed/failed status) + external_ids = self._extract_external_ids(playlist) + + # For completed/failed, we need to find the existing children table + # This should be stored in task info by the celery task + from routes.utils.celery_tasks import get_task_info + task_info = get_task_info(task_id) + children_table = task_info.get("children_table") + + if not children_table: + # Fallback: generate new children table name (shouldn't happen in normal flow) + playlist_uuid = str(uuid.uuid4()).replace("-", "")[:10] + children_table = f"playlist_{playlist_uuid}" + logger.warning(f"No children table found for playlist task {task_id}, generating new: {children_table}") + + # Extract summary data if available + summary = status_info.get("summary", {}) + successful_tracks = summary.get("total_successful", 0) + failed_tracks = summary.get("total_failed", 0) + skipped_tracks = summary.get("total_skipped", 0) + + tracks = playlist.get("tracks", []) + total_tracks = len(tracks) + total_duration = self._calculate_total_duration(tracks) + + # Extract owner information + owner = playlist.get("owner", {}) + + # Prepare metadata + metadata = { + "callback_type": "playlist", + "status_info": status_info, + "description": playlist.get("description", ""), + "tracks": tracks # Store track list in metadata + } + + with self._get_connection() as conn: + # Store main playlist entry + conn.execute(""" + INSERT OR REPLACE INTO download_history ( + download_type, title, artists, timestamp, status, service, + quality_format, quality_bitrate, total_tracks, successful_tracks, + failed_tracks, skipped_tracks, children_table, task_id, + external_ids, metadata, genres, images, owner, + duration_total_ms + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + "playlist", + playlist.get("title", "Unknown"), + json.dumps([owner.get("name", "Unknown")]), # Use owner as "artist" + callback_data.get("timestamp", time.time()), + status, + self._get_primary_service(external_ids), + status_info.get("convert_to"), + status_info.get("bitrate"), + total_tracks, + successful_tracks, + failed_tracks, + skipped_tracks, + children_table, + task_id, + json.dumps(external_ids), + json.dumps(metadata), + json.dumps([]), # Playlists don't have genres typically + json.dumps(self._extract_images(playlist)), + json.dumps(owner), + total_duration + )) + + # Children table is populated progressively during track processing, not from summary + + logger.info(f"Stored playlist history for '{playlist.get('title')}' (task: {task_id}, children: {children_table})") + return None except Exception as e: - logger.error(f"Error processing failed track {track_entry}: {e}", exc_info=True) + logger.error(f"Failed to store playlist history for task {task_id}: {e}") + return None - logger.info( - f"Added {processed['successful']} successful, {processed['skipped']} skipped, " - f"and {processed['failed']} failed track entries for task {parent_task_id}" - ) + def _populate_album_children_table(self, table_name: str, summary: Dict, album_title: str): + """Populate children table with individual track records from album summary.""" + try: + all_tracks = [] + + # Add successful tracks + for track in summary.get("successful_tracks", []): + track_data = self._prepare_child_track_data(track, album_title, "completed") + all_tracks.append(track_data) + + # Add failed tracks + for failed_item in summary.get("failed_tracks", []): + track = failed_item.get("track", {}) + track_data = self._prepare_child_track_data(track, album_title, "failed") + track_data["metadata"]["failure_reason"] = failed_item.get("reason", "Unknown error") + all_tracks.append(track_data) + + # Add skipped tracks + for track in summary.get("skipped_tracks", []): + track_data = self._prepare_child_track_data(track, album_title, "skipped") + all_tracks.append(track_data) + + # Insert all tracks + with self._get_connection() as conn: + for track_data in all_tracks: + conn.execute(f""" + INSERT INTO {table_name} ( + title, artists, album_title, duration_ms, track_number, + disc_number, explicit, status, external_ids, genres, + isrc, timestamp, position, metadata + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, track_data["values"]) + + logger.info(f"Populated {len(all_tracks)} tracks in children table {table_name}") + + except Exception as e: + logger.error(f"Failed to populate album children table {table_name}: {e}") - return processed - - -def get_history_entries(limit=25, offset=0, sort_by="timestamp_completed", sort_order="DESC", filters=None): - """Legacy compatibility function.""" - logger.warning("Using legacy get_history_entries - consider migrating to get_task_history") + def _populate_playlist_children_table(self, table_name: str, summary: Dict): + """Populate children table with individual track records from playlist summary.""" + try: + all_tracks = [] + + # Add successful tracks + for track in summary.get("successful_tracks", []): + track_data = self._prepare_child_track_data(track, "", "completed") + all_tracks.append(track_data) + + # Add failed tracks + for failed_item in summary.get("failed_tracks", []): + track = failed_item.get("track", {}) + track_data = self._prepare_child_track_data(track, "", "failed") + track_data["metadata"]["failure_reason"] = failed_item.get("reason", "Unknown error") + all_tracks.append(track_data) + + # Add skipped tracks + for track in summary.get("skipped_tracks", []): + track_data = self._prepare_child_track_data(track, "", "skipped") + all_tracks.append(track_data) + + # Insert all tracks + with self._get_connection() as conn: + for track_data in all_tracks: + conn.execute(f""" + INSERT INTO {table_name} ( + title, artists, album_title, duration_ms, track_number, + disc_number, explicit, status, external_ids, genres, + isrc, timestamp, position, metadata + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, track_data["values"]) + + logger.info(f"Populated {len(all_tracks)} tracks in children table {table_name}") + + except Exception as e: + logger.error(f"Failed to populate playlist children table {table_name}: {e}") - # Map old sort_by to new fields - sort_mapping = { - "timestamp_completed": "timestamp_completed", - "timestamp_added": "timestamp_created", - "item_name": "title" - } - - new_sort_by = sort_mapping.get(sort_by, "timestamp_updated") - return get_task_history(limit, offset, new_sort_by, sort_order, filters) - - -def get_track_mini_history(track_id: str, table_name: str) -> Dict: - """Get comprehensive mini-history for a specific track.""" - conn = None - try: - conn = sqlite3.connect(HISTORY_DB_FILE) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() + def _prepare_child_track_data(self, track: Dict, default_album: str, status: str) -> Dict: + """Prepare track data for insertion into children table.""" + artists = self._extract_artists(track) + external_ids = self._extract_external_ids(track) - cursor.execute(f"SELECT * FROM {table_name} WHERE track_id = ?", (track_id,)) - result = cursor.fetchone() + # Get album info + album = track.get("album", {}) + album_title = album.get("title", default_album) - if not result: - return {} + # Extract ISRC + isrc = external_ids.get("isrc", "") - track_info = dict(result) - - # Parse JSON fields - json_fields = ["track_data", "artists_data", "album_data", "ids_data", - "status_history", "error_info", "progress_info", "config"] - - for field in json_fields: - if track_info.get(field): - try: - track_info[field] = json.loads(track_info[field]) - except json.JSONDecodeError: - pass - - # Calculate duration statistics - if track_info.get("timestamp_started") and track_info.get("timestamp_completed"): - track_info["calculated_duration"] = track_info["timestamp_completed"] - track_info["timestamp_started"] - - # Add progress timeline - if track_info.get("status_history"): - track_info["timeline"] = [] - for entry in track_info["status_history"]: - timeline_entry = { - "timestamp": entry.get("timestamp"), - "status": entry.get("status_type"), - "readable_time": datetime.fromtimestamp(entry.get("timestamp", 0)).isoformat() if entry.get("timestamp") else None - } - if entry.get("progress_info"): - timeline_entry["progress"] = entry["progress_info"] - if entry.get("error_info"): - timeline_entry["error"] = entry["error_info"] - track_info["timeline"].append(timeline_entry) - - return track_info - - except Exception as e: - logger.error(f"Error getting track mini-history: {e}") - return {} - finally: - if conn: - conn.close() - - -if __name__ == "__main__": - # Test the enhanced system - logging.basicConfig(level=logging.INFO) - init_history_db() - - # Test track task - track_task = { - "task_id": "test_track_001", - "task_type": "track", - "title": "Test Song", - "artists": [{"name": "Test Artist"}], - "ids": {"spotify": "track123"}, - "status_final": "COMPLETED", - "metadata": { - "type": "track", - "title": "Test Song", - "duration_ms": 240000, - "artists": [{"name": "Test Artist"}] - }, - "config": {"quality_profile": "NORMAL", "service_used": "Spotify"} - } - - add_task_to_history(track_task) - - # Test album task with comprehensive track management - album_task = { - "task_id": "test_album_001", - "task_type": "album", - "title": "Test Album", - "artists": [{"name": "Test Artist"}], - "ids": {"spotify": "album123"}, - "children_table": create_child_table("test_album_001", "album") - } - - add_task_to_history(album_task) - - # Add tracks with comprehensive data to the album - for i in range(3): - track_data = { - "track_data": { - "title": f"Track {i+1}", - "track_number": i+1, - "disc_number": 1, - "duration_ms": 180000 + (i * 20000), - "explicit": False, - "artists": [{"name": "Test Artist", "ids": {"spotify": f"artist{i}"}}], - "ids": {"spotify": f"track{i}", "isrc": f"TEST{i:03d}"} - }, - "position": i+1, - "status_current": "initializing", - "status_history": [ - { - "timestamp": time.time() - 300, - "status_type": "initializing", - "status_data": {"status": "initializing"} - }, - { - "timestamp": time.time() - 200, - "status_type": "real-time", - "status_data": {"status": "real-time", "progress": 50}, - "progress_info": {"progress": 50, "time_elapsed": 100} - }, - { - "timestamp": time.time() - 100, - "status_type": "done", - "status_data": {"status": "done"}, - "progress_info": {"progress": 100} - } - ], - "timestamp_started": time.time() - 300, - "timestamp_completed": time.time() - 100, - "status_final": "COMPLETED", - "time_elapsed": 200, - "quality_achieved": "FLAC 1411kbps", - "file_size": 45000000 + (i * 5000000), - "download_path": f"/downloads/Test Album/Track {i+1}.flac" + # Prepare metadata + metadata = { + "album": album, + "type": track.get("type", "") } - track_id = add_child_track("test_album_001", track_data) - print(f"Added track with comprehensive data: {track_id}") + + values = ( + track.get("title", "Unknown"), + json.dumps(artists), + album_title, + track.get("duration_ms", 0), + track.get("track_number", 0), + track.get("disc_number", 1), + track.get("explicit", False), + status, + json.dumps(external_ids), + json.dumps(track.get("genres", [])), + isrc, + time.time(), + track.get("position", 0), # For playlist tracks + json.dumps(metadata) + ) + + return {"values": values, "metadata": metadata} - # Test retrieval - tasks, total = get_task_history(limit=10, include_children=True) - print(f"\nFound {total} tasks:") - for task in tasks: - print(f"- {task['title']} ({task['task_type']}) - {task.get('status_final', 'N/A')}") - if task.get('child_tracks'): - print(f" {len(task['child_tracks'])} child tracks:") - for child in task['child_tracks'][:2]: # Show first 2 tracks - print(f" • {child.get('title', 'Unknown')} - {child.get('status_final', 'N/A')}") - if child.get("status_history"): - print(f" Status changes: {len(child['status_history'])}") - if child.get("quality_achieved"): - print(f" Quality: {child['quality_achieved']}") + def update_download_status(self, task_id: str, status: str): + """Update download status for existing history entry.""" + try: + with self._get_connection() as conn: + conn.execute(""" + UPDATE download_history + SET status = ? + WHERE task_id = ? + """, (status, task_id)) + + logger.info(f"Updated download status to '{status}' for task {task_id}") + + except Exception as e: + logger.error(f"Failed to update download status for task {task_id}: {e}") - # Test track mini-history - if tasks: - for task in tasks: - if task.get('child_tracks'): - first_track = task['child_tracks'][0] - mini_history = get_track_mini_history(first_track['track_id'], task['children_table']) - if mini_history.get('timeline'): - print(f"\nMini-history for '{mini_history.get('title', 'Unknown')}':") - for event in mini_history['timeline']: - print(f" {event['readable_time']}: {event['status']}") - if event.get('progress'): - print(f" Progress: {event['progress']}") - break + def get_download_history(self, limit: int = 100, offset: int = 0, + download_type: Optional[str] = None, + status: Optional[str] = None) -> List[Dict]: + """ + Retrieve download history with optional filtering. + + Args: + limit: Maximum number of records to return + offset: Number of records to skip + download_type: Filter by download type ('track', 'album', 'playlist') + status: Filter by status ('completed', 'failed', 'skipped', 'in_progress') + + Returns: + List of download history records + """ + try: + query = "SELECT * FROM download_history" + params = [] + conditions = [] + + if download_type: + conditions.append("download_type = ?") + params.append(download_type) + + if status: + conditions.append("status = ?") + params.append(status) + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + + with self._get_connection() as conn: + cursor = conn.execute(query, params) + rows = cursor.fetchall() + + # Convert to list of dicts + result = [] + for row in rows: + record = dict(row) + # Parse JSON fields + for field in ['artists', 'external_ids', 'metadata', 'release_date', + 'genres', 'images', 'owner']: + if record.get(field): + try: + record[field] = json.loads(record[field]) + except (json.JSONDecodeError, TypeError): + pass + result.append(record) + + return result + + except Exception as e: + logger.error(f"Failed to retrieve download history: {e}") + return [] + + def get_children_history(self, children_table: str) -> List[Dict]: + """ + Retrieve track history from a children table. + + Args: + children_table: Name of the children table + + Returns: + List of track records + """ + try: + with self._get_connection() as conn: + cursor = conn.execute(f""" + SELECT * FROM {children_table} + ORDER BY track_number, position + """) + rows = cursor.fetchall() + + # Convert to list of dicts + result = [] + for row in rows: + record = dict(row) + # Parse JSON fields + for field in ['artists', 'external_ids', 'genres', 'metadata']: + if record.get(field): + try: + record[field] = json.loads(record[field]) + except (json.JSONDecodeError, TypeError): + pass + result.append(record) + + return result + + except Exception as e: + logger.error(f"Failed to retrieve children history from {children_table}: {e}") + return [] + + def get_download_stats(self) -> Dict: + """Get download statistics.""" + try: + with self._get_connection() as conn: + # Total downloads by type + cursor = conn.execute(""" + SELECT download_type, status, COUNT(*) as count + FROM download_history + GROUP BY download_type, status + """) + type_stats = {} + for row in cursor.fetchall(): + download_type = row['download_type'] + status = row['status'] + count = row['count'] + + if download_type not in type_stats: + type_stats[download_type] = {} + type_stats[download_type][status] = count + + # Total tracks downloaded (including from albums/playlists) + cursor = conn.execute(""" + SELECT SUM( + CASE + WHEN download_type = 'track' AND status = 'completed' THEN 1 + ELSE COALESCE(successful_tracks, 0) + END + ) as total_successful_tracks + FROM download_history + """) + total_tracks = cursor.fetchone()['total_successful_tracks'] or 0 + + # Recent downloads (last 7 days) + week_ago = time.time() - (7 * 24 * 60 * 60) + cursor = conn.execute(""" + SELECT COUNT(*) as count + FROM download_history + WHERE timestamp > ? + """, (week_ago,)) + recent_downloads = cursor.fetchone()['count'] + + return { + "by_type_and_status": type_stats, + "total_successful_tracks": total_tracks, + "recent_downloads_7d": recent_downloads + } + + except Exception as e: + logger.error(f"Failed to get download stats: {e}") + return {} + + def search_history(self, query: str, limit: int = 50) -> List[Dict]: + """ + Search download history by title or artist. + + Args: + query: Search query for title or artist + limit: Maximum number of results + + Returns: + List of matching download records + """ + try: + search_pattern = f"%{query}%" + + with self._get_connection() as conn: + cursor = conn.execute(""" + SELECT * FROM download_history + WHERE title LIKE ? OR artists LIKE ? + ORDER BY timestamp DESC + LIMIT ? + """, (search_pattern, search_pattern, limit)) + + rows = cursor.fetchall() + + # Convert to list of dicts + result = [] + for row in rows: + record = dict(row) + # Parse JSON fields + for field in ['artists', 'external_ids', 'metadata', 'release_date', + 'genres', 'images', 'owner']: + if record.get(field): + try: + record[field] = json.loads(record[field]) + except (json.JSONDecodeError, TypeError): + pass + result.append(record) + + return result + + except Exception as e: + logger.error(f"Failed to search download history: {e}") + return [] + + def get_download_by_task_id(self, task_id: str) -> Optional[Dict]: + """ + Get download history entry by task ID. + + Args: + task_id: Celery task ID + + Returns: + Download record or None if not found + """ + try: + with self._get_connection() as conn: + cursor = conn.execute(""" + SELECT * FROM download_history + WHERE task_id = ? + LIMIT 1 + """, (task_id,)) + + row = cursor.fetchone() + if not row: + return None + + record = dict(row) + # Parse JSON fields + for field in ['artists', 'external_ids', 'metadata', 'release_date', + 'genres', 'images', 'owner']: + if record.get(field): + try: + record[field] = json.loads(record[field]) + except (json.JSONDecodeError, TypeError): + pass + + return record + + except Exception as e: + logger.error(f"Failed to get download by task ID {task_id}: {e}") + return None + + def get_recent_downloads(self, limit: int = 20) -> List[Dict]: + """Get most recent downloads.""" + return self.get_download_history(limit=limit, offset=0) + + def get_failed_downloads(self, limit: int = 50) -> List[Dict]: + """Get failed downloads.""" + return self.get_download_history(limit=limit, status="failed") + + def clear_old_history(self, days_old: int = 30) -> int: + """ + Clear download history older than specified days. + + Args: + days_old: Number of days old to keep (default 30) + + Returns: + Number of records deleted + """ + try: + cutoff_time = time.time() - (days_old * 24 * 60 * 60) + + with self._get_connection() as conn: + # Get list of children tables to delete + cursor = conn.execute(""" + SELECT children_table FROM download_history + WHERE timestamp < ? AND children_table IS NOT NULL + """, (cutoff_time,)) + + children_tables = [row['children_table'] for row in cursor.fetchall()] + + # Delete main history records + cursor = conn.execute(""" + DELETE FROM download_history + WHERE timestamp < ? + """, (cutoff_time,)) + + deleted_count = cursor.rowcount + + # Drop children tables + for table_name in children_tables: + try: + conn.execute(f"DROP TABLE IF EXISTS {table_name}") + except Exception as e: + logger.warning(f"Failed to drop children table {table_name}: {e}") + + logger.info(f"Cleared {deleted_count} old history records and {len(children_tables)} children tables") + return deleted_count + + except Exception as e: + logger.error(f"Failed to clear old history: {e}") + return 0 + + +# Global history manager instance +history_manager = HistoryManager() \ No newline at end of file diff --git a/spotizerr-ui/src/routes/history.tsx b/spotizerr-ui/src/routes/history.tsx index 2507847..76a322c 100644 --- a/spotizerr-ui/src/routes/history.tsx +++ b/spotizerr-ui/src/routes/history.tsx @@ -11,184 +11,71 @@ import { } from "@tanstack/react-table"; // --- Type Definitions --- -type TimelineEntry = { - status_type: string; - timestamp: number; - human_readable: string; - status_data: any; -}; - -type TrackMiniHistory = { - track_id: string; - parent_task_id: string; - position: number; - disc_number?: number; - track_number?: number; - title: string; - duration_ms?: number; - explicit?: boolean; - artists_data?: Array<{ name: string; [key: string]: any }>; - album_data?: any; - ids_data?: { spotify?: string; deezer?: string; isrc?: string; upc?: string }; - status_current: string; - status_final: string; - timestamp_created: number; - timestamp_completed?: number; - timestamp_started?: number; - time_elapsed?: number; - calculated_duration?: string; - retry_count: number; - progress_info?: any; - download_path?: string; - file_size?: number; - quality_achieved?: string; - error_info?: { message?: string; [key: string]: any }; - config?: any; - status_history: Array; - timeline: TimelineEntry[]; -}; - type HistoryEntry = { - task_id: string; - task_type: "track" | "album" | "playlist" | "artist"; + id: number; + download_type: "track" | "album" | "playlist"; title: string; - status_current?: string; - status_final?: "COMPLETED" | "ERROR" | "CANCELLED" | "SKIPPED"; - timestamp_created?: number; - timestamp_updated?: number; - timestamp_completed?: number; - parent_task_id?: string; - position?: number; - - // Rich data fields - artists?: Array<{ name: string; [key: string]: any }>; - ids?: { spotify?: string; deezer?: string; isrc?: string; upc?: string }; - metadata?: any; - config?: { - service_used?: string; - quality_profile?: string; - convert_to?: string; - bitrate?: string; - [key: string]: any; - }; - error_info?: { message?: string; [key: string]: any }; - progress?: any; - summary?: { - total_successful?: number; - total_skipped?: number; - total_failed?: number; - [key: string]: any; - }; - - // Child information + artists: string[]; + timestamp: number; + status: "completed" | "failed" | "skipped" | "in_progress"; + service: string; + quality_format?: string; + quality_bitrate?: string; + total_tracks?: number; + successful_tracks?: number; + failed_tracks?: number; + skipped_tracks?: number; children_table?: string; - has_children?: boolean; - child_tracks?: Array; - child_track_count?: number; - child_track_summary?: { - completed: number; - error: number; - skipped: number; - }; - - // Mini-history fields (when included) - mini_history?: TrackMiniHistory; - timeline?: TimelineEntry[]; - retry_count?: number; - time_elapsed?: number; - quality_achieved?: string; - file_size?: number; - download_path?: string; - - // Computed/Legacy compatibility fields - artist_names?: string[]; - item_name?: string; - item_artist?: string; - item_album?: string; - item_url?: string; - download_type?: string; - service_used?: string; - quality_profile?: string; - convert_to?: string; - bitrate?: string; - error_message?: string; - timestamp_added?: number; - track_status?: string; - total_successful?: number; - total_skipped?: number; - total_failed?: number; + task_id: string; + external_ids: Record; + metadata: Record; + release_date?: Record; + genres: string[]; + images: Array>; + owner?: Record; + album_type?: string; + duration_total_ms?: number; + explicit?: boolean; }; -type TaskDetails = { - task: HistoryEntry & { - status_history?: Array<{ - status_id: number; - status_type: string; - status_data: any; - timestamp: number; - }>; - }; - include_children: boolean; - include_status_history: boolean; +type ChildTrack = { + id: number; + title: string; + artists: string[]; + album_title?: string; + duration_ms?: number; + track_number?: number; + disc_number?: number; + explicit?: boolean; + status: "completed" | "failed" | "skipped"; + external_ids: Record; + genres: string[]; + isrc?: string; + timestamp: number; + position?: number; + metadata: Record; +}; + +type ChildrenResponse = { + task_id: string; + download_type: string; + title: string; + children_table: string; + tracks: ChildTrack[]; + track_count: number; }; const STATUS_CLASS: Record = { - COMPLETED: "text-success", - ERROR: "text-error", - CANCELLED: "text-content-muted dark:text-content-muted-dark", - SKIPPED: "text-warning", -}; - -const QUALITY_MAP: Record> = { - spotify: { - NORMAL: "OGG 96k", - HIGH: "OGG 160k", - VERY_HIGH: "OGG 320k", - }, - deezer: { - MP3_128: "MP3 128k", - MP3_320: "MP3 320k", - FLAC: "FLAC (Hi-Res)", - }, -}; - -const getDownloadSource = (entry: HistoryEntry): "Spotify" | "Deezer" | "Unknown" => { - // Check metadata first - if (entry.metadata?.url) { - const url = entry.metadata.url.toLowerCase(); - if (url.includes("spotify.com")) return "Spotify"; - if (url.includes("deezer.com")) return "Deezer"; - } - - // Check legacy fields - const url = entry.item_url?.toLowerCase() || ""; - const service = entry.service_used?.toLowerCase() || entry.config?.service_used?.toLowerCase() || ""; - if (url.includes("spotify.com") || service.includes("spotify")) return "Spotify"; - if (url.includes("deezer.com") || service.includes("deezer")) return "Deezer"; - - // Check IDs - if (entry.ids?.spotify) return "Spotify"; - if (entry.ids?.deezer) return "Deezer"; - - return "Unknown"; + completed: "text-success", + failed: "text-error", + in_progress: "text-warning", + skipped: "text-content-muted dark:text-content-muted-dark", }; const formatQuality = (entry: HistoryEntry): string => { - const sourceName = getDownloadSource(entry).toLowerCase(); - const profile = entry.quality_profile || entry.config?.quality_profile || "N/A"; - const sourceQuality = sourceName !== "unknown" ? QUALITY_MAP[sourceName]?.[profile] || profile : profile; - let qualityDisplay = sourceQuality; - - const convertTo = entry.convert_to || entry.config?.convert_to; - const bitrate = entry.bitrate || entry.config?.bitrate; - - if (convertTo && convertTo !== "None") { - qualityDisplay += ` → ${convertTo.toUpperCase()}`; - if (bitrate && bitrate !== "None") { - qualityDisplay += ` ${bitrate}`; - } - } - return qualityDisplay; + const format = entry.quality_format || "Unknown"; + const bitrate = entry.quality_bitrate || ""; + return bitrate ? `${format} ${bitrate}` : format; }; const formatFileSize = (bytes?: number): string => { @@ -198,11 +85,12 @@ const formatFileSize = (bytes?: number): string => { return `${(bytes / Math.pow(1024, i)).toFixed(1)} ${sizes[i]}`; }; -const formatDuration = (seconds?: number): string => { - if (!seconds) return "N/A"; +const formatDuration = (ms?: number): string => { + if (!ms) return "N/A"; + const seconds = Math.floor(ms / 1000); const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); - const secs = Math.floor(seconds % 60); + const secs = seconds % 60; if (hours > 0) { return `${hours}:${minutes.toString().padStart(2, '0')}:${secs.toString().padStart(2, '0')}`; @@ -211,19 +99,17 @@ const formatDuration = (seconds?: number): string => { }; // --- Column Definitions --- -const columnHelper = createColumnHelper(); +const columnHelper = createColumnHelper(); export const History = () => { - const [data, setData] = useState([]); + const [data, setData] = useState<(HistoryEntry | ChildTrack)[]>([]); const [totalEntries, setTotalEntries] = useState(0); const [isLoading, setIsLoading] = useState(true); - const [selectedTask, setSelectedTask] = useState(null); - const [selectedTrackMiniHistory, setSelectedTrackMiniHistory] = useState(null); - const [showMiniHistories, setShowMiniHistories] = useState(false); - const [isMiniHistoryLoading, setIsMiniHistoryLoading] = useState(false); + const [selectedEntry, setSelectedEntry] = useState(null); + const [viewingChildren, setViewingChildren] = useState(null); // State for TanStack Table - const [sorting, setSorting] = useState([{ id: "timestamp_updated", desc: true }]); + const [sorting, setSorting] = useState([{ id: "timestamp", desc: true }]); const [{ pageIndex, pageSize }, setPagination] = useState({ pageIndex: 0, pageSize: 25, @@ -232,163 +118,41 @@ export const History = () => { // State for filters const [statusFilter, setStatusFilter] = useState(""); const [typeFilter, setTypeFilter] = useState(""); - const [currentStatusFilter, setCurrentStatusFilter] = useState(""); - const [hideChildTracks, setHideChildTracks] = useState(true); - const [includeChildren, setIncludeChildren] = useState(false); - const [parentTaskId, setParentTaskId] = useState(null); - const [parentTask, setParentTask] = useState(null); const pagination = useMemo(() => ({ pageIndex, pageSize }), [pageIndex, pageSize]); - const viewTracksForParent = useCallback( + const viewChildren = useCallback( async (parentEntry: HistoryEntry) => { + if (!parentEntry.children_table) { + toast.error("This download has no child tracks."); + return; + } + try { - const response = await apiClient.get<{ - parent_task_id: string; - parent_task_info: { - title: string; - task_type: string; - status_final: string; - }; - tracks: Array; - total_count: number; - }>(`/history/tracks/${parentEntry.task_id}`); - - // Transform tracks to match our HistoryEntry structure - const transformedTracks = response.data.tracks.map(track => ({ - task_id: track.track_id, - task_type: "track" as const, - title: track.title || "Unknown Track", - status_final: track.status_final, - timestamp_completed: track.timestamp_completed, - parent_task_id: track.parent_task_id, - position: track.position, - artists: track.artists || [], - artist_names: track.artist_names || [], - item_name: track.title || "Unknown Track", - item_artist: track.artist_names?.join(", ") || "", - download_type: "track", - config: track.config, - error_info: track.error_info, - // Mini-history fields if available - mini_history: track.mini_history, - timeline: track.timeline, - retry_count: track.retry_count, - time_elapsed: track.time_elapsed, - quality_achieved: track.quality_achieved, - file_size: track.file_size, - download_path: track.download_path, - // Legacy compatibility - service_used: track.config?.service_used, - quality_profile: track.config?.quality_profile, - convert_to: track.config?.convert_to, - bitrate: track.config?.bitrate, - error_message: track.error_info?.message, - })); - + setIsLoading(true); + const response = await apiClient.get(`/history/${parentEntry.task_id}/children`); + setViewingChildren(response.data); + setData(response.data.tracks); + setTotalEntries(response.data.track_count); setPagination({ pageIndex: 0, pageSize }); - setParentTaskId(parentEntry.task_id); - setParentTask({ - ...parentEntry, - item_name: parentEntry.title || parentEntry.item_name, - item_artist: parentEntry.artist_names?.join(", ") || parentEntry.item_artist, - }); - setData(transformedTracks); - setTotalEntries(response.data.total_count); - setStatusFilter(""); - setTypeFilter(""); - setCurrentStatusFilter(""); } catch (error) { - toast.error("Failed to load tracks for this task."); - console.error("Error loading tracks:", error); + toast.error("Failed to load child tracks."); + console.error("Error loading children:", error); + } finally { + setIsLoading(false); } }, [pageSize], ); - const viewTaskDetails = useCallback( + const viewEntryDetails = useCallback( async (taskId: string) => { try { - const response = await apiClient.get( - `/history/task/${taskId}?include_children=true&include_status_history=true` - ); - setSelectedTask(response.data); + const response = await apiClient.get(`/history/${taskId}`); + setSelectedEntry(response.data); } catch (error) { - toast.error("Failed to load task details."); - console.error("Error loading task details:", error); - } - }, - [], - ); - - const viewTrackMiniHistory = useCallback( - async (parentTaskId: string, trackId: string) => { - setIsMiniHistoryLoading(true); - try { - const response = await apiClient.get<{ - parent_task_id: string; - parent_task_info: any; - track_mini_history: TrackMiniHistory; - }>(`/history/track/${parentTaskId}/${trackId}/mini-history`); - setSelectedTrackMiniHistory(response.data.track_mini_history); - } catch (error) { - toast.error("Failed to load track mini-history."); - console.error("Error loading track mini-history:", error); - } finally { - setIsMiniHistoryLoading(false); - } - }, - [], - ); - - const loadTracksWithMiniHistories = useCallback( - async (parentTaskId: string) => { - try { - const response = await apiClient.get<{ - parent_task_id: string; - parent_task_info: any; - tracks: Array; - total_count: number; - include_mini_histories: boolean; - }>(`/history/tracks/${parentTaskId}?include_mini_histories=true`); - - const transformedTracks = response.data.tracks.map(track => ({ - task_id: track.track_id, - task_type: "track" as const, - title: track.title || "Unknown Track", - status_final: track.status_final, - timestamp_completed: track.timestamp_completed, - parent_task_id: track.parent_task_id, - position: track.position, - artists: track.artists || [], - artist_names: track.artist_names || [], - item_name: track.title || "Unknown Track", - item_artist: track.artist_names?.join(", ") || "", - download_type: "track", - config: track.config, - error_info: track.error_info, - // Mini-history fields if available - mini_history: track.mini_history, - timeline: track.timeline, - retry_count: track.retry_count, - time_elapsed: track.time_elapsed, - quality_achieved: track.quality_achieved, - file_size: track.file_size, - download_path: track.download_path, - // Legacy compatibility - service_used: track.config?.service_used, - quality_profile: track.config?.quality_profile, - convert_to: track.config?.convert_to, - bitrate: track.config?.bitrate, - error_message: track.error_info?.message, - })); - - setData(transformedTracks); - setTotalEntries(response.data.total_count); - setShowMiniHistories(true); - } catch (error) { - toast.error("Failed to load tracks with mini-histories."); - console.error("Error loading tracks with mini-histories:", error); + toast.error("Failed to load entry details."); + console.error("Error loading entry details:", error); } }, [], @@ -397,205 +161,153 @@ export const History = () => { const columns = useMemo( () => [ columnHelper.accessor("title", { - header: "Name", + header: "Title", cell: (info) => { const entry = info.row.original; - const displayName = entry.title || entry.item_name || "Unknown"; - return entry.parent_task_id ? ( - └─ {displayName} + const isChild = "album_title" in entry; + return isChild ? ( + └─ {entry.title} ) : (
- {displayName} - {entry.has_children && ( + {entry.title} + {(entry as HistoryEntry).children_table && ( - {entry.child_track_count || "N/A"} tracks + {(entry as HistoryEntry).total_tracks || "?"} tracks )}
); }, }), - columnHelper.accessor("artist_names", { - header: "Artist", + columnHelper.accessor("artists", { + header: "Artists", cell: (info) => { - const entry = info.row.original; - return entry.artist_names?.join(", ") || entry.item_artist || "Unknown Artist"; - }, - }), - columnHelper.accessor("task_type", { - header: "Type", - cell: (info) => { - const type = info.getValue() || info.row.original.download_type || "unknown"; - return {type}; - }, - }), - columnHelper.accessor("config", { - id: "quality", - header: "Quality", - cell: (info) => formatQuality(info.row.original), - }), - columnHelper.accessor("status_final", { - header: "Status", - cell: (info) => { - const entry = info.row.original; - const status = entry.status_final || entry.track_status; - const statusKey = (status || "").toUpperCase(); - const statusClass = STATUS_CLASS[statusKey] || "text-gray-500"; - - return ( -
- {status || "Unknown"} - {entry.status_current && entry.status_current !== status && ( - - ({entry.status_current}) - - )} -
- ); + const artists = info.getValue(); + return Array.isArray(artists) ? artists.join(", ") : artists || "Unknown Artist"; }, }), columnHelper.display({ - id: "source", - header: parentTaskId ? "Download Source" : "Search Source", - cell: (info) => getDownloadSource(info.row.original), - }), - ...(showMiniHistories && parentTaskId ? [ - columnHelper.accessor("retry_count", { - header: "Retries", - cell: (info) => info.getValue() || 0, - }), - columnHelper.accessor("time_elapsed", { - header: "Duration", - cell: (info) => formatDuration(info.getValue()), - }), - columnHelper.accessor("file_size", { - header: "File Size", - cell: (info) => formatFileSize(info.getValue()), - }), - columnHelper.accessor("quality_achieved", { - header: "Quality", - cell: (info) => info.getValue() || "N/A", - }), - ] : []), - columnHelper.accessor("timestamp_completed", { - header: "Date Completed", + id: "type", + header: "Type", cell: (info) => { - const timestamp = info.getValue() || info.row.original.timestamp_updated; + const entry = info.row.original; + const type = "download_type" in entry ? entry.download_type : "track"; + return {type}; + }, + }), + columnHelper.display({ + id: "quality", + header: "Quality", + cell: (info) => { + const entry = info.row.original; + if ("download_type" in entry) { + return formatQuality(entry); + } + return "N/A"; + }, + }), + columnHelper.accessor("status", { + header: "Status", + cell: (info) => { + const status = info.getValue(); + const statusClass = STATUS_CLASS[status] || "text-gray-500"; + return {status}; + }, + }), + columnHelper.display({ + id: "service", + header: "Service", + cell: (info) => { + const entry = info.row.original; + const service = "service" in entry ? entry.service : "Unknown"; + return {service}; + }, + }), + columnHelper.accessor("timestamp", { + header: "Date", + cell: (info) => { + const timestamp = info.getValue(); return timestamp ? new Date(timestamp * 1000).toLocaleString() : "N/A"; }, }), - ...(!parentTaskId + ...(!viewingChildren ? [ columnHelper.display({ id: "actions", header: "Actions", cell: ({ row }) => { - const entry = row.original; - if (!entry.parent_task_id && (entry.task_type === "album" || entry.task_type === "playlist" || entry.download_type === "album" || entry.download_type === "playlist")) { - const hasChildren = entry.has_children || - (entry.total_successful ?? 0) > 0 || - (entry.total_skipped ?? 0) > 0 || - (entry.total_failed ?? 0) > 0; - - return ( -
- {hasChildren && ( - <> - - - - - {entry.child_track_summary?.completed || entry.total_successful || 0} - /{" "} - - {entry.child_track_summary?.skipped || entry.total_skipped || 0} - /{" "} - - {entry.child_track_summary?.error || entry.total_failed || 0} - - - - )} -
- ); - } - - // For tracks in parent task view with mini-histories - if (parentTaskId && entry.task_type === "track") { - return ( -
- - {showMiniHistories && ( - - )} -
- ); - } + const entry = row.original as HistoryEntry; + const hasChildren = entry.children_table && + (entry.download_type === "album" || entry.download_type === "playlist"); return ( - +
+ + {hasChildren && ( + <> + + + + {entry.successful_tracks || 0} + /{" "} + + {entry.skipped_tracks || 0} + /{" "} + + {entry.failed_tracks || 0} + + + + )} +
); }, }), ] : []), ], - [viewTracksForParent, viewTaskDetails, loadTracksWithMiniHistories, viewTrackMiniHistory, parentTaskId, showMiniHistories], + [viewChildren, viewEntryDetails, viewingChildren], ); useEffect(() => { const fetchHistory = async () => { - if (parentTaskId) return; // Skip if we're viewing parent tracks (handled separately) + if (viewingChildren) return; // Skip if viewing children setIsLoading(true); - setData([]); try { const params = new URLSearchParams({ limit: `${pageSize}`, offset: `${pageIndex * pageSize}`, - sort_by: sorting[0]?.id ?? "timestamp_updated", - sort_order: sorting[0]?.desc ? "DESC" : "ASC", - include_children: includeChildren.toString(), }); - if (statusFilter) params.append("status_final", statusFilter); - if (typeFilter) params.append("task_type", typeFilter); - if (currentStatusFilter) params.append("status_current", currentStatusFilter); - if (hideChildTracks) params.append("hide_child_tracks", "true"); + if (statusFilter) params.append("status", statusFilter); + if (typeFilter) params.append("download_type", typeFilter); const response = await apiClient.get<{ - entries: HistoryEntry[]; - total_count: number; - include_children: boolean; + downloads: HistoryEntry[]; + pagination: { + limit: number; + offset: number; + returned_count: number; + }; }>(`/history?${params.toString()}`); - setData(response.data.entries); - setTotalEntries(response.data.total_count); + setData(response.data.downloads); + // Since we don't get total count, estimate based on returned count + const estimatedTotal = response.data.pagination.returned_count < pageSize + ? pageIndex * pageSize + response.data.pagination.returned_count + : (pageIndex + 1) * pageSize + 1; + setTotalEntries(estimatedTotal); } catch (error) { toast.error("Failed to load history."); console.error("Error loading history:", error); @@ -603,8 +315,9 @@ export const History = () => { setIsLoading(false); } }; + fetchHistory(); - }, [pageIndex, pageSize, sorting, statusFilter, typeFilter, currentStatusFilter, hideChildTracks, includeChildren, parentTaskId]); + }, [pageIndex, pageSize, statusFilter, typeFilter, viewingChildren]); const table = useReactTable({ data, @@ -622,40 +335,31 @@ export const History = () => { const clearFilters = () => { setStatusFilter(""); setTypeFilter(""); - setCurrentStatusFilter(""); - setHideChildTracks(true); - setIncludeChildren(false); }; - const viewParentTask = () => { + const goBackToHistory = () => { + setViewingChildren(null); setPagination({ pageIndex: 0, pageSize }); - setParentTaskId(null); - setParentTask(null); - setShowMiniHistories(false); clearFilters(); }; - const closeTaskDetails = () => { - setSelectedTask(null); - }; - - const closeMiniHistory = () => { - setSelectedTrackMiniHistory(null); + const closeDetails = () => { + setSelectedEntry(null); }; return (
- {/* Task Details Modal */} - {selectedTask && ( + {/* Entry Details Modal */} + {selectedEntry && (

- Task Details + Download Details

- {selectedTask.task.config && ( + {selectedEntry.external_ids && Object.keys(selectedEntry.external_ids).length > 0 && (
-

Configuration

+

External IDs

-
{JSON.stringify(selectedTask.task.config, null, 2)}
+
{JSON.stringify(selectedEntry.external_ids, null, 2)}
)} - {selectedTask.task.error_info && ( + {selectedEntry.metadata && Object.keys(selectedEntry.metadata).length > 0 && (
-

Error Information

-
-
{JSON.stringify(selectedTask.task.error_info, null, 2)}
+

Metadata

+
+
{JSON.stringify(selectedEntry.metadata, null, 2)}
)} - {selectedTask.task.child_tracks && selectedTask.task.child_tracks.length > 0 && ( + {selectedEntry.genres && selectedEntry.genres.length > 0 && (
-

Child Tracks ({selectedTask.task.child_tracks.length})

-
-
- {selectedTask.task.child_tracks.map((track, index) => ( -
-
{track.track_data?.title || "Unknown Track"}
-
- Status: {track.status_final} | Position: {track.position} -
-
- ))} -
-
-
- )} - - {selectedTask.task.status_history && selectedTask.task.status_history.length > 0 && ( -
-

Status History

-
-
- {selectedTask.task.status_history.map((status) => ( -
-
- {status.status_type} - - {new Date(status.timestamp * 1000).toLocaleString()} - -
- {status.status_data && ( -
-                                {JSON.stringify(status.status_data, null, 2)}
-                              
- )} -
- ))} -
-
-
- )} -
-
-
-
- )} - - {/* Track Mini-History Modal */} - {selectedTrackMiniHistory && ( -
-
-
-
-

- Track Mini-History: {selectedTrackMiniHistory.title} -

- -
- - {isMiniHistoryLoading ? ( -
-
Loading mini-history...
-
- ) : ( -
- {/* Track Summary */} -
-
-

Status

-

- {selectedTrackMiniHistory.status_final} -

-

- Current: {selectedTrackMiniHistory.status_current} -

-
-
-

Duration

-

- {formatDuration(selectedTrackMiniHistory.time_elapsed)} -

-

- {selectedTrackMiniHistory.calculated_duration} -

-
-
-

File Info

-

- {formatFileSize(selectedTrackMiniHistory.file_size)} -

-

- {selectedTrackMiniHistory.quality_achieved || "N/A"} -

-
-
-

Attempts

-

- {selectedTrackMiniHistory.retry_count + 1} -

-

- {selectedTrackMiniHistory.retry_count > 0 ? `${selectedTrackMiniHistory.retry_count} retries` : "No retries"} -

-
-
- - {/* Track Details */} -
-
-

Track Info

-
-

Position: {selectedTrackMiniHistory.disc_number}-{selectedTrackMiniHistory.track_number} (#{selectedTrackMiniHistory.position})

-

Duration: {selectedTrackMiniHistory.duration_ms ? `${Math.floor(selectedTrackMiniHistory.duration_ms / 60000)}:${Math.floor((selectedTrackMiniHistory.duration_ms % 60000) / 1000).toString().padStart(2, '0')}` : "N/A"}

-

Artists: {selectedTrackMiniHistory.artists_data?.map(a => a.name).join(", ") || "N/A"}

-

Explicit: {selectedTrackMiniHistory.explicit ? "Yes" : "No"}

-
-
-
-

Download Info

-
-

Started: {selectedTrackMiniHistory.timestamp_started ? new Date(selectedTrackMiniHistory.timestamp_started * 1000).toLocaleString() : "N/A"}

-

Completed: {selectedTrackMiniHistory.timestamp_completed ? new Date(selectedTrackMiniHistory.timestamp_completed * 1000).toLocaleString() : "N/A"}

-

Path: {selectedTrackMiniHistory.download_path || "N/A"}

-
-
-
- - {/* Timeline */} -
-

- Status Timeline ({selectedTrackMiniHistory.timeline.length} events) -

-
- {selectedTrackMiniHistory.timeline.map((event, index) => ( -
-
- - {event.status_type} - -
- {event.human_readable} - {new Date(event.timestamp * 1000).toLocaleString()} -
-
- {event.status_data && Object.keys(event.status_data).length > 0 && ( -
-
-                                {JSON.stringify(event.status_data, null, 2)}
-                              
-
- )} -
+

Genres

+
+ {selectedEntry.genres.map((genre, index) => ( + + {genre} + ))}
- - {/* Error Information */} - {selectedTrackMiniHistory.error_info && ( -
-

Error Information

-
-
{JSON.stringify(selectedTrackMiniHistory.error_info, null, 2)}
-
-
- )} -
- )} + )} +
)} - {parentTaskId && parentTask ? ( + {viewingChildren ? (
-
-
-
-

{parentTask.item_name || parentTask.title}

-

{parentTask.item_artist || parentTask.artist_names?.join(", ")}

+
+
+

{viewingChildren.title}

- {parentTask.task_type || parentTask.download_type} + {viewingChildren.download_type}
-
- {parentTask.status_final || "Unknown"} -
-

- Quality: - {formatQuality(parentTask)} -

- Completed: - {parentTask.timestamp_completed ? new Date(parentTask.timestamp_completed * 1000).toLocaleString() : "N/A"} + Total Tracks: + {viewingChildren.track_count}

-
-

- Tracks {showMiniHistories ? "(with Mini-Histories)" : ""} -

-
- {!showMiniHistories ? ( - - ) : ( - - )} -
-
+

+ Tracks +

) : (

Download History

)} - {/* Filter Controls - Responsive */} - {!parentTaskId && ( + {/* Filter Controls */} + {!viewingChildren && (
- {/* Mobile: Stacked filters */} -
+
- - - + Clear Filters +
)} @@ -1045,13 +540,13 @@ export const History = () => { ) : ( table.getRowModel().rows.map((row) => { - const isParent = - !row.original.parent_task_id && - (row.original.task_type === "album" || row.original.task_type === "playlist" || row.original.download_type === "album" || row.original.download_type === "playlist"); - const isChild = !!row.original.parent_task_id; + const entry = row.original; + const isChild = "album_title" in entry; + const isParent = !isChild && "children_table" in entry && entry.children_table; let rowClass = "hover:bg-surface-muted dark:hover:bg-surface-muted-dark"; + if (isParent) { - rowClass += " bg-surface-accent dark:bg-surface-accent-dark font-semibold hover:bg-surface-muted dark:hover:bg-surface-muted-dark"; + rowClass += " bg-surface-accent dark:bg-surface-accent-dark font-semibold"; } else if (isChild) { rowClass += " border-t border-dashed border-content-muted dark:border-content-muted-dark border-opacity-20"; } @@ -1084,11 +579,10 @@ export const History = () => { ) : ( table.getRowModel().rows.map((row) => { const entry = row.original; - const isParent = !entry.parent_task_id && (entry.task_type === "album" || entry.task_type === "playlist" || entry.download_type === "album" || entry.download_type === "playlist"); - const isChild = !!entry.parent_task_id; - const status = entry.status_final || entry.track_status; - const statusKey = (status || "").toUpperCase(); - const statusClass = STATUS_CLASS[statusKey] || "text-gray-500"; + const isChild = "album_title" in entry; + const isParent = !isChild && "children_table" in entry && entry.children_table; + const status = entry.status; + const statusClass = STATUS_CLASS[status] || "text-gray-500"; let cardClass = "bg-surface dark:bg-surface-secondary-dark rounded-lg border border-border dark:border-border-dark p-4"; if (isParent) { @@ -1103,13 +597,13 @@ export const History = () => {

- {isChild ? `└─ ${entry.title || entry.item_name}` : entry.title || entry.item_name} + {isChild ? `└─ ${entry.title}` : entry.title}

- {entry.artist_names?.join(", ") || entry.item_artist} + {Array.isArray(entry.artists) ? entry.artists.join(", ") : entry.artists}

- + {status}
@@ -1119,118 +613,71 @@ export const History = () => {
Type: - {entry.task_type || entry.download_type} + {"download_type" in entry ? entry.download_type : "track"}
- Source: - - {getDownloadSource(entry)} + Service: + + {"service" in entry ? entry.service : "Unknown"}
Quality: - {entry.quality_achieved || formatQuality(entry)} + {"download_type" in entry ? formatQuality(entry) : "N/A"}
- {showMiniHistories && parentTaskId && ( - <> -
- Retries: - - {entry.retry_count || 0} - -
-
- Duration: - - {formatDuration(entry.time_elapsed)} - -
-
- File Size: - - {formatFileSize(entry.file_size)} - -
- - )}
- Completed: + Date: - {entry.timestamp_completed ? new Date(entry.timestamp_completed * 1000).toLocaleString() : "N/A"} + {new Date(entry.timestamp * 1000).toLocaleString()}
- {/* Actions for parent entries */} - {!parentTaskId && isParent && ( - entry.has_children || entry.total_successful || entry.total_skipped || entry.total_failed - ) ? ( + {/* Actions */} + {!viewingChildren && !isChild && (
-
- - {entry.child_track_summary?.completed || entry.total_successful || 0} ✓ - - - {entry.child_track_summary?.skipped || entry.total_skipped || 0} ⊘ - - - {entry.child_track_summary?.error || entry.total_failed || 0} ✗ - -
-
- - -
-
- ) : !parentTaskId ? ( -
- -
- ) : parentTaskId ? ( -
- - {showMiniHistories && ( - + {isParent && ( +
+ + {(entry as HistoryEntry).successful_tracks || 0} ✓ + + + {(entry as HistoryEntry).skipped_tracks || 0} ⊘ + + + {(entry as HistoryEntry).failed_tracks || 0} ✗ + +
)} +
+ + {isParent && ( + + )} +
- ) : null} + )}
); }) )}
- {/* Pagination Controls - Responsive */} + {/* Pagination Controls */}
- {/* Mobile: Stacked layout */}