fix: minor optimizations, trying to fix #333

This commit is contained in:
Xoconoch
2025-08-29 08:26:16 -06:00
parent f800251de1
commit fe5e7964fa
6 changed files with 304 additions and 101 deletions

90
app.py
View File

@@ -13,11 +13,12 @@ import redis
import socket
from urllib.parse import urlparse
from dotenv import load_dotenv
load_dotenv()
# Parse log level from environment as early as possible, default to INFO for visibility
log_level_str = os.getenv("LOG_LEVEL", "INFO").upper()
log_level = getattr(logging, log_level_str, logging.INFO)
log_level_str = os.getenv("LOG_LEVEL", "WARNING").upper()
log_level = getattr(logging, log_level_str, logging.WARNING)
# Set up a very basic logging config immediately, so early logs (including import/migration errors) are visible
logging.basicConfig(
@@ -52,30 +53,25 @@ if _umask_value:
# Import and initialize routes (this will start the watch manager)
from routes.auth.credentials import router as credentials_router
from routes.auth.auth import router as auth_router
from routes.content.album import router as album_router
from routes.content.artist import router as artist_router
from routes.content.track import router as track_router
from routes.content.playlist import router as playlist_router
from routes.content.bulk_add import router as bulk_add_router
from routes.core.search import router as search_router
from routes.core.history import router as history_router
from routes.system.progress import router as prgs_router
from routes.system.config import router as config_router
from routes.auth.credentials import router as credentials_router # noqa: E402
from routes.auth.auth import router as auth_router # noqa: E402
from routes.content.album import router as album_router # noqa: E402
from routes.content.artist import router as artist_router # noqa: E402
from routes.content.track import router as track_router # noqa: E402
from routes.content.playlist import router as playlist_router # noqa: E402
from routes.content.bulk_add import router as bulk_add_router # noqa: E402
from routes.core.search import router as search_router # noqa: E402
from routes.core.history import router as history_router # noqa: E402
from routes.system.progress import router as prgs_router # noqa: E402
from routes.system.config import router as config_router # noqa: E402
# Import Celery configuration and manager
from routes.utils.celery_manager import celery_manager
from routes.utils.celery_config import REDIS_URL
from routes.utils.celery_config import REDIS_URL # noqa: E402
# Import authentication system
from routes.auth import AUTH_ENABLED
from routes.auth.middleware import AuthMiddleware
# Import watch manager controls (start/stop) without triggering side effects
from routes.utils.watch.manager import start_watch_manager, stop_watch_manager
# Configure application-wide logging
@@ -136,9 +132,9 @@ def setup_logging():
"routes.utils.celery_manager",
"routes.utils.celery_tasks",
"routes.utils.watch",
"uvicorn", # General Uvicorn logger
"uvicorn.access", # Uvicorn access logs
"uvicorn.error", # Uvicorn error logs
"uvicorn", # General Uvicorn logger
"uvicorn.access", # Uvicorn access logs
"uvicorn.error", # Uvicorn error logs
"spotizerr",
]:
logger = logging.getLogger(logger_name)
@@ -152,7 +148,6 @@ def setup_logging():
def check_redis_connection():
"""Check if Redis is available and accessible"""
from routes.utils.celery_config import REDIS_URL
if not REDIS_URL:
logging.error("REDIS_URL is not configured. Please check your environment.")
@@ -199,7 +194,9 @@ async def lifespan(app: FastAPI):
# Startup
setup_logging()
effective_level = logging.getLevelName(log_level)
logging.getLogger(__name__).info(f"Logging system fully initialized (lifespan startup). Effective log level: {effective_level}")
logging.getLogger(__name__).info(
f"Logging system fully initialized (lifespan startup). Effective log level: {effective_level}"
)
# Run migrations before initializing services
try:
@@ -226,8 +223,19 @@ async def lifespan(app: FastAPI):
try:
from routes.utils.celery_manager import celery_manager
celery_manager.start()
logging.info("Celery workers started successfully")
start_workers = os.getenv("START_EMBEDDED_WORKERS", "true").lower() in (
"1",
"true",
"yes",
"on",
)
if start_workers:
celery_manager.start()
logging.info("Celery workers started successfully")
else:
logging.info(
"START_EMBEDDED_WORKERS is false; skipping embedded Celery workers startup."
)
except Exception as e:
logging.error(f"Failed to start Celery workers: {e}")
@@ -257,8 +265,19 @@ async def lifespan(app: FastAPI):
try:
from routes.utils.celery_manager import celery_manager
celery_manager.stop()
logging.info("Celery workers stopped")
start_workers = os.getenv("START_EMBEDDED_WORKERS", "true").lower() in (
"1",
"true",
"yes",
"on",
)
if start_workers:
celery_manager.stop()
logging.info("Celery workers stopped")
else:
logging.info(
"START_EMBEDDED_WORKERS is false; no embedded Celery workers to stop."
)
except Exception as e:
logging.error(f"Error stopping Celery workers: {e}")
@@ -295,17 +314,6 @@ def create_app():
logging.warning(f"Auth system initialization failed or unavailable: {e}")
# Register routers with URL prefixes
from routes.auth.auth import router as auth_router
from routes.system.config import router as config_router
from routes.core.search import router as search_router
from routes.auth.credentials import router as credentials_router
from routes.content.album import router as album_router
from routes.content.track import router as track_router
from routes.content.playlist import router as playlist_router
from routes.content.bulk_add import router as bulk_add_router
from routes.content.artist import router as artist_router
from routes.system.progress import router as prgs_router
from routes.core.history import router as history_router
app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
@@ -449,4 +457,6 @@ if __name__ == "__main__":
except ValueError:
port = 7171
uvicorn.run(app, host=host, port=port, log_level=log_level_str.lower(), access_log=False)
uvicorn.run(
app, host=host, port=port, log_level=log_level_str.lower(), access_log=False
)

