2.2.2
This commit is contained in:
@@ -4,7 +4,7 @@ import traceback
|
|||||||
from deezspot.spotloader import SpoLogin
|
from deezspot.spotloader import SpoLogin
|
||||||
from deezspot.deezloader import DeeLogin
|
from deezspot.deezloader import DeeLogin
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from routes.utils.credentials import get_credential, _get_global_spotify_api_creds
|
from routes.utils.credentials import get_credential, _get_global_spotify_api_creds, get_spotify_blob_path
|
||||||
from routes.utils.celery_config import get_config_params
|
from routes.utils.celery_config import get_config_params
|
||||||
|
|
||||||
def download_album(
|
def download_album(
|
||||||
@@ -99,13 +99,12 @@ def download_album(
|
|||||||
if not global_spotify_client_id or not global_spotify_client_secret:
|
if not global_spotify_client_id or not global_spotify_client_secret:
|
||||||
raise ValueError("Global Spotify API credentials (client_id/secret) not configured for Spotify download.")
|
raise ValueError("Global Spotify API credentials (client_id/secret) not configured for Spotify download.")
|
||||||
|
|
||||||
spotify_main_creds = get_credential('spotify', main) # For blob path
|
blob_file_path = get_spotify_blob_path(main)
|
||||||
blob_file_path = spotify_main_creds.get('blob_file_path')
|
if not blob_file_path or not blob_file_path.exists():
|
||||||
if not Path(blob_file_path).exists():
|
raise FileNotFoundError(f"Spotify credentials blob file not found or path is invalid for account '{main}'. Path: {str(blob_file_path)}")
|
||||||
raise FileNotFoundError(f"Spotify credentials blob file not found at {blob_file_path} for account '{main}'")
|
|
||||||
|
|
||||||
spo = SpoLogin(
|
spo = SpoLogin(
|
||||||
credentials_path=blob_file_path,
|
credentials_path=str(blob_file_path), # Ensure it's a string
|
||||||
spotify_client_id=global_spotify_client_id,
|
spotify_client_id=global_spotify_client_id,
|
||||||
spotify_client_secret=global_spotify_client_secret,
|
spotify_client_secret=global_spotify_client_secret,
|
||||||
progress_callback=progress_callback
|
progress_callback=progress_callback
|
||||||
@@ -143,13 +142,12 @@ def download_album(
|
|||||||
if not global_spotify_client_id or not global_spotify_client_secret:
|
if not global_spotify_client_id or not global_spotify_client_secret:
|
||||||
raise ValueError("Global Spotify API credentials (client_id/secret) not configured for Spotify download.")
|
raise ValueError("Global Spotify API credentials (client_id/secret) not configured for Spotify download.")
|
||||||
|
|
||||||
spotify_main_creds = get_credential('spotify', main) # For blob path
|
blob_file_path = get_spotify_blob_path(main)
|
||||||
blob_file_path = spotify_main_creds.get('blob_file_path')
|
if not blob_file_path or not blob_file_path.exists():
|
||||||
if not Path(blob_file_path).exists():
|
raise FileNotFoundError(f"Spotify credentials blob file not found or path is invalid for account '{main}'. Path: {str(blob_file_path)}")
|
||||||
raise FileNotFoundError(f"Spotify credentials blob file not found at {blob_file_path} for account '{main}'")
|
|
||||||
|
|
||||||
spo = SpoLogin(
|
spo = SpoLogin(
|
||||||
credentials_path=blob_file_path,
|
credentials_path=str(blob_file_path), # Ensure it's a string
|
||||||
spotify_client_id=global_spotify_client_id,
|
spotify_client_id=global_spotify_client_id,
|
||||||
spotify_client_secret=global_spotify_client_secret,
|
spotify_client_secret=global_spotify_client_secret,
|
||||||
progress_callback=progress_callback
|
progress_callback=progress_callback
|
||||||
|
|||||||
@@ -100,6 +100,10 @@ task_queues = {
|
|||||||
'downloads': {
|
'downloads': {
|
||||||
'exchange': 'downloads',
|
'exchange': 'downloads',
|
||||||
'routing_key': 'downloads',
|
'routing_key': 'downloads',
|
||||||
|
},
|
||||||
|
'utility_tasks': {
|
||||||
|
'exchange': 'utility_tasks',
|
||||||
|
'routing_key': 'utility_tasks',
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ from .celery_tasks import (
|
|||||||
cleanup_stale_errors,
|
cleanup_stale_errors,
|
||||||
delayed_delete_task_data
|
delayed_delete_task_data
|
||||||
)
|
)
|
||||||
from .celery_config import get_config_params
|
from .celery_config import get_config_params, MAX_CONCURRENT_DL
|
||||||
# Import history manager
|
# Import history manager
|
||||||
from .history_manager import init_history_db
|
from .history_manager import init_history_db
|
||||||
# Import credentials manager for DB init
|
# Import credentials manager for DB init
|
||||||
@@ -41,386 +41,228 @@ class CeleryManager:
|
|||||||
Manages Celery workers dynamically based on configuration changes.
|
Manages Celery workers dynamically based on configuration changes.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, app_name="download_tasks"):
|
||||||
self.celery_process = None
|
self.app_name = app_name
|
||||||
self.current_worker_count = 0
|
self.download_worker_process = None
|
||||||
self.monitoring_thread = None
|
self.utility_worker_process = None
|
||||||
self.error_cleanup_thread = None
|
self.download_log_thread_stdout = None
|
||||||
self.running = False
|
self.download_log_thread_stderr = None
|
||||||
self.log_queue = queue.Queue()
|
self.utility_log_thread_stdout = None
|
||||||
self.output_threads = []
|
self.utility_log_thread_stderr = None
|
||||||
|
self.stop_event = threading.Event()
|
||||||
|
self.config_monitor_thread = None
|
||||||
|
# self.concurrency now specifically refers to download worker concurrency
|
||||||
|
self.concurrency = get_config_params().get('maxConcurrentDownloads', MAX_CONCURRENT_DL)
|
||||||
|
logger.info(f"CeleryManager initialized. Download concurrency set to: {self.concurrency}")
|
||||||
|
|
||||||
def _cleanup_stale_tasks(self):
|
def _get_worker_command(self, queues, concurrency, worker_name_suffix, log_level="INFO"):
|
||||||
logger.info("Cleaning up potentially stale Celery tasks...")
|
# Use a unique worker name to avoid conflicts.
|
||||||
|
# %h is replaced by celery with the actual hostname.
|
||||||
|
hostname = f"worker_{worker_name_suffix}@%h"
|
||||||
|
command = [
|
||||||
|
"celery",
|
||||||
|
"-A", self.app_name,
|
||||||
|
"worker",
|
||||||
|
"--loglevel=" + log_level,
|
||||||
|
"-Q", queues,
|
||||||
|
"-c", str(concurrency),
|
||||||
|
"--hostname=" + hostname,
|
||||||
|
"--pool=prefork"
|
||||||
|
]
|
||||||
|
# Optionally add --without-gossip, --without-mingle, --without-heartbeat
|
||||||
|
# if experiencing issues or to reduce network load, but defaults are usually fine.
|
||||||
|
# Example: command.extend(["--without-gossip", "--without-mingle"])
|
||||||
|
logger.debug(f"Generated Celery command: {' '.join(command)}")
|
||||||
|
return command
|
||||||
|
|
||||||
|
def _process_output_reader(self, stream, log_prefix, error=False):
|
||||||
|
logger.debug(f"Log reader thread started for {log_prefix}")
|
||||||
try:
|
try:
|
||||||
tasks = get_all_celery_tasks_info()
|
for line in iter(stream.readline, ''):
|
||||||
if not tasks:
|
if line:
|
||||||
logger.info("No tasks found in Redis to check for staleness.")
|
log_method = logger.error if error else logger.info
|
||||||
return
|
log_method(f"{log_prefix}: {line.strip()}")
|
||||||
|
elif self.stop_event.is_set(): # If empty line and stop is set, likely EOF
|
||||||
active_stale_states = [
|
break
|
||||||
ProgressState.PROCESSING,
|
# Loop may also exit if stream is closed by process termination
|
||||||
ProgressState.INITIALIZING,
|
except ValueError: #ValueError: I/O operation on closed file
|
||||||
ProgressState.DOWNLOADING,
|
if not self.stop_event.is_set():
|
||||||
ProgressState.PROGRESS,
|
logger.error(f"Error reading Celery output from {log_prefix} (ValueError - stream closed unexpectedly?)", exc_info=False) # Don't print full trace for common close error
|
||||||
ProgressState.REAL_TIME,
|
|
||||||
ProgressState.RETRYING
|
|
||||||
]
|
|
||||||
|
|
||||||
stale_tasks_count = 0
|
|
||||||
for task_summary in tasks:
|
|
||||||
task_id = task_summary.get("task_id")
|
|
||||||
if not task_id:
|
|
||||||
continue
|
|
||||||
|
|
||||||
last_status_data = get_last_task_status(task_id)
|
|
||||||
if last_status_data:
|
|
||||||
current_status_str = last_status_data.get("status")
|
|
||||||
if current_status_str in active_stale_states:
|
|
||||||
logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') found in stale state '{current_status_str}'. Marking as error.")
|
|
||||||
|
|
||||||
task_info_details = get_task_info(task_id)
|
|
||||||
config = get_config_params()
|
|
||||||
|
|
||||||
error_payload = {
|
|
||||||
"status": ProgressState.ERROR,
|
|
||||||
"message": "Task interrupted due to application restart.",
|
|
||||||
"error": "Task interrupted due to application restart.",
|
|
||||||
"timestamp": time.time(),
|
|
||||||
"type": task_info_details.get("type", task_summary.get("type", "unknown")),
|
|
||||||
"name": task_info_details.get("name", task_summary.get("name", "Unknown")),
|
|
||||||
"artist": task_info_details.get("artist", task_summary.get("artist", "")),
|
|
||||||
"can_retry": True,
|
|
||||||
"retry_count": last_status_data.get("retry_count", 0),
|
|
||||||
"max_retries": config.get('maxRetries', 3)
|
|
||||||
}
|
|
||||||
store_task_status(task_id, error_payload)
|
|
||||||
stale_tasks_count += 1
|
|
||||||
|
|
||||||
# Schedule deletion for this interrupted task
|
|
||||||
logger.info(f"Task {task_id} was interrupted. Data scheduled for deletion in 30s.")
|
|
||||||
delayed_delete_task_data.apply_async(
|
|
||||||
args=[task_id, "Task interrupted by application restart and auto-cleaned."],
|
|
||||||
countdown=30
|
|
||||||
)
|
|
||||||
|
|
||||||
if stale_tasks_count > 0:
|
|
||||||
logger.info(f"Marked {stale_tasks_count} stale tasks as 'error'.")
|
|
||||||
else:
|
else:
|
||||||
logger.info("No stale tasks found that needed cleanup (active states).")
|
logger.info(f"{log_prefix} stream reader gracefully stopped due to closed stream after stop signal.")
|
||||||
|
|
||||||
# NEW: Check for tasks that are already terminal but might have missed their cleanup
|
|
||||||
logger.info("Checking for terminal tasks (COMPLETE, CANCELLED, terminal ERROR) that might have missed cleanup...")
|
|
||||||
cleaned_during_this_pass = 0
|
|
||||||
# `tasks` variable is from `get_all_celery_tasks_info()` called at the beginning of the method
|
|
||||||
for task_summary in tasks:
|
|
||||||
task_id = task_summary.get("task_id")
|
|
||||||
if not task_id:
|
|
||||||
continue
|
|
||||||
|
|
||||||
last_status_data = get_last_task_status(task_id)
|
|
||||||
if last_status_data:
|
|
||||||
current_status_str = last_status_data.get("status")
|
|
||||||
task_info_details = get_task_info(task_id) # Get full info for download_type etc.
|
|
||||||
|
|
||||||
cleanup_reason = ""
|
|
||||||
schedule_cleanup = False
|
|
||||||
|
|
||||||
if current_status_str == ProgressState.COMPLETE:
|
|
||||||
# If a task is COMPLETE (any download_type) and still here, its original scheduled deletion was missed.
|
|
||||||
logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}', type: {task_info_details.get('download_type')}) is COMPLETE and still in Redis. Re-scheduling cleanup.")
|
|
||||||
cleanup_reason = f"Task ({task_info_details.get('download_type')}) was COMPLETE; re-scheduling auto-cleanup."
|
|
||||||
schedule_cleanup = True
|
|
||||||
elif current_status_str == ProgressState.CANCELLED:
|
|
||||||
logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') is CANCELLED and still in Redis. Re-scheduling cleanup.")
|
|
||||||
cleanup_reason = "Task was CANCELLED; re-scheduling auto-cleanup."
|
|
||||||
schedule_cleanup = True
|
|
||||||
elif current_status_str == ProgressState.ERROR:
|
|
||||||
can_retry_flag = last_status_data.get("can_retry", False)
|
|
||||||
# is_submission_error_task and is_duplicate_error_task are flags on task_info, not typically on last_status
|
|
||||||
is_submission_error = task_info_details.get("is_submission_error_task", False)
|
|
||||||
is_duplicate_error = task_info_details.get("is_duplicate_error_task", False)
|
|
||||||
# Check if it's an error state that should have been cleaned up
|
|
||||||
if not can_retry_flag or is_submission_error or is_duplicate_error or last_status_data.get("status") == ProgressState.ERROR_RETRIED:
|
|
||||||
# ERROR_RETRIED means the original task is done and should be cleaned.
|
|
||||||
logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') is in a terminal ERROR state ('{last_status_data.get('error')}') and still in Redis. Re-scheduling cleanup.")
|
|
||||||
cleanup_reason = f"Task was in terminal ERROR state ('{last_status_data.get('error', 'Unknown error')}'); re-scheduling auto-cleanup."
|
|
||||||
schedule_cleanup = True
|
|
||||||
elif current_status_str == ProgressState.ERROR_RETRIED:
|
|
||||||
# This state itself implies the task is terminal and its data can be cleaned.
|
|
||||||
logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') is ERROR_RETRIED and still in Redis. Re-scheduling cleanup.")
|
|
||||||
cleanup_reason = "Task was ERROR_RETRIED; re-scheduling auto-cleanup."
|
|
||||||
schedule_cleanup = True
|
|
||||||
|
|
||||||
if schedule_cleanup:
|
|
||||||
delayed_delete_task_data.apply_async(
|
|
||||||
args=[task_id, cleanup_reason],
|
|
||||||
countdown=30 # Schedule with 30s delay
|
|
||||||
)
|
|
||||||
cleaned_during_this_pass +=1
|
|
||||||
|
|
||||||
if cleaned_during_this_pass > 0:
|
|
||||||
logger.info(f"Re-scheduled cleanup for {cleaned_during_this_pass} terminal tasks that were still in Redis.")
|
|
||||||
else:
|
|
||||||
logger.info("No additional terminal tasks found in Redis needing cleanup re-scheduling.")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error during stale task cleanup: {e}", exc_info=True)
|
logger.error(f"Unexpected error in log reader for {log_prefix}: {e}", exc_info=True)
|
||||||
|
finally:
|
||||||
|
if hasattr(stream, 'close') and not stream.closed:
|
||||||
|
stream.close()
|
||||||
|
logger.info(f"{log_prefix} stream reader thread finished.")
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Start the Celery manager and initial workers"""
|
self.stop_event.clear() # Clear stop event before starting
|
||||||
if self.running:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.running = True
|
|
||||||
|
|
||||||
# Initialize history database
|
|
||||||
init_history_db()
|
|
||||||
# Initialize credentials database
|
|
||||||
init_credentials_db()
|
|
||||||
|
|
||||||
# Clean up stale tasks BEFORE starting/restarting workers
|
|
||||||
self._cleanup_stale_tasks()
|
|
||||||
|
|
||||||
# Start initial workers
|
|
||||||
self._update_workers()
|
|
||||||
|
|
||||||
# Start monitoring thread for config changes
|
|
||||||
self.monitoring_thread = threading.Thread(target=self._monitor_config, daemon=True)
|
|
||||||
self.monitoring_thread.start()
|
|
||||||
|
|
||||||
# Start periodic error cleanup thread
|
# Start Download Worker
|
||||||
self.error_cleanup_thread = threading.Thread(target=self._run_periodic_error_cleanup, daemon=True)
|
if self.download_worker_process and self.download_worker_process.poll() is None:
|
||||||
self.error_cleanup_thread.start()
|
logger.info("Celery Download Worker is already running.")
|
||||||
|
else:
|
||||||
|
self.concurrency = get_config_params().get('maxConcurrentDownloads', self.concurrency)
|
||||||
|
download_cmd = self._get_worker_command(
|
||||||
|
queues="downloads",
|
||||||
|
concurrency=self.concurrency,
|
||||||
|
worker_name_suffix="dlw" # Download Worker
|
||||||
|
)
|
||||||
|
logger.info(f"Starting Celery Download Worker with command: {' '.join(download_cmd)}")
|
||||||
|
self.download_worker_process = subprocess.Popen(
|
||||||
|
download_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True
|
||||||
|
)
|
||||||
|
self.download_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stdout, "Celery[DW-STDOUT]"))
|
||||||
|
self.download_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stderr, "Celery[DW-STDERR]", True))
|
||||||
|
self.download_log_thread_stdout.start()
|
||||||
|
self.download_log_thread_stderr.start()
|
||||||
|
logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) started with concurrency {self.concurrency}.")
|
||||||
|
|
||||||
|
# Start Utility Worker
|
||||||
|
if self.utility_worker_process and self.utility_worker_process.poll() is None:
|
||||||
|
logger.info("Celery Utility Worker is already running.")
|
||||||
|
else:
|
||||||
|
utility_cmd = self._get_worker_command(
|
||||||
|
queues="utility_tasks,default", # Listen to utility and default
|
||||||
|
concurrency=3,
|
||||||
|
worker_name_suffix="utw" # Utility Worker
|
||||||
|
)
|
||||||
|
logger.info(f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}")
|
||||||
|
self.utility_worker_process = subprocess.Popen(
|
||||||
|
utility_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True
|
||||||
|
)
|
||||||
|
self.utility_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.utility_worker_process.stdout, "Celery[UW-STDOUT]"))
|
||||||
|
self.utility_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.utility_worker_process.stderr, "Celery[UW-STDERR]", True))
|
||||||
|
self.utility_log_thread_stdout.start()
|
||||||
|
self.utility_log_thread_stderr.start()
|
||||||
|
logger.info(f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency 3.")
|
||||||
|
|
||||||
|
if self.config_monitor_thread is None or not self.config_monitor_thread.is_alive():
|
||||||
|
self.config_monitor_thread = threading.Thread(target=self._monitor_config_changes)
|
||||||
|
self.config_monitor_thread.daemon = True # Allow main program to exit even if this thread is running
|
||||||
|
self.config_monitor_thread.start()
|
||||||
|
logger.info("CeleryManager: Config monitor thread started.")
|
||||||
|
else:
|
||||||
|
logger.info("CeleryManager: Config monitor thread already running.")
|
||||||
|
|
||||||
|
def _monitor_config_changes(self):
|
||||||
|
logger.info("CeleryManager: Config monitor thread active, monitoring configuration changes...")
|
||||||
|
while not self.stop_event.is_set():
|
||||||
|
try:
|
||||||
|
time.sleep(10) # Check every 10 seconds
|
||||||
|
if self.stop_event.is_set(): break
|
||||||
|
|
||||||
|
current_config = get_config_params()
|
||||||
|
new_max_concurrent_downloads = current_config.get('maxConcurrentDownloads', self.concurrency)
|
||||||
|
|
||||||
|
if new_max_concurrent_downloads != self.concurrency:
|
||||||
|
logger.info(f"CeleryManager: Detected change in maxConcurrentDownloads from {self.concurrency} to {new_max_concurrent_downloads}. Restarting download worker only.")
|
||||||
|
|
||||||
|
# Stop only the download worker
|
||||||
|
if self.download_worker_process and self.download_worker_process.poll() is None:
|
||||||
|
logger.info(f"Stopping Celery Download Worker (PID: {self.download_worker_process.pid}) for config update...")
|
||||||
|
self.download_worker_process.terminate()
|
||||||
|
try:
|
||||||
|
self.download_worker_process.wait(timeout=10)
|
||||||
|
logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) terminated.")
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
logger.warning(f"Celery Download Worker (PID: {self.download_worker_process.pid}) did not terminate gracefully, killing.")
|
||||||
|
self.download_worker_process.kill()
|
||||||
|
self.download_worker_process = None
|
||||||
|
|
||||||
|
# Wait for log threads of download worker to finish
|
||||||
|
if self.download_log_thread_stdout and self.download_log_thread_stdout.is_alive():
|
||||||
|
self.download_log_thread_stdout.join(timeout=5)
|
||||||
|
if self.download_log_thread_stderr and self.download_log_thread_stderr.is_alive():
|
||||||
|
self.download_log_thread_stderr.join(timeout=5)
|
||||||
|
|
||||||
|
self.concurrency = new_max_concurrent_downloads
|
||||||
|
|
||||||
|
# Restart only the download worker
|
||||||
|
download_cmd = self._get_worker_command("downloads", self.concurrency, "dlw")
|
||||||
|
logger.info(f"Restarting Celery Download Worker with command: {' '.join(download_cmd)}")
|
||||||
|
self.download_worker_process = subprocess.Popen(
|
||||||
|
download_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True
|
||||||
|
)
|
||||||
|
self.download_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stdout, "Celery[DW-STDOUT]"))
|
||||||
|
self.download_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stderr, "Celery[DW-STDERR]", True))
|
||||||
|
self.download_log_thread_stdout.start()
|
||||||
|
self.download_log_thread_stderr.start()
|
||||||
|
logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) restarted with new concurrency {self.concurrency}.")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"CeleryManager: Error in config monitor thread: {e}", exc_info=True)
|
||||||
|
# Avoid busy-looping on continuous errors
|
||||||
|
if not self.stop_event.is_set(): time.sleep(30)
|
||||||
|
logger.info("CeleryManager: Config monitor thread stopped.")
|
||||||
|
|
||||||
# Register shutdown handler
|
def _stop_worker_process(self, worker_process, worker_name):
|
||||||
atexit.register(self.stop)
|
if worker_process and worker_process.poll() is None:
|
||||||
|
logger.info(f"Terminating Celery {worker_name} Worker (PID: {worker_process.pid})...")
|
||||||
|
worker_process.terminate()
|
||||||
|
try:
|
||||||
|
worker_process.wait(timeout=10)
|
||||||
|
logger.info(f"Celery {worker_name} Worker (PID: {worker_process.pid}) terminated.")
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
logger.warning(f"Celery {worker_name} Worker (PID: {worker_process.pid}) did not terminate gracefully, killing.")
|
||||||
|
worker_process.kill()
|
||||||
|
return None # Set process to None after stopping
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""Stop the Celery manager and all workers"""
|
logger.info("CeleryManager: Stopping Celery workers...")
|
||||||
self.running = False
|
self.stop_event.set() # Signal all threads to stop
|
||||||
|
|
||||||
|
# Stop download worker
|
||||||
|
self.download_worker_process = self._stop_worker_process(self.download_worker_process, "Download")
|
||||||
|
|
||||||
# Stop all running threads
|
# Stop utility worker
|
||||||
for thread in self.output_threads:
|
self.utility_worker_process = self._stop_worker_process(self.utility_worker_process, "Utility")
|
||||||
if thread.is_alive():
|
|
||||||
# We can't really stop the threads, but they'll exit on their own
|
logger.info("Joining log threads...")
|
||||||
# when the process is terminated since they're daemon threads
|
thread_timeout = 5 # seconds to wait for log threads
|
||||||
pass
|
|
||||||
|
# Join download worker log threads
|
||||||
|
if self.download_log_thread_stdout and self.download_log_thread_stdout.is_alive():
|
||||||
|
self.download_log_thread_stdout.join(timeout=thread_timeout)
|
||||||
|
if self.download_log_thread_stderr and self.download_log_thread_stderr.is_alive():
|
||||||
|
self.download_log_thread_stderr.join(timeout=thread_timeout)
|
||||||
|
|
||||||
|
# Join utility worker log threads
|
||||||
|
if self.utility_log_thread_stdout and self.utility_log_thread_stdout.is_alive():
|
||||||
|
self.utility_log_thread_stdout.join(timeout=thread_timeout)
|
||||||
|
if self.utility_log_thread_stderr and self.utility_log_thread_stderr.is_alive():
|
||||||
|
self.utility_log_thread_stderr.join(timeout=thread_timeout)
|
||||||
|
|
||||||
|
if self.config_monitor_thread and self.config_monitor_thread.is_alive():
|
||||||
|
logger.info("Joining config_monitor_thread...")
|
||||||
|
self.config_monitor_thread.join(timeout=thread_timeout)
|
||||||
|
|
||||||
if self.celery_process:
|
logger.info("CeleryManager: All workers and threads signaled to stop and joined.")
|
||||||
logger.info("Stopping Celery workers...")
|
|
||||||
try:
|
def restart(self):
|
||||||
# Send SIGTERM to process group
|
logger.info("CeleryManager: Restarting all Celery workers...")
|
||||||
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM)
|
self.stop()
|
||||||
self.celery_process.wait(timeout=5)
|
# Short delay before restarting
|
||||||
except (subprocess.TimeoutExpired, ProcessLookupError):
|
logger.info("Waiting a brief moment before restarting workers...")
|
||||||
# Force kill if not terminated
|
time.sleep(2)
|
||||||
try:
|
self.start()
|
||||||
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL)
|
logger.info("CeleryManager: All Celery workers restarted.")
|
||||||
except ProcessLookupError:
|
|
||||||
pass
|
# Global instance for managing Celery workers
|
||||||
|
celery_manager = CeleryManager()
|
||||||
self.celery_process = None
|
|
||||||
self.current_worker_count = 0
|
# Example of how to use the manager (typically called from your main app script)
|
||||||
|
if __name__ == '__main__':
|
||||||
def _get_worker_count(self):
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [%(threadName)s] [%(name)s] - %(message)s')
|
||||||
"""Get the configured worker count from config file"""
|
logger.info("Starting Celery Manager example...")
|
||||||
try:
|
celery_manager.start()
|
||||||
if not Path(CONFIG_PATH).exists():
|
try:
|
||||||
return 3 # Default
|
while True:
|
||||||
|
|
||||||
with open(CONFIG_PATH, 'r') as f:
|
|
||||||
config = json.load(f)
|
|
||||||
|
|
||||||
return int(config.get('maxConcurrentDownloads', 3))
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error reading worker count from config: {e}")
|
|
||||||
return 3 # Default on error
|
|
||||||
|
|
||||||
def _update_workers(self):
|
|
||||||
"""Update workers if needed based on configuration"""
|
|
||||||
new_worker_count = self._get_worker_count()
|
|
||||||
|
|
||||||
if new_worker_count == self.current_worker_count and self.celery_process and self.celery_process.poll() is None:
|
|
||||||
return # No change and process is running
|
|
||||||
|
|
||||||
logger.info(f"Updating Celery workers from {self.current_worker_count} to {new_worker_count}")
|
|
||||||
|
|
||||||
# Stop existing workers if running
|
|
||||||
if self.celery_process:
|
|
||||||
try:
|
|
||||||
logger.info("Stopping existing Celery workers...")
|
|
||||||
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM)
|
|
||||||
self.celery_process.wait(timeout=5)
|
|
||||||
except (subprocess.TimeoutExpired, ProcessLookupError):
|
|
||||||
try:
|
|
||||||
logger.warning("Forcibly killing Celery workers with SIGKILL")
|
|
||||||
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL)
|
|
||||||
except ProcessLookupError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Clear output threads list
|
|
||||||
self.output_threads = []
|
|
||||||
|
|
||||||
# Wait a moment to ensure processes are terminated
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
# Additional cleanup - find and kill any stray Celery processes
|
|
||||||
try:
|
|
||||||
# This runs a shell command to find and kill all celery processes
|
|
||||||
subprocess.run(
|
|
||||||
"ps aux | grep 'celery -A routes.utils.celery_tasks.celery_app worker' | grep -v grep | awk '{print $2}' | xargs -r kill -9",
|
|
||||||
shell=True,
|
|
||||||
stderr=subprocess.PIPE
|
|
||||||
)
|
|
||||||
logger.info("Killed any stray Celery processes")
|
|
||||||
|
|
||||||
# Wait a moment to ensure processes are terminated
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
except Exception as e:
|
except KeyboardInterrupt:
|
||||||
logger.error(f"Error during stray process cleanup: {e}")
|
logger.info("Keyboard interrupt received, stopping Celery Manager...")
|
||||||
|
finally:
|
||||||
# Start new workers with updated concurrency
|
celery_manager.stop()
|
||||||
try:
|
logger.info("Celery Manager example finished.")
|
||||||
# Set environment variables to configure Celery logging
|
|
||||||
env = os.environ.copy()
|
|
||||||
env['PYTHONUNBUFFERED'] = '1' # Ensure Python output is unbuffered
|
|
||||||
|
|
||||||
# Construct command with extra logging options
|
|
||||||
cmd = [
|
|
||||||
'celery',
|
|
||||||
'-A', CELERY_APP,
|
|
||||||
'worker',
|
|
||||||
'--loglevel=info',
|
|
||||||
f'--concurrency={new_worker_count}',
|
|
||||||
'-Q', 'downloads,default',
|
|
||||||
'--logfile=-', # Output logs to stdout
|
|
||||||
'--without-heartbeat', # Reduce log noise
|
|
||||||
'--without-gossip', # Reduce log noise
|
|
||||||
'--without-mingle', # Reduce log noise
|
|
||||||
# Add unique worker name to prevent conflicts
|
|
||||||
f'--hostname=worker@%h-{uuid.uuid4()}'
|
|
||||||
]
|
|
||||||
|
|
||||||
logger.info(f"Starting new Celery workers with command: {' '.join(cmd)}")
|
|
||||||
|
|
||||||
self.celery_process = subprocess.Popen(
|
|
||||||
cmd,
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.PIPE,
|
|
||||||
env=env,
|
|
||||||
preexec_fn=os.setsid, # New process group for clean termination
|
|
||||||
universal_newlines=True,
|
|
||||||
bufsize=1 # Line buffered
|
|
||||||
)
|
|
||||||
|
|
||||||
self.current_worker_count = new_worker_count
|
|
||||||
logger.info(f"Started Celery workers with concurrency {new_worker_count}, PID: {self.celery_process.pid}")
|
|
||||||
|
|
||||||
# Verify the process started correctly
|
|
||||||
time.sleep(2)
|
|
||||||
if self.celery_process.poll() is not None:
|
|
||||||
# Process exited prematurely
|
|
||||||
stdout, stderr = "", ""
|
|
||||||
try:
|
|
||||||
stdout, stderr = self.celery_process.communicate(timeout=1)
|
|
||||||
except subprocess.TimeoutExpired:
|
|
||||||
pass
|
|
||||||
|
|
||||||
logger.error(f"Celery workers failed to start. Exit code: {self.celery_process.poll()}")
|
|
||||||
logger.error(f"Stdout: {stdout}")
|
|
||||||
logger.error(f"Stderr: {stderr}")
|
|
||||||
self.celery_process = None
|
|
||||||
raise RuntimeError("Celery workers failed to start")
|
|
||||||
|
|
||||||
# Start non-blocking output reader threads for both stdout and stderr
|
|
||||||
stdout_thread = threading.Thread(
|
|
||||||
target=self._process_output_reader,
|
|
||||||
args=(self.celery_process.stdout, "STDOUT"),
|
|
||||||
daemon=True
|
|
||||||
)
|
|
||||||
stdout_thread.start()
|
|
||||||
self.output_threads.append(stdout_thread)
|
|
||||||
|
|
||||||
stderr_thread = threading.Thread(
|
|
||||||
target=self._process_output_reader,
|
|
||||||
args=(self.celery_process.stderr, "STDERR"),
|
|
||||||
daemon=True
|
|
||||||
)
|
|
||||||
stderr_thread.start()
|
|
||||||
self.output_threads.append(stderr_thread)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error starting Celery workers: {e}")
|
|
||||||
# In case of failure, make sure we don't leave orphaned processes
|
|
||||||
if self.celery_process and self.celery_process.poll() is None:
|
|
||||||
try:
|
|
||||||
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL)
|
|
||||||
except (ProcessLookupError, OSError):
|
|
||||||
pass
|
|
||||||
self.celery_process = None
|
|
||||||
|
|
||||||
def _process_output_reader(self, pipe, stream_name):
|
|
||||||
"""Read and log output from the process"""
|
|
||||||
try:
|
|
||||||
for line in iter(pipe.readline, ''):
|
|
||||||
if not line:
|
|
||||||
break
|
|
||||||
|
|
||||||
line = line.strip()
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Format the message to identify it's from Celery
|
|
||||||
if "ERROR" in line or "CRITICAL" in line:
|
|
||||||
logger.error(f"Celery[{stream_name}]: {line}")
|
|
||||||
elif "WARNING" in line:
|
|
||||||
logger.warning(f"Celery[{stream_name}]: {line}")
|
|
||||||
elif "DEBUG" in line:
|
|
||||||
logger.debug(f"Celery[{stream_name}]: {line}")
|
|
||||||
else:
|
|
||||||
logger.info(f"Celery[{stream_name}]: {line}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error processing Celery output: {e}")
|
|
||||||
finally:
|
|
||||||
pipe.close()
|
|
||||||
|
|
||||||
def _monitor_config(self):
|
|
||||||
"""Monitor configuration file for changes"""
|
|
||||||
logger.info("Starting config monitoring thread")
|
|
||||||
last_check_time = 0
|
|
||||||
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
# Check for changes
|
|
||||||
if time.time() - last_check_time >= CONFIG_CHECK_INTERVAL:
|
|
||||||
self._update_workers()
|
|
||||||
last_check_time = time.time()
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error in config monitoring thread: {e}")
|
|
||||||
time.sleep(5) # Wait before retrying
|
|
||||||
|
|
||||||
def _run_periodic_error_cleanup(self):
|
|
||||||
"""Periodically triggers the cleanup_stale_errors Celery task."""
|
|
||||||
cleanup_interval = 60 # Run cleanup task every 60 seconds
|
|
||||||
logger.info(f"Starting periodic error cleanup scheduler (runs every {cleanup_interval}s).")
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
logger.info("Scheduling cleanup_stale_errors task...")
|
|
||||||
cleanup_stale_errors.delay() # Call the Celery task
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error scheduling cleanup_stale_errors task: {e}", exc_info=True)
|
|
||||||
|
|
||||||
# Wait for the next interval
|
|
||||||
# Use a loop to check self.running more frequently to allow faster shutdown
|
|
||||||
for _ in range(cleanup_interval):
|
|
||||||
if not self.running:
|
|
||||||
break
|
|
||||||
time.sleep(1)
|
|
||||||
logger.info("Periodic error cleanup scheduler stopped.")
|
|
||||||
|
|
||||||
# Create single instance
|
|
||||||
celery_manager = CeleryManager()
|
|
||||||
@@ -931,11 +931,15 @@ def task_prerun_handler(task_id=None, task=None, *args, **kwargs):
|
|||||||
def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args, **kwargs):
|
def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args, **kwargs):
|
||||||
"""Signal handler when a task finishes"""
|
"""Signal handler when a task finishes"""
|
||||||
try:
|
try:
|
||||||
|
# Define download task names
|
||||||
|
download_task_names = ["download_track", "download_album", "download_playlist"]
|
||||||
|
|
||||||
last_status_for_history = get_last_task_status(task_id)
|
last_status_for_history = get_last_task_status(task_id)
|
||||||
if last_status_for_history and last_status_for_history.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, "ERROR_RETRIED", "ERROR_AUTO_CLEANED"]:
|
if last_status_for_history and last_status_for_history.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, "ERROR_RETRIED", "ERROR_AUTO_CLEANED"]:
|
||||||
if state == states.REVOKED and last_status_for_history.get("status") != ProgressState.CANCELLED:
|
if state == states.REVOKED and last_status_for_history.get("status") != ProgressState.CANCELLED:
|
||||||
logger.info(f"Task {task_id} was REVOKED (likely cancelled), logging to history.")
|
logger.info(f"Task {task_id} was REVOKED (likely cancelled), logging to history.")
|
||||||
_log_task_to_history(task_id, 'CANCELLED', "Task was revoked/cancelled.")
|
if task and task.name in download_task_names: # Check if it's a download task
|
||||||
|
_log_task_to_history(task_id, 'CANCELLED', "Task was revoked/cancelled.")
|
||||||
# return # Let status update proceed if necessary
|
# return # Let status update proceed if necessary
|
||||||
|
|
||||||
task_info = get_task_info(task_id)
|
task_info = get_task_info(task_id)
|
||||||
@@ -952,7 +956,8 @@ def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args
|
|||||||
"message": "Download completed successfully."
|
"message": "Download completed successfully."
|
||||||
})
|
})
|
||||||
logger.info(f"Task {task_id} completed successfully: {task_info.get('name', 'Unknown')}")
|
logger.info(f"Task {task_id} completed successfully: {task_info.get('name', 'Unknown')}")
|
||||||
_log_task_to_history(task_id, 'COMPLETED')
|
if task and task.name in download_task_names: # Check if it's a download task
|
||||||
|
_log_task_to_history(task_id, 'COMPLETED')
|
||||||
|
|
||||||
if task_info.get("download_type") == "track": # Applies to single track downloads and tracks from playlists/albums
|
if task_info.get("download_type") == "track": # Applies to single track downloads and tracks from playlists/albums
|
||||||
delayed_delete_task_data.apply_async(
|
delayed_delete_task_data.apply_async(
|
||||||
@@ -999,12 +1004,15 @@ def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args
|
|||||||
logger.error(f"Error in task_postrun_handler: {e}", exc_info=True)
|
logger.error(f"Error in task_postrun_handler: {e}", exc_info=True)
|
||||||
|
|
||||||
@task_failure.connect
|
@task_failure.connect
|
||||||
def task_failure_handler(task_id=None, exception=None, traceback=None, *args, **kwargs):
|
def task_failure_handler(task_id=None, exception=None, traceback=None, sender=None, *args, **kwargs):
|
||||||
"""Signal handler when a task fails"""
|
"""Signal handler when a task fails"""
|
||||||
try:
|
try:
|
||||||
# Skip if Retry exception
|
# Skip if Retry exception
|
||||||
if isinstance(exception, Retry):
|
if isinstance(exception, Retry):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Define download task names
|
||||||
|
download_task_names = ["download_track", "download_album", "download_playlist"]
|
||||||
|
|
||||||
# Get task info and status
|
# Get task info and status
|
||||||
task_info = get_task_info(task_id)
|
task_info = get_task_info(task_id)
|
||||||
@@ -1038,7 +1046,8 @@ def task_failure_handler(task_id=None, exception=None, traceback=None, *args, **
|
|||||||
})
|
})
|
||||||
|
|
||||||
logger.error(f"Task {task_id} failed: {str(exception)}")
|
logger.error(f"Task {task_id} failed: {str(exception)}")
|
||||||
_log_task_to_history(task_id, 'ERROR', str(exception))
|
if sender and sender.name in download_task_names: # Check if it's a download task
|
||||||
|
_log_task_to_history(task_id, 'ERROR', str(exception))
|
||||||
|
|
||||||
if can_retry:
|
if can_retry:
|
||||||
logger.info(f"Task {task_id} can be retried ({retry_count}/{max_retries})")
|
logger.info(f"Task {task_id} can be retried ({retry_count}/{max_retries})")
|
||||||
@@ -1346,7 +1355,7 @@ def delete_task_data_and_log(task_id, reason="Task data deleted"):
|
|||||||
logger.error(f"Error deleting task data for {task_id}: {e}", exc_info=True)
|
logger.error(f"Error deleting task data for {task_id}: {e}", exc_info=True)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@celery_app.task(name="cleanup_stale_errors", queue="default") # Put on default queue, not downloads
|
@celery_app.task(name="cleanup_stale_errors", queue="utility_tasks") # Put on utility_tasks queue
|
||||||
def cleanup_stale_errors():
|
def cleanup_stale_errors():
|
||||||
"""
|
"""
|
||||||
Periodically checks for tasks in ERROR state for more than 1 minute and cleans them up.
|
Periodically checks for tasks in ERROR state for more than 1 minute and cleans them up.
|
||||||
@@ -1385,7 +1394,7 @@ def cleanup_stale_errors():
|
|||||||
logger.error(f"Error during cleanup_stale_errors: {e}", exc_info=True)
|
logger.error(f"Error during cleanup_stale_errors: {e}", exc_info=True)
|
||||||
return {"status": "error", "error": str(e)}
|
return {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
@celery_app.task(name="delayed_delete_task_data", queue="default") # Use default queue for utility tasks
|
@celery_app.task(name="delayed_delete_task_data", queue="utility_tasks") # Use utility_tasks queue
|
||||||
def delayed_delete_task_data(task_id, reason):
|
def delayed_delete_task_data(task_id, reason):
|
||||||
"""
|
"""
|
||||||
Celery task to delete task data after a delay.
|
Celery task to delete task data after a delay.
|
||||||
|
|||||||
Reference in New Issue
Block a user