unlimitted logging!!!!
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -31,4 +31,5 @@ config/state/queue_state.json
|
|||||||
output.log
|
output.log
|
||||||
queue_state.json
|
queue_state.json
|
||||||
search_demo.py
|
search_demo.py
|
||||||
celery_worker.log
|
celery_worker.log
|
||||||
|
logs/spotizerr.log
|
||||||
|
|||||||
120
app.py
120
app.py
@@ -9,38 +9,72 @@ from routes.prgs import prgs_bp
|
|||||||
from routes.config import config_bp
|
from routes.config import config_bp
|
||||||
from routes.artist import artist_bp
|
from routes.artist import artist_bp
|
||||||
import logging
|
import logging
|
||||||
|
import logging.handlers
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import os
|
import os
|
||||||
import argparse
|
import atexit
|
||||||
|
import sys
|
||||||
|
|
||||||
# Import Celery configuration
|
# Import Celery configuration and manager
|
||||||
try:
|
from routes.utils.celery_tasks import celery_app
|
||||||
from routes.utils.celery_tasks import celery_app
|
from routes.utils.celery_manager import celery_manager
|
||||||
has_celery = True
|
|
||||||
except ImportError:
|
# Configure application-wide logging
|
||||||
has_celery = False
|
def setup_logging():
|
||||||
|
"""Configure application-wide logging with rotation"""
|
||||||
|
# Create logs directory if it doesn't exist
|
||||||
|
logs_dir = Path('logs')
|
||||||
|
logs_dir.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
# Set up log file paths
|
||||||
|
main_log = logs_dir / 'spotizerr.log'
|
||||||
|
|
||||||
|
# Configure root logger
|
||||||
|
root_logger = logging.getLogger()
|
||||||
|
root_logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
# Log formatting
|
||||||
|
log_format = logging.Formatter(
|
||||||
|
'%(asctime)s [%(processName)s:%(threadName)s] [%(name)s] [%(levelname)s] - %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S'
|
||||||
|
)
|
||||||
|
|
||||||
|
# File handler with rotation (10 MB max, keep 5 backups)
|
||||||
|
file_handler = logging.handlers.RotatingFileHandler(
|
||||||
|
main_log, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
|
||||||
|
)
|
||||||
|
file_handler.setFormatter(log_format)
|
||||||
|
file_handler.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
# Console handler for stderr
|
||||||
|
console_handler = logging.StreamHandler(sys.stderr)
|
||||||
|
console_handler.setFormatter(log_format)
|
||||||
|
console_handler.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
# Add handlers to root logger
|
||||||
|
root_logger.addHandler(file_handler)
|
||||||
|
root_logger.addHandler(console_handler)
|
||||||
|
|
||||||
|
# Set up specific loggers
|
||||||
|
for logger_name in ['werkzeug', 'celery', 'routes', 'flask', 'waitress']:
|
||||||
|
module_logger = logging.getLogger(logger_name)
|
||||||
|
module_logger.setLevel(logging.INFO)
|
||||||
|
# Handlers are inherited from root logger
|
||||||
|
|
||||||
|
# Enable propagation for all loggers
|
||||||
|
logging.getLogger('celery').propagate = True
|
||||||
|
|
||||||
|
# Notify successful setup
|
||||||
|
root_logger.info("Logging system initialized")
|
||||||
|
|
||||||
|
# Return the main file handler for permissions adjustment
|
||||||
|
return file_handler
|
||||||
|
|
||||||
def create_app():
|
def create_app():
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
# Configure basic logging
|
# Set up CORS
|
||||||
log_file = 'flask_server.log'
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format='%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s',
|
|
||||||
handlers=[
|
|
||||||
logging.FileHandler(log_file),
|
|
||||||
logging.StreamHandler()
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
os.chmod(log_file, 0o666)
|
|
||||||
|
|
||||||
# Get Flask's logger
|
|
||||||
logger = logging.getLogger('werkzeug')
|
|
||||||
logger.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
CORS(app)
|
CORS(app)
|
||||||
|
|
||||||
# Register blueprints
|
# Register blueprints
|
||||||
@@ -53,7 +87,6 @@ def create_app():
|
|||||||
app.register_blueprint(artist_bp, url_prefix='/api/artist')
|
app.register_blueprint(artist_bp, url_prefix='/api/artist')
|
||||||
app.register_blueprint(prgs_bp, url_prefix='/api/prgs')
|
app.register_blueprint(prgs_bp, url_prefix='/api/prgs')
|
||||||
|
|
||||||
|
|
||||||
# Serve frontend
|
# Serve frontend
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
def serve_index():
|
def serve_index():
|
||||||
@@ -98,27 +131,48 @@ def create_app():
|
|||||||
@app.before_request
|
@app.before_request
|
||||||
def log_request():
|
def log_request():
|
||||||
request.start_time = time.time()
|
request.start_time = time.time()
|
||||||
logger.info(f"Request: {request.method} {request.path}")
|
app.logger.debug(f"Request: {request.method} {request.path}")
|
||||||
|
|
||||||
@app.after_request
|
@app.after_request
|
||||||
def log_response(response):
|
def log_response(response):
|
||||||
duration = round((time.time() - request.start_time) * 1000, 2)
|
if hasattr(request, 'start_time'):
|
||||||
logger.info(f"Response: {response.status} | Duration: {duration}ms")
|
duration = round((time.time() - request.start_time) * 1000, 2)
|
||||||
|
app.logger.debug(f"Response: {response.status} | Duration: {duration}ms")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
# Error logging
|
# Error logging
|
||||||
@app.errorhandler(Exception)
|
@app.errorhandler(Exception)
|
||||||
def handle_exception(e):
|
def handle_exception(e):
|
||||||
logger.error(f"Server error: {str(e)}", exc_info=True)
|
app.logger.error(f"Server error: {str(e)}", exc_info=True)
|
||||||
return "Internal Server Error", 500
|
return "Internal Server Error", 500
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def start_celery_workers():
|
||||||
# Configure waitress logger
|
"""Start Celery workers with dynamic configuration"""
|
||||||
logger = logging.getLogger('waitress')
|
logging.info("Starting Celery workers with dynamic configuration")
|
||||||
logger.setLevel(logging.INFO)
|
celery_manager.start()
|
||||||
|
|
||||||
|
# Register shutdown handler
|
||||||
|
atexit.register(celery_manager.stop)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# Configure application logging
|
||||||
|
log_handler = setup_logging()
|
||||||
|
|
||||||
|
# Set file permissions for log files if needed
|
||||||
|
try:
|
||||||
|
os.chmod(log_handler.baseFilename, 0o666)
|
||||||
|
except:
|
||||||
|
logging.warning("Could not set permissions on log file")
|
||||||
|
|
||||||
|
# Log application startup
|
||||||
|
logging.info("=== Spotizerr Application Starting ===")
|
||||||
|
|
||||||
|
# Start Celery workers
|
||||||
|
start_celery_workers()
|
||||||
|
|
||||||
|
# Create and start Flask app
|
||||||
app = create_app()
|
app = create_app()
|
||||||
logging.info("Starting Flask server on port 7171")
|
logging.info("Starting Flask server on port 7171")
|
||||||
from waitress import serve
|
from waitress import serve
|
||||||
|
|||||||
@@ -6,40 +6,10 @@ if [ -n "${UMASK}" ]; then
|
|||||||
umask "${UMASK}"
|
umask "${UMASK}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Function to start the application
|
|
||||||
start_application() {
|
|
||||||
# Start Flask app in the background
|
|
||||||
echo "Starting Flask application..."
|
|
||||||
python app.py &
|
|
||||||
|
|
||||||
# Wait a moment for Flask to initialize
|
|
||||||
sleep 2
|
|
||||||
|
|
||||||
# Start Celery worker
|
|
||||||
echo "Starting Celery worker..."
|
|
||||||
celery -A routes.utils.celery_tasks.celery_app worker --loglevel=info --concurrency=${MAX_CONCURRENT_DL:-3} -Q downloads &
|
|
||||||
|
|
||||||
# Keep the script running
|
|
||||||
wait
|
|
||||||
}
|
|
||||||
|
|
||||||
# Check if custom command was provided
|
|
||||||
if [ $# -gt 0 ]; then
|
|
||||||
# Custom command provided, use it instead of default app startup
|
|
||||||
RUN_COMMAND="$@"
|
|
||||||
else
|
|
||||||
# No custom command, use our default application startup
|
|
||||||
RUN_COMMAND="start_application"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Check if both PUID and PGID are not set
|
# Check if both PUID and PGID are not set
|
||||||
if [ -z "${PUID}" ] && [ -z "${PGID}" ]; then
|
if [ -z "${PUID}" ] && [ -z "${PGID}" ]; then
|
||||||
# Run as root directly
|
# Run as root directly
|
||||||
if [ $# -gt 0 ]; then
|
exec "$@"
|
||||||
exec "$@"
|
|
||||||
else
|
|
||||||
start_application
|
|
||||||
fi
|
|
||||||
else
|
else
|
||||||
# Verify both PUID and PGID are set
|
# Verify both PUID and PGID are set
|
||||||
if [ -z "${PUID}" ] || [ -z "${PGID}" ]; then
|
if [ -z "${PUID}" ] || [ -z "${PGID}" ]; then
|
||||||
@@ -49,11 +19,7 @@ else
|
|||||||
|
|
||||||
# Check for root user request
|
# Check for root user request
|
||||||
if [ "${PUID}" -eq 0 ] && [ "${PGID}" -eq 0 ]; then
|
if [ "${PUID}" -eq 0 ] && [ "${PGID}" -eq 0 ]; then
|
||||||
if [ $# -gt 0 ]; then
|
exec "$@"
|
||||||
exec "$@"
|
|
||||||
else
|
|
||||||
start_application
|
|
||||||
fi
|
|
||||||
else
|
else
|
||||||
# Check if the group with the specified GID already exists
|
# Check if the group with the specified GID already exists
|
||||||
if getent group "${PGID}" >/dev/null; then
|
if getent group "${PGID}" >/dev/null; then
|
||||||
@@ -79,10 +45,6 @@ else
|
|||||||
chown -R "${USER_NAME}:${GROUP_NAME}" /app || true
|
chown -R "${USER_NAME}:${GROUP_NAME}" /app || true
|
||||||
|
|
||||||
# Run as specified user
|
# Run as specified user
|
||||||
if [ $# -gt 0 ]; then
|
exec gosu "${USER_NAME}" "$@"
|
||||||
exec gosu "${USER_NAME}" "$@"
|
|
||||||
else
|
|
||||||
exec gosu "${USER_NAME}" bash -c "$(declare -f start_application); start_application"
|
|
||||||
fi
|
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
@@ -2,10 +2,25 @@ from flask import Blueprint, jsonify, request
|
|||||||
import json
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
config_bp = Blueprint('config_bp', __name__)
|
config_bp = Blueprint('config_bp', __name__)
|
||||||
CONFIG_PATH = Path('./config/main.json')
|
CONFIG_PATH = Path('./config/main.json')
|
||||||
|
|
||||||
|
# Flag for config change notifications
|
||||||
|
config_changed = False
|
||||||
|
last_config = {}
|
||||||
|
|
||||||
|
# Define parameters that should trigger notification when changed
|
||||||
|
NOTIFY_PARAMETERS = [
|
||||||
|
'maxConcurrentDownloads',
|
||||||
|
'service',
|
||||||
|
'fallback',
|
||||||
|
'spotifyQuality',
|
||||||
|
'deezerQuality'
|
||||||
|
]
|
||||||
|
|
||||||
def get_config():
|
def get_config():
|
||||||
try:
|
try:
|
||||||
if not CONFIG_PATH.exists():
|
if not CONFIG_PATH.exists():
|
||||||
@@ -19,6 +34,34 @@ def get_config():
|
|||||||
logging.error(f"Error reading config: {str(e)}")
|
logging.error(f"Error reading config: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def save_config(config_data):
|
||||||
|
"""Save config and track changes to important parameters"""
|
||||||
|
global config_changed, last_config
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Load current config for comparison
|
||||||
|
current_config = get_config() or {}
|
||||||
|
|
||||||
|
# Check if any notify parameters changed
|
||||||
|
for param in NOTIFY_PARAMETERS:
|
||||||
|
if param in config_data:
|
||||||
|
if param not in current_config or config_data[param] != current_config.get(param):
|
||||||
|
config_changed = True
|
||||||
|
logging.info(f"Config parameter '{param}' changed from '{current_config.get(param)}' to '{config_data[param]}'")
|
||||||
|
|
||||||
|
# Save last known config
|
||||||
|
last_config = config_data.copy()
|
||||||
|
|
||||||
|
# Write the config file
|
||||||
|
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(CONFIG_PATH, 'w') as f:
|
||||||
|
json.dump(config_data, f, indent=2)
|
||||||
|
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error saving config: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
@config_bp.route('/config', methods=['GET'])
|
@config_bp.route('/config', methods=['GET'])
|
||||||
def handle_config():
|
def handle_config():
|
||||||
config = get_config()
|
config = get_config()
|
||||||
@@ -58,13 +101,32 @@ def update_config():
|
|||||||
if not isinstance(new_config, dict):
|
if not isinstance(new_config, dict):
|
||||||
return jsonify({"error": "Invalid config format"}), 400
|
return jsonify({"error": "Invalid config format"}), 400
|
||||||
|
|
||||||
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
if not save_config(new_config):
|
||||||
with open(CONFIG_PATH, 'w') as f:
|
return jsonify({"error": "Failed to save config"}), 500
|
||||||
json.dump(new_config, f, indent=2)
|
|
||||||
|
|
||||||
return jsonify({"message": "Config updated successfully"})
|
return jsonify({"message": "Config updated successfully"})
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
return jsonify({"error": "Invalid JSON data"}), 400
|
return jsonify({"error": "Invalid JSON data"}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error updating config: {str(e)}")
|
logging.error(f"Error updating config: {str(e)}")
|
||||||
return jsonify({"error": "Failed to update config"}), 500
|
return jsonify({"error": "Failed to update config"}), 500
|
||||||
|
|
||||||
|
@config_bp.route('/config/check', methods=['GET'])
|
||||||
|
def check_config_changes():
|
||||||
|
"""
|
||||||
|
Check if config has changed since last check
|
||||||
|
Returns: Status of config changes
|
||||||
|
"""
|
||||||
|
global config_changed
|
||||||
|
|
||||||
|
# Get current state
|
||||||
|
has_changed = config_changed
|
||||||
|
|
||||||
|
# Reset flag after checking
|
||||||
|
if has_changed:
|
||||||
|
config_changed = False
|
||||||
|
|
||||||
|
return jsonify({
|
||||||
|
"changed": has_changed,
|
||||||
|
"last_config": last_config
|
||||||
|
})
|
||||||
@@ -1,53 +1,75 @@
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
# Load configuration from ./config/main.json and get the max_concurrent_dl value.
|
# Configure logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Redis configuration
|
||||||
|
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
|
||||||
|
REDIS_PORT = os.getenv('REDIS_PORT', '6379')
|
||||||
|
REDIS_DB = os.getenv('REDIS_DB', '0')
|
||||||
|
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
|
||||||
|
REDIS_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
|
||||||
|
|
||||||
|
# Config path
|
||||||
CONFIG_PATH = './config/main.json'
|
CONFIG_PATH = './config/main.json'
|
||||||
|
|
||||||
try:
|
|
||||||
with open(CONFIG_PATH, 'r') as f:
|
|
||||||
config_data = json.load(f)
|
|
||||||
MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 3)
|
|
||||||
MAX_RETRIES = config_data.get("maxRetries", 3)
|
|
||||||
RETRY_DELAY = config_data.get("retryDelaySeconds", 5)
|
|
||||||
RETRY_DELAY_INCREASE = config_data.get("retry_delay_increase", 5)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error loading configuration: {e}")
|
|
||||||
# Fallback to default values if there's an error reading the config.
|
|
||||||
MAX_CONCURRENT_DL = 3
|
|
||||||
MAX_RETRIES = 3
|
|
||||||
RETRY_DELAY = 5
|
|
||||||
RETRY_DELAY_INCREASE = 5
|
|
||||||
|
|
||||||
def get_config_params():
|
def get_config_params():
|
||||||
"""
|
"""
|
||||||
Get common download parameters from the config file.
|
Get configuration parameters from the config file.
|
||||||
This centralizes parameter retrieval and reduces redundancy in API calls.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: A dictionary containing common parameters from config
|
dict: A dictionary containing configuration parameters
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
if not Path(CONFIG_PATH).exists():
|
||||||
|
return {
|
||||||
|
'service': 'spotify',
|
||||||
|
'spotify': '',
|
||||||
|
'deezer': '',
|
||||||
|
'fallback': False,
|
||||||
|
'spotifyQuality': 'NORMAL',
|
||||||
|
'deezerQuality': 'MP3_128',
|
||||||
|
'realTime': False,
|
||||||
|
'customDirFormat': '%ar_album%/%album%',
|
||||||
|
'customTrackFormat': '%tracknum%. %music%',
|
||||||
|
'tracknum_padding': True,
|
||||||
|
'maxConcurrentDownloads': 3,
|
||||||
|
'maxRetries': 3,
|
||||||
|
'retryDelaySeconds': 5,
|
||||||
|
'retry_delay_increase': 5
|
||||||
|
}
|
||||||
|
|
||||||
with open(CONFIG_PATH, 'r') as f:
|
with open(CONFIG_PATH, 'r') as f:
|
||||||
config = json.load(f)
|
config = json.load(f)
|
||||||
|
|
||||||
return {
|
# Set defaults for missing values
|
||||||
'service': config.get('service', 'spotify'),
|
defaults = {
|
||||||
'spotify': config.get('spotify', ''),
|
'service': 'spotify',
|
||||||
'deezer': config.get('deezer', ''),
|
'spotify': '',
|
||||||
'fallback': config.get('fallback', False),
|
'deezer': '',
|
||||||
'spotifyQuality': config.get('spotifyQuality', 'NORMAL'),
|
'fallback': False,
|
||||||
'deezerQuality': config.get('deezerQuality', 'MP3_128'),
|
'spotifyQuality': 'NORMAL',
|
||||||
'realTime': config.get('realTime', False),
|
'deezerQuality': 'MP3_128',
|
||||||
'customDirFormat': config.get('customDirFormat', '%ar_album%/%album%'),
|
'realTime': False,
|
||||||
'customTrackFormat': config.get('customTrackFormat', '%tracknum%. %music%'),
|
'customDirFormat': '%ar_album%/%album%',
|
||||||
'tracknum_padding': config.get('tracknum_padding', True),
|
'customTrackFormat': '%tracknum%. %music%',
|
||||||
'maxRetries': config.get('maxRetries', 3),
|
'tracknum_padding': True,
|
||||||
'retryDelaySeconds': config.get('retryDelaySeconds', 5),
|
'maxConcurrentDownloads': 3,
|
||||||
'retry_delay_increase': config.get('retry_delay_increase', 5)
|
'maxRetries': 3,
|
||||||
|
'retryDelaySeconds': 5,
|
||||||
|
'retry_delay_increase': 5
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for key, value in defaults.items():
|
||||||
|
if key not in config:
|
||||||
|
config[key] = value
|
||||||
|
|
||||||
|
return config
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error reading config for parameters: {e}")
|
logger.error(f"Error reading config: {e}")
|
||||||
# Return defaults if config read fails
|
# Return defaults if config read fails
|
||||||
return {
|
return {
|
||||||
'service': 'spotify',
|
'service': 'spotify',
|
||||||
@@ -60,14 +82,18 @@ def get_config_params():
|
|||||||
'customDirFormat': '%ar_album%/%album%',
|
'customDirFormat': '%ar_album%/%album%',
|
||||||
'customTrackFormat': '%tracknum%. %music%',
|
'customTrackFormat': '%tracknum%. %music%',
|
||||||
'tracknum_padding': True,
|
'tracknum_padding': True,
|
||||||
|
'maxConcurrentDownloads': 3,
|
||||||
'maxRetries': 3,
|
'maxRetries': 3,
|
||||||
'retryDelaySeconds': 5,
|
'retryDelaySeconds': 5,
|
||||||
'retry_delay_increase': 5
|
'retry_delay_increase': 5
|
||||||
}
|
}
|
||||||
|
|
||||||
# Celery configuration
|
# Load configuration values we need for Celery
|
||||||
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
|
config = get_config_params()
|
||||||
REDIS_BACKEND = os.environ.get('REDIS_BACKEND', 'redis://localhost:6379/0')
|
MAX_CONCURRENT_DL = config.get('maxConcurrentDownloads', 3)
|
||||||
|
MAX_RETRIES = config.get('maxRetries', 3)
|
||||||
|
RETRY_DELAY = config.get('retryDelaySeconds', 5)
|
||||||
|
RETRY_DELAY_INCREASE = config.get('retry_delay_increase', 5)
|
||||||
|
|
||||||
# Define task queues
|
# Define task queues
|
||||||
task_queues = {
|
task_queues = {
|
||||||
|
|||||||
214
routes/utils/celery_manager.py
Normal file
214
routes/utils/celery_manager.py
Normal file
@@ -0,0 +1,214 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import atexit
|
||||||
|
from pathlib import Path
|
||||||
|
import threading
|
||||||
|
import queue
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
CONFIG_PATH = './config/main.json'
|
||||||
|
CELERY_APP = 'routes.utils.celery_tasks.celery_app'
|
||||||
|
CELERY_PROCESS = None
|
||||||
|
CONFIG_CHECK_INTERVAL = 30 # seconds
|
||||||
|
|
||||||
|
class CeleryManager:
|
||||||
|
"""
|
||||||
|
Manages Celery workers dynamically based on configuration changes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.celery_process = None
|
||||||
|
self.current_worker_count = 0
|
||||||
|
self.monitoring_thread = None
|
||||||
|
self.running = False
|
||||||
|
self.log_queue = queue.Queue()
|
||||||
|
self.output_threads = []
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start the Celery manager and initial workers"""
|
||||||
|
if self.running:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.running = True
|
||||||
|
|
||||||
|
# Start initial workers
|
||||||
|
self._update_workers()
|
||||||
|
|
||||||
|
# Start monitoring thread for config changes
|
||||||
|
self.monitoring_thread = threading.Thread(target=self._monitor_config, daemon=True)
|
||||||
|
self.monitoring_thread.start()
|
||||||
|
|
||||||
|
# Register shutdown handler
|
||||||
|
atexit.register(self.stop)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the Celery manager and all workers"""
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
# Stop all running threads
|
||||||
|
for thread in self.output_threads:
|
||||||
|
if thread.is_alive():
|
||||||
|
# We can't really stop the threads, but they'll exit on their own
|
||||||
|
# when the process is terminated since they're daemon threads
|
||||||
|
pass
|
||||||
|
|
||||||
|
if self.celery_process:
|
||||||
|
logger.info("Stopping Celery workers...")
|
||||||
|
try:
|
||||||
|
# Send SIGTERM to process group
|
||||||
|
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM)
|
||||||
|
self.celery_process.wait(timeout=5)
|
||||||
|
except (subprocess.TimeoutExpired, ProcessLookupError):
|
||||||
|
# Force kill if not terminated
|
||||||
|
try:
|
||||||
|
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL)
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.celery_process = None
|
||||||
|
self.current_worker_count = 0
|
||||||
|
|
||||||
|
def _get_worker_count(self):
|
||||||
|
"""Get the configured worker count from config file"""
|
||||||
|
try:
|
||||||
|
if not Path(CONFIG_PATH).exists():
|
||||||
|
return 3 # Default
|
||||||
|
|
||||||
|
with open(CONFIG_PATH, 'r') as f:
|
||||||
|
config = json.load(f)
|
||||||
|
|
||||||
|
return int(config.get('maxConcurrentDownloads', 3))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error reading worker count from config: {e}")
|
||||||
|
return 3 # Default on error
|
||||||
|
|
||||||
|
def _update_workers(self):
|
||||||
|
"""Update workers if needed based on configuration"""
|
||||||
|
new_worker_count = self._get_worker_count()
|
||||||
|
|
||||||
|
if new_worker_count == self.current_worker_count and self.celery_process and self.celery_process.poll() is None:
|
||||||
|
return # No change and process is running
|
||||||
|
|
||||||
|
logger.info(f"Updating Celery workers from {self.current_worker_count} to {new_worker_count}")
|
||||||
|
|
||||||
|
# Stop existing workers if running
|
||||||
|
if self.celery_process:
|
||||||
|
try:
|
||||||
|
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM)
|
||||||
|
self.celery_process.wait(timeout=5)
|
||||||
|
except (subprocess.TimeoutExpired, ProcessLookupError):
|
||||||
|
try:
|
||||||
|
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL)
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Clear output threads list
|
||||||
|
self.output_threads = []
|
||||||
|
|
||||||
|
# Start new workers with updated concurrency
|
||||||
|
try:
|
||||||
|
# Set environment variables to configure Celery logging
|
||||||
|
env = os.environ.copy()
|
||||||
|
env['PYTHONUNBUFFERED'] = '1' # Ensure Python output is unbuffered
|
||||||
|
|
||||||
|
# Construct command with extra logging options
|
||||||
|
cmd = [
|
||||||
|
'celery',
|
||||||
|
'-A', CELERY_APP,
|
||||||
|
'worker',
|
||||||
|
'--loglevel=info',
|
||||||
|
f'--concurrency={new_worker_count}',
|
||||||
|
'-Q', 'downloads',
|
||||||
|
# Add timestamp to Celery logs
|
||||||
|
'--logfile=-', # Output logs to stdout
|
||||||
|
'--without-heartbeat', # Reduce log noise
|
||||||
|
'--without-gossip', # Reduce log noise
|
||||||
|
'--without-mingle' # Reduce log noise
|
||||||
|
]
|
||||||
|
|
||||||
|
self.celery_process = subprocess.Popen(
|
||||||
|
cmd,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
env=env,
|
||||||
|
preexec_fn=os.setsid, # New process group for clean termination
|
||||||
|
universal_newlines=True,
|
||||||
|
bufsize=1 # Line buffered
|
||||||
|
)
|
||||||
|
|
||||||
|
self.current_worker_count = new_worker_count
|
||||||
|
logger.info(f"Started Celery workers with concurrency {new_worker_count}")
|
||||||
|
|
||||||
|
# Start non-blocking output reader threads for both stdout and stderr
|
||||||
|
stdout_thread = threading.Thread(
|
||||||
|
target=self._process_output_reader,
|
||||||
|
args=(self.celery_process.stdout, "STDOUT"),
|
||||||
|
daemon=True
|
||||||
|
)
|
||||||
|
stdout_thread.start()
|
||||||
|
self.output_threads.append(stdout_thread)
|
||||||
|
|
||||||
|
stderr_thread = threading.Thread(
|
||||||
|
target=self._process_output_reader,
|
||||||
|
args=(self.celery_process.stderr, "STDERR"),
|
||||||
|
daemon=True
|
||||||
|
)
|
||||||
|
stderr_thread.start()
|
||||||
|
self.output_threads.append(stderr_thread)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error starting Celery workers: {e}")
|
||||||
|
|
||||||
|
def _process_output_reader(self, pipe, stream_name):
|
||||||
|
"""Read and log output from the process"""
|
||||||
|
try:
|
||||||
|
for line in iter(pipe.readline, ''):
|
||||||
|
if not line:
|
||||||
|
break
|
||||||
|
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Format the message to identify it's from Celery
|
||||||
|
if "ERROR" in line or "CRITICAL" in line:
|
||||||
|
logger.error(f"Celery[{stream_name}]: {line}")
|
||||||
|
elif "WARNING" in line:
|
||||||
|
logger.warning(f"Celery[{stream_name}]: {line}")
|
||||||
|
elif "DEBUG" in line:
|
||||||
|
logger.debug(f"Celery[{stream_name}]: {line}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Celery[{stream_name}]: {line}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing Celery output: {e}")
|
||||||
|
finally:
|
||||||
|
pipe.close()
|
||||||
|
|
||||||
|
def _monitor_config(self):
|
||||||
|
"""Monitor configuration file for changes"""
|
||||||
|
logger.info("Starting config monitoring thread")
|
||||||
|
last_check_time = 0
|
||||||
|
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
# Check for changes
|
||||||
|
if time.time() - last_check_time >= CONFIG_CHECK_INTERVAL:
|
||||||
|
self._update_workers()
|
||||||
|
last_check_time = time.time()
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in config monitoring thread: {e}")
|
||||||
|
time.sleep(5) # Wait before retrying
|
||||||
|
|
||||||
|
# Create single instance
|
||||||
|
celery_manager = CeleryManager()
|
||||||
@@ -5,15 +5,15 @@ import logging
|
|||||||
import traceback
|
import traceback
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from celery import Celery, Task, states
|
from celery import Celery, Task, states
|
||||||
from celery.signals import task_prerun, task_postrun, task_failure, worker_ready
|
from celery.signals import task_prerun, task_postrun, task_failure, worker_ready, worker_init, setup_logging
|
||||||
from celery.exceptions import Retry
|
from celery.exceptions import Retry
|
||||||
|
|
||||||
# Setup Redis and Celery
|
|
||||||
from routes.utils.celery_config import REDIS_URL, REDIS_BACKEND, get_config_params
|
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Setup Redis and Celery
|
||||||
|
from routes.utils.celery_config import REDIS_URL, REDIS_BACKEND, get_config_params
|
||||||
|
|
||||||
# Initialize Celery app
|
# Initialize Celery app
|
||||||
celery_app = Celery('download_tasks',
|
celery_app = Celery('download_tasks',
|
||||||
broker=REDIS_URL,
|
broker=REDIS_URL,
|
||||||
@@ -35,6 +35,25 @@ class ProgressState:
|
|||||||
RETRYING = "retrying"
|
RETRYING = "retrying"
|
||||||
CANCELLED = "cancel"
|
CANCELLED = "cancel"
|
||||||
|
|
||||||
|
# Reuse the application's logging configuration for Celery workers
|
||||||
|
@setup_logging.connect
|
||||||
|
def setup_celery_logging(**kwargs):
|
||||||
|
"""
|
||||||
|
This handler ensures Celery uses our application logging settings
|
||||||
|
instead of its own. Prevents duplicate log configurations.
|
||||||
|
"""
|
||||||
|
# Using the root logger's handlers and level preserves our config
|
||||||
|
return logging.getLogger()
|
||||||
|
|
||||||
|
# The initialization of a worker will log the worker configuration
|
||||||
|
@worker_init.connect
|
||||||
|
def worker_init_handler(**kwargs):
|
||||||
|
"""Log when a worker initializes with its configuration details"""
|
||||||
|
config = get_config_params()
|
||||||
|
logger.info(f"Celery worker initialized with concurrency {config.get('maxConcurrentDownloads', 3)}")
|
||||||
|
logger.info(f"Worker config: spotifyQuality={config.get('spotifyQuality')}, deezerQuality={config.get('deezerQuality')}")
|
||||||
|
logger.debug("Worker Redis connection: " + REDIS_URL)
|
||||||
|
|
||||||
def store_task_status(task_id, status_data):
|
def store_task_status(task_id, status_data):
|
||||||
"""Store task status information in Redis"""
|
"""Store task status information in Redis"""
|
||||||
# Add timestamp if not present
|
# Add timestamp if not present
|
||||||
@@ -102,6 +121,7 @@ def cancel_task(task_id):
|
|||||||
# Try to revoke the Celery task if it hasn't started yet
|
# Try to revoke the Celery task if it hasn't started yet
|
||||||
celery_app.control.revoke(task_id, terminate=True, signal='SIGTERM')
|
celery_app.control.revoke(task_id, terminate=True, signal='SIGTERM')
|
||||||
|
|
||||||
|
logger.info(f"Task {task_id} cancelled by user")
|
||||||
return {"status": "cancelled", "task_id": task_id}
|
return {"status": "cancelled", "task_id": task_id}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error cancelling task {task_id}: {e}")
|
logger.error(f"Error cancelling task {task_id}: {e}")
|
||||||
@@ -209,6 +229,8 @@ def retry_task(task_id):
|
|||||||
download_type = task_info.get("download_type", "unknown")
|
download_type = task_info.get("download_type", "unknown")
|
||||||
task = None
|
task = None
|
||||||
|
|
||||||
|
logger.info(f"Retrying task {task_id} as {new_task_id} (retry {retry_count + 1}/{max_retries})")
|
||||||
|
|
||||||
if download_type == "track":
|
if download_type == "track":
|
||||||
task = download_track.apply_async(
|
task = download_track.apply_async(
|
||||||
kwargs=task_info,
|
kwargs=task_info,
|
||||||
@@ -228,6 +250,7 @@ def retry_task(task_id):
|
|||||||
queue='downloads'
|
queue='downloads'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
logger.error(f"Unknown download type for retry: {download_type}")
|
||||||
return {
|
return {
|
||||||
"status": "error",
|
"status": "error",
|
||||||
"message": f"Unknown download type: {download_type}"
|
"message": f"Unknown download type: {download_type}"
|
||||||
@@ -303,8 +326,22 @@ class ProgressTrackingTask(Task):
|
|||||||
# Store the progress update in Redis
|
# Store the progress update in Redis
|
||||||
store_task_status(task_id, progress_data)
|
store_task_status(task_id, progress_data)
|
||||||
|
|
||||||
# Log the progress update
|
# Log the progress update with appropriate level
|
||||||
logger.info(f"Task {task_id} progress: {progress_data}")
|
message = progress_data.get("message", "Progress update")
|
||||||
|
|
||||||
|
if status == "processing":
|
||||||
|
progress = progress_data.get("progress", 0)
|
||||||
|
if progress > 0:
|
||||||
|
logger.debug(f"Task {task_id} progress: {progress}% - {message}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Task {task_id} processing: {message}")
|
||||||
|
elif status == "error":
|
||||||
|
error_message = progress_data.get("error", message)
|
||||||
|
logger.error(f"Task {task_id} error: {error_message}")
|
||||||
|
elif status == "complete":
|
||||||
|
logger.info(f"Task {task_id} completed: {message}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Task {task_id} {status}: {message}")
|
||||||
|
|
||||||
# Celery signal handlers
|
# Celery signal handlers
|
||||||
@task_prerun.connect
|
@task_prerun.connect
|
||||||
@@ -378,21 +415,24 @@ def task_failure_handler(task_id=None, exception=None, traceback=None, *args, **
|
|||||||
can_retry = retry_count < max_retries
|
can_retry = retry_count < max_retries
|
||||||
|
|
||||||
# Update task status to error
|
# Update task status to error
|
||||||
|
error_message = str(exception)
|
||||||
store_task_status(task_id, {
|
store_task_status(task_id, {
|
||||||
"status": ProgressState.ERROR,
|
"status": ProgressState.ERROR,
|
||||||
"timestamp": time.time(),
|
"timestamp": time.time(),
|
||||||
"type": task_info.get("type", "unknown"),
|
"type": task_info.get("type", "unknown"),
|
||||||
"name": task_info.get("name", "Unknown"),
|
"name": task_info.get("name", "Unknown"),
|
||||||
"artist": task_info.get("artist", ""),
|
"artist": task_info.get("artist", ""),
|
||||||
"error": str(exception),
|
"error": error_message,
|
||||||
"traceback": str(traceback),
|
"traceback": str(traceback),
|
||||||
"can_retry": can_retry,
|
"can_retry": can_retry,
|
||||||
"retry_count": retry_count,
|
"retry_count": retry_count,
|
||||||
"max_retries": max_retries,
|
"max_retries": max_retries,
|
||||||
"message": f"Error: {str(exception)}"
|
"message": f"Error: {error_message}"
|
||||||
})
|
})
|
||||||
|
|
||||||
logger.error(f"Task {task_id} failed: {str(exception)}")
|
logger.error(f"Task {task_id} failed: {error_message}")
|
||||||
|
if can_retry:
|
||||||
|
logger.info(f"Task {task_id} can be retried ({retry_count}/{max_retries})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in task_failure_handler: {e}")
|
logger.error(f"Error in task_failure_handler: {e}")
|
||||||
|
|
||||||
@@ -469,6 +509,9 @@ def download_track(self, **task_data):
|
|||||||
custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%"))
|
custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%"))
|
||||||
pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True))
|
pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True))
|
||||||
|
|
||||||
|
# Log task parameters for debugging
|
||||||
|
logger.debug(f"Track download parameters: service={service}, quality={quality}, real_time={real_time}")
|
||||||
|
|
||||||
# Execute the download function with progress callback
|
# Execute the download function with progress callback
|
||||||
download_track_func(
|
download_track_func(
|
||||||
service=service,
|
service=service,
|
||||||
@@ -550,6 +593,9 @@ def download_album(self, **task_data):
|
|||||||
custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%"))
|
custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%"))
|
||||||
pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True))
|
pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True))
|
||||||
|
|
||||||
|
# Log task parameters for debugging
|
||||||
|
logger.debug(f"Album download parameters: service={service}, quality={quality}, real_time={real_time}")
|
||||||
|
|
||||||
# Execute the download function with progress callback
|
# Execute the download function with progress callback
|
||||||
download_album_func(
|
download_album_func(
|
||||||
service=service,
|
service=service,
|
||||||
@@ -631,6 +677,9 @@ def download_playlist(self, **task_data):
|
|||||||
custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%"))
|
custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%"))
|
||||||
pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True))
|
pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True))
|
||||||
|
|
||||||
|
# Log task parameters for debugging
|
||||||
|
logger.debug(f"Playlist download parameters: service={service}, quality={quality}, real_time={real_time}")
|
||||||
|
|
||||||
# Execute the download function with progress callback
|
# Execute the download function with progress callback
|
||||||
download_playlist_func(
|
download_playlist_func(
|
||||||
service=service,
|
service=service,
|
||||||
|
|||||||
Reference in New Issue
Block a user