From 3c55de631a8be4b73fe07fe31215f0f373cf1f0b Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Thu, 5 Jun 2025 10:31:43 -0600 Subject: [PATCH] 2.2.2 --- routes/utils/album.py | 20 +- routes/utils/celery_config.py | 4 + routes/utils/celery_manager.py | 586 ++++++++++++--------------------- routes/utils/celery_tasks.py | 21 +- 4 files changed, 242 insertions(+), 389 deletions(-) diff --git a/routes/utils/album.py b/routes/utils/album.py index d95f785..3406c7c 100755 --- a/routes/utils/album.py +++ b/routes/utils/album.py @@ -4,7 +4,7 @@ import traceback from deezspot.spotloader import SpoLogin from deezspot.deezloader import DeeLogin from pathlib import Path -from routes.utils.credentials import get_credential, _get_global_spotify_api_creds +from routes.utils.credentials import get_credential, _get_global_spotify_api_creds, get_spotify_blob_path from routes.utils.celery_config import get_config_params def download_album( @@ -99,13 +99,12 @@ def download_album( if not global_spotify_client_id or not global_spotify_client_secret: raise ValueError("Global Spotify API credentials (client_id/secret) not configured for Spotify download.") - spotify_main_creds = get_credential('spotify', main) # For blob path - blob_file_path = spotify_main_creds.get('blob_file_path') - if not Path(blob_file_path).exists(): - raise FileNotFoundError(f"Spotify credentials blob file not found at {blob_file_path} for account '{main}'") + blob_file_path = get_spotify_blob_path(main) + if not blob_file_path or not blob_file_path.exists(): + raise FileNotFoundError(f"Spotify credentials blob file not found or path is invalid for account '{main}'. Path: {str(blob_file_path)}") spo = SpoLogin( - credentials_path=blob_file_path, + credentials_path=str(blob_file_path), # Ensure it's a string spotify_client_id=global_spotify_client_id, spotify_client_secret=global_spotify_client_secret, progress_callback=progress_callback @@ -143,13 +142,12 @@ def download_album( if not global_spotify_client_id or not global_spotify_client_secret: raise ValueError("Global Spotify API credentials (client_id/secret) not configured for Spotify download.") - spotify_main_creds = get_credential('spotify', main) # For blob path - blob_file_path = spotify_main_creds.get('blob_file_path') - if not Path(blob_file_path).exists(): - raise FileNotFoundError(f"Spotify credentials blob file not found at {blob_file_path} for account '{main}'") + blob_file_path = get_spotify_blob_path(main) + if not blob_file_path or not blob_file_path.exists(): + raise FileNotFoundError(f"Spotify credentials blob file not found or path is invalid for account '{main}'. Path: {str(blob_file_path)}") spo = SpoLogin( - credentials_path=blob_file_path, + credentials_path=str(blob_file_path), # Ensure it's a string spotify_client_id=global_spotify_client_id, spotify_client_secret=global_spotify_client_secret, progress_callback=progress_callback diff --git a/routes/utils/celery_config.py b/routes/utils/celery_config.py index fce5711..f8e5704 100644 --- a/routes/utils/celery_config.py +++ b/routes/utils/celery_config.py @@ -100,6 +100,10 @@ task_queues = { 'downloads': { 'exchange': 'downloads', 'routing_key': 'downloads', + }, + 'utility_tasks': { + 'exchange': 'utility_tasks', + 'routing_key': 'utility_tasks', } } diff --git a/routes/utils/celery_manager.py b/routes/utils/celery_manager.py index 3cde125..096808e 100644 --- a/routes/utils/celery_manager.py +++ b/routes/utils/celery_manager.py @@ -21,7 +21,7 @@ from .celery_tasks import ( cleanup_stale_errors, delayed_delete_task_data ) -from .celery_config import get_config_params +from .celery_config import get_config_params, MAX_CONCURRENT_DL # Import history manager from .history_manager import init_history_db # Import credentials manager for DB init @@ -41,386 +41,228 @@ class CeleryManager: Manages Celery workers dynamically based on configuration changes. """ - def __init__(self): - self.celery_process = None - self.current_worker_count = 0 - self.monitoring_thread = None - self.error_cleanup_thread = None - self.running = False - self.log_queue = queue.Queue() - self.output_threads = [] + def __init__(self, app_name="download_tasks"): + self.app_name = app_name + self.download_worker_process = None + self.utility_worker_process = None + self.download_log_thread_stdout = None + self.download_log_thread_stderr = None + self.utility_log_thread_stdout = None + self.utility_log_thread_stderr = None + self.stop_event = threading.Event() + self.config_monitor_thread = None + # self.concurrency now specifically refers to download worker concurrency + self.concurrency = get_config_params().get('maxConcurrentDownloads', MAX_CONCURRENT_DL) + logger.info(f"CeleryManager initialized. Download concurrency set to: {self.concurrency}") - def _cleanup_stale_tasks(self): - logger.info("Cleaning up potentially stale Celery tasks...") + def _get_worker_command(self, queues, concurrency, worker_name_suffix, log_level="INFO"): + # Use a unique worker name to avoid conflicts. + # %h is replaced by celery with the actual hostname. + hostname = f"worker_{worker_name_suffix}@%h" + command = [ + "celery", + "-A", self.app_name, + "worker", + "--loglevel=" + log_level, + "-Q", queues, + "-c", str(concurrency), + "--hostname=" + hostname, + "--pool=prefork" + ] + # Optionally add --without-gossip, --without-mingle, --without-heartbeat + # if experiencing issues or to reduce network load, but defaults are usually fine. + # Example: command.extend(["--without-gossip", "--without-mingle"]) + logger.debug(f"Generated Celery command: {' '.join(command)}") + return command + + def _process_output_reader(self, stream, log_prefix, error=False): + logger.debug(f"Log reader thread started for {log_prefix}") try: - tasks = get_all_celery_tasks_info() - if not tasks: - logger.info("No tasks found in Redis to check for staleness.") - return - - active_stale_states = [ - ProgressState.PROCESSING, - ProgressState.INITIALIZING, - ProgressState.DOWNLOADING, - ProgressState.PROGRESS, - ProgressState.REAL_TIME, - ProgressState.RETRYING - ] - - stale_tasks_count = 0 - for task_summary in tasks: - task_id = task_summary.get("task_id") - if not task_id: - continue - - last_status_data = get_last_task_status(task_id) - if last_status_data: - current_status_str = last_status_data.get("status") - if current_status_str in active_stale_states: - logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') found in stale state '{current_status_str}'. Marking as error.") - - task_info_details = get_task_info(task_id) - config = get_config_params() - - error_payload = { - "status": ProgressState.ERROR, - "message": "Task interrupted due to application restart.", - "error": "Task interrupted due to application restart.", - "timestamp": time.time(), - "type": task_info_details.get("type", task_summary.get("type", "unknown")), - "name": task_info_details.get("name", task_summary.get("name", "Unknown")), - "artist": task_info_details.get("artist", task_summary.get("artist", "")), - "can_retry": True, - "retry_count": last_status_data.get("retry_count", 0), - "max_retries": config.get('maxRetries', 3) - } - store_task_status(task_id, error_payload) - stale_tasks_count += 1 - - # Schedule deletion for this interrupted task - logger.info(f"Task {task_id} was interrupted. Data scheduled for deletion in 30s.") - delayed_delete_task_data.apply_async( - args=[task_id, "Task interrupted by application restart and auto-cleaned."], - countdown=30 - ) - - if stale_tasks_count > 0: - logger.info(f"Marked {stale_tasks_count} stale tasks as 'error'.") + for line in iter(stream.readline, ''): + if line: + log_method = logger.error if error else logger.info + log_method(f"{log_prefix}: {line.strip()}") + elif self.stop_event.is_set(): # If empty line and stop is set, likely EOF + break + # Loop may also exit if stream is closed by process termination + except ValueError: #ValueError: I/O operation on closed file + if not self.stop_event.is_set(): + logger.error(f"Error reading Celery output from {log_prefix} (ValueError - stream closed unexpectedly?)", exc_info=False) # Don't print full trace for common close error else: - logger.info("No stale tasks found that needed cleanup (active states).") - - # NEW: Check for tasks that are already terminal but might have missed their cleanup - logger.info("Checking for terminal tasks (COMPLETE, CANCELLED, terminal ERROR) that might have missed cleanup...") - cleaned_during_this_pass = 0 - # `tasks` variable is from `get_all_celery_tasks_info()` called at the beginning of the method - for task_summary in tasks: - task_id = task_summary.get("task_id") - if not task_id: - continue - - last_status_data = get_last_task_status(task_id) - if last_status_data: - current_status_str = last_status_data.get("status") - task_info_details = get_task_info(task_id) # Get full info for download_type etc. - - cleanup_reason = "" - schedule_cleanup = False - - if current_status_str == ProgressState.COMPLETE: - # If a task is COMPLETE (any download_type) and still here, its original scheduled deletion was missed. - logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}', type: {task_info_details.get('download_type')}) is COMPLETE and still in Redis. Re-scheduling cleanup.") - cleanup_reason = f"Task ({task_info_details.get('download_type')}) was COMPLETE; re-scheduling auto-cleanup." - schedule_cleanup = True - elif current_status_str == ProgressState.CANCELLED: - logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') is CANCELLED and still in Redis. Re-scheduling cleanup.") - cleanup_reason = "Task was CANCELLED; re-scheduling auto-cleanup." - schedule_cleanup = True - elif current_status_str == ProgressState.ERROR: - can_retry_flag = last_status_data.get("can_retry", False) - # is_submission_error_task and is_duplicate_error_task are flags on task_info, not typically on last_status - is_submission_error = task_info_details.get("is_submission_error_task", False) - is_duplicate_error = task_info_details.get("is_duplicate_error_task", False) - # Check if it's an error state that should have been cleaned up - if not can_retry_flag or is_submission_error or is_duplicate_error or last_status_data.get("status") == ProgressState.ERROR_RETRIED: - # ERROR_RETRIED means the original task is done and should be cleaned. - logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') is in a terminal ERROR state ('{last_status_data.get('error')}') and still in Redis. Re-scheduling cleanup.") - cleanup_reason = f"Task was in terminal ERROR state ('{last_status_data.get('error', 'Unknown error')}'); re-scheduling auto-cleanup." - schedule_cleanup = True - elif current_status_str == ProgressState.ERROR_RETRIED: - # This state itself implies the task is terminal and its data can be cleaned. - logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') is ERROR_RETRIED and still in Redis. Re-scheduling cleanup.") - cleanup_reason = "Task was ERROR_RETRIED; re-scheduling auto-cleanup." - schedule_cleanup = True - - if schedule_cleanup: - delayed_delete_task_data.apply_async( - args=[task_id, cleanup_reason], - countdown=30 # Schedule with 30s delay - ) - cleaned_during_this_pass +=1 - - if cleaned_during_this_pass > 0: - logger.info(f"Re-scheduled cleanup for {cleaned_during_this_pass} terminal tasks that were still in Redis.") - else: - logger.info("No additional terminal tasks found in Redis needing cleanup re-scheduling.") - + logger.info(f"{log_prefix} stream reader gracefully stopped due to closed stream after stop signal.") except Exception as e: - logger.error(f"Error during stale task cleanup: {e}", exc_info=True) + logger.error(f"Unexpected error in log reader for {log_prefix}: {e}", exc_info=True) + finally: + if hasattr(stream, 'close') and not stream.closed: + stream.close() + logger.info(f"{log_prefix} stream reader thread finished.") def start(self): - """Start the Celery manager and initial workers""" - if self.running: - return - - self.running = True - - # Initialize history database - init_history_db() - # Initialize credentials database - init_credentials_db() - - # Clean up stale tasks BEFORE starting/restarting workers - self._cleanup_stale_tasks() - - # Start initial workers - self._update_workers() - - # Start monitoring thread for config changes - self.monitoring_thread = threading.Thread(target=self._monitor_config, daemon=True) - self.monitoring_thread.start() + self.stop_event.clear() # Clear stop event before starting - # Start periodic error cleanup thread - self.error_cleanup_thread = threading.Thread(target=self._run_periodic_error_cleanup, daemon=True) - self.error_cleanup_thread.start() + # Start Download Worker + if self.download_worker_process and self.download_worker_process.poll() is None: + logger.info("Celery Download Worker is already running.") + else: + self.concurrency = get_config_params().get('maxConcurrentDownloads', self.concurrency) + download_cmd = self._get_worker_command( + queues="downloads", + concurrency=self.concurrency, + worker_name_suffix="dlw" # Download Worker + ) + logger.info(f"Starting Celery Download Worker with command: {' '.join(download_cmd)}") + self.download_worker_process = subprocess.Popen( + download_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True + ) + self.download_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stdout, "Celery[DW-STDOUT]")) + self.download_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stderr, "Celery[DW-STDERR]", True)) + self.download_log_thread_stdout.start() + self.download_log_thread_stderr.start() + logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) started with concurrency {self.concurrency}.") + + # Start Utility Worker + if self.utility_worker_process and self.utility_worker_process.poll() is None: + logger.info("Celery Utility Worker is already running.") + else: + utility_cmd = self._get_worker_command( + queues="utility_tasks,default", # Listen to utility and default + concurrency=3, + worker_name_suffix="utw" # Utility Worker + ) + logger.info(f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}") + self.utility_worker_process = subprocess.Popen( + utility_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True + ) + self.utility_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.utility_worker_process.stdout, "Celery[UW-STDOUT]")) + self.utility_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.utility_worker_process.stderr, "Celery[UW-STDERR]", True)) + self.utility_log_thread_stdout.start() + self.utility_log_thread_stderr.start() + logger.info(f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency 3.") + + if self.config_monitor_thread is None or not self.config_monitor_thread.is_alive(): + self.config_monitor_thread = threading.Thread(target=self._monitor_config_changes) + self.config_monitor_thread.daemon = True # Allow main program to exit even if this thread is running + self.config_monitor_thread.start() + logger.info("CeleryManager: Config monitor thread started.") + else: + logger.info("CeleryManager: Config monitor thread already running.") + + def _monitor_config_changes(self): + logger.info("CeleryManager: Config monitor thread active, monitoring configuration changes...") + while not self.stop_event.is_set(): + try: + time.sleep(10) # Check every 10 seconds + if self.stop_event.is_set(): break + + current_config = get_config_params() + new_max_concurrent_downloads = current_config.get('maxConcurrentDownloads', self.concurrency) + + if new_max_concurrent_downloads != self.concurrency: + logger.info(f"CeleryManager: Detected change in maxConcurrentDownloads from {self.concurrency} to {new_max_concurrent_downloads}. Restarting download worker only.") + + # Stop only the download worker + if self.download_worker_process and self.download_worker_process.poll() is None: + logger.info(f"Stopping Celery Download Worker (PID: {self.download_worker_process.pid}) for config update...") + self.download_worker_process.terminate() + try: + self.download_worker_process.wait(timeout=10) + logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) terminated.") + except subprocess.TimeoutExpired: + logger.warning(f"Celery Download Worker (PID: {self.download_worker_process.pid}) did not terminate gracefully, killing.") + self.download_worker_process.kill() + self.download_worker_process = None + + # Wait for log threads of download worker to finish + if self.download_log_thread_stdout and self.download_log_thread_stdout.is_alive(): + self.download_log_thread_stdout.join(timeout=5) + if self.download_log_thread_stderr and self.download_log_thread_stderr.is_alive(): + self.download_log_thread_stderr.join(timeout=5) + + self.concurrency = new_max_concurrent_downloads + + # Restart only the download worker + download_cmd = self._get_worker_command("downloads", self.concurrency, "dlw") + logger.info(f"Restarting Celery Download Worker with command: {' '.join(download_cmd)}") + self.download_worker_process = subprocess.Popen( + download_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True + ) + self.download_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stdout, "Celery[DW-STDOUT]")) + self.download_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stderr, "Celery[DW-STDERR]", True)) + self.download_log_thread_stdout.start() + self.download_log_thread_stderr.start() + logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) restarted with new concurrency {self.concurrency}.") + + except Exception as e: + logger.error(f"CeleryManager: Error in config monitor thread: {e}", exc_info=True) + # Avoid busy-looping on continuous errors + if not self.stop_event.is_set(): time.sleep(30) + logger.info("CeleryManager: Config monitor thread stopped.") - # Register shutdown handler - atexit.register(self.stop) - + def _stop_worker_process(self, worker_process, worker_name): + if worker_process and worker_process.poll() is None: + logger.info(f"Terminating Celery {worker_name} Worker (PID: {worker_process.pid})...") + worker_process.terminate() + try: + worker_process.wait(timeout=10) + logger.info(f"Celery {worker_name} Worker (PID: {worker_process.pid}) terminated.") + except subprocess.TimeoutExpired: + logger.warning(f"Celery {worker_name} Worker (PID: {worker_process.pid}) did not terminate gracefully, killing.") + worker_process.kill() + return None # Set process to None after stopping + def stop(self): - """Stop the Celery manager and all workers""" - self.running = False + logger.info("CeleryManager: Stopping Celery workers...") + self.stop_event.set() # Signal all threads to stop + + # Stop download worker + self.download_worker_process = self._stop_worker_process(self.download_worker_process, "Download") - # Stop all running threads - for thread in self.output_threads: - if thread.is_alive(): - # We can't really stop the threads, but they'll exit on their own - # when the process is terminated since they're daemon threads - pass + # Stop utility worker + self.utility_worker_process = self._stop_worker_process(self.utility_worker_process, "Utility") + + logger.info("Joining log threads...") + thread_timeout = 5 # seconds to wait for log threads + + # Join download worker log threads + if self.download_log_thread_stdout and self.download_log_thread_stdout.is_alive(): + self.download_log_thread_stdout.join(timeout=thread_timeout) + if self.download_log_thread_stderr and self.download_log_thread_stderr.is_alive(): + self.download_log_thread_stderr.join(timeout=thread_timeout) + + # Join utility worker log threads + if self.utility_log_thread_stdout and self.utility_log_thread_stdout.is_alive(): + self.utility_log_thread_stdout.join(timeout=thread_timeout) + if self.utility_log_thread_stderr and self.utility_log_thread_stderr.is_alive(): + self.utility_log_thread_stderr.join(timeout=thread_timeout) + + if self.config_monitor_thread and self.config_monitor_thread.is_alive(): + logger.info("Joining config_monitor_thread...") + self.config_monitor_thread.join(timeout=thread_timeout) - if self.celery_process: - logger.info("Stopping Celery workers...") - try: - # Send SIGTERM to process group - os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM) - self.celery_process.wait(timeout=5) - except (subprocess.TimeoutExpired, ProcessLookupError): - # Force kill if not terminated - try: - os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL) - except ProcessLookupError: - pass - - self.celery_process = None - self.current_worker_count = 0 - - def _get_worker_count(self): - """Get the configured worker count from config file""" - try: - if not Path(CONFIG_PATH).exists(): - return 3 # Default - - with open(CONFIG_PATH, 'r') as f: - config = json.load(f) - - return int(config.get('maxConcurrentDownloads', 3)) - except Exception as e: - logger.error(f"Error reading worker count from config: {e}") - return 3 # Default on error - - def _update_workers(self): - """Update workers if needed based on configuration""" - new_worker_count = self._get_worker_count() - - if new_worker_count == self.current_worker_count and self.celery_process and self.celery_process.poll() is None: - return # No change and process is running - - logger.info(f"Updating Celery workers from {self.current_worker_count} to {new_worker_count}") - - # Stop existing workers if running - if self.celery_process: - try: - logger.info("Stopping existing Celery workers...") - os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM) - self.celery_process.wait(timeout=5) - except (subprocess.TimeoutExpired, ProcessLookupError): - try: - logger.warning("Forcibly killing Celery workers with SIGKILL") - os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL) - except ProcessLookupError: - pass - - # Clear output threads list - self.output_threads = [] - - # Wait a moment to ensure processes are terminated - time.sleep(2) - - # Additional cleanup - find and kill any stray Celery processes - try: - # This runs a shell command to find and kill all celery processes - subprocess.run( - "ps aux | grep 'celery -A routes.utils.celery_tasks.celery_app worker' | grep -v grep | awk '{print $2}' | xargs -r kill -9", - shell=True, - stderr=subprocess.PIPE - ) - logger.info("Killed any stray Celery processes") - - # Wait a moment to ensure processes are terminated + logger.info("CeleryManager: All workers and threads signaled to stop and joined.") + + def restart(self): + logger.info("CeleryManager: Restarting all Celery workers...") + self.stop() + # Short delay before restarting + logger.info("Waiting a brief moment before restarting workers...") + time.sleep(2) + self.start() + logger.info("CeleryManager: All Celery workers restarted.") + +# Global instance for managing Celery workers +celery_manager = CeleryManager() + +# Example of how to use the manager (typically called from your main app script) +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [%(threadName)s] [%(name)s] - %(message)s') + logger.info("Starting Celery Manager example...") + celery_manager.start() + try: + while True: time.sleep(1) - except Exception as e: - logger.error(f"Error during stray process cleanup: {e}") - - # Start new workers with updated concurrency - try: - # Set environment variables to configure Celery logging - env = os.environ.copy() - env['PYTHONUNBUFFERED'] = '1' # Ensure Python output is unbuffered - - # Construct command with extra logging options - cmd = [ - 'celery', - '-A', CELERY_APP, - 'worker', - '--loglevel=info', - f'--concurrency={new_worker_count}', - '-Q', 'downloads,default', - '--logfile=-', # Output logs to stdout - '--without-heartbeat', # Reduce log noise - '--without-gossip', # Reduce log noise - '--without-mingle', # Reduce log noise - # Add unique worker name to prevent conflicts - f'--hostname=worker@%h-{uuid.uuid4()}' - ] - - logger.info(f"Starting new Celery workers with command: {' '.join(cmd)}") - - self.celery_process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=env, - preexec_fn=os.setsid, # New process group for clean termination - universal_newlines=True, - bufsize=1 # Line buffered - ) - - self.current_worker_count = new_worker_count - logger.info(f"Started Celery workers with concurrency {new_worker_count}, PID: {self.celery_process.pid}") - - # Verify the process started correctly - time.sleep(2) - if self.celery_process.poll() is not None: - # Process exited prematurely - stdout, stderr = "", "" - try: - stdout, stderr = self.celery_process.communicate(timeout=1) - except subprocess.TimeoutExpired: - pass - - logger.error(f"Celery workers failed to start. Exit code: {self.celery_process.poll()}") - logger.error(f"Stdout: {stdout}") - logger.error(f"Stderr: {stderr}") - self.celery_process = None - raise RuntimeError("Celery workers failed to start") - - # Start non-blocking output reader threads for both stdout and stderr - stdout_thread = threading.Thread( - target=self._process_output_reader, - args=(self.celery_process.stdout, "STDOUT"), - daemon=True - ) - stdout_thread.start() - self.output_threads.append(stdout_thread) - - stderr_thread = threading.Thread( - target=self._process_output_reader, - args=(self.celery_process.stderr, "STDERR"), - daemon=True - ) - stderr_thread.start() - self.output_threads.append(stderr_thread) - - except Exception as e: - logger.error(f"Error starting Celery workers: {e}") - # In case of failure, make sure we don't leave orphaned processes - if self.celery_process and self.celery_process.poll() is None: - try: - os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL) - except (ProcessLookupError, OSError): - pass - self.celery_process = None - - def _process_output_reader(self, pipe, stream_name): - """Read and log output from the process""" - try: - for line in iter(pipe.readline, ''): - if not line: - break - - line = line.strip() - if not line: - continue - - # Format the message to identify it's from Celery - if "ERROR" in line or "CRITICAL" in line: - logger.error(f"Celery[{stream_name}]: {line}") - elif "WARNING" in line: - logger.warning(f"Celery[{stream_name}]: {line}") - elif "DEBUG" in line: - logger.debug(f"Celery[{stream_name}]: {line}") - else: - logger.info(f"Celery[{stream_name}]: {line}") - - except Exception as e: - logger.error(f"Error processing Celery output: {e}") - finally: - pipe.close() - - def _monitor_config(self): - """Monitor configuration file for changes""" - logger.info("Starting config monitoring thread") - last_check_time = 0 - - while self.running: - try: - # Check for changes - if time.time() - last_check_time >= CONFIG_CHECK_INTERVAL: - self._update_workers() - last_check_time = time.time() - - time.sleep(1) - except Exception as e: - logger.error(f"Error in config monitoring thread: {e}") - time.sleep(5) # Wait before retrying - - def _run_periodic_error_cleanup(self): - """Periodically triggers the cleanup_stale_errors Celery task.""" - cleanup_interval = 60 # Run cleanup task every 60 seconds - logger.info(f"Starting periodic error cleanup scheduler (runs every {cleanup_interval}s).") - while self.running: - try: - logger.info("Scheduling cleanup_stale_errors task...") - cleanup_stale_errors.delay() # Call the Celery task - except Exception as e: - logger.error(f"Error scheduling cleanup_stale_errors task: {e}", exc_info=True) - - # Wait for the next interval - # Use a loop to check self.running more frequently to allow faster shutdown - for _ in range(cleanup_interval): - if not self.running: - break - time.sleep(1) - logger.info("Periodic error cleanup scheduler stopped.") - -# Create single instance -celery_manager = CeleryManager() \ No newline at end of file + except KeyboardInterrupt: + logger.info("Keyboard interrupt received, stopping Celery Manager...") + finally: + celery_manager.stop() + logger.info("Celery Manager example finished.") \ No newline at end of file diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index ac72774..fd45b6f 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -931,11 +931,15 @@ def task_prerun_handler(task_id=None, task=None, *args, **kwargs): def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args, **kwargs): """Signal handler when a task finishes""" try: + # Define download task names + download_task_names = ["download_track", "download_album", "download_playlist"] + last_status_for_history = get_last_task_status(task_id) if last_status_for_history and last_status_for_history.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, "ERROR_RETRIED", "ERROR_AUTO_CLEANED"]: if state == states.REVOKED and last_status_for_history.get("status") != ProgressState.CANCELLED: logger.info(f"Task {task_id} was REVOKED (likely cancelled), logging to history.") - _log_task_to_history(task_id, 'CANCELLED', "Task was revoked/cancelled.") + if task and task.name in download_task_names: # Check if it's a download task + _log_task_to_history(task_id, 'CANCELLED', "Task was revoked/cancelled.") # return # Let status update proceed if necessary task_info = get_task_info(task_id) @@ -952,7 +956,8 @@ def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args "message": "Download completed successfully." }) logger.info(f"Task {task_id} completed successfully: {task_info.get('name', 'Unknown')}") - _log_task_to_history(task_id, 'COMPLETED') + if task and task.name in download_task_names: # Check if it's a download task + _log_task_to_history(task_id, 'COMPLETED') if task_info.get("download_type") == "track": # Applies to single track downloads and tracks from playlists/albums delayed_delete_task_data.apply_async( @@ -999,12 +1004,15 @@ def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args logger.error(f"Error in task_postrun_handler: {e}", exc_info=True) @task_failure.connect -def task_failure_handler(task_id=None, exception=None, traceback=None, *args, **kwargs): +def task_failure_handler(task_id=None, exception=None, traceback=None, sender=None, *args, **kwargs): """Signal handler when a task fails""" try: # Skip if Retry exception if isinstance(exception, Retry): return + + # Define download task names + download_task_names = ["download_track", "download_album", "download_playlist"] # Get task info and status task_info = get_task_info(task_id) @@ -1038,7 +1046,8 @@ def task_failure_handler(task_id=None, exception=None, traceback=None, *args, ** }) logger.error(f"Task {task_id} failed: {str(exception)}") - _log_task_to_history(task_id, 'ERROR', str(exception)) + if sender and sender.name in download_task_names: # Check if it's a download task + _log_task_to_history(task_id, 'ERROR', str(exception)) if can_retry: logger.info(f"Task {task_id} can be retried ({retry_count}/{max_retries})") @@ -1346,7 +1355,7 @@ def delete_task_data_and_log(task_id, reason="Task data deleted"): logger.error(f"Error deleting task data for {task_id}: {e}", exc_info=True) return False -@celery_app.task(name="cleanup_stale_errors", queue="default") # Put on default queue, not downloads +@celery_app.task(name="cleanup_stale_errors", queue="utility_tasks") # Put on utility_tasks queue def cleanup_stale_errors(): """ Periodically checks for tasks in ERROR state for more than 1 minute and cleans them up. @@ -1385,7 +1394,7 @@ def cleanup_stale_errors(): logger.error(f"Error during cleanup_stale_errors: {e}", exc_info=True) return {"status": "error", "error": str(e)} -@celery_app.task(name="delayed_delete_task_data", queue="default") # Use default queue for utility tasks +@celery_app.task(name="delayed_delete_task_data", queue="utility_tasks") # Use utility_tasks queue def delayed_delete_task_data(task_id, reason): """ Celery task to delete task data after a delay.