2035 lines
81 KiB
Python
2035 lines
81 KiB
Python
import time
|
|
import json
|
|
import logging
|
|
import traceback
|
|
from celery import Celery, Task, states
|
|
from celery.signals import (
|
|
task_prerun,
|
|
task_postrun,
|
|
task_failure,
|
|
worker_ready,
|
|
worker_init,
|
|
setup_logging,
|
|
)
|
|
from celery.exceptions import Retry
|
|
from pathlib import Path # Added for path operations
|
|
|
|
|
|
# Setup Redis and Celery
|
|
from routes.utils.celery_config import (
|
|
REDIS_URL,
|
|
REDIS_BACKEND,
|
|
get_config_params,
|
|
)
|
|
|
|
# Import for playlist watch DB update
|
|
from routes.utils.watch.db import (
|
|
add_single_track_to_playlist_db,
|
|
add_or_update_album_for_artist,
|
|
)
|
|
|
|
# Import for download history management
|
|
from routes.utils.history_manager import history_manager
|
|
|
|
# Create Redis connection for storing task data that's not part of the Celery result backend
|
|
import redis
|
|
|
|
|
|
# --- Helpers to build partial summaries from task logs ---
|
|
def _read_task_log_json_lines(task_id: str) -> list:
|
|
log_file_path = Path("./logs/tasks") / f"{task_id}.log"
|
|
if not log_file_path.exists():
|
|
return []
|
|
lines = []
|
|
try:
|
|
with open(log_file_path, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
lines.append(json.loads(line))
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
return []
|
|
return lines
|
|
|
|
|
|
def _extract_parent_initial_tracks(log_lines: list, parent_type: str) -> dict:
|
|
"""
|
|
Returns a mapping from a stable track key to the track object from the initial parent callback.
|
|
For albums: key by ids.spotify or f"{track_number}:{title}" as fallback.
|
|
For playlists: key by ids.spotify or f"{position}:{title}" as fallback.
|
|
"""
|
|
track_map: dict[str, dict] = {}
|
|
if parent_type == "album":
|
|
for obj in log_lines:
|
|
album = obj.get("album")
|
|
if album and isinstance(album, dict) and album.get("tracks"):
|
|
for t in album.get("tracks", []):
|
|
ids = (t or {}).get("ids", {}) or {}
|
|
key = (
|
|
ids.get("spotify")
|
|
or f"{(t or {}).get('track_number', 0)}:{(t or {}).get('title', '')}"
|
|
)
|
|
track_map[key] = t
|
|
break
|
|
elif parent_type == "playlist":
|
|
for obj in log_lines:
|
|
playlist = obj.get("playlist")
|
|
if playlist and isinstance(playlist, dict) and playlist.get("tracks"):
|
|
for t in playlist.get("tracks", []):
|
|
ids = (t or {}).get("ids", {}) or {}
|
|
# TrackPlaylistObject uses position
|
|
key = (
|
|
ids.get("spotify")
|
|
or f"{(t or {}).get('position', 0)}:{(t or {}).get('title', '')}"
|
|
)
|
|
track_map[key] = t
|
|
break
|
|
return track_map
|
|
|
|
|
|
def _extract_completed_and_skipped_from_logs(
|
|
log_lines: list,
|
|
) -> tuple[set, set, dict, dict]:
|
|
"""
|
|
Returns (completed_keys, skipped_keys, completed_objects_by_key, skipped_objects_by_key)
|
|
Keys prefer ids.spotify, falling back to index+title scheme consistent with initial map.
|
|
"""
|
|
completed_keys: set[str] = set()
|
|
skipped_keys: set[str] = set()
|
|
completed_objs: dict[str, dict] = {}
|
|
skipped_objs: dict[str, dict] = {}
|
|
for obj in log_lines:
|
|
track = obj.get("track")
|
|
if not track:
|
|
continue
|
|
status_info = obj.get("status_info", {}) or {}
|
|
status = status_info.get("status")
|
|
ids = (track or {}).get("ids", {}) or {}
|
|
# Fallback keys try track_number:title and position:title
|
|
fallback_key = (
|
|
f"{(track or {}).get('track_number', 0)}:{(track or {}).get('title', '')}"
|
|
)
|
|
key = ids.get("spotify") or fallback_key
|
|
if status == "done":
|
|
completed_keys.add(key)
|
|
completed_objs[key] = track
|
|
elif status == "skipped":
|
|
skipped_keys.add(key)
|
|
skipped_objs[key] = track
|
|
return completed_keys, skipped_keys, completed_objs, skipped_objs
|
|
|
|
|
|
def _to_track_object_from_initial(initial_track: dict, parent_type: str) -> dict:
|
|
"""Convert initial album/playlist track entry to a TrackObject-like dict."""
|
|
# Common fields
|
|
title = initial_track.get("title", "")
|
|
disc_number = initial_track.get("disc_number", 1)
|
|
track_number = initial_track.get("track_number", 0)
|
|
duration_ms = initial_track.get("duration_ms", 0)
|
|
explicit = initial_track.get("explicit", False)
|
|
ids = initial_track.get("ids", {}) or {}
|
|
|
|
# Convert artists to ArtistTrackObject[] shape
|
|
artists_src = initial_track.get("artists", []) or []
|
|
artists_conv = []
|
|
for a in artists_src:
|
|
if isinstance(a, dict):
|
|
artists_conv.append(
|
|
{
|
|
"type": "artistTrack",
|
|
"name": a.get("name", ""),
|
|
"ids": a.get("ids", {}) or {},
|
|
}
|
|
)
|
|
|
|
# Convert album to AlbumTrackObject-like shape
|
|
album_src = initial_track.get("album", {}) or {}
|
|
album_conv = {
|
|
"type": "albumTrack",
|
|
"album_type": album_src.get("album_type", ""),
|
|
"title": album_src.get("title", ""),
|
|
"release_date": album_src.get("release_date", {}) or {},
|
|
"total_tracks": album_src.get("total_tracks", 0),
|
|
"genres": album_src.get("genres", []) or [],
|
|
"images": album_src.get("images", []) or [],
|
|
"ids": album_src.get("ids", {}) or {},
|
|
"artists": [
|
|
{
|
|
"type": "artistAlbumTrack",
|
|
"name": aa.get("name", ""),
|
|
"ids": aa.get("ids", {}) or {},
|
|
}
|
|
for aa in (album_src.get("artists", []) or [])
|
|
if isinstance(aa, dict)
|
|
],
|
|
}
|
|
|
|
return {
|
|
"type": "track",
|
|
"title": title,
|
|
"disc_number": disc_number,
|
|
"track_number": track_number,
|
|
"duration_ms": duration_ms,
|
|
"explicit": explicit,
|
|
"genres": [],
|
|
"album": album_conv,
|
|
"artists": artists_conv,
|
|
"ids": ids,
|
|
}
|
|
|
|
|
|
def build_partial_summary_from_task_log(task_id: str, parent_type: str) -> dict:
|
|
"""
|
|
Build a SummaryObject-like dict using the task's log lines.
|
|
Includes arrays successful_tracks, skipped_tracks, failed_tracks (with reason), and totals.
|
|
"""
|
|
log_lines = _read_task_log_json_lines(task_id)
|
|
initial_tracks_map = _extract_parent_initial_tracks(log_lines, parent_type)
|
|
completed_keys, skipped_keys, completed_objs, skipped_objs = (
|
|
_extract_completed_and_skipped_from_logs(log_lines)
|
|
)
|
|
|
|
# Determine failed as initial - completed - skipped
|
|
initial_keys = set(initial_tracks_map.keys())
|
|
failed_keys = initial_keys.difference(completed_keys.union(skipped_keys))
|
|
|
|
successful_tracks = [
|
|
completed_objs[k] for k in completed_keys if k in completed_objs
|
|
]
|
|
skipped_tracks = [skipped_objs[k] for k in skipped_keys if k in skipped_objs]
|
|
failed_tracks = [
|
|
{
|
|
"track": _to_track_object_from_initial(initial_tracks_map[k], parent_type),
|
|
"reason": "cancelled",
|
|
}
|
|
for k in failed_keys
|
|
if k in initial_tracks_map
|
|
]
|
|
|
|
return {
|
|
"successful_tracks": successful_tracks,
|
|
"skipped_tracks": skipped_tracks,
|
|
"failed_tracks": failed_tracks,
|
|
"total_successful": len(successful_tracks),
|
|
"total_skipped": len(skipped_tracks),
|
|
"total_failed": len(failed_tracks),
|
|
}
|
|
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize Celery app
|
|
celery_app = Celery(
|
|
"routes.utils.celery_tasks", broker=REDIS_URL, backend=REDIS_BACKEND
|
|
)
|
|
|
|
# Load Celery config
|
|
celery_app.config_from_object("routes.utils.celery_config")
|
|
|
|
|
|
redis_client = redis.Redis.from_url(REDIS_URL)
|
|
|
|
|
|
def trigger_sse_event(task_id: str, reason: str = "status_change"):
|
|
"""Trigger an SSE event using a dedicated Celery worker task"""
|
|
try:
|
|
# Submit SSE update task to utility worker queue
|
|
# This is non-blocking and more reliable than threads
|
|
trigger_sse_update_task.apply_async(
|
|
args=[task_id, reason],
|
|
queue="utility_tasks",
|
|
priority=9, # High priority for real-time updates
|
|
)
|
|
# Only log at debug level to reduce verbosity
|
|
logger.debug(f"SSE: Submitted SSE update task for {task_id} (reason: {reason})")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error submitting SSE update task for task {task_id}: {e}", exc_info=True
|
|
)
|
|
|
|
|
|
class ProgressState:
|
|
"""Enum-like class for progress states"""
|
|
|
|
QUEUED = "queued"
|
|
PROCESSING = "processing"
|
|
COMPLETE = "complete"
|
|
ERROR = "error"
|
|
RETRYING = "retrying"
|
|
CANCELLED = "cancelled"
|
|
PROGRESS = "progress"
|
|
|
|
# Additional states from deezspot library
|
|
INITIALIZING = "initializing"
|
|
DOWNLOADING = "downloading"
|
|
TRACK_PROGRESS = "track_progress"
|
|
TRACK_COMPLETE = "track_complete"
|
|
REAL_TIME = "real_time"
|
|
SKIPPED = "skipped"
|
|
DONE = "done"
|
|
ERROR_RETRIED = "ERROR_RETRIED" # Status for an error task that has been retried
|
|
ERROR_AUTO_CLEANED = (
|
|
"ERROR_AUTO_CLEANED" # Status for an error task that was auto-cleaned
|
|
)
|
|
|
|
|
|
# Reuse the application's logging configuration for Celery workers
|
|
@setup_logging.connect
|
|
def setup_celery_logging(**kwargs):
|
|
"""
|
|
This handler ensures Celery uses our application logging settings
|
|
instead of its own. Prevents duplicate log configurations.
|
|
"""
|
|
# Using the root logger's handlers and level preserves our config
|
|
return logging.getLogger()
|
|
|
|
|
|
# The initialization of a worker will log the worker configuration
|
|
@worker_init.connect
|
|
def worker_init_handler(**kwargs):
|
|
"""Log when a worker initializes with its configuration details"""
|
|
config = get_config_params()
|
|
logger.info(
|
|
f"Celery worker initialized with concurrency {config.get('maxConcurrentDownloads', 3)}"
|
|
)
|
|
logger.info(
|
|
f"Worker config: spotifyQuality={config.get('spotifyQuality')}, deezerQuality={config.get('deezerQuality')}"
|
|
)
|
|
logger.debug("Worker Redis connection: " + REDIS_URL)
|
|
|
|
|
|
def store_task_status(task_id, status_data):
|
|
"""
|
|
Store task status information in Redis with a sequential ID
|
|
|
|
Args:
|
|
task_id: The task ID
|
|
status_data: Dictionary containing status information
|
|
"""
|
|
# Add timestamp if not present
|
|
if "timestamp" not in status_data:
|
|
status_data["timestamp"] = time.time()
|
|
|
|
try:
|
|
# Get next ID for this task's status updates
|
|
status_id = redis_client.incr(f"task:{task_id}:status:next_id")
|
|
status_data["id"] = status_id
|
|
|
|
# Convert to JSON and store in Redis
|
|
redis_client.rpush(f"task:{task_id}:status", json.dumps(status_data))
|
|
|
|
# Set expiry for the list to avoid filling up Redis with old data
|
|
redis_client.expire(f"task:{task_id}:status", 60 * 60 * 24 * 7) # 7 days
|
|
redis_client.expire(
|
|
f"task:{task_id}:status:next_id", 60 * 60 * 24 * 7
|
|
) # 7 days
|
|
|
|
# Publish an update event to a Redis channel for subscribers
|
|
# This will be used by the SSE endpoint to push updates in real-time
|
|
update_channel = f"task_updates:{task_id}"
|
|
redis_client.publish(
|
|
update_channel, json.dumps({"task_id": task_id, "status_id": status_id})
|
|
)
|
|
|
|
# Trigger immediate SSE event for real-time frontend updates
|
|
trigger_sse_event(task_id, "status_update")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error storing task status: {e}")
|
|
traceback.print_exc()
|
|
|
|
|
|
def get_task_status(task_id):
|
|
"""Get all task status updates from Redis"""
|
|
try:
|
|
status_list = redis_client.lrange(f"task:{task_id}:status", 0, -1)
|
|
return [json.loads(s.decode("utf-8")) for s in status_list]
|
|
except Exception as e:
|
|
logger.error(f"Error getting task status: {e}")
|
|
return []
|
|
|
|
|
|
def get_last_task_status(task_id):
|
|
"""Get the most recent task status update from Redis"""
|
|
try:
|
|
# Get the last status update
|
|
status_list = redis_client.lrange(f"task:{task_id}:status", -1, -1)
|
|
if not status_list:
|
|
return None
|
|
|
|
return json.loads(status_list[0].decode("utf-8"))
|
|
except Exception as e:
|
|
logger.error(f"Error getting last task status: {e}")
|
|
return None
|
|
|
|
|
|
def store_task_info(task_id, task_info):
|
|
"""Store task information in Redis"""
|
|
try:
|
|
redis_client.set(f"task:{task_id}:info", json.dumps(task_info))
|
|
redis_client.expire(f"task:{task_id}:info", 60 * 60 * 24 * 7) # 7 days
|
|
except Exception as e:
|
|
logger.error(f"Error storing task info: {e}")
|
|
|
|
|
|
def get_task_info(task_id):
|
|
"""Get task information from Redis"""
|
|
try:
|
|
task_info = redis_client.get(f"task:{task_id}:info")
|
|
if task_info:
|
|
return json.loads(task_info.decode("utf-8"))
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Error getting task info: {e}")
|
|
return {}
|
|
|
|
|
|
def get_all_tasks():
|
|
"""Get all active task IDs and their full info"""
|
|
try:
|
|
# Use SCAN for better performance than KEYS in production
|
|
task_ids = [
|
|
key.decode("utf-8").split(":")[1]
|
|
for key in redis_client.scan_iter("task:*:info")
|
|
]
|
|
|
|
tasks = []
|
|
for task_id in task_ids:
|
|
task_info = get_task_info(task_id)
|
|
last_status = get_last_task_status(task_id)
|
|
|
|
if task_info and last_status:
|
|
tasks.append(
|
|
{
|
|
"task_id": task_id,
|
|
"task_info": task_info, # Pass full info
|
|
"last_status": last_status, # Pass last status
|
|
# Keep original fields for backward compatibility
|
|
"type": task_info.get("type", "unknown"),
|
|
"name": task_info.get("name", "Unknown"),
|
|
"artist": task_info.get("artist", ""),
|
|
"download_type": task_info.get("download_type", "unknown"),
|
|
"status": last_status.get("status", "unknown"),
|
|
"timestamp": last_status.get("timestamp", 0),
|
|
}
|
|
)
|
|
|
|
return tasks
|
|
except Exception as e:
|
|
logger.error(f"Error getting all tasks: {e}")
|
|
return []
|
|
|
|
|
|
def cancel_task(task_id):
|
|
"""Cancel a task by its ID"""
|
|
try:
|
|
# Mark the task as cancelled in Redis
|
|
store_task_status(
|
|
task_id,
|
|
{
|
|
"status": ProgressState.CANCELLED,
|
|
"error": "Task cancelled by user",
|
|
"status_info": {
|
|
"status": ProgressState.CANCELLED,
|
|
"error": "Task cancelled by user",
|
|
"timestamp": time.time(),
|
|
},
|
|
},
|
|
)
|
|
|
|
# Try to revoke the Celery task if it hasn't started yet
|
|
celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM")
|
|
|
|
# Schedule deletion of task data after 3 seconds
|
|
delayed_delete_task_data.apply_async(
|
|
args=[task_id, "Task cancelled by user and auto-cleaned."], countdown=3
|
|
)
|
|
logger.info(
|
|
f"Task {task_id} cancelled by user. Data scheduled for deletion in 3s."
|
|
)
|
|
|
|
return {"status": "cancelled", "task_id": task_id}
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling task {task_id}: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
def retry_task(task_id):
|
|
"""Retry a failed task"""
|
|
try:
|
|
# Get task info
|
|
task_info = get_task_info(task_id)
|
|
if not task_info:
|
|
return {"status": "error", "error": f"Task {task_id} not found"}
|
|
|
|
# Check if task has error status
|
|
last_status = get_last_task_status(task_id)
|
|
if not last_status or last_status.get("status") != ProgressState.ERROR:
|
|
return {"status": "error", "error": "Task is not in a failed state"}
|
|
|
|
# Get current retry count
|
|
retry_count = last_status.get("retry_count", 0)
|
|
|
|
# Get retry configuration from config
|
|
config_params = get_config_params()
|
|
max_retries = config_params.get("maxRetries", 3)
|
|
initial_retry_delay = config_params.get("retryDelaySeconds", 5)
|
|
retry_delay_increase = config_params.get("retry_delay_increase", 5)
|
|
|
|
# Check if we've exceeded max retries
|
|
if retry_count >= max_retries:
|
|
return {
|
|
"status": "error",
|
|
"error": f"Maximum retry attempts ({max_retries}) exceeded",
|
|
}
|
|
|
|
# Calculate retry delay
|
|
retry_delay = initial_retry_delay + (retry_count * retry_delay_increase)
|
|
|
|
# Create a new task_id for the retry
|
|
new_task_id = f"{task_id}_retry{retry_count + 1}"
|
|
|
|
# Update task info for the retry
|
|
task_info["retry_count"] = retry_count + 1
|
|
task_info["retry_of"] = task_id
|
|
|
|
# Use retry_url if available, otherwise use the original url
|
|
if "retry_url" in task_info and task_info["retry_url"]:
|
|
task_info["url"] = task_info["retry_url"]
|
|
|
|
# Get service configuration
|
|
service = config_params.get("service")
|
|
fallback_enabled = config_params.get("fallback", False)
|
|
|
|
# Update service settings
|
|
if service == "spotify":
|
|
if fallback_enabled:
|
|
task_info["main"] = config_params.get("spotify", "")
|
|
task_info["fallback"] = config_params.get("deezer", "")
|
|
task_info["quality"] = config_params.get("deezerQuality", "MP3_128")
|
|
task_info["fall_quality"] = config_params.get(
|
|
"spotifyQuality", "NORMAL"
|
|
)
|
|
else:
|
|
task_info["main"] = config_params.get("spotify", "")
|
|
task_info["fallback"] = None
|
|
task_info["quality"] = config_params.get("spotifyQuality", "NORMAL")
|
|
task_info["fall_quality"] = None
|
|
elif service == "deezer":
|
|
task_info["main"] = config_params.get("deezer", "")
|
|
task_info["fallback"] = None
|
|
task_info["quality"] = config_params.get("deezerQuality", "MP3_128")
|
|
task_info["fall_quality"] = None
|
|
else:
|
|
task_info["main"] = config_params.get("spotify", "")
|
|
task_info["fallback"] = None
|
|
task_info["quality"] = config_params.get("spotifyQuality", "NORMAL")
|
|
task_info["fall_quality"] = None
|
|
|
|
# Ensure service comes from config for the retry
|
|
task_info["service"] = service
|
|
|
|
# Update other config-derived parameters
|
|
task_info["real_time"] = task_info.get(
|
|
"real_time", config_params.get("realTime", False)
|
|
)
|
|
task_info["custom_dir_format"] = task_info.get(
|
|
"custom_dir_format",
|
|
config_params.get("customDirFormat", "%ar_album%/%album%"),
|
|
)
|
|
task_info["custom_track_format"] = task_info.get(
|
|
"custom_track_format",
|
|
config_params.get("customTrackFormat", "%tracknum%. %music%"),
|
|
)
|
|
task_info["pad_tracks"] = task_info.get(
|
|
"pad_tracks", config_params.get("tracknum_padding", True)
|
|
)
|
|
|
|
# Store the updated task info
|
|
store_task_info(new_task_id, task_info)
|
|
|
|
# Create a queued status
|
|
store_task_status(
|
|
new_task_id,
|
|
{
|
|
"status": ProgressState.QUEUED,
|
|
"type": task_info.get("type", "unknown"),
|
|
"name": task_info.get("name", "Unknown"),
|
|
"artist": task_info.get("artist", ""),
|
|
"retry_count": retry_count + 1,
|
|
"max_retries": max_retries,
|
|
"retry_delay": retry_delay,
|
|
"timestamp": time.time(),
|
|
},
|
|
)
|
|
|
|
# Launch the appropriate task based on download_type
|
|
download_type = task_info.get("download_type", "unknown")
|
|
new_celery_task_obj = None
|
|
|
|
logger.info(
|
|
f"Retrying task {task_id} as {new_task_id} (retry {retry_count + 1}/{max_retries})"
|
|
)
|
|
|
|
if download_type == "track":
|
|
new_celery_task_obj = download_track.apply_async(
|
|
kwargs=task_info, task_id=new_task_id, queue="downloads"
|
|
)
|
|
elif download_type == "album":
|
|
new_celery_task_obj = download_album.apply_async(
|
|
kwargs=task_info, task_id=new_task_id, queue="downloads"
|
|
)
|
|
elif download_type == "playlist":
|
|
new_celery_task_obj = download_playlist.apply_async(
|
|
kwargs=task_info, task_id=new_task_id, queue="downloads"
|
|
)
|
|
else:
|
|
logger.error(f"Unknown download type for retry: {download_type}")
|
|
store_task_status(
|
|
new_task_id,
|
|
{
|
|
"status": ProgressState.ERROR,
|
|
"error": f"Cannot retry: Unknown download type '{download_type}' for original task {task_id}",
|
|
"timestamp": time.time(),
|
|
},
|
|
)
|
|
return {
|
|
"status": "error",
|
|
"error": f"Unknown download type: {download_type}",
|
|
}
|
|
|
|
# If retry was successfully submitted, update the original task's status
|
|
if new_celery_task_obj:
|
|
store_task_status(
|
|
task_id,
|
|
{
|
|
"status": "ERROR_RETRIED",
|
|
"error": f"Task superseded by retry: {new_task_id}",
|
|
"retried_as_task_id": new_task_id,
|
|
"timestamp": time.time(),
|
|
},
|
|
)
|
|
logger.info(
|
|
f"Original task {task_id} status updated to ERROR_RETRIED, superseded by {new_task_id}"
|
|
)
|
|
else:
|
|
logger.error(
|
|
f"Retry submission for task {task_id} (as {new_task_id}) did not return a Celery AsyncResult. Original task not marked as ERROR_RETRIED."
|
|
)
|
|
|
|
return {
|
|
"status": "requeued",
|
|
"task_id": new_task_id,
|
|
"retry_count": retry_count + 1,
|
|
"max_retries": max_retries,
|
|
"retry_delay": retry_delay,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error retrying task {task_id}: {e}", exc_info=True)
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
class ProgressTrackingTask(Task):
|
|
"""Base task class that tracks progress through callbacks"""
|
|
|
|
def progress_callback(self, progress_data):
|
|
"""
|
|
Process progress data from deezspot library callbacks using the optimized approach
|
|
based on known status types and flow patterns.
|
|
|
|
Args:
|
|
progress_data: Dictionary containing progress information from deezspot
|
|
"""
|
|
# Store a copy of the original, unprocessed callback data
|
|
raw_callback_data = progress_data.copy()
|
|
|
|
task_id = self.request.id
|
|
|
|
# Ensure ./logs/tasks directory exists
|
|
logs_tasks_dir = Path("./logs/tasks")
|
|
try:
|
|
logs_tasks_dir.mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Task {task_id}: Could not create log directory {logs_tasks_dir}: {e}"
|
|
)
|
|
|
|
# Define log file path
|
|
log_file_path = logs_tasks_dir / f"{task_id}.log"
|
|
|
|
# Log progress_data to the task-specific file
|
|
try:
|
|
with open(log_file_path, "a") as log_file:
|
|
log_entry = progress_data.copy()
|
|
if "timestamp" not in log_entry:
|
|
log_entry["timestamp"] = time.time()
|
|
print(json.dumps(log_entry), file=log_file)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Task {task_id}: Could not write to task log file {log_file_path}: {e}"
|
|
)
|
|
|
|
if "timestamp" not in progress_data:
|
|
progress_data["timestamp"] = time.time()
|
|
|
|
# Extract status from status_info (deezspot callback format)
|
|
status_info = progress_data.get("status_info", {})
|
|
status = status_info.get("status", progress_data.get("status", "unknown"))
|
|
task_info = get_task_info(task_id)
|
|
|
|
logger.debug(f"Task {task_id}: Extracted status: '{status}' from callback")
|
|
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug(
|
|
f"Task {task_id}: Raw progress data: {json.dumps(progress_data)}"
|
|
)
|
|
|
|
if status == "initializing":
|
|
self._handle_initializing(task_id, progress_data, task_info)
|
|
elif status == "downloading":
|
|
self._handle_downloading(task_id, progress_data, task_info)
|
|
elif status == "progress":
|
|
self._handle_progress(task_id, progress_data, task_info)
|
|
elif status in ["real_time", "track_progress"]:
|
|
self._handle_real_time(task_id, progress_data)
|
|
elif status == "skipped":
|
|
# Re-fetch task_info to ensure we have the latest children_table info
|
|
task_info = get_task_info(task_id)
|
|
self._handle_skipped(task_id, progress_data, task_info)
|
|
elif status == "retrying":
|
|
self._handle_retrying(task_id, progress_data, task_info)
|
|
elif status == "error":
|
|
# Re-fetch task_info to ensure we have the latest children_table info
|
|
task_info = get_task_info(task_id)
|
|
self._handle_error(task_id, progress_data, task_info)
|
|
elif status == "done":
|
|
# Re-fetch task_info to ensure we have the latest children_table info
|
|
task_info = get_task_info(task_id)
|
|
self._handle_done(task_id, progress_data, task_info)
|
|
else:
|
|
logger.info(
|
|
f"Task {task_id} {status}: {progress_data.get('message', 'No details')}"
|
|
)
|
|
|
|
progress_data["raw_callback"] = raw_callback_data
|
|
store_task_status(task_id, progress_data)
|
|
|
|
def _handle_initializing(self, task_id, data, task_info):
|
|
"""Handle initializing status from deezspot"""
|
|
logger.info(f"Task {task_id} initializing...")
|
|
|
|
# Initializing object is now very basic, mainly for acknowledging the start.
|
|
# More detailed info comes with 'progress' or 'downloading' states.
|
|
data["status"] = ProgressState.INITIALIZING
|
|
|
|
# Store initial history entry for download start
|
|
try:
|
|
# Check for album/playlist FIRST since their callbacks contain both parent and track info
|
|
if "album" in data:
|
|
# Album download - create children table and store name in task info
|
|
logger.info(f"Task {task_id}: Creating album children table")
|
|
children_table = history_manager.store_album_history(
|
|
data, task_id, "in_progress"
|
|
)
|
|
if children_table:
|
|
task_info["children_table"] = children_table
|
|
store_task_info(task_id, task_info)
|
|
logger.info(
|
|
f"Task {task_id}: Created and stored children table '{children_table}' in task info"
|
|
)
|
|
else:
|
|
logger.error(
|
|
f"Task {task_id}: Failed to create album children table"
|
|
)
|
|
elif "playlist" in data:
|
|
# Playlist download - create children table and store name in task info
|
|
logger.info(f"Task {task_id}: Creating playlist children table")
|
|
children_table = history_manager.store_playlist_history(
|
|
data, task_id, "in_progress"
|
|
)
|
|
if children_table:
|
|
task_info["children_table"] = children_table
|
|
store_task_info(task_id, task_info)
|
|
logger.info(
|
|
f"Task {task_id}: Created and stored children table '{children_table}' in task info"
|
|
)
|
|
else:
|
|
logger.error(
|
|
f"Task {task_id}: Failed to create playlist children table"
|
|
)
|
|
elif "track" in data:
|
|
# Individual track download - check if it's part of an album/playlist
|
|
children_table = task_info.get("children_table")
|
|
if children_table:
|
|
# Track is part of album/playlist - don't store in main table during initialization
|
|
logger.info(
|
|
f"Task {task_id}: Skipping track initialization storage (part of album/playlist, children table: {children_table})"
|
|
)
|
|
else:
|
|
# Individual track download - store in main table
|
|
logger.info(
|
|
f"Task {task_id}: Storing individual track history (initializing)"
|
|
)
|
|
history_manager.store_track_history(data, task_id, "in_progress")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to store initial history for task {task_id}: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
def _handle_downloading(self, task_id, data, task_info):
|
|
"""Handle downloading status from deezspot"""
|
|
track_obj = data.get("track", {})
|
|
track_name = track_obj.get("title", "Unknown")
|
|
|
|
artists = track_obj.get("artists", [])
|
|
artist_name = artists[0].get("name", "") if artists else ""
|
|
|
|
album_obj = track_obj.get("album", {})
|
|
album_name = album_obj.get("title", "")
|
|
|
|
logger.info(
|
|
f"Task {task_id}: Starting download for track '{track_name}' by {artist_name}"
|
|
)
|
|
|
|
data["status"] = ProgressState.DOWNLOADING
|
|
data["song"] = track_name
|
|
data["artist"] = artist_name
|
|
data["album"] = album_name
|
|
|
|
def _handle_progress(self, task_id, data, task_info):
|
|
"""Handle progress status for albums/playlists from deezspot"""
|
|
item = data.get("playlist") or data.get("album", {})
|
|
track = data.get("track", {})
|
|
|
|
item_name = item.get("title", "Unknown Item")
|
|
total_tracks = item.get("total_tracks", 0)
|
|
|
|
track_name = track.get("title", "Unknown Track")
|
|
artists = track.get("artists", [])
|
|
artist_name = artists[0].get("name", "") if artists else ""
|
|
|
|
# The 'progress' field in the callback is the track number being processed
|
|
current_track_num = data.get("progress", 0)
|
|
|
|
if total_tracks > 0:
|
|
task_info["total_tracks"] = total_tracks
|
|
task_info["completed_tracks"] = current_track_num - 1
|
|
task_info["current_track_num"] = current_track_num
|
|
store_task_info(task_id, task_info)
|
|
|
|
overall_progress = min(
|
|
int(((current_track_num - 1) / total_tracks) * 100), 100
|
|
)
|
|
data["overall_progress"] = overall_progress
|
|
data["parsed_current_track"] = current_track_num
|
|
data["parsed_total_tracks"] = total_tracks
|
|
|
|
logger.info(
|
|
f"Task {task_id}: Progress on '{item_name}': Processing track {current_track_num}/{total_tracks} - '{track_name}'"
|
|
)
|
|
|
|
data["status"] = ProgressState.PROGRESS
|
|
data["song"] = track_name
|
|
data["artist"] = artist_name
|
|
data["current_track"] = f"{current_track_num}/{total_tracks}"
|
|
|
|
def _handle_real_time(self, task_id, data):
|
|
"""Handle real-time progress status from deezspot"""
|
|
track_obj = data.get("track", {})
|
|
track_name = track_obj.get("title", "Unknown Track")
|
|
percentage = data.get("percentage", 0)
|
|
|
|
logger.debug(
|
|
f"Task {task_id}: Real-time progress for '{track_name}': {percentage}%"
|
|
)
|
|
|
|
data["song"] = track_name
|
|
artist = data.get("artist", "Unknown")
|
|
|
|
# Handle percent formatting
|
|
percent = data.get("percent", data.get("percentage", 0))
|
|
if isinstance(percent, float) and percent <= 1.0:
|
|
percent = int(percent * 100)
|
|
data["percent"] = percent
|
|
|
|
# Calculate download rate if bytes_received is available
|
|
if "bytes_received" in data:
|
|
last_update = data.get("last_update_time", data["timestamp"])
|
|
bytes_received = data["bytes_received"]
|
|
last_bytes = data.get("last_bytes_received", 0)
|
|
time_diff = data["timestamp"] - last_update
|
|
|
|
if time_diff > 0 and bytes_received > last_bytes:
|
|
bytes_diff = bytes_received - last_bytes
|
|
download_rate = bytes_diff / time_diff
|
|
data["download_rate"] = download_rate
|
|
data["last_update_time"] = data["timestamp"]
|
|
data["last_bytes_received"] = bytes_received
|
|
|
|
# Format download rate for display
|
|
if download_rate < 1024:
|
|
data["download_rate_formatted"] = f"{download_rate:.2f} B/s"
|
|
elif download_rate < 1024 * 1024:
|
|
data["download_rate_formatted"] = f"{download_rate / 1024:.2f} KB/s"
|
|
else:
|
|
data["download_rate_formatted"] = (
|
|
f"{download_rate / (1024 * 1024):.2f} MB/s"
|
|
)
|
|
|
|
# Log at debug level
|
|
logger.debug(
|
|
f"Task {task_id} track progress: {track_name} by {artist}: {percent}%"
|
|
)
|
|
|
|
def _handle_skipped(self, task_id, data, task_info):
|
|
"""Handle skipped status from deezspot"""
|
|
|
|
# Store skipped history for deezspot callback format
|
|
try:
|
|
if "track" in data:
|
|
# Individual track skipped - check if we should use children table
|
|
children_table = task_info.get("children_table")
|
|
logger.debug(
|
|
f"Task {task_id}: Skipped track, children_table = '{children_table}'"
|
|
)
|
|
if children_table:
|
|
# Part of album/playlist - store progressively in children table
|
|
logger.info(
|
|
f"Task {task_id}: Storing skipped track in children table '{children_table}' (progressive)"
|
|
)
|
|
history_manager.store_track_history(
|
|
data, task_id, "skipped", children_table
|
|
)
|
|
else:
|
|
# Individual track download - store in main table
|
|
logger.info(
|
|
f"Task {task_id}: Storing skipped track in main table (individual download)"
|
|
)
|
|
history_manager.store_track_history(data, task_id, "skipped")
|
|
except Exception as e:
|
|
logger.error(f"Failed to store skipped history for task {task_id}: {e}")
|
|
|
|
# Extract track info (legacy format support)
|
|
title = data.get("song", "Unknown")
|
|
artist = data.get("artist", "Unknown")
|
|
reason = data.get("reason", "Unknown reason")
|
|
|
|
# Log skip
|
|
logger.info(f"Task {task_id} skipped: {artist} - {title}")
|
|
logger.debug(f"Task {task_id} skip reason: {reason}")
|
|
|
|
# Update task info
|
|
skipped_tracks = task_info.get("skipped_tracks", 0) + 1
|
|
task_info["skipped_tracks"] = skipped_tracks
|
|
store_task_info(task_id, task_info)
|
|
|
|
# Check if part of album/playlist
|
|
parent_type = task_info.get("type", "").lower()
|
|
if parent_type in ["album", "playlist"]:
|
|
total_tracks = task_info.get("total_tracks", 0)
|
|
processed_tracks = task_info.get("completed_tracks", 0) + skipped_tracks
|
|
|
|
if total_tracks > 0:
|
|
overall_progress = min(
|
|
int((processed_tracks / total_tracks) * 100), 100
|
|
)
|
|
|
|
# Create parent progress update
|
|
progress_update = {
|
|
"status": ProgressState.PROGRESS,
|
|
"type": parent_type,
|
|
"track": title,
|
|
"current_track": f"{processed_tracks}/{total_tracks}",
|
|
"album": data.get("album", ""),
|
|
"artist": artist,
|
|
"timestamp": data["timestamp"],
|
|
"parsed_current_track": processed_tracks,
|
|
"parsed_total_tracks": total_tracks,
|
|
"overall_progress": overall_progress,
|
|
"track_skipped": True,
|
|
"skip_reason": reason,
|
|
"parent_task": True,
|
|
}
|
|
|
|
# Store progress update
|
|
store_task_status(task_id, progress_update)
|
|
|
|
# Set status
|
|
# data["status"] = ProgressState.SKIPPED
|
|
|
|
def _handle_retrying(self, task_id, data, task_info):
|
|
"""Handle retrying status from deezspot"""
|
|
# Extract retry info
|
|
song = data.get("song", "Unknown")
|
|
artist = data.get("artist", "Unknown")
|
|
retry_count = data.get("retry_count", 0)
|
|
seconds_left = data.get("seconds_left", 0)
|
|
error = data.get("error", "Unknown error")
|
|
|
|
# Log retry
|
|
logger.warning(
|
|
f"Task {task_id} retrying: {artist} - {song} (Attempt {retry_count}, waiting {seconds_left}s)"
|
|
)
|
|
logger.debug(f"Task {task_id} retry reason: {error}")
|
|
|
|
# Update task info
|
|
retry_count_total = task_info.get("retry_count", 0) + 1
|
|
task_info["retry_count"] = retry_count_total
|
|
store_task_info(task_id, task_info)
|
|
|
|
# Set status
|
|
# data["status"] = ProgressState.RETRYING
|
|
|
|
def _handle_error(self, task_id, data, task_info):
|
|
"""Handle error status from deezspot"""
|
|
|
|
# Store error history for deezspot callback format
|
|
try:
|
|
# Check for album/playlist FIRST since their callbacks contain both parent and track info
|
|
if "album" in data:
|
|
# Album failed - store in main table
|
|
logger.info(f"Task {task_id}: Storing album history (failed)")
|
|
history_manager.store_album_history(data, task_id, "failed")
|
|
elif "playlist" in data:
|
|
# Playlist failed - store in main table
|
|
logger.info(f"Task {task_id}: Storing playlist history (failed)")
|
|
history_manager.store_playlist_history(data, task_id, "failed")
|
|
elif "track" in data:
|
|
# Individual track failed - check if we should use children table
|
|
children_table = task_info.get("children_table")
|
|
logger.debug(
|
|
f"Task {task_id}: Failed track, children_table = '{children_table}'"
|
|
)
|
|
if children_table:
|
|
# Part of album/playlist - store progressively in children table
|
|
logger.info(
|
|
f"Task {task_id}: Storing failed track in children table '{children_table}' (progressive)"
|
|
)
|
|
history_manager.store_track_history(
|
|
data, task_id, "failed", children_table
|
|
)
|
|
else:
|
|
# Individual track download - store in main table
|
|
logger.info(
|
|
f"Task {task_id}: Storing failed track in main table (individual download)"
|
|
)
|
|
history_manager.store_track_history(data, task_id, "failed")
|
|
except Exception as e:
|
|
logger.error(f"Failed to store error history for task {task_id}: {e}")
|
|
|
|
# Extract error info (legacy format support)
|
|
message = data.get("message", "Unknown error")
|
|
|
|
# Log error
|
|
logger.error(f"Task {task_id} error: {message}")
|
|
|
|
# Update task info
|
|
error_count = task_info.get("error_count", 0) + 1
|
|
task_info["error_count"] = error_count
|
|
store_task_info(task_id, task_info)
|
|
|
|
# Set status and error message
|
|
# data["status"] = ProgressState.ERROR
|
|
data["error"] = message
|
|
|
|
def _handle_done(self, task_id, data, task_info):
|
|
"""Handle done status from deezspot"""
|
|
|
|
# Store completion history for deezspot callback format
|
|
try:
|
|
# Check for album/playlist FIRST since their callbacks contain both parent and track info
|
|
if "album" in data:
|
|
# Album completion with summary - store in main table
|
|
logger.info(f"Task {task_id}: Storing album history (completed)")
|
|
history_manager.store_album_history(data, task_id, "completed")
|
|
elif "playlist" in data:
|
|
# Playlist completion with summary - store in main table
|
|
logger.info(f"Task {task_id}: Storing playlist history (completed)")
|
|
history_manager.store_playlist_history(data, task_id, "completed")
|
|
elif "track" in data:
|
|
# Individual track completion - check if we should use children table
|
|
children_table = task_info.get("children_table")
|
|
logger.debug(
|
|
f"Task {task_id}: Completed track, children_table = '{children_table}'"
|
|
)
|
|
if children_table:
|
|
# Part of album/playlist - store progressively in children table
|
|
logger.info(
|
|
f"Task {task_id}: Storing completed track in children table '{children_table}' (progressive)"
|
|
)
|
|
history_manager.store_track_history(
|
|
data, task_id, "completed", children_table
|
|
)
|
|
else:
|
|
# Individual track download - store in main table
|
|
logger.info(
|
|
f"Task {task_id}: Storing completed track in main table (individual download)"
|
|
)
|
|
history_manager.store_track_history(data, task_id, "completed")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to store completion history for task {task_id}: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
# Extract data (legacy format support)
|
|
content_type = data.get("type", "").lower()
|
|
album = data.get("album", "")
|
|
artist = data.get("artist", "")
|
|
song = data.get("song", "")
|
|
|
|
# Handle based on content type
|
|
if content_type == "track":
|
|
# For track completions
|
|
if artist and song:
|
|
logger.info(f"Task {task_id} completed: Track '{song}' by {artist}")
|
|
else:
|
|
logger.info(f"Task {task_id} completed: Track '{song}'")
|
|
|
|
# Update status to track_complete
|
|
# data["status"] = ProgressState.TRACK_COMPLETE
|
|
|
|
# Update task info
|
|
completed_tracks = task_info.get("completed_tracks", 0) + 1
|
|
task_info["completed_tracks"] = completed_tracks
|
|
store_task_info(task_id, task_info)
|
|
|
|
# If part of album/playlist, update progress
|
|
parent_type = task_info.get("type", "").lower()
|
|
if parent_type in ["album", "playlist"]:
|
|
total_tracks = task_info.get("total_tracks", 0)
|
|
if total_tracks > 0:
|
|
completion_percent = int((completed_tracks / total_tracks) * 100)
|
|
|
|
# Create progress update
|
|
progress_update = {
|
|
"status": ProgressState.PROGRESS,
|
|
"type": parent_type,
|
|
"track": song,
|
|
"current_track": f"{completed_tracks}/{total_tracks}",
|
|
"album": album,
|
|
"artist": artist,
|
|
"timestamp": data["timestamp"],
|
|
"parsed_current_track": completed_tracks,
|
|
"parsed_total_tracks": total_tracks,
|
|
"overall_progress": completion_percent,
|
|
"track_complete": True,
|
|
"parent_task": True,
|
|
}
|
|
|
|
# Store progress update
|
|
store_task_status(task_id, progress_update)
|
|
|
|
elif content_type in ["album", "playlist"]:
|
|
# Get completion counts
|
|
completed_tracks = task_info.get("completed_tracks", 0)
|
|
skipped_tracks = task_info.get("skipped_tracks", 0)
|
|
error_count = task_info.get("error_count", 0)
|
|
|
|
# Log completion
|
|
if album and artist:
|
|
logger.info(
|
|
f"Task {task_id} completed: {content_type.upper()} '{album}' by {artist}"
|
|
)
|
|
elif album:
|
|
logger.info(
|
|
f"Task {task_id} completed: {content_type.upper()} '{album}'"
|
|
)
|
|
else:
|
|
name = data.get("name", "")
|
|
if name:
|
|
logger.info(
|
|
f"Task {task_id} completed: {content_type.upper()} '{name}'"
|
|
)
|
|
else:
|
|
logger.info(f"Task {task_id} completed: {content_type.upper()}")
|
|
|
|
# Add summary
|
|
# data["status"] = ProgressState.COMPLETE
|
|
summary_obj = data.get("summary")
|
|
|
|
if summary_obj:
|
|
total_successful = summary_obj.get("total_successful", 0)
|
|
total_skipped = summary_obj.get("total_skipped", 0)
|
|
total_failed = summary_obj.get("total_failed", 0)
|
|
# data[
|
|
# "message"
|
|
# ] = f"Download complete: {total_successful} tracks downloaded, {total_skipped} skipped, {total_failed} failed."
|
|
# Log summary from the summary object
|
|
logger.info(
|
|
f"Task {task_id} summary: {total_successful} successful, {total_skipped} skipped, {total_failed} failed."
|
|
)
|
|
else:
|
|
# data["message"] = (
|
|
# f"Download complete: {completed_tracks} tracks downloaded, {skipped_tracks} skipped"
|
|
# )
|
|
# Log summary
|
|
logger.info(
|
|
f"Task {task_id} summary: {completed_tracks} completed, {skipped_tracks} skipped, {error_count} errors"
|
|
)
|
|
# Schedule deletion for completed multi-track downloads
|
|
delayed_delete_task_data.apply_async(
|
|
args=[task_id, "Task completed successfully and auto-cleaned."],
|
|
countdown=3, # Delay in seconds
|
|
)
|
|
|
|
# If from playlist_watch and successful, add track to DB
|
|
original_request = task_info.get("original_request", {})
|
|
if (
|
|
original_request.get("source") == "playlist_watch"
|
|
and task_info.get("download_type") == "track"
|
|
): # ensure it's a track for playlist
|
|
playlist_id = original_request.get("playlist_id")
|
|
track_item_for_db = original_request.get("track_item_for_db")
|
|
|
|
if playlist_id and track_item_for_db and track_item_for_db.get("track"):
|
|
logger.info(
|
|
f"Task {task_id} was from playlist watch for playlist {playlist_id}. Adding track to DB."
|
|
)
|
|
try:
|
|
add_single_track_to_playlist_db(playlist_id, track_item_for_db)
|
|
except Exception as db_add_err:
|
|
logger.error(
|
|
f"Failed to add track to DB for playlist {playlist_id} after successful download task {task_id}: {db_add_err}",
|
|
exc_info=True,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Task {task_id} was from playlist_watch but missing playlist_id or track_item_for_db for DB update. Original Request: {original_request}"
|
|
)
|
|
|
|
# If from artist_watch and successful, update album in DB
|
|
if (
|
|
original_request.get("source") == "artist_watch"
|
|
and task_info.get("download_type") == "album"
|
|
):
|
|
artist_spotify_id = original_request.get("artist_spotify_id")
|
|
album_data_for_db = original_request.get("album_data_for_db")
|
|
|
|
if (
|
|
artist_spotify_id
|
|
and album_data_for_db
|
|
and album_data_for_db.get("id")
|
|
):
|
|
album_spotify_id = album_data_for_db.get("id")
|
|
logger.info(
|
|
f"Task {task_id} was from artist watch for artist {artist_spotify_id}, album {album_spotify_id}. Updating album in DB as complete."
|
|
)
|
|
try:
|
|
add_or_update_album_for_artist(
|
|
artist_spotify_id=artist_spotify_id,
|
|
album_data=album_data_for_db,
|
|
task_id=task_id,
|
|
is_download_complete=True,
|
|
)
|
|
except Exception as db_update_err:
|
|
logger.error(
|
|
f"Failed to update album {album_spotify_id} in DB for artist {artist_spotify_id} after successful download task {task_id}: {db_update_err}",
|
|
exc_info=True,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Task {task_id} was from artist_watch (album) but missing key data (artist_spotify_id or album_data_for_db) for DB update. Original Request: {original_request}"
|
|
)
|
|
|
|
else:
|
|
# Generic done for other types
|
|
logger.info(f"Task {task_id} completed: {content_type.upper()}")
|
|
# data["status"] = ProgressState.COMPLETE
|
|
# data["message"] = "Download complete"
|
|
|
|
|
|
# Celery signal handlers
|
|
@task_prerun.connect
|
|
def task_prerun_handler(task_id=None, task=None, *args, **kwargs):
|
|
"""Signal handler when a task begins running"""
|
|
try:
|
|
# Skip verbose logging for SSE tasks
|
|
if task and hasattr(task, "name") and task.name in ["trigger_sse_update_task"]:
|
|
return
|
|
|
|
task_info = get_task_info(task_id)
|
|
|
|
# Update task status to processing
|
|
store_task_status(
|
|
task_id,
|
|
{
|
|
"status": ProgressState.PROCESSING,
|
|
"timestamp": time.time(),
|
|
"type": task_info.get("type", "unknown"),
|
|
"name": task_info.get("name", "Unknown"),
|
|
"artist": task_info.get("artist", ""),
|
|
},
|
|
)
|
|
|
|
logger.info(
|
|
f"Task {task_id} started processing: {task_info.get('name', 'Unknown')}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error in task_prerun_handler: {e}")
|
|
|
|
|
|
@task_postrun.connect
|
|
def task_postrun_handler(
|
|
task_id=None, task=None, retval=None, state=None, *args, **kwargs
|
|
):
|
|
"""Signal handler when a task finishes"""
|
|
try:
|
|
# Skip verbose logging for SSE tasks
|
|
if task and hasattr(task, "name") and task.name in ["trigger_sse_update_task"]:
|
|
return
|
|
|
|
last_status_for_history = get_last_task_status(task_id)
|
|
if last_status_for_history and last_status_for_history.get("status") in [
|
|
ProgressState.COMPLETE,
|
|
ProgressState.ERROR,
|
|
ProgressState.CANCELLED,
|
|
"ERROR_RETRIED",
|
|
"ERROR_AUTO_CLEANED",
|
|
]:
|
|
if (
|
|
state == states.REVOKED
|
|
and last_status_for_history.get("status") != ProgressState.CANCELLED
|
|
):
|
|
logger.info(f"Task {task_id} was REVOKED (likely cancelled).")
|
|
# return # Let status update proceed if necessary
|
|
|
|
task_info = get_task_info(task_id)
|
|
current_redis_status = (
|
|
last_status_for_history.get("status") if last_status_for_history else None
|
|
)
|
|
|
|
# If task was cancelled/revoked, finalize parent history with partial summary
|
|
try:
|
|
if (
|
|
state == states.REVOKED
|
|
or current_redis_status == ProgressState.CANCELLED
|
|
):
|
|
parent_type = (
|
|
task_info.get("download_type") or task_info.get("type") or ""
|
|
).lower()
|
|
if parent_type in ["album", "playlist"]:
|
|
# Build detailed summary from the task log
|
|
summary = build_partial_summary_from_task_log(task_id, parent_type)
|
|
status_info = {"status": "done", "summary": summary}
|
|
title = task_info.get("name", "Unknown")
|
|
total_tracks = task_info.get("total_tracks", 0)
|
|
|
|
# Try to enrich parent payload with initial callback object (to capture artists, ids, images)
|
|
try:
|
|
log_lines = _read_task_log_json_lines(task_id)
|
|
initial_parent = _extract_initial_parent_object(
|
|
log_lines, parent_type
|
|
)
|
|
except Exception:
|
|
initial_parent = None
|
|
|
|
if parent_type == "album":
|
|
album_payload = {"title": title, "total_tracks": total_tracks}
|
|
if isinstance(initial_parent, dict):
|
|
for k in [
|
|
"artists",
|
|
"ids",
|
|
"images",
|
|
"release_date",
|
|
"genres",
|
|
"album_type",
|
|
"tracks",
|
|
]:
|
|
if k in initial_parent:
|
|
album_payload[k] = initial_parent.get(k)
|
|
# Ensure a main history entry exists even on cancellation
|
|
history_manager.store_album_history(
|
|
{"album": album_payload, "status_info": status_info},
|
|
task_id,
|
|
"failed",
|
|
)
|
|
else:
|
|
playlist_payload = {"title": title}
|
|
if isinstance(initial_parent, dict):
|
|
for k in [
|
|
"owner",
|
|
"ids",
|
|
"images",
|
|
"tracks",
|
|
"description",
|
|
]:
|
|
if k in initial_parent:
|
|
playlist_payload[k] = initial_parent.get(k)
|
|
history_manager.store_playlist_history(
|
|
{"playlist": playlist_payload, "status_info": status_info},
|
|
task_id,
|
|
"failed",
|
|
)
|
|
except Exception as finalize_err:
|
|
logger.error(
|
|
f"Failed to finalize partial history for cancelled task {task_id}: {finalize_err}",
|
|
exc_info=True,
|
|
)
|
|
|
|
if state == states.SUCCESS:
|
|
if current_redis_status not in [ProgressState.COMPLETE, "done"]:
|
|
# The final status is now set by the 'done' callback from deezspot.
|
|
# We no longer need to store a generic 'COMPLETE' status here.
|
|
# This ensures the raw callback data is the last thing in the log.
|
|
pass
|
|
logger.info(
|
|
f"Task {task_id} completed successfully: {task_info.get('name', 'Unknown')}"
|
|
)
|
|
|
|
if (
|
|
task_info.get("download_type") == "track"
|
|
): # Applies to single track downloads and tracks from playlists/albums
|
|
delayed_delete_task_data.apply_async(
|
|
args=[task_id, "Task completed successfully and auto-cleaned."],
|
|
countdown=3,
|
|
)
|
|
|
|
original_request = task_info.get("original_request", {})
|
|
# Handle successful track from playlist watch
|
|
if (
|
|
original_request.get("source") == "playlist_watch"
|
|
and task_info.get("download_type") == "track"
|
|
):
|
|
playlist_id = original_request.get("playlist_id")
|
|
track_item_for_db = original_request.get("track_item_for_db")
|
|
|
|
if playlist_id and track_item_for_db and track_item_for_db.get("track"):
|
|
logger.info(
|
|
f"Task {task_id} was from playlist watch for playlist {playlist_id}. Adding track to DB."
|
|
)
|
|
try:
|
|
# Use task_id as primary source for metadata extraction
|
|
add_single_track_to_playlist_db(
|
|
playlist_spotify_id=playlist_id,
|
|
track_item_for_db=track_item_for_db, # Keep as fallback
|
|
task_id=task_id, # Primary source for metadata
|
|
)
|
|
|
|
# Update the playlist's m3u file after successful track addition
|
|
try:
|
|
from routes.utils.watch.manager import (
|
|
update_playlist_m3u_file,
|
|
)
|
|
|
|
logger.info(
|
|
f"Updating m3u file for playlist {playlist_id} after successful track download."
|
|
)
|
|
update_playlist_m3u_file(playlist_id)
|
|
except Exception as m3u_update_err:
|
|
logger.error(
|
|
f"Failed to update m3u file for playlist {playlist_id} after successful track download task {task_id}: {m3u_update_err}",
|
|
exc_info=True,
|
|
)
|
|
|
|
except Exception as db_add_err:
|
|
logger.error(
|
|
f"Failed to add track to DB for playlist {playlist_id} after successful download task {task_id}: {db_add_err}",
|
|
exc_info=True,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Task {task_id} was from playlist_watch but missing playlist_id or track_item_for_db for DB update. Original Request: {original_request}"
|
|
)
|
|
|
|
# Handle successful album from artist watch
|
|
if (
|
|
original_request.get("source") == "artist_watch"
|
|
and task_info.get("download_type") == "album"
|
|
):
|
|
artist_spotify_id = original_request.get("artist_spotify_id")
|
|
album_data_for_db = original_request.get("album_data_for_db")
|
|
|
|
if (
|
|
artist_spotify_id
|
|
and album_data_for_db
|
|
and album_data_for_db.get("id")
|
|
):
|
|
album_spotify_id = album_data_for_db.get("id")
|
|
logger.info(
|
|
f"Task {task_id} was from artist watch for artist {artist_spotify_id}, album {album_spotify_id}. Updating album in DB as complete."
|
|
)
|
|
try:
|
|
add_or_update_album_for_artist(
|
|
artist_spotify_id=artist_spotify_id,
|
|
album_data=album_data_for_db,
|
|
task_id=task_id,
|
|
is_download_complete=True,
|
|
)
|
|
except Exception as db_update_err:
|
|
logger.error(
|
|
f"Failed to update album {album_spotify_id} in DB for artist {artist_spotify_id} after successful download task {task_id}: {db_update_err}",
|
|
exc_info=True,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Task {task_id} was from artist_watch (album) but missing key data (artist_spotify_id or album_data_for_db) for DB update. Original Request: {original_request}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in task_postrun_handler: {e}", exc_info=True)
|
|
|
|
|
|
@task_failure.connect
|
|
def task_failure_handler(
|
|
task_id=None, exception=None, traceback=None, sender=None, *args, **kwargs
|
|
):
|
|
"""Signal handler when a task fails"""
|
|
try:
|
|
# Skip if Retry exception
|
|
if isinstance(exception, Retry):
|
|
return
|
|
|
|
# Get task info and status
|
|
task_info = get_task_info(task_id)
|
|
last_status = get_last_task_status(task_id)
|
|
|
|
# Get retry count
|
|
retry_count = 0
|
|
if last_status:
|
|
retry_count = last_status.get("retry_count", 0)
|
|
|
|
# Get retry configuration
|
|
config_params = get_config_params()
|
|
max_retries = config_params.get("maxRetries", 3)
|
|
|
|
# Check if we can retry
|
|
can_retry = retry_count < max_retries
|
|
|
|
# Update task status to error in Redis if not already an error
|
|
if last_status and last_status.get("status") != ProgressState.ERROR:
|
|
store_task_status(
|
|
task_id,
|
|
{
|
|
"status": ProgressState.ERROR,
|
|
"timestamp": time.time(),
|
|
"type": task_info.get("type", "unknown"),
|
|
"name": task_info.get("name", "Unknown"),
|
|
"artist": task_info.get("artist", ""),
|
|
"error": str(exception),
|
|
"traceback": str(traceback),
|
|
"can_retry": can_retry,
|
|
"retry_count": retry_count,
|
|
"max_retries": max_retries,
|
|
},
|
|
)
|
|
|
|
logger.error(f"Task {task_id} failed: {str(exception)}")
|
|
|
|
if can_retry:
|
|
logger.info(f"Task {task_id} can be retried ({retry_count}/{max_retries})")
|
|
else:
|
|
# If task cannot be retried, schedule its data for deletion
|
|
logger.info(
|
|
f"Task {task_id} failed and cannot be retried. Data scheduled for deletion in 3s."
|
|
)
|
|
delayed_delete_task_data.apply_async(
|
|
args=[
|
|
task_id,
|
|
f"Task failed ({str(exception)}) and max retries reached. Auto-cleaned.",
|
|
],
|
|
countdown=3,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in task_failure_handler: {e}")
|
|
|
|
|
|
@worker_ready.connect
|
|
def worker_ready_handler(**kwargs):
|
|
"""Signal handler when a worker starts up"""
|
|
logger.info("Celery worker ready and listening for tasks")
|
|
|
|
# Check Redis connection
|
|
try:
|
|
redis_client.ping()
|
|
logger.info("Redis connection successful")
|
|
except Exception as e:
|
|
logger.error(f"Redis connection failed: {e}")
|
|
|
|
|
|
# Define the download tasks
|
|
@celery_app.task(
|
|
bind=True, base=ProgressTrackingTask, name="download_track", queue="downloads"
|
|
)
|
|
def download_track(self, **task_data):
|
|
"""
|
|
Task to download a track
|
|
|
|
Args:
|
|
**task_data: Dictionary containing all task parameters
|
|
"""
|
|
try:
|
|
logger.info(
|
|
f"Processing track download task: {task_data.get('name', 'Unknown')}"
|
|
)
|
|
from routes.utils.track import download_track as download_track_func
|
|
|
|
# Get config parameters
|
|
config_params = get_config_params()
|
|
service = config_params.get("service")
|
|
fallback_enabled = config_params.get("fallback", False)
|
|
|
|
# Determine service parameters
|
|
if service == "spotify":
|
|
if fallback_enabled:
|
|
main = config_params.get("spotify", "")
|
|
fallback = config_params.get("deezer", "")
|
|
quality = config_params.get("deezerQuality", "MP3_128")
|
|
fall_quality = config_params.get("spotifyQuality", "NORMAL")
|
|
else:
|
|
main = config_params.get("spotify", "")
|
|
fallback = None
|
|
quality = config_params.get("spotifyQuality", "NORMAL")
|
|
fall_quality = None
|
|
elif service == "deezer":
|
|
main = config_params.get("deezer", "")
|
|
fallback = None
|
|
quality = config_params.get("deezerQuality", "MP3_128")
|
|
fall_quality = None
|
|
else:
|
|
main = config_params.get("spotify", "")
|
|
fallback = None
|
|
quality = config_params.get("spotifyQuality", "NORMAL")
|
|
fall_quality = None
|
|
|
|
# Get remaining parameters
|
|
url = task_data.get("url", "")
|
|
real_time = task_data.get("real_time", config_params.get("realTime", False))
|
|
custom_dir_format = task_data.get(
|
|
"custom_dir_format",
|
|
config_params.get("customDirFormat", "%ar_album%/%album%"),
|
|
)
|
|
custom_track_format = task_data.get(
|
|
"custom_track_format",
|
|
config_params.get("customTrackFormat", "%tracknum%. %music%"),
|
|
)
|
|
pad_tracks = task_data.get(
|
|
"pad_tracks", config_params.get("tracknum_padding", True)
|
|
)
|
|
save_cover = task_data.get("save_cover", config_params.get("save_cover", True))
|
|
convert_to = task_data.get("convertTo", config_params.get("convertTo"))
|
|
bitrate = task_data.get("bitrate", config_params.get("bitrate"))
|
|
recursive_quality = task_data.get(
|
|
"recursive_quality", config_params.get("recursive_quality", False)
|
|
)
|
|
artist_separator = task_data.get(
|
|
"artist_separator", config_params.get("artist_separator", "; ")
|
|
)
|
|
|
|
# Execute the download - service is now determined from URL
|
|
download_track_func(
|
|
url=url,
|
|
main=main,
|
|
fallback=fallback if fallback_enabled else None,
|
|
quality=quality,
|
|
fall_quality=fall_quality,
|
|
real_time=real_time,
|
|
custom_dir_format=custom_dir_format,
|
|
custom_track_format=custom_track_format,
|
|
pad_tracks=pad_tracks,
|
|
save_cover=save_cover,
|
|
progress_callback=self.progress_callback,
|
|
convert_to=convert_to,
|
|
bitrate=bitrate,
|
|
recursive_quality=recursive_quality,
|
|
artist_separator=artist_separator,
|
|
_is_celery_task_execution=True, # Skip duplicate check inside Celery task (consistency)
|
|
)
|
|
|
|
return {"status": "success", "message": "Track download completed"}
|
|
except Exception as e:
|
|
logger.error(f"Error in download_track task: {e}")
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
|
|
@celery_app.task(
|
|
bind=True, base=ProgressTrackingTask, name="download_album", queue="downloads"
|
|
)
|
|
def download_album(self, **task_data):
|
|
"""
|
|
Task to download an album
|
|
|
|
Args:
|
|
**task_data: Dictionary containing all task parameters
|
|
"""
|
|
try:
|
|
logger.info(
|
|
f"Processing album download task: {task_data.get('name', 'Unknown')}"
|
|
)
|
|
from routes.utils.album import download_album as download_album_func
|
|
|
|
# Get config parameters
|
|
config_params = get_config_params()
|
|
service = config_params.get("service")
|
|
fallback_enabled = config_params.get("fallback", False)
|
|
|
|
# Determine service parameters
|
|
if service == "spotify":
|
|
if fallback_enabled:
|
|
main = config_params.get("spotify", "")
|
|
fallback = config_params.get("deezer", "")
|
|
quality = config_params.get("deezerQuality", "MP3_128")
|
|
fall_quality = config_params.get("spotifyQuality", "NORMAL")
|
|
else:
|
|
main = config_params.get("spotify", "")
|
|
fallback = None
|
|
quality = config_params.get("spotifyQuality", "NORMAL")
|
|
fall_quality = None
|
|
elif service == "deezer":
|
|
main = config_params.get("deezer", "")
|
|
fallback = None
|
|
quality = config_params.get("deezerQuality", "MP3_128")
|
|
fall_quality = None
|
|
else:
|
|
main = config_params.get("spotify", "")
|
|
fallback = None
|
|
quality = config_params.get("spotifyQuality", "NORMAL")
|
|
fall_quality = None
|
|
|
|
# Get remaining parameters
|
|
url = task_data.get("url", "")
|
|
real_time = task_data.get("real_time", config_params.get("realTime", False))
|
|
custom_dir_format = task_data.get(
|
|
"custom_dir_format",
|
|
config_params.get("customDirFormat", "%ar_album%/%album%"),
|
|
)
|
|
custom_track_format = task_data.get(
|
|
"custom_track_format",
|
|
config_params.get("customTrackFormat", "%tracknum%. %music%"),
|
|
)
|
|
pad_tracks = task_data.get(
|
|
"pad_tracks", config_params.get("tracknum_padding", True)
|
|
)
|
|
save_cover = task_data.get("save_cover", config_params.get("save_cover", True))
|
|
convert_to = task_data.get("convertTo", config_params.get("convertTo"))
|
|
bitrate = task_data.get("bitrate", config_params.get("bitrate"))
|
|
recursive_quality = task_data.get(
|
|
"recursive_quality", config_params.get("recursive_quality", False)
|
|
)
|
|
artist_separator = task_data.get(
|
|
"artist_separator", config_params.get("artist_separator", "; ")
|
|
)
|
|
|
|
# Execute the download - service is now determined from URL
|
|
download_album_func(
|
|
url=url,
|
|
main=main,
|
|
fallback=fallback if fallback_enabled else None,
|
|
quality=quality,
|
|
fall_quality=fall_quality,
|
|
real_time=real_time,
|
|
custom_dir_format=custom_dir_format,
|
|
custom_track_format=custom_track_format,
|
|
pad_tracks=pad_tracks,
|
|
save_cover=save_cover,
|
|
progress_callback=self.progress_callback,
|
|
convert_to=convert_to,
|
|
bitrate=bitrate,
|
|
recursive_quality=recursive_quality,
|
|
artist_separator=artist_separator,
|
|
_is_celery_task_execution=True, # Skip duplicate check inside Celery task
|
|
)
|
|
|
|
return {"status": "success", "message": "Album download completed"}
|
|
except Exception as e:
|
|
logger.error(f"Error in download_album task: {e}")
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
|
|
@celery_app.task(
|
|
bind=True, base=ProgressTrackingTask, name="download_playlist", queue="downloads"
|
|
)
|
|
def download_playlist(self, **task_data):
|
|
"""
|
|
Task to download a playlist
|
|
|
|
Args:
|
|
**task_data: Dictionary containing all task parameters
|
|
"""
|
|
try:
|
|
logger.info(
|
|
f"Processing playlist download task: {task_data.get('name', 'Unknown')}"
|
|
)
|
|
from routes.utils.playlist import download_playlist as download_playlist_func
|
|
|
|
# Get config parameters
|
|
config_params = get_config_params()
|
|
service = config_params.get("service")
|
|
fallback_enabled = config_params.get("fallback", False)
|
|
|
|
# Determine service parameters
|
|
if service == "spotify":
|
|
if fallback_enabled:
|
|
main = config_params.get("spotify", "")
|
|
fallback = config_params.get("deezer", "")
|
|
quality = config_params.get("deezerQuality", "MP3_128")
|
|
fall_quality = config_params.get("spotifyQuality", "NORMAL")
|
|
else:
|
|
main = config_params.get("spotify", "")
|
|
fallback = None
|
|
quality = config_params.get("spotifyQuality", "NORMAL")
|
|
fall_quality = None
|
|
elif service == "deezer":
|
|
main = config_params.get("deezer", "")
|
|
fallback = None
|
|
quality = config_params.get("deezerQuality", "MP3_128")
|
|
fall_quality = None
|
|
else:
|
|
main = config_params.get("spotify", "")
|
|
fallback = None
|
|
quality = config_params.get("spotifyQuality", "NORMAL")
|
|
fall_quality = None
|
|
|
|
# Get remaining parameters
|
|
url = task_data.get("url", "")
|
|
real_time = task_data.get("real_time", config_params.get("realTime", False))
|
|
custom_dir_format = task_data.get(
|
|
"custom_dir_format",
|
|
config_params.get("customDirFormat", "%ar_album%/%album%"),
|
|
)
|
|
custom_track_format = task_data.get(
|
|
"custom_track_format",
|
|
config_params.get("customTrackFormat", "%tracknum%. %music%"),
|
|
)
|
|
pad_tracks = task_data.get(
|
|
"pad_tracks", config_params.get("tracknum_padding", True)
|
|
)
|
|
save_cover = task_data.get("save_cover", config_params.get("save_cover", True))
|
|
convert_to = task_data.get("convertTo", config_params.get("convertTo"))
|
|
bitrate = task_data.get("bitrate", config_params.get("bitrate"))
|
|
recursive_quality = task_data.get(
|
|
"recursive_quality", config_params.get("recursive_quality", False)
|
|
)
|
|
artist_separator = task_data.get(
|
|
"artist_separator", config_params.get("artist_separator", "; ")
|
|
)
|
|
|
|
# Get retry parameters
|
|
initial_retry_delay = task_data.get(
|
|
"initial_retry_delay", config_params.get("retryDelaySeconds", 5)
|
|
)
|
|
retry_delay_increase = task_data.get(
|
|
"retry_delay_increase", config_params.get("retry_delay_increase", 5)
|
|
)
|
|
max_retries = task_data.get("max_retries", config_params.get("maxRetries", 3))
|
|
|
|
# Execute the download - service is now determined from URL
|
|
download_playlist_func(
|
|
url=url,
|
|
main=main,
|
|
fallback=fallback if fallback_enabled else None,
|
|
quality=quality,
|
|
fall_quality=fall_quality,
|
|
real_time=real_time,
|
|
custom_dir_format=custom_dir_format,
|
|
custom_track_format=custom_track_format,
|
|
pad_tracks=pad_tracks,
|
|
save_cover=save_cover,
|
|
initial_retry_delay=initial_retry_delay,
|
|
retry_delay_increase=retry_delay_increase,
|
|
max_retries=max_retries,
|
|
progress_callback=self.progress_callback,
|
|
convert_to=convert_to,
|
|
bitrate=bitrate,
|
|
recursive_quality=recursive_quality,
|
|
artist_separator=artist_separator,
|
|
_is_celery_task_execution=True, # Skip duplicate check inside Celery task
|
|
)
|
|
|
|
return {"status": "success", "message": "Playlist download completed"}
|
|
except Exception as e:
|
|
logger.error(f"Error in download_playlist task: {e}")
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
|
|
# Helper function to fully delete task data from Redis
|
|
def delete_task_data_and_log(task_id, reason="Task data deleted"):
|
|
"""
|
|
Marks a task as cancelled (if not already) and deletes all its data from Redis.
|
|
"""
|
|
try:
|
|
task_info = get_task_info(task_id) # Get info before deleting
|
|
last_status = get_last_task_status(task_id)
|
|
current_status_val = last_status.get("status") if last_status else None
|
|
|
|
# Determine the final status for Redis before deletion
|
|
# The reason passed to this function indicates why it's being deleted.
|
|
final_redis_status = (
|
|
ProgressState.ERROR_AUTO_CLEANED
|
|
) # Default for most cleanup scenarios
|
|
error_message_for_status = reason
|
|
|
|
if reason == "Task completed successfully and auto-cleaned.":
|
|
final_redis_status = ProgressState.COMPLETE # It was already complete
|
|
error_message_for_status = "Task completed and auto-cleaned."
|
|
elif reason == "Task cancelled by user and auto-cleaned.":
|
|
final_redis_status = ProgressState.CANCELLED # It was already cancelled
|
|
error_message_for_status = "Task cancelled and auto-cleaned."
|
|
elif "Task failed" in reason and "max retries reached" in reason:
|
|
final_redis_status = (
|
|
ProgressState.ERROR
|
|
) # It was already an error (non-retryable)
|
|
error_message_for_status = reason
|
|
elif reason == "Task interrupted by application restart and auto-cleaned.":
|
|
final_redis_status = (
|
|
ProgressState.ERROR
|
|
) # It was marked as ERROR (interrupted)
|
|
error_message_for_status = reason
|
|
# Add more specific conditions if needed based on other reasons `delayed_delete_task_data` might be called with.
|
|
|
|
# Update Redis status one last time if it's not already reflecting the final intended state for this cleanup.
|
|
# This is mainly for cases where cleanup is initiated for tasks not yet in a fully terminal state by other handlers.
|
|
if current_status_val not in [
|
|
ProgressState.COMPLETE,
|
|
ProgressState.CANCELLED,
|
|
ProgressState.ERROR_RETRIED,
|
|
ProgressState.ERROR_AUTO_CLEANED,
|
|
final_redis_status,
|
|
]:
|
|
store_task_status(
|
|
task_id,
|
|
{
|
|
"status": final_redis_status,
|
|
"error": error_message_for_status, # Use the reason as the error/message for this status
|
|
"timestamp": time.time(),
|
|
},
|
|
)
|
|
|
|
# Delete Redis keys associated with the task
|
|
redis_client.delete(f"task:{task_id}:info")
|
|
redis_client.delete(f"task:{task_id}:status")
|
|
redis_client.delete(f"task:{task_id}:status:next_id")
|
|
|
|
logger.info(
|
|
f"Data for task {task_id} ('{task_info.get('name', 'Unknown')}') deleted from Redis. Reason: {reason}"
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error deleting task data for {task_id}: {e}", exc_info=True)
|
|
return False
|
|
|
|
|
|
@celery_app.task(
|
|
name="cleanup_stale_errors", queue="utility_tasks"
|
|
) # Put on utility_tasks queue
|
|
def cleanup_stale_errors():
|
|
"""
|
|
Periodically checks for tasks in ERROR state for more than 1 minute and cleans them up.
|
|
"""
|
|
logger.info("Running cleanup_stale_errors task...")
|
|
cleaned_count = 0
|
|
try:
|
|
task_keys = redis_client.keys("task:*:info")
|
|
if not task_keys:
|
|
logger.info("No task keys found for cleanup.")
|
|
return {"status": "complete", "message": "No tasks to check."}
|
|
|
|
current_time = time.time()
|
|
stale_threshold = 60 # 1 minute
|
|
|
|
for key_bytes in task_keys:
|
|
task_id = key_bytes.decode("utf-8").split(":")[1]
|
|
last_status = get_last_task_status(task_id)
|
|
|
|
if last_status and last_status.get("status") == ProgressState.ERROR:
|
|
error_timestamp = last_status.get("timestamp", 0)
|
|
if (current_time - error_timestamp) > stale_threshold:
|
|
# Check again to ensure it wasn't retried just before cleanup
|
|
current_last_status_before_delete = get_last_task_status(task_id)
|
|
if (
|
|
current_last_status_before_delete
|
|
and current_last_status_before_delete.get("status")
|
|
== ProgressState.ERROR_RETRIED
|
|
):
|
|
logger.info(
|
|
f"Task {task_id} was retried just before cleanup. Skipping delete."
|
|
)
|
|
continue
|
|
|
|
logger.info(
|
|
f"Task {task_id} is in ERROR state for more than {stale_threshold}s. Cleaning up."
|
|
)
|
|
if delete_task_data_and_log(
|
|
task_id,
|
|
reason=f"Auto-cleaned: Task was in ERROR state for over {stale_threshold} seconds without manual retry.",
|
|
):
|
|
cleaned_count += 1
|
|
|
|
logger.info(
|
|
f"cleanup_stale_errors task finished. Cleaned up {cleaned_count} stale errored tasks."
|
|
)
|
|
return {"status": "complete", "cleaned_count": cleaned_count}
|
|
except Exception as e:
|
|
logger.error(f"Error during cleanup_stale_errors: {e}", exc_info=True)
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
@celery_app.task(
|
|
name="delayed_delete_task_data", queue="utility_tasks"
|
|
) # Use utility_tasks queue
|
|
def delayed_delete_task_data(task_id, reason):
|
|
"""
|
|
Celery task to delete task data after a delay.
|
|
"""
|
|
logger.info(f"Executing delayed deletion for task {task_id}. Reason: {reason}")
|
|
delete_task_data_and_log(task_id, reason)
|
|
|
|
|
|
@celery_app.task(name="trigger_sse_update_task", queue="utility_tasks", bind=True)
|
|
def trigger_sse_update_task(self, task_id: str, reason: str = "status_update"):
|
|
"""
|
|
Dedicated Celery task for triggering SSE task summary updates.
|
|
Uses Redis pub/sub to communicate with the main FastAPI process.
|
|
"""
|
|
try:
|
|
# Send task summary update via Redis pub/sub
|
|
logger.debug(
|
|
f"SSE Task: Processing summary update for task {task_id} (reason: {reason})"
|
|
)
|
|
|
|
event_data = {
|
|
"task_id": task_id,
|
|
"reason": reason,
|
|
"timestamp": time.time(),
|
|
"change_type": "task_summary",
|
|
"event_type": "summary_update",
|
|
}
|
|
|
|
# Use Redis pub/sub for cross-process communication
|
|
redis_client.publish("sse_events", json.dumps(event_data))
|
|
logger.debug(f"SSE Task: Published summary update for task {task_id}")
|
|
|
|
except Exception as e:
|
|
# Only log errors, not success cases
|
|
logger.error(
|
|
f"SSE Task: Failed to publish summary update for task {task_id}: {e}",
|
|
exc_info=True,
|
|
)
|
|
# Don't raise exception to avoid task retry - SSE updates are best-effort
|
|
|
|
|
|
def _extract_initial_parent_object(log_lines: list, parent_type: str) -> dict | None:
|
|
"""Return the first album/playlist object from the log's initializing callback, if present."""
|
|
key = (
|
|
"album"
|
|
if parent_type == "album"
|
|
else ("playlist" if parent_type == "playlist" else None)
|
|
)
|
|
if not key:
|
|
return None
|
|
for obj in log_lines:
|
|
if key in obj and isinstance(obj[key], dict):
|
|
return obj[key]
|
|
return None
|