This may be the cleanest this shit of a project has ever been

This commit is contained in:
Xoconoch
2025-08-02 18:59:00 -06:00
parent 5abc62d8be
commit 999da3048a
7 changed files with 621 additions and 400 deletions

View File

@@ -121,6 +121,16 @@ task_default_queue = "downloads"
task_default_exchange = "downloads"
task_default_routing_key = "downloads"
# Task routing - ensure SSE and utility tasks go to utility_tasks queue
task_routes = {
'routes.utils.celery_tasks.trigger_sse_update_task': {'queue': 'utility_tasks'},
'routes.utils.celery_tasks.cleanup_stale_errors': {'queue': 'utility_tasks'},
'routes.utils.celery_tasks.delayed_delete_task_data': {'queue': 'utility_tasks'},
'routes.utils.celery_tasks.download_track': {'queue': 'downloads'},
'routes.utils.celery_tasks.download_album': {'queue': 'downloads'},
'routes.utils.celery_tasks.download_playlist': {'queue': 'downloads'},
}
# Celery task settings
task_serializer = "json"
accept_content = ["json"]
@@ -141,6 +151,19 @@ task_annotations = {
"routes.utils.celery_tasks.download_playlist": {
"rate_limit": f"{MAX_CONCURRENT_DL}/m",
},
"routes.utils.celery_tasks.trigger_sse_update_task": {
"rate_limit": "500/m", # Allow high rate for real-time SSE updates
"default_retry_delay": 1, # Quick retry for SSE updates
"max_retries": 1, # Limited retries for best-effort delivery
"ignore_result": True, # Don't store results for SSE tasks
"track_started": False, # Don't track when SSE tasks start
},
"routes.utils.celery_tasks.cleanup_stale_errors": {
"rate_limit": "10/m", # Moderate rate for cleanup tasks
},
"routes.utils.celery_tasks.delayed_delete_task_data": {
"rate_limit": "100/m", # Moderate rate for cleanup
},
}
# Configure retry settings

View File

@@ -149,8 +149,9 @@ class CeleryManager:
else:
utility_cmd = self._get_worker_command(
queues="utility_tasks,default", # Listen to utility and default
concurrency=3,
concurrency=5, # Increased concurrency for SSE updates and utility tasks
worker_name_suffix="utw", # Utility Worker
log_level="ERROR" # Reduce log verbosity for utility worker (only errors)
)
logger.info(
f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}"
@@ -174,7 +175,7 @@ class CeleryManager:
self.utility_log_thread_stdout.start()
self.utility_log_thread_stderr.start()
logger.info(
f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency 3."
f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency 5."
)
if (

View File

@@ -2,6 +2,7 @@ import time
import json
import logging
import traceback
import asyncio
from celery import Celery, Task, states
from celery.signals import (
task_prerun,
@@ -49,6 +50,26 @@ celery_app.config_from_object("routes.utils.celery_config")
redis_client = redis.Redis.from_url(REDIS_URL)
def trigger_sse_event(task_id: str, reason: str = "status_change"):
"""Trigger an SSE event using a dedicated Celery worker task"""
try:
# Submit SSE update task to utility worker queue
# This is non-blocking and more reliable than threads
trigger_sse_update_task.apply_async(
args=[task_id, reason],
queue="utility_tasks",
priority=9 # High priority for real-time updates
)
# Only log at debug level to reduce verbosity
logger.debug(f"SSE: Submitted SSE update task for {task_id} (reason: {reason})")
except Exception as e:
logger.error(f"Error submitting SSE update task for task {task_id}: {e}", exc_info=True)
class ProgressState:
"""Enum-like class for progress states"""
@@ -131,6 +152,10 @@ def store_task_status(task_id, status_data):
redis_client.publish(
update_channel, json.dumps({"task_id": task_id, "status_id": status_id})
)
# Trigger immediate SSE event for real-time frontend updates
trigger_sse_event(task_id, "status_update")
except Exception as e:
logger.error(f"Error storing task status: {e}")
traceback.print_exc()
@@ -611,7 +636,6 @@ class ProgressTrackingTask(Task):
logger.debug(f"Task {task_id}: Real-time progress for '{track_name}': {percentage}%")
data["status"] = ProgressState.TRACK_PROGRESS
data["song"] = track_name
artist = data.get("artist", "Unknown")
@@ -648,13 +672,6 @@ class ProgressTrackingTask(Task):
# Log at debug level
logger.debug(f"Task {task_id} track progress: {track_name} by {artist}: {percent}%")
# Set appropriate status
# data["status"] = (
# ProgressState.REAL_TIME
# if data.get("status") == "real_time"
# else ProgressState.TRACK_PROGRESS
# )
def _handle_skipped(self, task_id, data, task_info):
"""Handle skipped status from deezspot"""
@@ -991,6 +1008,10 @@ class ProgressTrackingTask(Task):
def task_prerun_handler(task_id=None, task=None, *args, **kwargs):
"""Signal handler when a task begins running"""
try:
# Skip verbose logging for SSE tasks
if task and hasattr(task, 'name') and task.name in ['trigger_sse_update_task']:
return
task_info = get_task_info(task_id)
# Update task status to processing
@@ -1018,6 +1039,10 @@ def task_postrun_handler(
):
"""Signal handler when a task finishes"""
try:
# Skip verbose logging for SSE tasks
if task and hasattr(task, 'name') and task.name in ['trigger_sse_update_task']:
return
last_status_for_history = get_last_task_status(task_id)
if last_status_for_history and last_status_for_history.get("status") in [
ProgressState.COMPLETE,
@@ -1607,3 +1632,38 @@ def delayed_delete_task_data(task_id, reason):
"""
logger.info(f"Executing delayed deletion for task {task_id}. Reason: {reason}")
delete_task_data_and_log(task_id, reason)
@celery_app.task(
name="trigger_sse_update_task",
queue="utility_tasks",
bind=True
)
def trigger_sse_update_task(self, task_id: str, reason: str = "status_update"):
"""
Dedicated Celery task for triggering SSE task summary updates.
Uses Redis pub/sub to communicate with the main FastAPI process.
"""
try:
# Send task summary update via Redis pub/sub
logger.debug(f"SSE Task: Processing summary update for task {task_id} (reason: {reason})")
event_data = {
"task_id": task_id,
"reason": reason,
"timestamp": time.time(),
"change_type": "task_summary",
"event_type": "summary_update"
}
# Use Redis pub/sub for cross-process communication
redis_client.publish("sse_events", json.dumps(event_data))
logger.debug(f"SSE Task: Published summary update for task {task_id}")
except Exception as e:
# Only log errors, not success cases
logger.error(f"SSE Task: Failed to publish summary update for task {task_id}: {e}", exc_info=True)
# Don't raise exception to avoid task retry - SSE updates are best-effort