From 13680ddd2623063922d16ccf596d0cf0fc9e99a6 Mon Sep 17 00:00:00 2001 From: Phlogi Date: Sat, 23 Aug 2025 19:37:42 +0200 Subject: [PATCH] fix: global logging level --- app.py | 34 ++++++++++++++++++++++------ routes/__init__.py | 4 ---- routes/system/progress.py | 30 ++++++++++-------------- routes/utils/celery_manager.py | 15 ++++++------ routes/utils/celery_queue_manager.py | 2 +- 5 files changed, 48 insertions(+), 37 deletions(-) diff --git a/app.py b/app.py index c24c822..0036763 100755 --- a/app.py +++ b/app.py @@ -13,6 +13,16 @@ import redis import socket from urllib.parse import urlparse +# Define a mapping from string log levels to logging constants +LOG_LEVELS = { + "CRITICAL": logging.CRITICAL, + "ERROR": logging.ERROR, + "WARNING": logging.WARNING, + "INFO": logging.INFO, + "DEBUG": logging.DEBUG, + "NOTSET": logging.NOTSET, +} + # Run DB migrations as early as possible, before importing any routers that may touch DBs try: from routes.migrations import run_migrations_if_needed @@ -27,13 +37,18 @@ except Exception as e: ) sys.exit(1) +# Get log level from environment variable, default to INFO +log_level_str = os.getenv("LOG_LEVEL", "WARNING").upper() +log_level = LOG_LEVELS.get(log_level_str, logging.INFO) + # Import route routers (to be created) from routes.auth.credentials import router as credentials_router from routes.auth.auth import router as auth_router -from routes.content.artist import router as artist_router from routes.content.album import router as album_router +from routes.content.artist import router as artist_router from routes.content.track import router as track_router from routes.content.playlist import router as playlist_router +from routes.content.bulk_add import router as bulk_add_router from routes.core.search import router as search_router from routes.core.history import router as history_router from routes.system.progress import router as prgs_router @@ -66,7 +81,7 @@ def setup_logging(): # Configure root logger root_logger = logging.getLogger() - root_logger.setLevel(logging.DEBUG) + root_logger.setLevel(log_level) # Clear any existing handlers from the root logger if root_logger.hasHandlers(): @@ -83,12 +98,12 @@ def setup_logging(): main_log, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8" ) file_handler.setFormatter(log_format) - file_handler.setLevel(logging.INFO) + file_handler.setLevel(log_level) # Console handler for stderr console_handler = logging.StreamHandler(sys.stderr) console_handler.setFormatter(log_format) - console_handler.setLevel(logging.INFO) + console_handler.setLevel(log_level) # Add handlers to root logger root_logger.addHandler(file_handler) @@ -101,10 +116,15 @@ def setup_logging(): "routes.utils.celery_manager", "routes.utils.celery_tasks", "routes.utils.watch", + "uvicorn", # General Uvicorn logger + "uvicorn.access", # Uvicorn access logs + "uvicorn.error", # Uvicorn error logs ]: logger = logging.getLogger(logger_name) - logger.setLevel(logging.INFO) - logger.propagate = True # Propagate to root logger + logger.setLevel(log_level) + # For uvicorn.access, we explicitly set propagate to False to prevent duplicate logging + # if access_log=False is used in uvicorn.run, and to ensure our middleware handles it. + logger.propagate = False if logger_name == "uvicorn.access" else True logging.info("Logging system initialized") @@ -363,4 +383,4 @@ if __name__ == "__main__": except ValueError: port = 7171 - uvicorn.run(app, host=host, port=port, log_level="info", access_log=True) + uvicorn.run(app, host=host, port=port, log_level=log_level_str.lower(), access_log=False) diff --git a/routes/__init__.py b/routes/__init__.py index 2fa27c9..eea436a 100755 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -1,7 +1,3 @@ import logging -# Configure basic logging for the application if not already configured -# This remains safe to execute on import -logging.basicConfig(level=logging.INFO, format="%(message)s") - logger = logging.getLogger(__name__) diff --git a/routes/system/progress.py b/routes/system/progress.py index c6cbc58..d8c242c 100755 --- a/routes/system/progress.py +++ b/routes/system/progress.py @@ -31,12 +31,12 @@ class SSEBroadcaster: async def add_client(self, queue: asyncio.Queue): """Add a new SSE client""" self.clients.add(queue) - logger.info(f"SSE: Client connected (total: {len(self.clients)})") + logger.debug(f"SSE: Client connected (total: {len(self.clients)})") async def remove_client(self, queue: asyncio.Queue): """Remove an SSE client""" self.clients.discard(queue) - logger.info(f"SSE: Client disconnected (total: {len(self.clients)})") + logger.debug(f"SSE: Client disconnected (total: {len(self.clients)})") async def broadcast_event(self, event_data: dict): """Broadcast an event to all connected clients""" @@ -69,7 +69,7 @@ class SSEBroadcaster: for client in disconnected: self.clients.discard(client) - logger.info(f"SSE Broadcaster: Successfully sent to {sent_count} clients, removed {len(disconnected)} disconnected clients") + logger.debug(f"SSE Broadcaster: Successfully sent to {sent_count} clients, removed {len(disconnected)} disconnected clients") # Global broadcaster instance sse_broadcaster = SSEBroadcaster() @@ -139,7 +139,7 @@ def start_sse_redis_subscriber(): # Start Redis subscriber in background thread thread = threading.Thread(target=redis_subscriber_thread, daemon=True) thread.start() - logger.info("SSE Redis Subscriber: Background thread started") + logger.debug("SSE Redis Subscriber: Background thread started") async def transform_callback_to_task_format(task_id: str, event_data: dict) -> Optional[dict]: """Transform callback event data into the task format expected by frontend""" @@ -200,13 +200,7 @@ async def trigger_sse_update(task_id: str, reason: str = "task_update"): last_status = get_last_task_status(task_id) # Create a dummy request for the _build_task_response function - from fastapi import Request - class DummyRequest: - def __init__(self): - self.base_url = "http://localhost:7171" - - dummy_request = DummyRequest() - task_response = _build_task_response(task_info, last_status, task_id, current_time, dummy_request) + task_response = _build_task_response(task_info, last_status, task_id, current_time, request=None) # Create minimal event data - global counts will be added at broadcast time event_data = { @@ -431,7 +425,7 @@ def _build_error_callback_object(last_status): return callback_object -def _build_task_response(task_info, last_status, task_id, current_time, request: Request): +def _build_task_response(task_info, last_status, task_id, current_time, request: Optional[Request] = None): """ Helper function to build a standardized task response object. """ @@ -444,7 +438,7 @@ def _build_task_response(task_info, last_status, task_id, current_time, request: try: item_id = item_url.split("/")[-1] if item_id: - base_url = str(request.base_url).rstrip("/") + base_url = str(request.base_url).rstrip("/") if request else "http://localhost:7171" dynamic_original_url = ( f"{base_url}/api/{download_type}/download/{item_id}" ) @@ -496,7 +490,7 @@ def _build_task_response(task_info, last_status, task_id, current_time, request: return task_response -async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Request = None): +async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Optional[Request] = None): """ Get paginated list of tasks. """ @@ -938,9 +932,9 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get try: # Register this client with the broadcaster - logger.info(f"SSE Stream: New client connecting...") + logger.debug(f"SSE Stream: New client connecting...") await sse_broadcaster.add_client(client_queue) - logger.info(f"SSE Stream: Client registered successfully, total clients: {len(sse_broadcaster.clients)}") + logger.debug(f"SSE Stream: Client registered successfully, total clients: {len(sse_broadcaster.clients)}") # Send initial data immediately upon connection initial_data = await generate_task_update_event(time.time(), active_only, request) @@ -973,7 +967,7 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get } event_json = json.dumps(callback_event) yield f"data: {event_json}\n\n" - logger.info(f"SSE Stream: Sent replay callback for task {task_id}") + logger.debug(f"SSE Stream: Sent replay callback for task {task_id}") # Send periodic heartbeats and listen for real-time events last_heartbeat = time.time() @@ -1039,7 +1033,7 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get await asyncio.sleep(1) except asyncio.CancelledError: - logger.info("SSE client disconnected") + logger.debug("SSE client disconnected") return except Exception as e: logger.error(f"SSE connection error: {e}", exc_info=True) diff --git a/routes/utils/celery_manager.py b/routes/utils/celery_manager.py index faebe95..9ce27b4 100644 --- a/routes/utils/celery_manager.py +++ b/routes/utils/celery_manager.py @@ -2,6 +2,7 @@ import subprocess import logging import time import threading +import os # Import Celery task utilities from .celery_config import get_config_params, MAX_CONCURRENT_DL @@ -40,8 +41,10 @@ class CeleryManager: ) def _get_worker_command( - self, queues, concurrency, worker_name_suffix, log_level="INFO" + self, queues, concurrency, worker_name_suffix, log_level_env=None ): + # Use LOG_LEVEL from environment if provided, otherwise default to INFO + log_level = log_level_env if log_level_env else os.getenv("LOG_LEVEL", "WARNING").upper() # Use a unique worker name to avoid conflicts. # %h is replaced by celery with the actual hostname. hostname = f"worker_{worker_name_suffix}@%h" @@ -117,6 +120,7 @@ class CeleryManager: queues="downloads", concurrency=self.concurrency, worker_name_suffix="dlw", # Download Worker + log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper(), ) logger.info( f"Starting Celery Download Worker with command: {' '.join(download_cmd)}" @@ -151,7 +155,7 @@ class CeleryManager: queues="utility_tasks,default", # Listen to utility and default concurrency=5, # Increased concurrency for SSE updates and utility tasks worker_name_suffix="utw", # Utility Worker - log_level="ERROR" # Reduce log verbosity for utility worker (only errors) + log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper(), ) logger.info( f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}" @@ -250,7 +254,7 @@ class CeleryManager: # Restart only the download worker download_cmd = self._get_worker_command( - "downloads", self.concurrency, "dlw" + "downloads", self.concurrency, "dlw", log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper() ) logger.info( f"Restarting Celery Download Worker with command: {' '.join(download_cmd)}" @@ -366,10 +370,7 @@ celery_manager = CeleryManager() # Example of how to use the manager (typically called from your main app script) if __name__ == "__main__": - logging.basicConfig( - level=logging.INFO, - format="%(message)s", - ) + # Removed logging.basicConfig as it's handled by the main app's setup_logging logger.info("Starting Celery Manager example...") celery_manager.start() try: diff --git a/routes/utils/celery_queue_manager.py b/routes/utils/celery_queue_manager.py index 10b47f1..4d0a378 100644 --- a/routes/utils/celery_queue_manager.py +++ b/routes/utils/celery_queue_manager.py @@ -246,7 +246,7 @@ class CeleryDownloadQueueManager: """Initialize the Celery-based download queue manager""" self.max_concurrent = MAX_CONCURRENT_DL self.paused = False - print( + logger.info( f"Celery Download Queue Manager initialized with max_concurrent={self.max_concurrent}" )