This commit is contained in:
Xoconoch
2025-08-19 09:36:22 -06:00
parent 015ae024a6
commit 6538cde022
2 changed files with 393 additions and 350 deletions

468
app.py
View File

@@ -8,7 +8,6 @@ import logging.handlers
import time
from pathlib import Path
import os
import atexit
import sys
import redis
import socket
@@ -16,11 +15,16 @@ from urllib.parse import urlparse
# Run DB migrations as early as possible, before importing any routers that may touch DBs
try:
from routes.migrations import run_migrations_if_needed
run_migrations_if_needed()
logging.getLogger(__name__).info("Database migrations executed (if needed) early in startup.")
from routes.migrations import run_migrations_if_needed
run_migrations_if_needed()
logging.getLogger(__name__).info(
"Database migrations executed (if needed) early in startup."
)
except Exception as e:
logging.getLogger(__name__).error(f"Database migration step failed early in startup: {e}", exc_info=True)
logging.getLogger(__name__).error(
f"Database migration step failed early in startup: {e}", exc_info=True
)
# Import route routers (to be created)
from routes.auth.credentials import router as credentials_router
@@ -44,251 +48,299 @@ from routes.auth import AUTH_ENABLED
from routes.auth.middleware import AuthMiddleware
# Import and initialize routes (this will start the watch manager)
import routes
# Configure application-wide logging
def setup_logging():
"""Configure application-wide logging with rotation"""
# Create logs directory if it doesn't exist
logs_dir = Path("logs")
logs_dir.mkdir(exist_ok=True)
"""Configure application-wide logging with rotation"""
# Create logs directory if it doesn't exist
logs_dir = Path("logs")
logs_dir.mkdir(exist_ok=True)
# Set up log file paths
main_log = logs_dir / "spotizerr.log"
# Set up log file paths
main_log = logs_dir / "spotizerr.log"
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# Clear any existing handlers from the root logger
if root_logger.hasHandlers():
root_logger.handlers.clear()
# Clear any existing handlers from the root logger
if root_logger.hasHandlers():
root_logger.handlers.clear()
# Log formatting
log_format = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Log formatting
log_format = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# File handler with rotation (10 MB max, keep 5 backups)
file_handler = logging.handlers.RotatingFileHandler(
main_log, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8"
)
file_handler.setFormatter(log_format)
file_handler.setLevel(logging.INFO)
# File handler with rotation (10 MB max, keep 5 backups)
file_handler = logging.handlers.RotatingFileHandler(
main_log, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8"
)
file_handler.setFormatter(log_format)
file_handler.setLevel(logging.INFO)
# Console handler for stderr
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(log_format)
console_handler.setLevel(logging.INFO)
# Console handler for stderr
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(log_format)
console_handler.setLevel(logging.INFO)
# Add handlers to root logger
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# Add handlers to root logger
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# Set up specific loggers
for logger_name in [
"routes",
"routes.utils",
"routes.utils.celery_manager",
"routes.utils.celery_tasks",
"routes.utils.watch",
]:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
logger.propagate = True # Propagate to root logger
# Set up specific loggers
for logger_name in [
"routes",
"routes.utils",
"routes.utils.celery_manager",
"routes.utils.celery_tasks",
"routes.utils.watch",
]:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
logger.propagate = True # Propagate to root logger
logging.info("Logging system initialized")
logging.info("Logging system initialized")
def check_redis_connection():
"""Check if Redis is available and accessible"""
if not REDIS_URL:
logging.error("REDIS_URL is not configured. Please check your environment.")
return False
"""Check if Redis is available and accessible"""
if not REDIS_URL:
logging.error("REDIS_URL is not configured. Please check your environment.")
return False
try:
# Parse Redis URL
parsed_url = urlparse(REDIS_URL)
host = parsed_url.hostname or "localhost"
port = parsed_url.port or 6379
try:
# Parse Redis URL
parsed_url = urlparse(REDIS_URL)
host = parsed_url.hostname or "localhost"
port = parsed_url.port or 6379
logging.info(f"Testing Redis connection to {host}:{port}...")
logging.info(f"Testing Redis connection to {host}:{port}...")
# Test socket connection first
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
sock.close()
# Test socket connection first
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
sock.close()
if result != 0:
logging.error(f"Cannot connect to Redis at {host}:{port}")
return False
if result != 0:
logging.error(f"Cannot connect to Redis at {host}:{port}")
return False
# Test Redis client connection
r = redis.from_url(REDIS_URL, socket_connect_timeout=5, socket_timeout=5)
r.ping()
logging.info("Redis connection successful")
return True
# Test Redis client connection
r = redis.from_url(REDIS_URL, socket_connect_timeout=5, socket_timeout=5)
r.ping()
logging.info("Redis connection successful")
return True
except redis.ConnectionError as e:
logging.error(f"Redis connection error: {e}")
return False
except redis.TimeoutError as e:
logging.error(f"Redis timeout error: {e}")
return False
except Exception as e:
logging.error(f"Unexpected error checking Redis connection: {e}")
return False
except redis.ConnectionError as e:
logging.error(f"Redis connection error: {e}")
return False
except redis.TimeoutError as e:
logging.error(f"Redis timeout error: {e}")
return False
except Exception as e:
logging.error(f"Unexpected error checking Redis connection: {e}")
return False
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Handle application startup and shutdown"""
# Startup
setup_logging()
# Check Redis connection
if not check_redis_connection():
logging.error("Failed to connect to Redis. Please ensure Redis is running and accessible.")
# Don't exit, but warn - some functionality may not work
# Start Celery workers
try:
celery_manager.start()
logging.info("Celery workers started successfully")
except Exception as e:
logging.error(f"Failed to start Celery workers: {e}")
yield
# Shutdown
try:
celery_manager.stop()
logging.info("Celery workers stopped")
except Exception as e:
logging.error(f"Error stopping Celery workers: {e}")
"""Handle application startup and shutdown"""
# Startup
setup_logging()
# Check Redis connection
if not check_redis_connection():
logging.error(
"Failed to connect to Redis. Please ensure Redis is running and accessible."
)
# Don't exit, but warn - some functionality may not work
# Start Celery workers
try:
celery_manager.start()
logging.info("Celery workers started successfully")
except Exception as e:
logging.error(f"Failed to start Celery workers: {e}")
yield
# Shutdown
try:
celery_manager.stop()
logging.info("Celery workers stopped")
except Exception as e:
logging.error(f"Error stopping Celery workers: {e}")
def create_app():
app = FastAPI(
title="Spotizerr API",
description="Music download service API",
version="3.0.0",
lifespan=lifespan,
redirect_slashes=True # Enable automatic trailing slash redirects
)
app = FastAPI(
title="Spotizerr API",
description="Music download service API",
version="3.0.0",
lifespan=lifespan,
redirect_slashes=True, # Enable automatic trailing slash redirects
)
# Set up CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Set up CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add authentication middleware (only if auth is enabled)
if AUTH_ENABLED:
app.add_middleware(AuthMiddleware)
logging.info("Authentication system enabled")
else:
logging.info("Authentication system disabled")
# Add authentication middleware (only if auth is enabled)
if AUTH_ENABLED:
app.add_middleware(AuthMiddleware)
logging.info("Authentication system enabled")
else:
logging.info("Authentication system disabled")
# Register routers with URL prefixes
app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
# Include SSO router if available
try:
from routes.auth.sso import router as sso_router
app.include_router(sso_router, prefix="/api/auth", tags=["sso"])
logging.info("SSO functionality enabled")
except ImportError as e:
logging.warning(f"SSO functionality not available: {e}")
app.include_router(config_router, prefix="/api/config", tags=["config"])
app.include_router(search_router, prefix="/api/search", tags=["search"])
app.include_router(credentials_router, prefix="/api/credentials", tags=["credentials"])
app.include_router(album_router, prefix="/api/album", tags=["album"])
app.include_router(track_router, prefix="/api/track", tags=["track"])
app.include_router(playlist_router, prefix="/api/playlist", tags=["playlist"])
app.include_router(artist_router, prefix="/api/artist", tags=["artist"])
app.include_router(prgs_router, prefix="/api/prgs", tags=["progress"])
app.include_router(history_router, prefix="/api/history", tags=["history"])
# Register routers with URL prefixes
app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
# Add request logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
# Log request
logger = logging.getLogger("uvicorn.access")
logger.debug(f"Request: {request.method} {request.url.path}")
try:
response = await call_next(request)
# Log response
duration = round((time.time() - start_time) * 1000, 2)
logger.debug(f"Response: {response.status_code} | Duration: {duration}ms")
return response
except Exception as e:
# Log errors
logger.error(f"Server error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal Server Error")
# Include SSO router if available
try:
from routes.auth.sso import router as sso_router
# Mount static files for React app
if os.path.exists("spotizerr-ui/dist"):
app.mount("/static", StaticFiles(directory="spotizerr-ui/dist"), name="static")
# Serve React App - catch-all route for SPA (but not for API routes)
@app.get("/{full_path:path}")
async def serve_react_app(full_path: str):
"""Serve React app with fallback to index.html for SPA routing"""
static_dir = "spotizerr-ui/dist"
# Don't serve React app for API routes (more specific check)
if full_path.startswith("api") or full_path.startswith("api/"):
raise HTTPException(status_code=404, detail="API endpoint not found")
# If it's a file that exists, serve it
if full_path and os.path.exists(os.path.join(static_dir, full_path)):
return FileResponse(os.path.join(static_dir, full_path))
else:
# Fallback to index.html for SPA routing
return FileResponse(os.path.join(static_dir, "index.html"))
else:
logging.warning("React app build directory not found at spotizerr-ui/dist")
app.include_router(sso_router, prefix="/api/auth", tags=["sso"])
logging.info("SSO functionality enabled")
except ImportError as e:
logging.warning(f"SSO functionality not available: {e}")
app.include_router(config_router, prefix="/api/config", tags=["config"])
app.include_router(search_router, prefix="/api/search", tags=["search"])
app.include_router(
credentials_router, prefix="/api/credentials", tags=["credentials"]
)
app.include_router(album_router, prefix="/api/album", tags=["album"])
app.include_router(track_router, prefix="/api/track", tags=["track"])
app.include_router(playlist_router, prefix="/api/playlist", tags=["playlist"])
app.include_router(artist_router, prefix="/api/artist", tags=["artist"])
app.include_router(prgs_router, prefix="/api/prgs", tags=["progress"])
app.include_router(history_router, prefix="/api/history", tags=["history"])
return app
# Add request logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
# Log request
logger = logging.getLogger("uvicorn.access")
logger.debug(f"Request: {request.method} {request.url.path}")
try:
response = await call_next(request)
# Log response
duration = round((time.time() - start_time) * 1000, 2)
logger.debug(f"Response: {response.status_code} | Duration: {duration}ms")
return response
except Exception as e:
# Log errors
logger.error(f"Server error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal Server Error")
# Mount static files for React app
if os.path.exists("spotizerr-ui/dist"):
app.mount("/static", StaticFiles(directory="spotizerr-ui/dist"), name="static")
# Serve React App - catch-all route for SPA (but not for API routes)
@app.get("/{full_path:path}")
async def serve_react_app(full_path: str):
"""Serve React app with fallback to index.html for SPA routing. Prevent directory traversal."""
static_dir = "spotizerr-ui/dist"
static_dir_path = Path(static_dir).resolve()
index_path = static_dir_path / "index.html"
allowed_exts = {
".html",
".js",
".css",
".map",
".png",
".jpg",
".jpeg",
".svg",
".webp",
".ico",
".json",
".txt",
".woff",
".woff2",
".ttf",
".eot",
".mp3",
".ogg",
".mp4",
".webm",
}
# Don't serve React app for API routes (more specific check)
if full_path.startswith("api") or full_path.startswith("api/"):
raise HTTPException(status_code=404, detail="API endpoint not found")
# Reject null bytes early
if "\x00" in full_path:
return FileResponse(str(index_path))
# Sanitize path: normalize backslashes and strip URL schemes
sanitized = full_path.replace("\\", "/").lstrip("/")
if sanitized.startswith("http://") or sanitized.startswith("https://"):
return FileResponse(str(index_path))
# Resolve requested path safely and ensure it stays within static_dir
try:
requested_path = (static_dir_path / sanitized).resolve()
except Exception:
requested_path = index_path
# If traversal attempted or non-file within static dir, fall back to index.html for SPA routing
if not str(requested_path).startswith(str(static_dir_path)):
return FileResponse(str(index_path))
# Disallow hidden files (starting with dot) and enforce safe extensions
if requested_path.is_file():
name = requested_path.name
if name.startswith("."):
return FileResponse(str(index_path))
suffix = requested_path.suffix.lower()
if suffix in allowed_exts:
return FileResponse(str(requested_path))
# Not an allowed asset; fall back to SPA index
return FileResponse(str(index_path))
else:
# Fallback to index.html for SPA routing
return FileResponse(str(index_path))
else:
logging.warning("React app build directory not found at spotizerr-ui/dist")
return app
def start_celery_workers():
"""Start Celery workers with dynamic configuration"""
# This function is now handled by the lifespan context manager
# and the celery_manager.start() call
pass
"""Start Celery workers with dynamic configuration"""
# This function is now handled by the lifespan context manager
# and the celery_manager.start() call
pass
if __name__ == "__main__":
import uvicorn
import uvicorn
app = create_app()
app = create_app()
# Use HOST environment variable if present, otherwise fall back to IPv4 wildcard
host = os.getenv("HOST", "0.0.0.0")
# Use HOST environment variable if present, otherwise fall back to IPv4 wildcard
host = os.getenv("HOST", "0.0.0.0")
# Allow overriding port via PORT env var, with default 7171
try:
port = int(os.getenv("PORT", "7171"))
except ValueError:
port = 7171
# Allow overriding port via PORT env var, with default 7171
try:
port = int(os.getenv("PORT", "7171"))
except ValueError:
port = 7171
uvicorn.run(
app,
host=host,
port=port,
log_level="info",
access_log=True
)
uvicorn.run(app, host=host, port=port, log_level="info", access_log=True)

View File

@@ -1,9 +1,8 @@
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi import APIRouter, Request, Depends
from fastapi.responses import JSONResponse
import json
import traceback
import logging
from routes.utils.history_manager import history_manager
from typing import Any, Dict
# Import authentication dependencies
from routes.auth.middleware import require_auth_from_state, User
@@ -15,10 +14,12 @@ router = APIRouter()
@router.get("/")
@router.get("")
async def get_history(request: Request, current_user: User = Depends(require_auth_from_state)):
async def get_history(
request: Request, current_user: User = Depends(require_auth_from_state)
):
"""
Retrieve download history with optional filtering and pagination.
Query parameters:
- limit: Maximum number of records (default: 100, max: 500)
- offset: Number of records to skip (default: 0)
@@ -31,143 +32,144 @@ async def get_history(request: Request, current_user: User = Depends(require_aut
offset = max(int(request.query_params.get("offset", 0)), 0)
download_type = request.query_params.get("download_type")
status = request.query_params.get("status")
# Validate download_type if provided
valid_types = ["track", "album", "playlist"]
if download_type and download_type not in valid_types:
return JSONResponse(
content={"error": f"Invalid download_type. Must be one of: {valid_types}"},
status_code=400
content={
"error": f"Invalid download_type. Must be one of: {valid_types}"
},
status_code=400,
)
# Validate status if provided
valid_statuses = ["completed", "failed", "skipped", "in_progress"]
if status and status not in valid_statuses:
return JSONResponse(
content={"error": f"Invalid status. Must be one of: {valid_statuses}"},
status_code=400
status_code=400,
)
# Get history from manager
history = history_manager.get_download_history(
limit=limit,
offset=offset,
download_type=download_type,
status=status
limit=limit, offset=offset, download_type=download_type, status=status
)
# Add pagination info
response_data = {
response_data: Dict[str, Any] = {
"downloads": history,
"pagination": {
"limit": limit,
"offset": offset,
"returned_count": len(history)
}
"returned_count": len(history),
},
}
filters: Dict[str, Any] = {}
if download_type:
response_data["filters"] = {"download_type": download_type}
filters["download_type"] = download_type
if status:
if "filters" not in response_data:
response_data["filters"] = {}
response_data["filters"]["status"] = status
return JSONResponse(
content=response_data,
status_code=200
)
filters["status"] = status
if filters:
response_data["filters"] = filters
return JSONResponse(content=response_data, status_code=200)
except ValueError as e:
return JSONResponse(
content={"error": f"Invalid parameter value: {str(e)}"},
status_code=400
content={"error": f"Invalid parameter value: {str(e)}"}, status_code=400
)
except Exception as e:
logger.error(f"Error retrieving download history: {e}", exc_info=True)
return JSONResponse(
content={"error": "Failed to retrieve download history", "details": str(e)},
status_code=500
status_code=500,
)
@router.get("/{task_id}")
async def get_download_by_task_id(task_id: str, current_user: User = Depends(require_auth_from_state)):
async def get_download_by_task_id(
task_id: str, current_user: User = Depends(require_auth_from_state)
):
"""
Retrieve specific download history by task ID.
Args:
task_id: Celery task ID
task_id: Celery task ID
"""
try:
download = history_manager.get_download_by_task_id(task_id)
if not download:
return JSONResponse(
content={"error": f"Download with task ID '{task_id}' not found"},
status_code=404
status_code=404,
)
return JSONResponse(
content=download,
status_code=200
)
return JSONResponse(content=download, status_code=200)
except Exception as e:
logger.error(f"Error retrieving download for task {task_id}: {e}", exc_info=True)
logger.error(
f"Error retrieving download for task {task_id}: {e}", exc_info=True
)
return JSONResponse(
content={"error": "Failed to retrieve download", "details": str(e)},
status_code=500
status_code=500,
)
@router.get("/{task_id}/children")
async def get_download_children(task_id: str, current_user: User = Depends(require_auth_from_state)):
async def get_download_children(
task_id: str, current_user: User = Depends(require_auth_from_state)
):
"""
Retrieve children tracks for an album or playlist download.
Args:
task_id: Celery task ID
task_id: Celery task ID
"""
try:
# First get the main download to find the children table
download = history_manager.get_download_by_task_id(task_id)
if not download:
return JSONResponse(
content={"error": f"Download with task ID '{task_id}' not found"},
status_code=404
status_code=404,
)
children_table = download.get("children_table")
if not children_table:
return JSONResponse(
content={"error": f"Download '{task_id}' has no children tracks"},
status_code=404
status_code=404,
)
# Get children tracks
children = history_manager.get_children_history(children_table)
response_data = {
"task_id": task_id,
"download_type": download.get("download_type"),
"title": download.get("title"),
"children_table": children_table,
"tracks": children,
"track_count": len(children)
"track_count": len(children),
}
return JSONResponse(
content=response_data,
status_code=200
)
return JSONResponse(content=response_data, status_code=200)
except Exception as e:
logger.error(f"Error retrieving children for task {task_id}: {e}", exc_info=True)
logger.error(
f"Error retrieving children for task {task_id}: {e}", exc_info=True
)
return JSONResponse(
content={"error": "Failed to retrieve download children", "details": str(e)},
status_code=500
content={
"error": "Failed to retrieve download children",
"details": str(e),
},
status_code=500,
)
@@ -178,25 +180,27 @@ async def get_download_stats(current_user: User = Depends(require_auth_from_stat
"""
try:
stats = history_manager.get_download_stats()
return JSONResponse(
content=stats,
status_code=200
)
return JSONResponse(content=stats, status_code=200)
except Exception as e:
logger.error(f"Error retrieving download stats: {e}", exc_info=True)
return JSONResponse(
content={"error": "Failed to retrieve download statistics", "details": str(e)},
status_code=500
content={
"error": "Failed to retrieve download statistics",
"details": str(e),
},
status_code=500,
)
@router.get("/search")
async def search_history(request: Request, current_user: User = Depends(require_auth_from_state)):
async def search_history(
request: Request, current_user: User = Depends(require_auth_from_state)
):
"""
Search download history by title or artist.
Query parameters:
- q: Search query (required)
- limit: Maximum number of results (default: 50, max: 200)
@@ -206,147 +210,134 @@ async def search_history(request: Request, current_user: User = Depends(require_
if not query:
return JSONResponse(
content={"error": "Missing required parameter: q (search query)"},
status_code=400
status_code=400,
)
limit = min(int(request.query_params.get("limit", 50)), 200) # Cap at 200
# Search history
results = history_manager.search_history(query, limit)
response_data = {
"query": query,
"results": results,
"result_count": len(results),
"limit": limit
"limit": limit,
}
return JSONResponse(
content=response_data,
status_code=200
)
return JSONResponse(content=response_data, status_code=200)
except ValueError as e:
return JSONResponse(
content={"error": f"Invalid parameter value: {str(e)}"},
status_code=400
content={"error": f"Invalid parameter value: {str(e)}"}, status_code=400
)
except Exception as e:
logger.error(f"Error searching download history: {e}", exc_info=True)
return JSONResponse(
content={"error": "Failed to search download history", "details": str(e)},
status_code=500
status_code=500,
)
@router.get("/recent")
async def get_recent_downloads(request: Request, current_user: User = Depends(require_auth_from_state)):
async def get_recent_downloads(
request: Request, current_user: User = Depends(require_auth_from_state)
):
"""
Get most recent downloads.
Query parameters:
- limit: Maximum number of results (default: 20, max: 100)
"""
try:
limit = min(int(request.query_params.get("limit", 20)), 100) # Cap at 100
recent = history_manager.get_recent_downloads(limit)
response_data = {
"downloads": recent,
"count": len(recent),
"limit": limit
}
return JSONResponse(
content=response_data,
status_code=200
)
response_data = {"downloads": recent, "count": len(recent), "limit": limit}
return JSONResponse(content=response_data, status_code=200)
except ValueError as e:
return JSONResponse(
content={"error": f"Invalid parameter value: {str(e)}"},
status_code=400
content={"error": f"Invalid parameter value: {str(e)}"}, status_code=400
)
except Exception as e:
logger.error(f"Error retrieving recent downloads: {e}", exc_info=True)
return JSONResponse(
content={"error": "Failed to retrieve recent downloads", "details": str(e)},
status_code=500
status_code=500,
)
@router.get("/failed")
async def get_failed_downloads(request: Request, current_user: User = Depends(require_auth_from_state)):
async def get_failed_downloads(
request: Request, current_user: User = Depends(require_auth_from_state)
):
"""
Get failed downloads.
Query parameters:
- limit: Maximum number of results (default: 50, max: 200)
"""
try:
limit = min(int(request.query_params.get("limit", 50)), 200) # Cap at 200
failed = history_manager.get_failed_downloads(limit)
response_data = {
"downloads": failed,
"count": len(failed),
"limit": limit
}
return JSONResponse(
content=response_data,
status_code=200
)
response_data = {"downloads": failed, "count": len(failed), "limit": limit}
return JSONResponse(content=response_data, status_code=200)
except ValueError as e:
return JSONResponse(
content={"error": f"Invalid parameter value: {str(e)}"},
status_code=400
content={"error": f"Invalid parameter value: {str(e)}"}, status_code=400
)
except Exception as e:
logger.error(f"Error retrieving failed downloads: {e}", exc_info=True)
return JSONResponse(
content={"error": "Failed to retrieve failed downloads", "details": str(e)},
status_code=500
status_code=500,
)
@router.post("/cleanup")
async def cleanup_old_history(request: Request, current_user: User = Depends(require_auth_from_state)):
async def cleanup_old_history(
request: Request, current_user: User = Depends(require_auth_from_state)
):
"""
Clean up old download history.
JSON body:
- days_old: Number of days old to keep (default: 30)
"""
try:
data = await request.json() if request.headers.get("content-type") == "application/json" else {}
data = (
await request.json()
if request.headers.get("content-type") == "application/json"
else {}
)
days_old = data.get("days_old", 30)
if not isinstance(days_old, int) or days_old <= 0:
return JSONResponse(
content={"error": "days_old must be a positive integer"},
status_code=400
status_code=400,
)
deleted_count = history_manager.clear_old_history(days_old)
response_data = {
"message": f"Successfully cleaned up old download history",
"message": "Successfully cleaned up old download history",
"deleted_records": deleted_count,
"days_old": days_old
"days_old": days_old,
}
return JSONResponse(
content=response_data,
status_code=200
)
return JSONResponse(content=response_data, status_code=200)
except Exception as e:
logger.error(f"Error cleaning up old history: {e}", exc_info=True)
return JSONResponse(
content={"error": "Failed to cleanup old history", "details": str(e)},
status_code=500
)
status_code=500,
)