Solved issue #114

This commit is contained in:
cool.gitter.not.me.again.duh
2025-05-26 14:44:30 -06:00
parent 5d15532d42
commit b6ff994047
2 changed files with 272 additions and 146 deletions

View File

@@ -11,6 +11,16 @@ import queue
import sys
import uuid
# Import Celery task utilities
from .celery_tasks import (
ProgressState,
get_task_info,
get_last_task_status,
store_task_status,
get_all_tasks as get_all_celery_tasks_info
)
from .celery_config import get_config_params
# Configure logging
logger = logging.getLogger(__name__)
@@ -33,6 +43,61 @@ class CeleryManager:
self.log_queue = queue.Queue()
self.output_threads = []
def _cleanup_stale_tasks(self):
logger.info("Cleaning up potentially stale Celery tasks...")
try:
tasks = get_all_celery_tasks_info()
if not tasks:
logger.info("No tasks found in Redis to check for staleness.")
return
active_stale_states = [
ProgressState.PROCESSING,
ProgressState.INITIALIZING,
ProgressState.DOWNLOADING,
ProgressState.PROGRESS,
ProgressState.REAL_TIME,
ProgressState.RETRYING
]
stale_tasks_count = 0
for task_summary in tasks:
task_id = task_summary.get("task_id")
if not task_id:
continue
last_status_data = get_last_task_status(task_id)
if last_status_data:
current_status_str = last_status_data.get("status")
if current_status_str in active_stale_states:
logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') found in stale state '{current_status_str}'. Marking as error.")
task_info_details = get_task_info(task_id)
config = get_config_params()
error_payload = {
"status": ProgressState.ERROR,
"message": "Task interrupted due to application restart.",
"error": "Task interrupted due to application restart.",
"timestamp": time.time(),
"type": task_info_details.get("type", task_summary.get("type", "unknown")),
"name": task_info_details.get("name", task_summary.get("name", "Unknown")),
"artist": task_info_details.get("artist", task_summary.get("artist", "")),
"can_retry": True,
"retry_count": last_status_data.get("retry_count", 0),
"max_retries": config.get('maxRetries', 3)
}
store_task_status(task_id, error_payload)
stale_tasks_count += 1
if stale_tasks_count > 0:
logger.info(f"Marked {stale_tasks_count} stale tasks as 'error'.")
else:
logger.info("No stale tasks found that needed cleanup.")
except Exception as e:
logger.error(f"Error during stale task cleanup: {e}", exc_info=True)
def start(self):
"""Start the Celery manager and initial workers"""
if self.running:
@@ -40,6 +105,9 @@ class CeleryManager:
self.running = True
# Clean up stale tasks BEFORE starting/restarting workers
self._cleanup_stale_tasks()
# Start initial workers
self._update_workers()