From 6538cde022ca95dee670319835fd0a7ef37902ff Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Tue, 19 Aug 2025 09:36:22 -0600 Subject: [PATCH] fix: #274 --- app.py | 468 +++++++++++++++++++++++------------------ routes/core/history.py | 275 ++++++++++++------------ 2 files changed, 393 insertions(+), 350 deletions(-) diff --git a/app.py b/app.py index 13c2a88..b520e4a 100755 --- a/app.py +++ b/app.py @@ -8,7 +8,6 @@ import logging.handlers import time from pathlib import Path import os -import atexit import sys import redis import socket @@ -16,11 +15,16 @@ from urllib.parse import urlparse # Run DB migrations as early as possible, before importing any routers that may touch DBs try: - from routes.migrations import run_migrations_if_needed - run_migrations_if_needed() - logging.getLogger(__name__).info("Database migrations executed (if needed) early in startup.") + from routes.migrations import run_migrations_if_needed + + run_migrations_if_needed() + logging.getLogger(__name__).info( + "Database migrations executed (if needed) early in startup." + ) except Exception as e: - logging.getLogger(__name__).error(f"Database migration step failed early in startup: {e}", exc_info=True) + logging.getLogger(__name__).error( + f"Database migration step failed early in startup: {e}", exc_info=True + ) # Import route routers (to be created) from routes.auth.credentials import router as credentials_router @@ -44,251 +48,299 @@ from routes.auth import AUTH_ENABLED from routes.auth.middleware import AuthMiddleware # Import and initialize routes (this will start the watch manager) -import routes # Configure application-wide logging def setup_logging(): - """Configure application-wide logging with rotation""" - # Create logs directory if it doesn't exist - logs_dir = Path("logs") - logs_dir.mkdir(exist_ok=True) + """Configure application-wide logging with rotation""" + # Create logs directory if it doesn't exist + logs_dir = Path("logs") + logs_dir.mkdir(exist_ok=True) - # Set up log file paths - main_log = logs_dir / "spotizerr.log" + # Set up log file paths + main_log = logs_dir / "spotizerr.log" - # Configure root logger - root_logger = logging.getLogger() - root_logger.setLevel(logging.DEBUG) + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) - # Clear any existing handlers from the root logger - if root_logger.hasHandlers(): - root_logger.handlers.clear() + # Clear any existing handlers from the root logger + if root_logger.hasHandlers(): + root_logger.handlers.clear() - # Log formatting - log_format = logging.Formatter( - "%(asctime)s [%(levelname)s] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) + # Log formatting + log_format = logging.Formatter( + "%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) - # File handler with rotation (10 MB max, keep 5 backups) - file_handler = logging.handlers.RotatingFileHandler( - main_log, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8" - ) - file_handler.setFormatter(log_format) - file_handler.setLevel(logging.INFO) + # File handler with rotation (10 MB max, keep 5 backups) + file_handler = logging.handlers.RotatingFileHandler( + main_log, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8" + ) + file_handler.setFormatter(log_format) + file_handler.setLevel(logging.INFO) - # Console handler for stderr - console_handler = logging.StreamHandler(sys.stderr) - console_handler.setFormatter(log_format) - console_handler.setLevel(logging.INFO) + # Console handler for stderr + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setFormatter(log_format) + console_handler.setLevel(logging.INFO) - # Add handlers to root logger - root_logger.addHandler(file_handler) - root_logger.addHandler(console_handler) + # Add handlers to root logger + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) - # Set up specific loggers - for logger_name in [ - "routes", - "routes.utils", - "routes.utils.celery_manager", - "routes.utils.celery_tasks", - "routes.utils.watch", - ]: - logger = logging.getLogger(logger_name) - logger.setLevel(logging.INFO) - logger.propagate = True # Propagate to root logger + # Set up specific loggers + for logger_name in [ + "routes", + "routes.utils", + "routes.utils.celery_manager", + "routes.utils.celery_tasks", + "routes.utils.watch", + ]: + logger = logging.getLogger(logger_name) + logger.setLevel(logging.INFO) + logger.propagate = True # Propagate to root logger - logging.info("Logging system initialized") + logging.info("Logging system initialized") def check_redis_connection(): - """Check if Redis is available and accessible""" - if not REDIS_URL: - logging.error("REDIS_URL is not configured. Please check your environment.") - return False + """Check if Redis is available and accessible""" + if not REDIS_URL: + logging.error("REDIS_URL is not configured. Please check your environment.") + return False - try: - # Parse Redis URL - parsed_url = urlparse(REDIS_URL) - host = parsed_url.hostname or "localhost" - port = parsed_url.port or 6379 + try: + # Parse Redis URL + parsed_url = urlparse(REDIS_URL) + host = parsed_url.hostname or "localhost" + port = parsed_url.port or 6379 - logging.info(f"Testing Redis connection to {host}:{port}...") + logging.info(f"Testing Redis connection to {host}:{port}...") - # Test socket connection first - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(5) - result = sock.connect_ex((host, port)) - sock.close() + # Test socket connection first + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + result = sock.connect_ex((host, port)) + sock.close() - if result != 0: - logging.error(f"Cannot connect to Redis at {host}:{port}") - return False + if result != 0: + logging.error(f"Cannot connect to Redis at {host}:{port}") + return False - # Test Redis client connection - r = redis.from_url(REDIS_URL, socket_connect_timeout=5, socket_timeout=5) - r.ping() - logging.info("Redis connection successful") - return True + # Test Redis client connection + r = redis.from_url(REDIS_URL, socket_connect_timeout=5, socket_timeout=5) + r.ping() + logging.info("Redis connection successful") + return True - except redis.ConnectionError as e: - logging.error(f"Redis connection error: {e}") - return False - except redis.TimeoutError as e: - logging.error(f"Redis timeout error: {e}") - return False - except Exception as e: - logging.error(f"Unexpected error checking Redis connection: {e}") - return False + except redis.ConnectionError as e: + logging.error(f"Redis connection error: {e}") + return False + except redis.TimeoutError as e: + logging.error(f"Redis timeout error: {e}") + return False + except Exception as e: + logging.error(f"Unexpected error checking Redis connection: {e}") + return False @asynccontextmanager async def lifespan(app: FastAPI): - """Handle application startup and shutdown""" - # Startup - setup_logging() - - # Check Redis connection - if not check_redis_connection(): - logging.error("Failed to connect to Redis. Please ensure Redis is running and accessible.") - # Don't exit, but warn - some functionality may not work - - # Start Celery workers - try: - celery_manager.start() - logging.info("Celery workers started successfully") - except Exception as e: - logging.error(f"Failed to start Celery workers: {e}") - - yield - - # Shutdown - try: - celery_manager.stop() - logging.info("Celery workers stopped") - except Exception as e: - logging.error(f"Error stopping Celery workers: {e}") + """Handle application startup and shutdown""" + # Startup + setup_logging() + + # Check Redis connection + if not check_redis_connection(): + logging.error( + "Failed to connect to Redis. Please ensure Redis is running and accessible." + ) + # Don't exit, but warn - some functionality may not work + + # Start Celery workers + try: + celery_manager.start() + logging.info("Celery workers started successfully") + except Exception as e: + logging.error(f"Failed to start Celery workers: {e}") + + yield + + # Shutdown + try: + celery_manager.stop() + logging.info("Celery workers stopped") + except Exception as e: + logging.error(f"Error stopping Celery workers: {e}") def create_app(): - app = FastAPI( - title="Spotizerr API", - description="Music download service API", - version="3.0.0", - lifespan=lifespan, - redirect_slashes=True # Enable automatic trailing slash redirects - ) + app = FastAPI( + title="Spotizerr API", + description="Music download service API", + version="3.0.0", + lifespan=lifespan, + redirect_slashes=True, # Enable automatic trailing slash redirects + ) - # Set up CORS - app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) + # Set up CORS + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) - # Add authentication middleware (only if auth is enabled) - if AUTH_ENABLED: - app.add_middleware(AuthMiddleware) - logging.info("Authentication system enabled") - else: - logging.info("Authentication system disabled") + # Add authentication middleware (only if auth is enabled) + if AUTH_ENABLED: + app.add_middleware(AuthMiddleware) + logging.info("Authentication system enabled") + else: + logging.info("Authentication system disabled") - # Register routers with URL prefixes - app.include_router(auth_router, prefix="/api/auth", tags=["auth"]) - - # Include SSO router if available - try: - from routes.auth.sso import router as sso_router - app.include_router(sso_router, prefix="/api/auth", tags=["sso"]) - logging.info("SSO functionality enabled") - except ImportError as e: - logging.warning(f"SSO functionality not available: {e}") - app.include_router(config_router, prefix="/api/config", tags=["config"]) - app.include_router(search_router, prefix="/api/search", tags=["search"]) - app.include_router(credentials_router, prefix="/api/credentials", tags=["credentials"]) - app.include_router(album_router, prefix="/api/album", tags=["album"]) - app.include_router(track_router, prefix="/api/track", tags=["track"]) - app.include_router(playlist_router, prefix="/api/playlist", tags=["playlist"]) - app.include_router(artist_router, prefix="/api/artist", tags=["artist"]) - app.include_router(prgs_router, prefix="/api/prgs", tags=["progress"]) - app.include_router(history_router, prefix="/api/history", tags=["history"]) + # Register routers with URL prefixes + app.include_router(auth_router, prefix="/api/auth", tags=["auth"]) - # Add request logging middleware - @app.middleware("http") - async def log_requests(request: Request, call_next): - start_time = time.time() - - # Log request - logger = logging.getLogger("uvicorn.access") - logger.debug(f"Request: {request.method} {request.url.path}") - - try: - response = await call_next(request) - - # Log response - duration = round((time.time() - start_time) * 1000, 2) - logger.debug(f"Response: {response.status_code} | Duration: {duration}ms") - - return response - except Exception as e: - # Log errors - logger.error(f"Server error: {str(e)}", exc_info=True) - raise HTTPException(status_code=500, detail="Internal Server Error") + # Include SSO router if available + try: + from routes.auth.sso import router as sso_router - # Mount static files for React app - if os.path.exists("spotizerr-ui/dist"): - app.mount("/static", StaticFiles(directory="spotizerr-ui/dist"), name="static") - - # Serve React App - catch-all route for SPA (but not for API routes) - @app.get("/{full_path:path}") - async def serve_react_app(full_path: str): - """Serve React app with fallback to index.html for SPA routing""" - static_dir = "spotizerr-ui/dist" - - # Don't serve React app for API routes (more specific check) - if full_path.startswith("api") or full_path.startswith("api/"): - raise HTTPException(status_code=404, detail="API endpoint not found") - - # If it's a file that exists, serve it - if full_path and os.path.exists(os.path.join(static_dir, full_path)): - return FileResponse(os.path.join(static_dir, full_path)) - else: - # Fallback to index.html for SPA routing - return FileResponse(os.path.join(static_dir, "index.html")) - else: - logging.warning("React app build directory not found at spotizerr-ui/dist") + app.include_router(sso_router, prefix="/api/auth", tags=["sso"]) + logging.info("SSO functionality enabled") + except ImportError as e: + logging.warning(f"SSO functionality not available: {e}") + app.include_router(config_router, prefix="/api/config", tags=["config"]) + app.include_router(search_router, prefix="/api/search", tags=["search"]) + app.include_router( + credentials_router, prefix="/api/credentials", tags=["credentials"] + ) + app.include_router(album_router, prefix="/api/album", tags=["album"]) + app.include_router(track_router, prefix="/api/track", tags=["track"]) + app.include_router(playlist_router, prefix="/api/playlist", tags=["playlist"]) + app.include_router(artist_router, prefix="/api/artist", tags=["artist"]) + app.include_router(prgs_router, prefix="/api/prgs", tags=["progress"]) + app.include_router(history_router, prefix="/api/history", tags=["history"]) - return app + # Add request logging middleware + @app.middleware("http") + async def log_requests(request: Request, call_next): + start_time = time.time() + + # Log request + logger = logging.getLogger("uvicorn.access") + logger.debug(f"Request: {request.method} {request.url.path}") + + try: + response = await call_next(request) + + # Log response + duration = round((time.time() - start_time) * 1000, 2) + logger.debug(f"Response: {response.status_code} | Duration: {duration}ms") + + return response + except Exception as e: + # Log errors + logger.error(f"Server error: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail="Internal Server Error") + + # Mount static files for React app + if os.path.exists("spotizerr-ui/dist"): + app.mount("/static", StaticFiles(directory="spotizerr-ui/dist"), name="static") + + # Serve React App - catch-all route for SPA (but not for API routes) + @app.get("/{full_path:path}") + async def serve_react_app(full_path: str): + """Serve React app with fallback to index.html for SPA routing. Prevent directory traversal.""" + static_dir = "spotizerr-ui/dist" + static_dir_path = Path(static_dir).resolve() + index_path = static_dir_path / "index.html" + allowed_exts = { + ".html", + ".js", + ".css", + ".map", + ".png", + ".jpg", + ".jpeg", + ".svg", + ".webp", + ".ico", + ".json", + ".txt", + ".woff", + ".woff2", + ".ttf", + ".eot", + ".mp3", + ".ogg", + ".mp4", + ".webm", + } + + # Don't serve React app for API routes (more specific check) + if full_path.startswith("api") or full_path.startswith("api/"): + raise HTTPException(status_code=404, detail="API endpoint not found") + + # Reject null bytes early + if "\x00" in full_path: + return FileResponse(str(index_path)) + + # Sanitize path: normalize backslashes and strip URL schemes + sanitized = full_path.replace("\\", "/").lstrip("/") + if sanitized.startswith("http://") or sanitized.startswith("https://"): + return FileResponse(str(index_path)) + + # Resolve requested path safely and ensure it stays within static_dir + try: + requested_path = (static_dir_path / sanitized).resolve() + except Exception: + requested_path = index_path + + # If traversal attempted or non-file within static dir, fall back to index.html for SPA routing + if not str(requested_path).startswith(str(static_dir_path)): + return FileResponse(str(index_path)) + + # Disallow hidden files (starting with dot) and enforce safe extensions + if requested_path.is_file(): + name = requested_path.name + if name.startswith("."): + return FileResponse(str(index_path)) + suffix = requested_path.suffix.lower() + if suffix in allowed_exts: + return FileResponse(str(requested_path)) + # Not an allowed asset; fall back to SPA index + return FileResponse(str(index_path)) + else: + # Fallback to index.html for SPA routing + return FileResponse(str(index_path)) + else: + logging.warning("React app build directory not found at spotizerr-ui/dist") + + return app def start_celery_workers(): - """Start Celery workers with dynamic configuration""" - # This function is now handled by the lifespan context manager - # and the celery_manager.start() call - pass + """Start Celery workers with dynamic configuration""" + # This function is now handled by the lifespan context manager + # and the celery_manager.start() call + pass if __name__ == "__main__": - import uvicorn + import uvicorn - app = create_app() + app = create_app() - # Use HOST environment variable if present, otherwise fall back to IPv4 wildcard - host = os.getenv("HOST", "0.0.0.0") + # Use HOST environment variable if present, otherwise fall back to IPv4 wildcard + host = os.getenv("HOST", "0.0.0.0") - # Allow overriding port via PORT env var, with default 7171 - try: - port = int(os.getenv("PORT", "7171")) - except ValueError: - port = 7171 + # Allow overriding port via PORT env var, with default 7171 + try: + port = int(os.getenv("PORT", "7171")) + except ValueError: + port = 7171 - uvicorn.run( - app, - host=host, - port=port, - log_level="info", - access_log=True - ) + uvicorn.run(app, host=host, port=port, log_level="info", access_log=True) diff --git a/routes/core/history.py b/routes/core/history.py index e892ac0..d1f1d93 100644 --- a/routes/core/history.py +++ b/routes/core/history.py @@ -1,9 +1,8 @@ -from fastapi import APIRouter, HTTPException, Request, Depends +from fastapi import APIRouter, Request, Depends from fastapi.responses import JSONResponse -import json -import traceback import logging from routes.utils.history_manager import history_manager +from typing import Any, Dict # Import authentication dependencies from routes.auth.middleware import require_auth_from_state, User @@ -15,10 +14,12 @@ router = APIRouter() @router.get("/") @router.get("") -async def get_history(request: Request, current_user: User = Depends(require_auth_from_state)): +async def get_history( + request: Request, current_user: User = Depends(require_auth_from_state) +): """ Retrieve download history with optional filtering and pagination. - + Query parameters: - limit: Maximum number of records (default: 100, max: 500) - offset: Number of records to skip (default: 0) @@ -31,143 +32,144 @@ async def get_history(request: Request, current_user: User = Depends(require_aut offset = max(int(request.query_params.get("offset", 0)), 0) download_type = request.query_params.get("download_type") status = request.query_params.get("status") - + # Validate download_type if provided valid_types = ["track", "album", "playlist"] if download_type and download_type not in valid_types: return JSONResponse( - content={"error": f"Invalid download_type. Must be one of: {valid_types}"}, - status_code=400 + content={ + "error": f"Invalid download_type. Must be one of: {valid_types}" + }, + status_code=400, ) - + # Validate status if provided valid_statuses = ["completed", "failed", "skipped", "in_progress"] if status and status not in valid_statuses: return JSONResponse( content={"error": f"Invalid status. Must be one of: {valid_statuses}"}, - status_code=400 + status_code=400, ) - + # Get history from manager history = history_manager.get_download_history( - limit=limit, - offset=offset, - download_type=download_type, - status=status + limit=limit, offset=offset, download_type=download_type, status=status ) - + # Add pagination info - response_data = { + response_data: Dict[str, Any] = { "downloads": history, "pagination": { "limit": limit, "offset": offset, - "returned_count": len(history) - } + "returned_count": len(history), + }, } - + + filters: Dict[str, Any] = {} if download_type: - response_data["filters"] = {"download_type": download_type} + filters["download_type"] = download_type if status: - if "filters" not in response_data: - response_data["filters"] = {} - response_data["filters"]["status"] = status - - return JSONResponse( - content=response_data, - status_code=200 - ) - + filters["status"] = status + if filters: + response_data["filters"] = filters + + return JSONResponse(content=response_data, status_code=200) + except ValueError as e: return JSONResponse( - content={"error": f"Invalid parameter value: {str(e)}"}, - status_code=400 + content={"error": f"Invalid parameter value: {str(e)}"}, status_code=400 ) except Exception as e: logger.error(f"Error retrieving download history: {e}", exc_info=True) return JSONResponse( content={"error": "Failed to retrieve download history", "details": str(e)}, - status_code=500 + status_code=500, ) @router.get("/{task_id}") -async def get_download_by_task_id(task_id: str, current_user: User = Depends(require_auth_from_state)): +async def get_download_by_task_id( + task_id: str, current_user: User = Depends(require_auth_from_state) +): """ Retrieve specific download history by task ID. - + Args: - task_id: Celery task ID + task_id: Celery task ID """ try: download = history_manager.get_download_by_task_id(task_id) - + if not download: return JSONResponse( content={"error": f"Download with task ID '{task_id}' not found"}, - status_code=404 + status_code=404, ) - - return JSONResponse( - content=download, - status_code=200 - ) - + + return JSONResponse(content=download, status_code=200) + except Exception as e: - logger.error(f"Error retrieving download for task {task_id}: {e}", exc_info=True) + logger.error( + f"Error retrieving download for task {task_id}: {e}", exc_info=True + ) return JSONResponse( content={"error": "Failed to retrieve download", "details": str(e)}, - status_code=500 + status_code=500, ) @router.get("/{task_id}/children") -async def get_download_children(task_id: str, current_user: User = Depends(require_auth_from_state)): +async def get_download_children( + task_id: str, current_user: User = Depends(require_auth_from_state) +): """ Retrieve children tracks for an album or playlist download. - + Args: - task_id: Celery task ID + task_id: Celery task ID """ try: # First get the main download to find the children table download = history_manager.get_download_by_task_id(task_id) - + if not download: return JSONResponse( content={"error": f"Download with task ID '{task_id}' not found"}, - status_code=404 + status_code=404, ) - + children_table = download.get("children_table") if not children_table: return JSONResponse( content={"error": f"Download '{task_id}' has no children tracks"}, - status_code=404 + status_code=404, ) - + # Get children tracks children = history_manager.get_children_history(children_table) - + response_data = { "task_id": task_id, "download_type": download.get("download_type"), "title": download.get("title"), "children_table": children_table, "tracks": children, - "track_count": len(children) + "track_count": len(children), } - - return JSONResponse( - content=response_data, - status_code=200 - ) - + + return JSONResponse(content=response_data, status_code=200) + except Exception as e: - logger.error(f"Error retrieving children for task {task_id}: {e}", exc_info=True) + logger.error( + f"Error retrieving children for task {task_id}: {e}", exc_info=True + ) return JSONResponse( - content={"error": "Failed to retrieve download children", "details": str(e)}, - status_code=500 + content={ + "error": "Failed to retrieve download children", + "details": str(e), + }, + status_code=500, ) @@ -178,25 +180,27 @@ async def get_download_stats(current_user: User = Depends(require_auth_from_stat """ try: stats = history_manager.get_download_stats() - - return JSONResponse( - content=stats, - status_code=200 - ) - + + return JSONResponse(content=stats, status_code=200) + except Exception as e: logger.error(f"Error retrieving download stats: {e}", exc_info=True) return JSONResponse( - content={"error": "Failed to retrieve download statistics", "details": str(e)}, - status_code=500 + content={ + "error": "Failed to retrieve download statistics", + "details": str(e), + }, + status_code=500, ) @router.get("/search") -async def search_history(request: Request, current_user: User = Depends(require_auth_from_state)): +async def search_history( + request: Request, current_user: User = Depends(require_auth_from_state) +): """ Search download history by title or artist. - + Query parameters: - q: Search query (required) - limit: Maximum number of results (default: 50, max: 200) @@ -206,147 +210,134 @@ async def search_history(request: Request, current_user: User = Depends(require_ if not query: return JSONResponse( content={"error": "Missing required parameter: q (search query)"}, - status_code=400 + status_code=400, ) - + limit = min(int(request.query_params.get("limit", 50)), 200) # Cap at 200 - + # Search history results = history_manager.search_history(query, limit) - + response_data = { "query": query, "results": results, "result_count": len(results), - "limit": limit + "limit": limit, } - - return JSONResponse( - content=response_data, - status_code=200 - ) - + + return JSONResponse(content=response_data, status_code=200) + except ValueError as e: return JSONResponse( - content={"error": f"Invalid parameter value: {str(e)}"}, - status_code=400 + content={"error": f"Invalid parameter value: {str(e)}"}, status_code=400 ) except Exception as e: logger.error(f"Error searching download history: {e}", exc_info=True) return JSONResponse( content={"error": "Failed to search download history", "details": str(e)}, - status_code=500 + status_code=500, ) @router.get("/recent") -async def get_recent_downloads(request: Request, current_user: User = Depends(require_auth_from_state)): +async def get_recent_downloads( + request: Request, current_user: User = Depends(require_auth_from_state) +): """ Get most recent downloads. - + Query parameters: - limit: Maximum number of results (default: 20, max: 100) """ try: limit = min(int(request.query_params.get("limit", 20)), 100) # Cap at 100 - + recent = history_manager.get_recent_downloads(limit) - - response_data = { - "downloads": recent, - "count": len(recent), - "limit": limit - } - - return JSONResponse( - content=response_data, - status_code=200 - ) - + + response_data = {"downloads": recent, "count": len(recent), "limit": limit} + + return JSONResponse(content=response_data, status_code=200) + except ValueError as e: return JSONResponse( - content={"error": f"Invalid parameter value: {str(e)}"}, - status_code=400 + content={"error": f"Invalid parameter value: {str(e)}"}, status_code=400 ) except Exception as e: logger.error(f"Error retrieving recent downloads: {e}", exc_info=True) return JSONResponse( content={"error": "Failed to retrieve recent downloads", "details": str(e)}, - status_code=500 + status_code=500, ) @router.get("/failed") -async def get_failed_downloads(request: Request, current_user: User = Depends(require_auth_from_state)): +async def get_failed_downloads( + request: Request, current_user: User = Depends(require_auth_from_state) +): """ Get failed downloads. - + Query parameters: - limit: Maximum number of results (default: 50, max: 200) """ try: limit = min(int(request.query_params.get("limit", 50)), 200) # Cap at 200 - + failed = history_manager.get_failed_downloads(limit) - - response_data = { - "downloads": failed, - "count": len(failed), - "limit": limit - } - - return JSONResponse( - content=response_data, - status_code=200 - ) - + + response_data = {"downloads": failed, "count": len(failed), "limit": limit} + + return JSONResponse(content=response_data, status_code=200) + except ValueError as e: return JSONResponse( - content={"error": f"Invalid parameter value: {str(e)}"}, - status_code=400 + content={"error": f"Invalid parameter value: {str(e)}"}, status_code=400 ) except Exception as e: logger.error(f"Error retrieving failed downloads: {e}", exc_info=True) return JSONResponse( content={"error": "Failed to retrieve failed downloads", "details": str(e)}, - status_code=500 + status_code=500, ) @router.post("/cleanup") -async def cleanup_old_history(request: Request, current_user: User = Depends(require_auth_from_state)): +async def cleanup_old_history( + request: Request, current_user: User = Depends(require_auth_from_state) +): """ Clean up old download history. - + JSON body: - days_old: Number of days old to keep (default: 30) """ try: - data = await request.json() if request.headers.get("content-type") == "application/json" else {} + data = ( + await request.json() + if request.headers.get("content-type") == "application/json" + else {} + ) days_old = data.get("days_old", 30) - + if not isinstance(days_old, int) or days_old <= 0: return JSONResponse( content={"error": "days_old must be a positive integer"}, - status_code=400 + status_code=400, ) - + deleted_count = history_manager.clear_old_history(days_old) - + response_data = { - "message": f"Successfully cleaned up old download history", + "message": "Successfully cleaned up old download history", "deleted_records": deleted_count, - "days_old": days_old + "days_old": days_old, } - - return JSONResponse( - content=response_data, - status_code=200 - ) - + + return JSONResponse(content=response_data, status_code=200) + except Exception as e: logger.error(f"Error cleaning up old history: {e}", exc_info=True) return JSONResponse( content={"error": "Failed to cleanup old history", "details": str(e)}, - status_code=500 - ) \ No newline at end of file + status_code=500, + )