fix: celery fails to start
This commit is contained in:
@@ -19,29 +19,26 @@ from .celery_tasks import (
|
||||
store_task_status,
|
||||
get_all_tasks as get_all_celery_tasks_info,
|
||||
cleanup_stale_errors,
|
||||
delayed_delete_task_data
|
||||
delayed_delete_task_data,
|
||||
)
|
||||
from .celery_config import get_config_params, MAX_CONCURRENT_DL
|
||||
# Import history manager
|
||||
from .history_manager import init_history_db
|
||||
# Import credentials manager for DB init
|
||||
from .credentials import init_credentials_db
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Configuration
|
||||
CONFIG_PATH = './data/config/main.json'
|
||||
CELERY_APP = 'routes.utils.celery_tasks.celery_app'
|
||||
CONFIG_PATH = "./data/config/main.json"
|
||||
CELERY_APP = "routes.utils.celery_tasks.celery_app"
|
||||
CELERY_PROCESS = None
|
||||
CONFIG_CHECK_INTERVAL = 30 # seconds
|
||||
|
||||
|
||||
class CeleryManager:
|
||||
"""
|
||||
Manages Celery workers dynamically based on configuration changes.
|
||||
"""
|
||||
|
||||
def __init__(self, app_name="download_tasks"):
|
||||
|
||||
def __init__(self, app_name="routes.utils.celery_tasks"):
|
||||
self.app_name = app_name
|
||||
self.download_worker_process = None
|
||||
self.utility_worker_process = None
|
||||
@@ -52,22 +49,31 @@ class CeleryManager:
|
||||
self.stop_event = threading.Event()
|
||||
self.config_monitor_thread = None
|
||||
# self.concurrency now specifically refers to download worker concurrency
|
||||
self.concurrency = get_config_params().get('maxConcurrentDownloads', MAX_CONCURRENT_DL)
|
||||
logger.info(f"CeleryManager initialized. Download concurrency set to: {self.concurrency}")
|
||||
|
||||
def _get_worker_command(self, queues, concurrency, worker_name_suffix, log_level="INFO"):
|
||||
self.concurrency = get_config_params().get(
|
||||
"maxConcurrentDownloads", MAX_CONCURRENT_DL
|
||||
)
|
||||
logger.info(
|
||||
f"CeleryManager initialized. Download concurrency set to: {self.concurrency}"
|
||||
)
|
||||
|
||||
def _get_worker_command(
|
||||
self, queues, concurrency, worker_name_suffix, log_level="INFO"
|
||||
):
|
||||
# Use a unique worker name to avoid conflicts.
|
||||
# %h is replaced by celery with the actual hostname.
|
||||
hostname = f"worker_{worker_name_suffix}@%h"
|
||||
command = [
|
||||
"celery",
|
||||
"-A", self.app_name,
|
||||
"-A",
|
||||
self.app_name,
|
||||
"worker",
|
||||
"--loglevel=" + log_level,
|
||||
"-Q", queues,
|
||||
"-c", str(concurrency),
|
||||
"-Q",
|
||||
queues,
|
||||
"-c",
|
||||
str(concurrency),
|
||||
"--hostname=" + hostname,
|
||||
"--pool=prefork"
|
||||
"--pool=prefork",
|
||||
]
|
||||
# Optionally add --without-gossip, --without-mingle, --without-heartbeat
|
||||
# if experiencing issues or to reduce network load, but defaults are usually fine.
|
||||
@@ -78,155 +84,265 @@ class CeleryManager:
|
||||
def _process_output_reader(self, stream, log_prefix, error=False):
|
||||
logger.debug(f"Log reader thread started for {log_prefix}")
|
||||
try:
|
||||
for line in iter(stream.readline, ''):
|
||||
for line in iter(stream.readline, ""):
|
||||
if line:
|
||||
log_method = logger.error if error else logger.info
|
||||
log_method(f"{log_prefix}: {line.strip()}")
|
||||
elif self.stop_event.is_set(): # If empty line and stop is set, likely EOF
|
||||
elif (
|
||||
self.stop_event.is_set()
|
||||
): # If empty line and stop is set, likely EOF
|
||||
break
|
||||
# Loop may also exit if stream is closed by process termination
|
||||
except ValueError: #ValueError: I/O operation on closed file
|
||||
except ValueError: # ValueError: I/O operation on closed file
|
||||
if not self.stop_event.is_set():
|
||||
logger.error(f"Error reading Celery output from {log_prefix} (ValueError - stream closed unexpectedly?)", exc_info=False) # Don't print full trace for common close error
|
||||
logger.error(
|
||||
f"Error reading Celery output from {log_prefix} (ValueError - stream closed unexpectedly?)",
|
||||
exc_info=False,
|
||||
) # Don't print full trace for common close error
|
||||
else:
|
||||
logger.info(f"{log_prefix} stream reader gracefully stopped due to closed stream after stop signal.")
|
||||
logger.info(
|
||||
f"{log_prefix} stream reader gracefully stopped due to closed stream after stop signal."
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in log reader for {log_prefix}: {e}", exc_info=True)
|
||||
logger.error(
|
||||
f"Unexpected error in log reader for {log_prefix}: {e}", exc_info=True
|
||||
)
|
||||
finally:
|
||||
if hasattr(stream, 'close') and not stream.closed:
|
||||
if hasattr(stream, "close") and not stream.closed:
|
||||
stream.close()
|
||||
logger.info(f"{log_prefix} stream reader thread finished.")
|
||||
|
||||
def start(self):
|
||||
self.stop_event.clear() # Clear stop event before starting
|
||||
self.stop_event.clear() # Clear stop event before starting
|
||||
|
||||
# Start Download Worker
|
||||
if self.download_worker_process and self.download_worker_process.poll() is None:
|
||||
logger.info("Celery Download Worker is already running.")
|
||||
else:
|
||||
self.concurrency = get_config_params().get('maxConcurrentDownloads', self.concurrency)
|
||||
self.concurrency = get_config_params().get(
|
||||
"maxConcurrentDownloads", self.concurrency
|
||||
)
|
||||
download_cmd = self._get_worker_command(
|
||||
queues="downloads",
|
||||
concurrency=self.concurrency,
|
||||
worker_name_suffix="dlw" # Download Worker
|
||||
worker_name_suffix="dlw", # Download Worker
|
||||
)
|
||||
logger.info(
|
||||
f"Starting Celery Download Worker with command: {' '.join(download_cmd)}"
|
||||
)
|
||||
logger.info(f"Starting Celery Download Worker with command: {' '.join(download_cmd)}")
|
||||
self.download_worker_process = subprocess.Popen(
|
||||
download_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True
|
||||
download_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
universal_newlines=True,
|
||||
)
|
||||
self.download_log_thread_stdout = threading.Thread(
|
||||
target=self._process_output_reader,
|
||||
args=(self.download_worker_process.stdout, "Celery[DW-STDOUT]"),
|
||||
)
|
||||
self.download_log_thread_stderr = threading.Thread(
|
||||
target=self._process_output_reader,
|
||||
args=(self.download_worker_process.stderr, "Celery[DW-STDERR]", True),
|
||||
)
|
||||
self.download_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stdout, "Celery[DW-STDOUT]"))
|
||||
self.download_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stderr, "Celery[DW-STDERR]", True))
|
||||
self.download_log_thread_stdout.start()
|
||||
self.download_log_thread_stderr.start()
|
||||
logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) started with concurrency {self.concurrency}.")
|
||||
logger.info(
|
||||
f"Celery Download Worker (PID: {self.download_worker_process.pid}) started with concurrency {self.concurrency}."
|
||||
)
|
||||
|
||||
# Start Utility Worker
|
||||
if self.utility_worker_process and self.utility_worker_process.poll() is None:
|
||||
logger.info("Celery Utility Worker is already running.")
|
||||
else:
|
||||
utility_cmd = self._get_worker_command(
|
||||
queues="utility_tasks,default", # Listen to utility and default
|
||||
queues="utility_tasks,default", # Listen to utility and default
|
||||
concurrency=3,
|
||||
worker_name_suffix="utw" # Utility Worker
|
||||
worker_name_suffix="utw", # Utility Worker
|
||||
)
|
||||
logger.info(
|
||||
f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}"
|
||||
)
|
||||
logger.info(f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}")
|
||||
self.utility_worker_process = subprocess.Popen(
|
||||
utility_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True
|
||||
utility_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
universal_newlines=True,
|
||||
)
|
||||
self.utility_log_thread_stdout = threading.Thread(
|
||||
target=self._process_output_reader,
|
||||
args=(self.utility_worker_process.stdout, "Celery[UW-STDOUT]"),
|
||||
)
|
||||
self.utility_log_thread_stderr = threading.Thread(
|
||||
target=self._process_output_reader,
|
||||
args=(self.utility_worker_process.stderr, "Celery[UW-STDERR]", True),
|
||||
)
|
||||
self.utility_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.utility_worker_process.stdout, "Celery[UW-STDOUT]"))
|
||||
self.utility_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.utility_worker_process.stderr, "Celery[UW-STDERR]", True))
|
||||
self.utility_log_thread_stdout.start()
|
||||
self.utility_log_thread_stderr.start()
|
||||
logger.info(f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency 3.")
|
||||
logger.info(
|
||||
f"Celery Utility Worker (PID: {self.utility_worker_process.pid}) started with concurrency 3."
|
||||
)
|
||||
|
||||
if self.config_monitor_thread is None or not self.config_monitor_thread.is_alive():
|
||||
self.config_monitor_thread = threading.Thread(target=self._monitor_config_changes)
|
||||
self.config_monitor_thread.daemon = True # Allow main program to exit even if this thread is running
|
||||
if (
|
||||
self.config_monitor_thread is None
|
||||
or not self.config_monitor_thread.is_alive()
|
||||
):
|
||||
self.config_monitor_thread = threading.Thread(
|
||||
target=self._monitor_config_changes
|
||||
)
|
||||
self.config_monitor_thread.daemon = (
|
||||
True # Allow main program to exit even if this thread is running
|
||||
)
|
||||
self.config_monitor_thread.start()
|
||||
logger.info("CeleryManager: Config monitor thread started.")
|
||||
else:
|
||||
logger.info("CeleryManager: Config monitor thread already running.")
|
||||
|
||||
def _monitor_config_changes(self):
|
||||
logger.info("CeleryManager: Config monitor thread active, monitoring configuration changes...")
|
||||
logger.info(
|
||||
"CeleryManager: Config monitor thread active, monitoring configuration changes..."
|
||||
)
|
||||
while not self.stop_event.is_set():
|
||||
try:
|
||||
time.sleep(10) # Check every 10 seconds
|
||||
if self.stop_event.is_set(): break
|
||||
if self.stop_event.is_set():
|
||||
break
|
||||
|
||||
current_config = get_config_params()
|
||||
new_max_concurrent_downloads = current_config.get('maxConcurrentDownloads', self.concurrency)
|
||||
new_max_concurrent_downloads = current_config.get(
|
||||
"maxConcurrentDownloads", self.concurrency
|
||||
)
|
||||
|
||||
if new_max_concurrent_downloads != self.concurrency:
|
||||
logger.info(f"CeleryManager: Detected change in maxConcurrentDownloads from {self.concurrency} to {new_max_concurrent_downloads}. Restarting download worker only.")
|
||||
|
||||
logger.info(
|
||||
f"CeleryManager: Detected change in maxConcurrentDownloads from {self.concurrency} to {new_max_concurrent_downloads}. Restarting download worker only."
|
||||
)
|
||||
|
||||
# Stop only the download worker
|
||||
if self.download_worker_process and self.download_worker_process.poll() is None:
|
||||
logger.info(f"Stopping Celery Download Worker (PID: {self.download_worker_process.pid}) for config update...")
|
||||
if (
|
||||
self.download_worker_process
|
||||
and self.download_worker_process.poll() is None
|
||||
):
|
||||
logger.info(
|
||||
f"Stopping Celery Download Worker (PID: {self.download_worker_process.pid}) for config update..."
|
||||
)
|
||||
self.download_worker_process.terminate()
|
||||
try:
|
||||
self.download_worker_process.wait(timeout=10)
|
||||
logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) terminated.")
|
||||
logger.info(
|
||||
f"Celery Download Worker (PID: {self.download_worker_process.pid}) terminated."
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"Celery Download Worker (PID: {self.download_worker_process.pid}) did not terminate gracefully, killing.")
|
||||
logger.warning(
|
||||
f"Celery Download Worker (PID: {self.download_worker_process.pid}) did not terminate gracefully, killing."
|
||||
)
|
||||
self.download_worker_process.kill()
|
||||
self.download_worker_process = None
|
||||
|
||||
|
||||
# Wait for log threads of download worker to finish
|
||||
if self.download_log_thread_stdout and self.download_log_thread_stdout.is_alive():
|
||||
if (
|
||||
self.download_log_thread_stdout
|
||||
and self.download_log_thread_stdout.is_alive()
|
||||
):
|
||||
self.download_log_thread_stdout.join(timeout=5)
|
||||
if self.download_log_thread_stderr and self.download_log_thread_stderr.is_alive():
|
||||
if (
|
||||
self.download_log_thread_stderr
|
||||
and self.download_log_thread_stderr.is_alive()
|
||||
):
|
||||
self.download_log_thread_stderr.join(timeout=5)
|
||||
|
||||
self.concurrency = new_max_concurrent_downloads
|
||||
|
||||
|
||||
# Restart only the download worker
|
||||
download_cmd = self._get_worker_command("downloads", self.concurrency, "dlw")
|
||||
logger.info(f"Restarting Celery Download Worker with command: {' '.join(download_cmd)}")
|
||||
self.download_worker_process = subprocess.Popen(
|
||||
download_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True
|
||||
download_cmd = self._get_worker_command(
|
||||
"downloads", self.concurrency, "dlw"
|
||||
)
|
||||
logger.info(
|
||||
f"Restarting Celery Download Worker with command: {' '.join(download_cmd)}"
|
||||
)
|
||||
self.download_worker_process = subprocess.Popen(
|
||||
download_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
universal_newlines=True,
|
||||
)
|
||||
self.download_log_thread_stdout = threading.Thread(
|
||||
target=self._process_output_reader,
|
||||
args=(self.download_worker_process.stdout, "Celery[DW-STDOUT]"),
|
||||
)
|
||||
self.download_log_thread_stderr = threading.Thread(
|
||||
target=self._process_output_reader,
|
||||
args=(
|
||||
self.download_worker_process.stderr,
|
||||
"Celery[DW-STDERR]",
|
||||
True,
|
||||
),
|
||||
)
|
||||
self.download_log_thread_stdout = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stdout, "Celery[DW-STDOUT]"))
|
||||
self.download_log_thread_stderr = threading.Thread(target=self._process_output_reader, args=(self.download_worker_process.stderr, "Celery[DW-STDERR]", True))
|
||||
self.download_log_thread_stdout.start()
|
||||
self.download_log_thread_stderr.start()
|
||||
logger.info(f"Celery Download Worker (PID: {self.download_worker_process.pid}) restarted with new concurrency {self.concurrency}.")
|
||||
logger.info(
|
||||
f"Celery Download Worker (PID: {self.download_worker_process.pid}) restarted with new concurrency {self.concurrency}."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"CeleryManager: Error in config monitor thread: {e}", exc_info=True)
|
||||
logger.error(
|
||||
f"CeleryManager: Error in config monitor thread: {e}", exc_info=True
|
||||
)
|
||||
# Avoid busy-looping on continuous errors
|
||||
if not self.stop_event.is_set(): time.sleep(30)
|
||||
if not self.stop_event.is_set():
|
||||
time.sleep(30)
|
||||
logger.info("CeleryManager: Config monitor thread stopped.")
|
||||
|
||||
|
||||
def _stop_worker_process(self, worker_process, worker_name):
|
||||
if worker_process and worker_process.poll() is None:
|
||||
logger.info(f"Terminating Celery {worker_name} Worker (PID: {worker_process.pid})...")
|
||||
logger.info(
|
||||
f"Terminating Celery {worker_name} Worker (PID: {worker_process.pid})..."
|
||||
)
|
||||
worker_process.terminate()
|
||||
try:
|
||||
worker_process.wait(timeout=10)
|
||||
logger.info(f"Celery {worker_name} Worker (PID: {worker_process.pid}) terminated.")
|
||||
logger.info(
|
||||
f"Celery {worker_name} Worker (PID: {worker_process.pid}) terminated."
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"Celery {worker_name} Worker (PID: {worker_process.pid}) did not terminate gracefully, killing.")
|
||||
logger.warning(
|
||||
f"Celery {worker_name} Worker (PID: {worker_process.pid}) did not terminate gracefully, killing."
|
||||
)
|
||||
worker_process.kill()
|
||||
return None # Set process to None after stopping
|
||||
return None # Set process to None after stopping
|
||||
|
||||
def stop(self):
|
||||
logger.info("CeleryManager: Stopping Celery workers...")
|
||||
self.stop_event.set() # Signal all threads to stop
|
||||
self.stop_event.set() # Signal all threads to stop
|
||||
|
||||
# Stop download worker
|
||||
self.download_worker_process = self._stop_worker_process(self.download_worker_process, "Download")
|
||||
|
||||
self.download_worker_process = self._stop_worker_process(
|
||||
self.download_worker_process, "Download"
|
||||
)
|
||||
|
||||
# Stop utility worker
|
||||
self.utility_worker_process = self._stop_worker_process(self.utility_worker_process, "Utility")
|
||||
self.utility_worker_process = self._stop_worker_process(
|
||||
self.utility_worker_process, "Utility"
|
||||
)
|
||||
|
||||
logger.info("Joining log threads...")
|
||||
thread_timeout = 5 # seconds to wait for log threads
|
||||
thread_timeout = 5 # seconds to wait for log threads
|
||||
|
||||
# Join download worker log threads
|
||||
if self.download_log_thread_stdout and self.download_log_thread_stdout.is_alive():
|
||||
if (
|
||||
self.download_log_thread_stdout
|
||||
and self.download_log_thread_stdout.is_alive()
|
||||
):
|
||||
self.download_log_thread_stdout.join(timeout=thread_timeout)
|
||||
if self.download_log_thread_stderr and self.download_log_thread_stderr.is_alive():
|
||||
if (
|
||||
self.download_log_thread_stderr
|
||||
and self.download_log_thread_stderr.is_alive()
|
||||
):
|
||||
self.download_log_thread_stderr.join(timeout=thread_timeout)
|
||||
|
||||
# Join utility worker log threads
|
||||
@@ -238,24 +354,30 @@ class CeleryManager:
|
||||
if self.config_monitor_thread and self.config_monitor_thread.is_alive():
|
||||
logger.info("Joining config_monitor_thread...")
|
||||
self.config_monitor_thread.join(timeout=thread_timeout)
|
||||
|
||||
logger.info("CeleryManager: All workers and threads signaled to stop and joined.")
|
||||
|
||||
logger.info(
|
||||
"CeleryManager: All workers and threads signaled to stop and joined."
|
||||
)
|
||||
|
||||
def restart(self):
|
||||
logger.info("CeleryManager: Restarting all Celery workers...")
|
||||
self.stop()
|
||||
# Short delay before restarting
|
||||
logger.info("Waiting a brief moment before restarting workers...")
|
||||
time.sleep(2)
|
||||
time.sleep(2)
|
||||
self.start()
|
||||
logger.info("CeleryManager: All Celery workers restarted.")
|
||||
|
||||
|
||||
# Global instance for managing Celery workers
|
||||
celery_manager = CeleryManager()
|
||||
|
||||
# Example of how to use the manager (typically called from your main app script)
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [%(threadName)s] [%(name)s] - %(message)s')
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] [%(threadName)s] [%(name)s] - %(message)s",
|
||||
)
|
||||
logger.info("Starting Celery Manager example...")
|
||||
celery_manager.start()
|
||||
try:
|
||||
@@ -265,4 +387,4 @@ if __name__ == '__main__':
|
||||
logger.info("Keyboard interrupt received, stopping Celery Manager...")
|
||||
finally:
|
||||
celery_manager.stop()
|
||||
logger.info("Celery Manager example finished.")
|
||||
logger.info("Celery Manager example finished.")
|
||||
|
||||
Reference in New Issue
Block a user