View File

@@ -233,41 +233,70 @@ async def add_to_watchlist(
}
# Fetch playlist details from Spotify to populate our DB (metadata only)
cfg = get_config_params() or {}
active_account = cfg.get("spotify")
if not active_account:
raise HTTPException(
status_code=500,
detail={"error": "Active Spotify account not set in configuration."},
# Use shared helper and add a safe fallback for missing 'id'
try:
from routes.utils.get_info import get_playlist_metadata
playlist_data = get_playlist_metadata(playlist_spotify_id) or {}
except Exception as e:
logger.error(
f"Failed to fetch playlist metadata for {playlist_spotify_id}: {e}",
exc_info=True,
)
blob_path = get_spotify_blob_path(active_account)
if not blob_path.exists():
raise HTTPException(
status_code=500,
detail={
"error": f"Spotify credentials blob not found for account '{active_account}'"
"error": f"Failed to fetch metadata for playlist {playlist_spotify_id}: {str(e)}"
},
)
client = get_client()
try:
playlist_data = get_playlist(
client, playlist_spotify_id, expand_items=False
# Some Librespot responses may omit 'id' even when the payload is valid.
# Fall back to the path parameter to avoid false negatives.
if playlist_data and "id" not in playlist_data:
logger.warning(
f"Playlist metadata for {playlist_spotify_id} missing 'id'. Injecting from path param. Keys: {list(playlist_data.keys())}"
)
finally:
pass
try:
playlist_data["id"] = playlist_spotify_id
except Exception:
pass
if not playlist_data or "id" not in playlist_data:
# Validate minimal fields needed downstream and normalize shape to be resilient to client changes
if not playlist_data or not playlist_data.get("name"):
logger.error(
f"Could not fetch details for playlist {playlist_spotify_id} from Spotify."
f"Insufficient playlist metadata for {playlist_spotify_id}. Keys present: {list(playlist_data.keys()) if isinstance(playlist_data, dict) else type(playlist_data)}"
)
raise HTTPException(
status_code=404,
detail={
"error": f"Could not fetch details for playlist {playlist_spotify_id} from Spotify."
"error": f"Could not fetch sufficient details for playlist {playlist_spotify_id} from Spotify."
},
)
# Ensure 'owner' is a dict with at least id/display_name to satisfy DB layer
owner = playlist_data.get("owner")
if not isinstance(owner, dict):
owner = {}
if "id" not in owner or not owner.get("id"):
owner["id"] = "unknown_owner"
if "display_name" not in owner or not owner.get("display_name"):
owner["display_name"] = owner.get("id", "Unknown Owner")
playlist_data["owner"] = owner
# Ensure 'tracks' is a dict with a numeric 'total'
tracks = playlist_data.get("tracks")
if not isinstance(tracks, dict):
tracks = {}
total = tracks.get("total")
if not isinstance(total, int):
items = tracks.get("items")
if isinstance(items, list):
total = len(items)
else:
total = 0
tracks["total"] = total
playlist_data["tracks"] = tracks
add_playlist_db(playlist_data) # This also creates the tracks table
logger.info(

View File

@@ -105,6 +105,10 @@ def start_sse_redis_subscriber():
pubsub.subscribe("sse_events")
logger.info("SSE Redis Subscriber: Started listening for events")
# Create a single event loop for this thread and reuse it
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for message in pubsub.listen():
if message["type"] == "message":
try:
@@ -121,47 +125,32 @@ def start_sse_redis_subscriber():
# Transform callback data into standardized update format expected by frontend
standardized = standardize_incoming_event(event_data)
if standardized:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
sse_broadcaster.broadcast_event(standardized)
)
logger.debug(
f"SSE Redis Subscriber: Broadcasted standardized progress update to {len(sse_broadcaster.clients)} clients"
)
finally:
loop.close()
elif event_type == "summary_update":
# Task summary update - use standardized trigger
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
trigger_sse_update(
task_id, event_data.get("reason", "update")
)
sse_broadcaster.broadcast_event(standardized)
)
logger.debug(
f"SSE Redis Subscriber: Processed summary update for {task_id}"
f"SSE Redis Subscriber: Broadcasted standardized progress update to {len(sse_broadcaster.clients)} clients"
)
finally:
loop.close()
elif event_type == "summary_update":
# Task summary update - use standardized trigger
loop.run_until_complete(
trigger_sse_update(
task_id, event_data.get("reason", "update")
)
)
logger.debug(
f"SSE Redis Subscriber: Processed summary update for {task_id}"
)
else:
# Unknown event type - attempt to standardize and broadcast
standardized = standardize_incoming_event(event_data)
if standardized:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
sse_broadcaster.broadcast_event(standardized)
)
logger.debug(
f"SSE Redis Subscriber: Broadcasted standardized {event_type} to {len(sse_broadcaster.clients)} clients"
)
finally:
loop.close()
loop.run_until_complete(
sse_broadcaster.broadcast_event(standardized)
)
logger.debug(
f"SSE Redis Subscriber: Broadcasted standardized {event_type} to {len(sse_broadcaster.clients)} clients"
)
except Exception as e:
logger.error(

View File

@@ -40,6 +40,7 @@ DEFAULT_MAIN_CONFIG = {
"tracknumPadding": True,
"saveCover": True,
"maxConcurrentDownloads": 3,
"utilityConcurrency": 1,
"maxRetries": 3,
"retryDelaySeconds": 5,
"retryDelayIncrease": 5,

View File

@@ -6,10 +6,11 @@ import os
import sys
from dotenv import load_dotenv
load_dotenv()
# Import Celery task utilities
from .celery_config import get_config_params, MAX_CONCURRENT_DL
from .celery_config import get_config_params, MAX_CONCURRENT_DL # noqa: E402
# Configure logging
logger = logging.getLogger(__name__)
@@ -40,15 +41,22 @@ class CeleryManager:
self.concurrency = get_config_params().get(
"maxConcurrentDownloads", MAX_CONCURRENT_DL
)
self.utility_concurrency = max(
1, int(get_config_params().get("utilityConcurrency", 1))
)
logger.info(
f"CeleryManager initialized. Download concurrency set to: {self.concurrency}"
f"CeleryManager initialized. Download concurrency set to: {self.concurrency} | Utility concurrency: {self.utility_concurrency}"
)
def _get_worker_command(
self, queues, concurrency, worker_name_suffix, log_level_env=None
):
# Use LOG_LEVEL from environment if provided, otherwise default to INFO
log_level = log_level_env if log_level_env else os.getenv("LOG_LEVEL", "WARNING").upper()
log_level = (
log_level_env
if log_level_env
else os.getenv("LOG_LEVEL", "WARNING").upper()
)
# Use a unique worker name to avoid conflicts.
# %h is replaced by celery with the actual hostname.
hostname = f"worker_{worker_name_suffix}@%h"
@@ -167,12 +175,19 @@ class CeleryManager:
if self.utility_worker_process and self.utility_worker_process.poll() is None:
logger.info("Celery Utility Worker is already running.")
else:
self.utility_concurrency = max(
1,
int(
get_config_params().get(
"utilityConcurrency", self.utility_concurrency
)
),
)
utility_cmd = self._get_worker_command(
queues="utility_tasks,default", # Listen to utility and default
concurrency=5, # Increased concurrency for SSE updates and utility tasks
concurrency=self.utility_concurrency,
worker_name_suffix="utw", # Utility Worker
log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper(),
)
logger.info(
f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}"
@@ -197,7 +212,7 @@ class CeleryManager:
self.utility_log_thread_stdout.start()
self.utility_log_thread_stderr.start()
logger.info(
f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency 5."
f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency {self.utility_concurrency}."
)
if (
@@ -221,7 +236,9 @@ class CeleryManager:
)
while not self.stop_event.is_set():
try:
time.sleep(10) # Check every 10 seconds
# Wait using stop_event to be responsive to shutdown and respect interval
if self.stop_event.wait(CONFIG_CHECK_INTERVAL):
break
if self.stop_event.is_set():
break
@@ -229,6 +246,14 @@ class CeleryManager:
new_max_concurrent_downloads = current_config.get(
"maxConcurrentDownloads", self.concurrency
)
new_utility_concurrency = max(
1,
int(
current_config.get(
"utilityConcurrency", self.utility_concurrency
)
),
)
if new_max_concurrent_downloads != self.concurrency:
logger.info(
@@ -272,7 +297,10 @@ class CeleryManager:
# Restart only the download worker
download_cmd = self._get_worker_command(
"downloads", self.concurrency, "dlw", log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper()
"downloads",
self.concurrency,
"dlw",
log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper(),
)
logger.info(
f"Restarting Celery Download Worker with command: {' '.join(download_cmd)}"
@@ -303,6 +331,82 @@ class CeleryManager:
f"Celery Download Worker (PID: {self.download_worker_process.pid}) restarted with new concurrency {self.concurrency}."
)
# Handle utility worker concurrency changes
if new_utility_concurrency != self.utility_concurrency:
logger.info(
f"CeleryManager: Detected change in utilityConcurrency from {self.utility_concurrency} to {new_utility_concurrency}. Restarting utility worker only."
)
if (
self.utility_worker_process
and self.utility_worker_process.poll() is None
):
logger.info(
f"Stopping Celery Utility Worker (PID: {self.utility_worker_process.pid}) for config update..."
)
self.utility_worker_process.terminate()
try:
self.utility_worker_process.wait(timeout=10)
logger.info(
f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) terminated."
)
except subprocess.TimeoutExpired:
logger.warning(
f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) did not terminate gracefully, killing."
)
self.utility_worker_process.kill()
self.utility_worker_process = None
# Wait for log threads of utility worker to finish
if (
self.utility_log_thread_stdout
and self.utility_log_thread_stdout.is_alive()
):
self.utility_log_thread_stdout.join(timeout=5)
if (
self.utility_log_thread_stderr
and self.utility_log_thread_stderr.is_alive()
):
self.utility_log_thread_stderr.join(timeout=5)
self.utility_concurrency = new_utility_concurrency
# Restart only the utility worker
utility_cmd = self._get_worker_command(
"utility_tasks,default",
self.utility_concurrency,
"utw",
log_level_env=os.getenv("LOG_LEVEL", "WARNING").upper(),
)
logger.info(
f"Restarting Celery Utility Worker with command: {' '.join(utility_cmd)}"
)
self.utility_worker_process = subprocess.Popen(
utility_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True,
)
self.utility_log_thread_stdout = threading.Thread(
target=self._process_output_reader,
args=(self.utility_worker_process.stdout, "Celery[UW-STDOUT]"),
)
self.utility_log_thread_stderr = threading.Thread(
target=self._process_output_reader,
args=(
self.utility_worker_process.stderr,
"Celery[UW-STDERR]",
True,
),
)
self.utility_log_thread_stdout.start()
self.utility_log_thread_stderr.start()
logger.info(
f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) restarted with new concurrency {self.utility_concurrency}."
)
except Exception as e:
logger.error(
f"CeleryManager: Error in config monitor thread: {e}", exc_info=True

View File

@@ -167,6 +167,46 @@ def get_watch_config():
watch_cfg["maxItemsPerRun"] = clamped_value
migrated = True
# Enforce sane ranges and types for poll/delay intervals to prevent tight loops
def _safe_int(value, default):
try:
return int(value)
except Exception:
return default
# Clamp poll interval to at least 1 second
poll_val = _safe_int(
watch_cfg.get(
"watchPollIntervalSeconds",
DEFAULT_WATCH_CONFIG["watchPollIntervalSeconds"],
),
DEFAULT_WATCH_CONFIG["watchPollIntervalSeconds"],
)
if poll_val < 1:
watch_cfg["watchPollIntervalSeconds"] = 1
migrated = True
# Clamp per-item delays to at least 1 second
delay_pl = _safe_int(
watch_cfg.get(
"delayBetweenPlaylistsSeconds",
DEFAULT_WATCH_CONFIG["delayBetweenPlaylistsSeconds"],
),
DEFAULT_WATCH_CONFIG["delayBetweenPlaylistsSeconds"],
)
if delay_pl < 1:
watch_cfg["delayBetweenPlaylistsSeconds"] = 1
migrated = True
delay_ar = _safe_int(
watch_cfg.get(
"delayBetweenArtistsSeconds",
DEFAULT_WATCH_CONFIG["delayBetweenArtistsSeconds"],
),
DEFAULT_WATCH_CONFIG["delayBetweenArtistsSeconds"],
)
if delay_ar < 1:
watch_cfg["delayBetweenArtistsSeconds"] = 1
migrated = True
if migrated or legacy_file_found:
# Persist migration back to main.json
main_cfg["watch"] = watch_cfg
@@ -670,7 +710,9 @@ def check_watched_playlists(specific_playlist_id: str = None):
# Only sleep between items when running a batch (no specific ID)
if not specific_playlist_id:
time.sleep(max(1, config.get("delayBetweenPlaylistsSeconds", 2)))
time.sleep(
max(1, _safe_to_int(config.get("delayBetweenPlaylistsSeconds"), 2))
)
logger.info("Playlist Watch Manager: Finished checking all watched playlists.")
@@ -817,7 +859,9 @@ def check_watched_artists(specific_artist_id: str = None):
# Only sleep between items when running a batch (no specific ID)
if not specific_artist_id:
time.sleep(max(1, config.get("delayBetweenArtistsSeconds", 5)))
time.sleep(
max(1, _safe_to_int(config.get("delayBetweenArtistsSeconds"), 5))
)
logger.info("Artist Watch Manager: Finished checking all watched artists.")
@@ -832,6 +876,14 @@ def playlist_watch_scheduler():
interval = current_config.get("watchPollIntervalSeconds", 3600)
watch_enabled = current_config.get("enabled", False) # Get enabled status
# Ensure interval is a positive integer to avoid tight loops
try:
interval = int(interval)
except Exception:
interval = 3600
if interval < 1:
interval = 1
if not watch_enabled:
logger.info(
"Watch Scheduler: Watch feature is disabled in config. Skipping checks."
@@ -907,6 +959,13 @@ def run_playlist_check_over_intervals(playlist_spotify_id: str) -> None:
# Determine if we are done: no active processing snapshot and no pending sync
cfg = get_watch_config()
interval = cfg.get("watchPollIntervalSeconds", 3600)
# Ensure interval is a positive integer
try:
interval = int(interval)
except Exception:
interval = 3600
if interval < 1:
interval = 1
# Use local helper that leverages Librespot client
metadata = _fetch_playlist_metadata(playlist_spotify_id)
if not metadata:
@@ -1169,6 +1228,17 @@ def update_playlist_m3u_file(playlist_spotify_id: str):
# Helper to build a Librespot client from active account
# Add a small internal helper for safe int conversion
_def_safe_int_added = True
def _safe_to_int(value, default):
try:
return int(value)
except Exception:
return default
def _build_librespot_client():
try:
# Reuse shared client managed in routes.utils.get_info