From fe5e7964fab1b81a2acd6ecb8539ad270dc5bc4f Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Fri, 29 Aug 2025 08:26:16 -0600 Subject: [PATCH] fix: minor optimizations, trying to fix #333 --- app.py | 90 ++++++++++++++----------- routes/content/playlist.py | 65 +++++++++++++----- routes/system/progress.py | 55 ++++++--------- routes/utils/celery_config.py | 1 + routes/utils/celery_manager.py | 120 ++++++++++++++++++++++++++++++--- routes/utils/watch/manager.py | 74 +++++++++++++++++++- 6 files changed, 304 insertions(+), 101 deletions(-) diff --git a/app.py b/app.py index 74e6f97..6563056 100755 --- a/app.py +++ b/app.py @@ -13,11 +13,12 @@ import redis import socket from urllib.parse import urlparse from dotenv import load_dotenv + load_dotenv() # Parse log level from environment as early as possible, default to INFO for visibility -log_level_str = os.getenv("LOG_LEVEL", "INFO").upper() -log_level = getattr(logging, log_level_str, logging.INFO) +log_level_str = os.getenv("LOG_LEVEL", "WARNING").upper() +log_level = getattr(logging, log_level_str, logging.WARNING) # Set up a very basic logging config immediately, so early logs (including import/migration errors) are visible logging.basicConfig( @@ -52,30 +53,25 @@ if _umask_value: # Import and initialize routes (this will start the watch manager) -from routes.auth.credentials import router as credentials_router -from routes.auth.auth import router as auth_router -from routes.content.album import router as album_router -from routes.content.artist import router as artist_router -from routes.content.track import router as track_router -from routes.content.playlist import router as playlist_router -from routes.content.bulk_add import router as bulk_add_router -from routes.core.search import router as search_router -from routes.core.history import router as history_router -from routes.system.progress import router as prgs_router -from routes.system.config import router as config_router +from routes.auth.credentials import router as credentials_router # noqa: E402 +from routes.auth.auth import router as auth_router # noqa: E402 +from routes.content.album import router as album_router # noqa: E402 +from routes.content.artist import router as artist_router # noqa: E402 +from routes.content.track import router as track_router # noqa: E402 +from routes.content.playlist import router as playlist_router # noqa: E402 +from routes.content.bulk_add import router as bulk_add_router # noqa: E402 +from routes.core.search import router as search_router # noqa: E402 +from routes.core.history import router as history_router # noqa: E402 +from routes.system.progress import router as prgs_router # noqa: E402 +from routes.system.config import router as config_router # noqa: E402 # Import Celery configuration and manager -from routes.utils.celery_manager import celery_manager -from routes.utils.celery_config import REDIS_URL +from routes.utils.celery_config import REDIS_URL # noqa: E402 # Import authentication system -from routes.auth import AUTH_ENABLED -from routes.auth.middleware import AuthMiddleware # Import watch manager controls (start/stop) without triggering side effects -from routes.utils.watch.manager import start_watch_manager, stop_watch_manager - # Configure application-wide logging @@ -136,9 +132,9 @@ def setup_logging(): "routes.utils.celery_manager", "routes.utils.celery_tasks", "routes.utils.watch", - "uvicorn", # General Uvicorn logger - "uvicorn.access", # Uvicorn access logs - "uvicorn.error", # Uvicorn error logs + "uvicorn", # General Uvicorn logger + "uvicorn.access", # Uvicorn access logs + "uvicorn.error", # Uvicorn error logs "spotizerr", ]: logger = logging.getLogger(logger_name) @@ -152,7 +148,6 @@ def setup_logging(): def check_redis_connection(): """Check if Redis is available and accessible""" - from routes.utils.celery_config import REDIS_URL if not REDIS_URL: logging.error("REDIS_URL is not configured. Please check your environment.") @@ -199,7 +194,9 @@ async def lifespan(app: FastAPI): # Startup setup_logging() effective_level = logging.getLevelName(log_level) - logging.getLogger(__name__).info(f"Logging system fully initialized (lifespan startup). Effective log level: {effective_level}") + logging.getLogger(__name__).info( + f"Logging system fully initialized (lifespan startup). Effective log level: {effective_level}" + ) # Run migrations before initializing services try: @@ -226,8 +223,19 @@ async def lifespan(app: FastAPI): try: from routes.utils.celery_manager import celery_manager - celery_manager.start() - logging.info("Celery workers started successfully") + start_workers = os.getenv("START_EMBEDDED_WORKERS", "true").lower() in ( + "1", + "true", + "yes", + "on", + ) + if start_workers: + celery_manager.start() + logging.info("Celery workers started successfully") + else: + logging.info( + "START_EMBEDDED_WORKERS is false; skipping embedded Celery workers startup." + ) except Exception as e: logging.error(f"Failed to start Celery workers: {e}") @@ -257,8 +265,19 @@ async def lifespan(app: FastAPI): try: from routes.utils.celery_manager import celery_manager - celery_manager.stop() - logging.info("Celery workers stopped") + start_workers = os.getenv("START_EMBEDDED_WORKERS", "true").lower() in ( + "1", + "true", + "yes", + "on", + ) + if start_workers: + celery_manager.stop() + logging.info("Celery workers stopped") + else: + logging.info( + "START_EMBEDDED_WORKERS is false; no embedded Celery workers to stop." + ) except Exception as e: logging.error(f"Error stopping Celery workers: {e}") @@ -295,17 +314,6 @@ def create_app(): logging.warning(f"Auth system initialization failed or unavailable: {e}") # Register routers with URL prefixes - from routes.auth.auth import router as auth_router - from routes.system.config import router as config_router - from routes.core.search import router as search_router - from routes.auth.credentials import router as credentials_router - from routes.content.album import router as album_router - from routes.content.track import router as track_router - from routes.content.playlist import router as playlist_router - from routes.content.bulk_add import router as bulk_add_router - from routes.content.artist import router as artist_router - from routes.system.progress import router as prgs_router - from routes.core.history import router as history_router app.include_router(auth_router, prefix="/api/auth", tags=["auth"]) @@ -449,4 +457,6 @@ if __name__ == "__main__": except ValueError: port = 7171 - uvicorn.run(app, host=host, port=port, log_level=log_level_str.lower(), access_log=False) + uvicorn.run( + app, host=host, port=port, log_level=log_level_str.lower(), access_log=False + ) diff --git a/routes/content/playlist.py b/routes/content/playlist.py index 27881eb..6b88f84 100755 --- a/routes/content/playlist.py +++ b/routes/content/playlist.py @@ -233,41 +233,70 @@ async def add_to_watchlist( } # Fetch playlist details from Spotify to populate our DB (metadata only) - cfg = get_config_params() or {} - active_account = cfg.get("spotify") - if not active_account: - raise HTTPException( - status_code=500, - detail={"error": "Active Spotify account not set in configuration."}, + # Use shared helper and add a safe fallback for missing 'id' + try: + from routes.utils.get_info import get_playlist_metadata + + playlist_data = get_playlist_metadata(playlist_spotify_id) or {} + except Exception as e: + logger.error( + f"Failed to fetch playlist metadata for {playlist_spotify_id}: {e}", + exc_info=True, ) - blob_path = get_spotify_blob_path(active_account) - if not blob_path.exists(): raise HTTPException( status_code=500, detail={ - "error": f"Spotify credentials blob not found for account '{active_account}'" + "error": f"Failed to fetch metadata for playlist {playlist_spotify_id}: {str(e)}" }, ) - client = get_client() - try: - playlist_data = get_playlist( - client, playlist_spotify_id, expand_items=False + # Some Librespot responses may omit 'id' even when the payload is valid. + # Fall back to the path parameter to avoid false negatives. + if playlist_data and "id" not in playlist_data: + logger.warning( + f"Playlist metadata for {playlist_spotify_id} missing 'id'. Injecting from path param. Keys: {list(playlist_data.keys())}" ) - finally: - pass + try: + playlist_data["id"] = playlist_spotify_id + except Exception: + pass - if not playlist_data or "id" not in playlist_data: + # Validate minimal fields needed downstream and normalize shape to be resilient to client changes + if not playlist_data or not playlist_data.get("name"): logger.error( - f"Could not fetch details for playlist {playlist_spotify_id} from Spotify." + f"Insufficient playlist metadata for {playlist_spotify_id}. Keys present: {list(playlist_data.keys()) if isinstance(playlist_data, dict) else type(playlist_data)}" ) raise HTTPException( status_code=404, detail={ - "error": f"Could not fetch details for playlist {playlist_spotify_id} from Spotify." + "error": f"Could not fetch sufficient details for playlist {playlist_spotify_id} from Spotify." }, ) + # Ensure 'owner' is a dict with at least id/display_name to satisfy DB layer + owner = playlist_data.get("owner") + if not isinstance(owner, dict): + owner = {} + if "id" not in owner or not owner.get("id"): + owner["id"] = "unknown_owner" + if "display_name" not in owner or not owner.get("display_name"): + owner["display_name"] = owner.get("id", "Unknown Owner") + playlist_data["owner"] = owner + + # Ensure 'tracks' is a dict with a numeric 'total' + tracks = playlist_data.get("tracks") + if not isinstance(tracks, dict): + tracks = {} + total = tracks.get("total") + if not isinstance(total, int): + items = tracks.get("items") + if isinstance(items, list): + total = len(items) + else: + total = 0 + tracks["total"] = total + playlist_data["tracks"] = tracks + add_playlist_db(playlist_data) # This also creates the tracks table logger.info( diff --git a/routes/system/progress.py b/routes/system/progress.py index f5d5d78..113dcc6 100755 --- a/routes/system/progress.py +++ b/routes/system/progress.py @@ -105,6 +105,10 @@ def start_sse_redis_subscriber(): pubsub.subscribe("sse_events") logger.info("SSE Redis Subscriber: Started listening for events") + # Create a single event loop for this thread and reuse it + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + for message in pubsub.listen(): if message["type"] == "message": try: @@ -121,47 +125,32 @@ def start_sse_redis_subscriber(): # Transform callback data into standardized update format expected by frontend standardized = standardize_incoming_event(event_data) if standardized: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete( - sse_broadcaster.broadcast_event(standardized) - ) - logger.debug( - f"SSE Redis Subscriber: Broadcasted standardized progress update to {len(sse_broadcaster.clients)} clients" - ) - finally: - loop.close() - elif event_type == "summary_update": - # Task summary update - use standardized trigger - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: loop.run_until_complete( - trigger_sse_update( - task_id, event_data.get("reason", "update") - ) + sse_broadcaster.broadcast_event(standardized) ) logger.debug( - f"SSE Redis Subscriber: Processed summary update for {task_id}" + f"SSE Redis Subscriber: Broadcasted standardized progress update to {len(sse_broadcaster.clients)} clients" ) - finally: - loop.close() + elif event_type == "summary_update": + # Task summary update - use standardized trigger + loop.run_until_complete( + trigger_sse_update( + task_id, event_data.get("reason", "update") + ) + ) + logger.debug( + f"SSE Redis Subscriber: Processed summary update for {task_id}" + ) else: # Unknown event type - attempt to standardize and broadcast standardized = standardize_incoming_event(event_data) if standardized: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete( - sse_broadcaster.broadcast_event(standardized) - ) - logger.debug( - f"SSE Redis Subscriber: Broadcasted standardized {event_type} to {len(sse_broadcaster.clients)} clients" - ) - finally: - loop.close() + loop.run_until_complete( + sse_broadcaster.broadcast_event(standardized) + ) + logger.debug( + f"SSE Redis Subscriber: Broadcasted standardized {event_type} to {len(sse_broadcaster.clients)} clients" + ) except Exception as e: logger.error( diff --git a/routes/utils/celery_config.py b/routes/utils/celery_config.py index 83814fd..b93dce3 100644 --- a/routes/utils/celery_config.py +++ b/routes/utils/celery_config.py @@ -40,6 +40,7 @@ DEFAULT_MAIN_CONFIG = { "tracknumPadding": True, "saveCover": True, "maxConcurrentDownloads": 3, + "utilityConcurrency": 1, "maxRetries": 3, "retryDelaySeconds": 5, "retryDelayIncrease": 5, diff --git a/routes/utils/celery_manager.py b/routes/utils/celery_manager.py index e75519b..d4489d1 100644 --- a/routes/utils/celery_manager.py +++ b/routes/utils/celery_manager.py @@ -6,10 +6,11 @@ import os import sys from dotenv import load_dotenv + load_dotenv() # Import Celery task utilities -from .celery_config import get_config_params, MAX_CONCURRENT_DL +from .celery_config import get_config_params, MAX_CONCURRENT_DL # noqa: E402 # Configure logging logger = logging.getLogger(__name__) @@ -40,15 +41,22 @@ class CeleryManager: self.concurrency = get_config_params().get( "maxConcurrentDownloads", MAX_CONCURRENT_DL ) + self.utility_concurrency = max( + 1, int(get_config_params().get("utilityConcurrency", 1)) + ) logger.info( - f"CeleryManager initialized. Download concurrency set to: {self.concurrency}" + f"CeleryManager initialized. Download concurrency set to: {self.concurrency} | Utility concurrency: {self.utility_concurrency}" ) def _get_worker_command( self, queues, concurrency, worker_name_suffix, log_level_env=None ): # Use LOG_LEVEL from environment if provided, otherwise default to INFO - log_level = log_level_env if log_level_env else os.getenv("LOG_LEVEL", "WARNING").upper() + log_level = ( + log_level_env + if log_level_env + else os.getenv("LOG_LEVEL", "WARNING").upper() + ) # Use a unique worker name to avoid conflicts. # %h is replaced by celery with the actual hostname. hostname = f"worker_{worker_name_suffix}@%h" @@ -167,12 +175,19 @@ class CeleryManager: if self.utility_worker_process and self.utility_worker_process.poll() is None: logger.info("Celery Utility Worker is already running.") else: + self.utility_concurrency = max( + 1, + int( + get_config_params().get( + "utilityConcurrency", self.utility_concurrency + ) + ), + ) utility_cmd = self._get_worker_command( queues="utility_tasks,default", # Listen to utility and default - concurrency=5, # Increased concurrency for SSE updates and utility tasks + concurrency=self.utility_concurrency, worker_name_suffix="utw", # Utility Worker log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper(), - ) logger.info( f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}" @@ -197,7 +212,7 @@ class CeleryManager: self.utility_log_thread_stdout.start() self.utility_log_thread_stderr.start() logger.info( - f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency 5." + f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency {self.utility_concurrency}." ) if ( @@ -221,7 +236,9 @@ class CeleryManager: ) while not self.stop_event.is_set(): try: - time.sleep(10) # Check every 10 seconds + # Wait using stop_event to be responsive to shutdown and respect interval + if self.stop_event.wait(CONFIG_CHECK_INTERVAL): + break if self.stop_event.is_set(): break @@ -229,6 +246,14 @@ class CeleryManager: new_max_concurrent_downloads = current_config.get( "maxConcurrentDownloads", self.concurrency ) + new_utility_concurrency = max( + 1, + int( + current_config.get( + "utilityConcurrency", self.utility_concurrency + ) + ), + ) if new_max_concurrent_downloads != self.concurrency: logger.info( @@ -272,7 +297,10 @@ class CeleryManager: # Restart only the download worker download_cmd = self._get_worker_command( - "downloads", self.concurrency, "dlw", log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper() + "downloads", + self.concurrency, + "dlw", + log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper(), ) logger.info( f"Restarting Celery Download Worker with command: {' '.join(download_cmd)}" @@ -303,6 +331,82 @@ class CeleryManager: f"Celery Download Worker (PID: {self.download_worker_process.pid}) restarted with new concurrency {self.concurrency}." ) + # Handle utility worker concurrency changes + if new_utility_concurrency != self.utility_concurrency: + logger.info( + f"CeleryManager: Detected change in utilityConcurrency from {self.utility_concurrency} to {new_utility_concurrency}. Restarting utility worker only." + ) + + if ( + self.utility_worker_process + and self.utility_worker_process.poll() is None + ): + logger.info( + f"Stopping Celery Utility Worker (PID: {self.utility_worker_process.pid}) for config update..." + ) + self.utility_worker_process.terminate() + try: + self.utility_worker_process.wait(timeout=10) + logger.info( + f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) terminated." + ) + except subprocess.TimeoutExpired: + logger.warning( + f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) did not terminate gracefully, killing." + ) + self.utility_worker_process.kill() + self.utility_worker_process = None + + # Wait for log threads of utility worker to finish + if ( + self.utility_log_thread_stdout + and self.utility_log_thread_stdout.is_alive() + ): + self.utility_log_thread_stdout.join(timeout=5) + if ( + self.utility_log_thread_stderr + and self.utility_log_thread_stderr.is_alive() + ): + self.utility_log_thread_stderr.join(timeout=5) + + self.utility_concurrency = new_utility_concurrency + + # Restart only the utility worker + utility_cmd = self._get_worker_command( + "utility_tasks,default", + self.utility_concurrency, + "utw", + log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper(), + ) + logger.info( + f"Restarting Celery Utility Worker with command: {' '.join(utility_cmd)}" + ) + self.utility_worker_process = subprocess.Popen( + utility_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + universal_newlines=True, + ) + self.utility_log_thread_stdout = threading.Thread( + target=self._process_output_reader, + args=(self.utility_worker_process.stdout, "Celery[UW-STDOUT]"), + ) + self.utility_log_thread_stderr = threading.Thread( + target=self._process_output_reader, + args=( + self.utility_worker_process.stderr, + "Celery[UW-STDERR]", + True, + ), + ) + self.utility_log_thread_stdout.start() + self.utility_log_thread_stderr.start() + logger.info( + f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) restarted with new concurrency {self.utility_concurrency}." + ) + except Exception as e: logger.error( f"CeleryManager: Error in config monitor thread: {e}", exc_info=True diff --git a/routes/utils/watch/manager.py b/routes/utils/watch/manager.py index 32014c5..a7c56f4 100644 --- a/routes/utils/watch/manager.py +++ b/routes/utils/watch/manager.py @@ -167,6 +167,46 @@ def get_watch_config(): watch_cfg["maxItemsPerRun"] = clamped_value migrated = True + # Enforce sane ranges and types for poll/delay intervals to prevent tight loops + def _safe_int(value, default): + try: + return int(value) + except Exception: + return default + + # Clamp poll interval to at least 1 second + poll_val = _safe_int( + watch_cfg.get( + "watchPollIntervalSeconds", + DEFAULT_WATCH_CONFIG["watchPollIntervalSeconds"], + ), + DEFAULT_WATCH_CONFIG["watchPollIntervalSeconds"], + ) + if poll_val < 1: + watch_cfg["watchPollIntervalSeconds"] = 1 + migrated = True + # Clamp per-item delays to at least 1 second + delay_pl = _safe_int( + watch_cfg.get( + "delayBetweenPlaylistsSeconds", + DEFAULT_WATCH_CONFIG["delayBetweenPlaylistsSeconds"], + ), + DEFAULT_WATCH_CONFIG["delayBetweenPlaylistsSeconds"], + ) + if delay_pl < 1: + watch_cfg["delayBetweenPlaylistsSeconds"] = 1 + migrated = True + delay_ar = _safe_int( + watch_cfg.get( + "delayBetweenArtistsSeconds", + DEFAULT_WATCH_CONFIG["delayBetweenArtistsSeconds"], + ), + DEFAULT_WATCH_CONFIG["delayBetweenArtistsSeconds"], + ) + if delay_ar < 1: + watch_cfg["delayBetweenArtistsSeconds"] = 1 + migrated = True + if migrated or legacy_file_found: # Persist migration back to main.json main_cfg["watch"] = watch_cfg @@ -670,7 +710,9 @@ def check_watched_playlists(specific_playlist_id: str = None): # Only sleep between items when running a batch (no specific ID) if not specific_playlist_id: - time.sleep(max(1, config.get("delayBetweenPlaylistsSeconds", 2))) + time.sleep( + max(1, _safe_to_int(config.get("delayBetweenPlaylistsSeconds"), 2)) + ) logger.info("Playlist Watch Manager: Finished checking all watched playlists.") @@ -817,7 +859,9 @@ def check_watched_artists(specific_artist_id: str = None): # Only sleep between items when running a batch (no specific ID) if not specific_artist_id: - time.sleep(max(1, config.get("delayBetweenArtistsSeconds", 5))) + time.sleep( + max(1, _safe_to_int(config.get("delayBetweenArtistsSeconds"), 5)) + ) logger.info("Artist Watch Manager: Finished checking all watched artists.") @@ -832,6 +876,14 @@ def playlist_watch_scheduler(): interval = current_config.get("watchPollIntervalSeconds", 3600) watch_enabled = current_config.get("enabled", False) # Get enabled status + # Ensure interval is a positive integer to avoid tight loops + try: + interval = int(interval) + except Exception: + interval = 3600 + if interval < 1: + interval = 1 + if not watch_enabled: logger.info( "Watch Scheduler: Watch feature is disabled in config. Skipping checks." @@ -907,6 +959,13 @@ def run_playlist_check_over_intervals(playlist_spotify_id: str) -> None: # Determine if we are done: no active processing snapshot and no pending sync cfg = get_watch_config() interval = cfg.get("watchPollIntervalSeconds", 3600) + # Ensure interval is a positive integer + try: + interval = int(interval) + except Exception: + interval = 3600 + if interval < 1: + interval = 1 # Use local helper that leverages Librespot client metadata = _fetch_playlist_metadata(playlist_spotify_id) if not metadata: @@ -1169,6 +1228,17 @@ def update_playlist_m3u_file(playlist_spotify_id: str): # Helper to build a Librespot client from active account +# Add a small internal helper for safe int conversion +_def_safe_int_added = True + + +def _safe_to_int(value, default): + try: + return int(value) + except Exception: + return default + + def _build_librespot_client(): try: # Reuse shared client managed in routes.utils.get_info