Implemented Deezspot-compliant error reporting on the backend

This commit is contained in:
Xoconoch
2025-06-14 19:45:10 -07:00
parent 2e966b4245
commit 6bd4cadd37
15 changed files with 606 additions and 377 deletions

View File

@@ -6,6 +6,7 @@ import time
from routes.utils.celery_queue_manager import download_queue_manager
from routes.utils.celery_tasks import store_task_info, store_task_status, ProgressState
from routes.utils.get_info import get_spotify_info
from routes.utils.errors import DuplicateDownloadError
album_bp = Blueprint("album", __name__)
@@ -74,6 +75,17 @@ def handle_download(album_id):
"orig_request": orig_params,
}
)
except DuplicateDownloadError as e:
return Response(
json.dumps(
{
"error": "Duplicate download detected.",
"existing_task": e.existing_task,
}
),
status=409,
mimetype="application/json",
)
except Exception as e:
# Generic error handling for other issues during task submission
# Create an error task ID if add_task itself fails before returning an ID

View File

@@ -27,6 +27,7 @@ from routes.utils.watch.manager import (
check_watched_playlists,
get_watch_config,
) # For manual trigger & config
from routes.utils.errors import DuplicateDownloadError
logger = logging.getLogger(__name__) # Added logger initialization
playlist_bp = Blueprint("playlist", __name__, url_prefix="/api/playlist")
@@ -97,7 +98,17 @@ def handle_download(playlist_id):
"orig_request": orig_params,
}
)
# Removed DuplicateDownloadError handling, add_task now manages this by creating an error task.
except DuplicateDownloadError as e:
return Response(
json.dumps(
{
"error": "Duplicate download detected.",
"existing_task": e.existing_task,
}
),
status=409,
mimetype="application/json",
)
except Exception as e:
# Generic error handling for other issues during task submission
error_task_id = str(uuid.uuid4())

View File

@@ -10,6 +10,7 @@ from routes.utils.celery_tasks import (
cancel_task,
retry_task,
redis_client,
delete_task_data,
)
# Configure logging
@@ -20,6 +21,60 @@ prgs_bp = Blueprint("prgs", __name__, url_prefix="/api/prgs")
# (Old .prg file system removed. Using new task system only.)
def _build_error_callback_object(last_status):
"""
Constructs a structured error callback object based on the last status of a task.
This conforms to the CallbackObject types in the frontend.
"""
# The 'type' from the status update corresponds to the download_type (album, playlist, track)
download_type = last_status.get("type")
name = last_status.get("name")
# The 'artist' field from the status may contain artist names or a playlist owner's name
artist_or_owner = last_status.get("artist")
error_message = last_status.get("error", "An unknown error occurred.")
status_info = {"status": "error", "error": error_message}
callback_object = {"status_info": status_info}
if download_type == "album":
callback_object["album"] = {
"type": "album",
"title": name,
"artists": [{
"type": "artistAlbum",
"name": artist_or_owner
}] if artist_or_owner else [],
}
elif download_type == "playlist":
playlist_payload = {"type": "playlist", "title": name}
if artist_or_owner:
playlist_payload["owner"] = {"type": "user", "name": artist_or_owner}
callback_object["playlist"] = playlist_payload
elif download_type == "track":
callback_object["track"] = {
"type": "track",
"title": name,
"artists": [{
"type": "artistTrack",
"name": artist_or_owner
}] if artist_or_owner else [],
}
else:
# Fallback for unknown types to avoid breaking the client, returning a basic error structure.
return {
"status_info": status_info,
"unstructured_error": True,
"details": {
"type": download_type,
"name": name,
"artist_or_owner": artist_or_owner,
},
}
return callback_object
@prgs_bp.route("/<task_id>", methods=["GET"])
def get_task_details(task_id):
"""
@@ -77,17 +132,26 @@ def get_task_details(task_id):
last_status = get_last_task_status(task_id)
status_count = len(get_task_status(task_id))
# Default to the full last_status object, then check for the raw callback
last_line_content = last_status
# Determine last_line content
if last_status and "raw_callback" in last_status:
last_line_content = last_status["raw_callback"]
elif last_status and last_status.get("status") == "error":
last_line_content = _build_error_callback_object(last_status)
else:
# Fallback for non-error, no raw_callback, or if last_status is None
last_line_content = last_status
response = {
"original_url": dynamic_original_url,
"last_line": last_line_content,
"timestamp": time.time(),
"timestamp": last_status.get("timestamp") if last_status else time.time(),
"task_id": task_id,
"status_count": status_count,
"created_at": task_info.get("created_at"),
"name": task_info.get("name"),
"artist": task_info.get("artist"),
"type": task_info.get("type"),
"download_type": task_info.get("download_type"),
}
if last_status and last_status.get("summary"):
response["summary"] = last_status["summary"]
@@ -106,9 +170,13 @@ def delete_task(task_id):
task_info = get_task_info(task_id)
if not task_info:
abort(404, "Task not found")
# First, cancel the task if it's running
cancel_task(task_id)
redis_client.delete(f"task:{task_id}:info")
redis_client.delete(f"task:{task_id}:status")
# Then, delete all associated data from Redis
delete_task_data(task_id)
return {"message": f"Task {task_id} deleted successfully"}, 200
@@ -116,8 +184,7 @@ def delete_task(task_id):
def list_tasks():
"""
Retrieve a list of all tasks in the system.
Returns a detailed list of task objects including status and metadata,
formatted according to the callback documentation.
Returns a detailed list of task objects including status and metadata.
By default, it returns active tasks. Use ?include_finished=true to include completed tasks.
"""
try:
@@ -133,90 +200,86 @@ def list_tasks():
continue
task_info = get_task_info(task_id)
last_status = get_last_task_status(task_id)
if not task_info:
continue
if task_info and last_status:
# Start with the last status object as the base.
# This object should conform to one of the callback types.
task_details = last_status.copy()
# Dynamically construct original_url
dynamic_original_url = ""
download_type = task_info.get("download_type")
# The 'url' field in task_info stores the Spotify/Deezer URL of the item
# e.g., https://open.spotify.com/album/albumId or https://www.deezer.com/track/trackId
item_url = task_info.get("url")
# Add essential metadata to the task details
task_details["task_id"] = task_id
task_details["original_request"] = task_info.get(
"original_request", {}
)
task_details["created_at"] = task_info.get("created_at", 0)
# Ensure core properties from task_info are present if not in status
if "type" not in task_details:
task_details["type"] = task_info.get("type", "unknown")
if "name" not in task_details:
task_details["name"] = task_info.get("name", "Unknown")
if "artist" not in task_details:
task_details["artist"] = task_info.get("artist", "")
if "download_type" not in task_details:
task_details["download_type"] = task_info.get(
"download_type", "unknown"
if download_type and item_url:
try:
# Extract the ID from the item_url (last part of the path)
item_id = item_url.split("/")[-1]
if item_id: # Ensure item_id is not empty
base_url = request.host_url.rstrip("/")
dynamic_original_url = (
f"{base_url}/api/{download_type}/download/{item_id}"
)
else:
logger.warning(
f"Could not extract item ID from URL: {item_url} for task {task_id}. Falling back for original_url."
)
original_request_obj = task_info.get("original_request", {})
dynamic_original_url = original_request_obj.get(
"original_url", ""
)
except Exception as e:
logger.error(
f"Error constructing dynamic original_url for task {task_id}: {e}",
exc_info=True,
)
detailed_tasks.append(task_details)
elif (
task_info
): # If last_status is somehow missing, still provide some info
detailed_tasks.append(
{
"task_id": task_id,
"type": task_info.get("type", "unknown"),
"name": task_info.get("name", "Unknown"),
"artist": task_info.get("artist", ""),
"download_type": task_info.get("download_type", "unknown"),
"status": "unknown",
"original_request": task_info.get("original_request", {}),
"created_at": task_info.get("created_at", 0),
"timestamp": task_info.get("created_at", 0),
}
original_request_obj = task_info.get("original_request", {})
dynamic_original_url = original_request_obj.get(
"original_url", ""
) # Fallback on any error
else:
logger.warning(
f"Missing download_type ('{download_type}') or item_url ('{item_url}') in task_info for task {task_id}. Falling back for original_url."
)
original_request_obj = task_info.get("original_request", {})
dynamic_original_url = original_request_obj.get("original_url", "")
# Sort tasks by creation time (newest first, or by timestamp if creation time is missing)
detailed_tasks.sort(
key=lambda x: x.get("timestamp", x.get("created_at", 0)), reverse=True
)
last_status = get_last_task_status(task_id)
status_count = len(get_task_status(task_id))
# Determine last_line content
if last_status and "raw_callback" in last_status:
last_line_content = last_status["raw_callback"]
elif last_status and last_status.get("status") == "error":
last_line_content = _build_error_callback_object(last_status)
else:
# Fallback for non-error, no raw_callback, or if last_status is None
last_line_content = last_status
response = {
"original_url": dynamic_original_url,
"last_line": last_line_content,
"timestamp": last_status.get("timestamp") if last_status else time.time(),
"task_id": task_id,
"status_count": status_count,
"created_at": task_info.get("created_at"),
"name": task_info.get("name"),
"artist": task_info.get("artist"),
"type": task_info.get("type"),
"download_type": task_info.get("download_type"),
}
if last_status and last_status.get("summary"):
response["summary"] = last_status["summary"]
detailed_tasks.append(response)
# Sort tasks by creation time (newest first)
detailed_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True)
return jsonify(detailed_tasks)
except Exception as e:
logger.error(f"Error in /api/prgs/list: {e}", exc_info=True)
return jsonify({"error": "Failed to retrieve task list"}), 500
@prgs_bp.route("/retry/<task_id>", methods=["POST"])
def retry_task_endpoint(task_id):
"""
Retry a failed task.
Args:
task_id: The ID of the task to retry
"""
try:
# First check if this is a task ID in the new system
task_info = get_task_info(task_id)
if task_info:
# This is a task ID in the new system
result = retry_task(task_id)
return jsonify(result)
# If not found in new system, we need to handle the old system retry
# For now, return an error as we're transitioning to the new system
return jsonify(
{
"status": "error",
"message": "Retry for old system is not supported in the new API. Please use the new task ID format.",
}
), 400
except Exception as e:
abort(500, f"An error occurred: {e}")
@prgs_bp.route("/cancel/<task_id>", methods=["POST"])
def cancel_task_endpoint(task_id):
"""
@@ -244,3 +307,36 @@ def cancel_task_endpoint(task_id):
), 400
except Exception as e:
abort(500, f"An error occurred: {e}")
@prgs_bp.route("/cancel/all", methods=["POST"])
def cancel_all_tasks():
"""
Cancel all active (running or queued) tasks.
"""
try:
tasks_to_cancel = get_all_tasks(include_finished=False)
cancelled_count = 0
errors = []
for task_summary in tasks_to_cancel:
task_id = task_summary.get("task_id")
if not task_id:
continue
try:
cancel_task(task_id)
cancelled_count += 1
except Exception as e:
error_message = f"Failed to cancel task {task_id}: {e}"
logger.error(error_message)
errors.append(error_message)
response = {
"message": f"Attempted to cancel all active tasks. {cancelled_count} tasks cancelled.",
"cancelled_count": cancelled_count,
"errors": errors,
}
return jsonify(response), 200
except Exception as e:
logger.error(f"Error in /api/prgs/cancel/all: {e}", exc_info=True)
return jsonify({"error": "Failed to cancel all tasks"}), 500

View File

@@ -3,7 +3,10 @@ import json
import traceback
import uuid # For generating error task IDs
import time # For timestamps
from routes.utils.celery_queue_manager import download_queue_manager
from routes.utils.celery_queue_manager import (
download_queue_manager,
get_existing_task_id,
)
from routes.utils.celery_tasks import (
store_task_info,
store_task_status,
@@ -81,6 +84,20 @@ def handle_download(track_id):
mimetype="application/json",
)
# Check for existing task before adding to the queue
existing_task = get_existing_task_id(url)
if existing_task:
return Response(
json.dumps(
{
"error": "Duplicate download detected.",
"existing_task": existing_task,
}
),
status=409,
mimetype="application/json",
)
try:
task_id = download_queue_manager.add_task(
{

View File

@@ -6,6 +6,8 @@ from routes.utils.credentials import (
_get_global_spotify_api_creds,
get_spotify_blob_path,
)
from routes.utils.celery_queue_manager import get_existing_task_id
from routes.utils.errors import DuplicateDownloadError
def download_album(
@@ -25,7 +27,15 @@ def download_album(
progress_callback=None,
convert_to=None,
bitrate=None,
_is_celery_task_execution=False, # Added to skip duplicate check from Celery task
):
if not _is_celery_task_execution:
existing_task = get_existing_task_id(url) # Check for duplicates only if not called by Celery task
if existing_task:
raise DuplicateDownloadError(
f"Download for this URL is already in progress.",
existing_task=existing_task,
)
try:
# Detect URL source (Spotify or Deezer) from URL
is_spotify_url = "open.spotify.com" in url.lower()

View File

@@ -4,7 +4,7 @@ from flask import url_for
from routes.utils.celery_queue_manager import download_queue_manager
from routes.utils.get_info import get_spotify_info
from routes.utils.credentials import get_credential, _get_global_spotify_api_creds
from routes.utils.celery_tasks import get_last_task_status, ProgressState
from routes.utils.errors import DuplicateDownloadError
from deezspot.easy_spoty import Spo
from deezspot.libutils.utils import get_ids, link_is_valid
@@ -112,48 +112,34 @@ def download_artist_albums(
if not url:
raise ValueError("Missing required parameter: url")
# Extract artist ID from URL
artist_id = url.split("/")[-1]
if "?" in artist_id:
artist_id = artist_id.split("?")[0]
logger.info(f"Fetching artist info for ID: {artist_id}")
# Detect URL source (only Spotify is supported for artists)
is_spotify_url = "open.spotify.com" in url.lower()
# Artist functionality only works with Spotify URLs currently
if not is_spotify_url:
if "open.spotify.com" not in url.lower():
error_msg = (
"Invalid URL: Artist functionality only supports open.spotify.com URLs"
)
logger.error(error_msg)
raise ValueError(error_msg)
# Get artist info with albums
artist_data = get_spotify_info(artist_id, "artist_discography")
# Debug logging to inspect the structure of artist_data
logger.debug(
f"Artist data structure has keys: {list(artist_data.keys() if isinstance(artist_data, dict) else [])}"
)
if not artist_data or "items" not in artist_data:
raise ValueError(
f"Failed to retrieve artist data or no albums found for artist ID {artist_id}"
)
# Parse the album types to filter by
allowed_types = [t.strip().lower() for t in album_type.split(",")]
logger.info(f"Filtering albums by types: {allowed_types}")
# Filter albums by the specified types
filtered_albums = []
for album in artist_data.get("items", []):
album_type_value = album.get("album_type", "").lower()
album_group_value = album.get("album_group", "").lower()
# Apply filtering logic based on album_type and album_group
if (
(
"album" in allowed_types
@@ -174,116 +160,54 @@ def download_artist_albums(
logger.warning(f"No albums match the specified types: {album_type}")
return [], []
# Queue each album as a separate download task
album_task_ids = []
successfully_queued_albums = []
duplicate_albums = [] # To store info about albums that were duplicates
duplicate_albums = []
for album in filtered_albums:
# Add detailed logging to inspect each album's structure and URLs
logger.debug(f"Processing album: {album.get('name', 'Unknown')}")
logger.debug(f"Album structure has keys: {list(album.keys())}")
external_urls = album.get("external_urls", {})
logger.debug(f"Album external_urls: {external_urls}")
album_url = external_urls.get("spotify", "")
album_url = album.get("external_urls", {}).get("spotify", "")
album_name = album.get("name", "Unknown Album")
album_artists = album.get("artists", [])
album_artist = (
album_artists[0].get("name", "Unknown Artist")
if album_artists
else "Unknown Artist"
album_artists[0].get("name", "Unknown Artist") if album_artists else "Unknown Artist"
)
album_id = album.get("id")
logger.debug(f"Extracted album URL: {album_url}")
logger.debug(f"Extracted album ID: {album_id}")
if not album_url or not album_id:
logger.warning(f"Skipping album without URL or ID: {album_name}")
if not album_url:
logger.warning(f"Skipping album '{album_name}' because it has no Spotify URL.")
continue
# Create album-specific request args instead of using original artist request
album_request_args = {
task_data = {
"download_type": "album",
"url": album_url,
"name": album_name,
"artist": album_artist,
"type": "album",
# URL source will be automatically detected in the download functions
"parent_artist_url": url,
"parent_request_type": "artist",
"orig_request": request_args,
}
# Include original download URL for this album task
album_request_args["original_url"] = url_for(
"album.handle_download", album_id=album_id, _external=True
)
# Create task for this album
task_data = {
"download_type": "album",
"type": "album", # Type for the download task
"url": album_url, # Important: use the album URL, not artist URL
"retry_url": album_url, # Use album URL for retry logic, not artist URL
"name": album_name,
"artist": album_artist,
"orig_request": album_request_args, # Store album-specific request params
}
# Debug log the task data being sent to the queue
logger.debug(
f"Album task data: url={task_data['url']}, retry_url={task_data['retry_url']}"
)
try:
task_id = download_queue_manager.add_task(task_data)
# Check the status of the newly added task to see if it was marked as a duplicate error
last_status = get_last_task_status(task_id)
if (
last_status
and last_status.get("status") == ProgressState.ERROR
and last_status.get("existing_task_id")
):
logger.warning(
f"Album {album_name} (URL: {album_url}) is a duplicate. Error task ID: {task_id}. Existing task ID: {last_status.get('existing_task_id')}"
)
duplicate_albums.append(
{
"name": album_name,
"artist": album_artist,
"url": album_url,
"error_task_id": task_id, # This is the ID of the task marked as a duplicate error
"existing_task_id": last_status.get("existing_task_id"),
"message": last_status.get(
"message", "Duplicate download attempt."
),
}
)
else:
# If not a duplicate error, it was successfully queued (or failed for other reasons handled by add_task)
# We only add to successfully_queued_albums if it wasn't a duplicate error from add_task
# Other errors from add_task (like submission failure) would also result in an error status for task_id
# but won't have 'existing_task_id'. The client can check the status of this task_id.
album_task_ids.append(
task_id
) # Keep track of all task_ids returned by add_task
successfully_queued_albums.append(
{
"name": album_name,
"artist": album_artist,
"url": album_url,
"task_id": task_id,
}
)
logger.info(f"Queued album download: {album_name} ({task_id})")
except Exception as e: # Catch any other unexpected error from add_task itself (though it should be rare now)
logger.error(
f"Failed to queue album {album_name} due to an unexpected error in add_task: {str(e)}"
successfully_queued_albums.append(
{
"name": album_name,
"artist": album_artist,
"url": album_url,
"task_id": task_id,
}
)
# Optionally, collect these errors. For now, just logging and continuing.
except DuplicateDownloadError as e:
logger.warning(
f"Skipping duplicate album {album_name} (URL: {album_url}). Existing task: {e.existing_task}"
)
duplicate_albums.append(
{
"name": album_name,
"artist": album_artist,
"url": album_url,
"existing_task": e.existing_task,
"message": str(e),
}
)
except Exception as e:
logger.error(f"Failed to queue album {album_name} for an unknown reason: {e}")
logger.info(
f"Artist album processing: {len(successfully_queued_albums)} queued, {len(duplicate_albums)} duplicates found."

View File

@@ -149,7 +149,7 @@ task_max_retries = MAX_RETRIES
# Task result settings
task_track_started = True
result_expires = 60 * 60 * 24 * 7 # 7 days
result_expires = 3600 # 1 hour
# Configure visibility timeout for task messages
broker_transport_options = {
@@ -167,3 +167,11 @@ broker_pool_limit = 10
worker_prefetch_multiplier = 1 # Process one task at a time per worker
worker_max_tasks_per_child = 100 # Restart worker after 100 tasks
worker_disable_rate_limits = False
# Celery Beat schedule
beat_schedule = {
'cleanup-old-tasks': {
'task': 'routes.utils.celery_tasks.cleanup_old_tasks',
'schedule': 3600.0, # Run every hour
},
}

View File

@@ -83,6 +83,89 @@ def get_config_params():
}
def get_existing_task_id(url, download_type=None):
"""
Check if an active task with the same URL (and optionally, type) already exists.
This function ignores tasks that are in a terminal state (e.g., completed, cancelled, or failed).
Args:
url (str): The URL to check for duplicates.
download_type (str, optional): The type of download to check. Defaults to None.
Returns:
str | None: The task ID of the existing active task, or None if no active duplicate is found.
"""
logger.debug(f"GET_EXISTING_TASK_ID: Checking for URL='{url}', type='{download_type}'")
if not url:
logger.debug("GET_EXISTING_TASK_ID: No URL provided, returning None.")
return None
# Define terminal states. Tasks in these states are considered inactive and will be ignored.
TERMINAL_STATES = {
ProgressState.COMPLETE,
ProgressState.DONE,
ProgressState.CANCELLED,
ProgressState.ERROR,
ProgressState.ERROR_RETRIED,
ProgressState.ERROR_AUTO_CLEANED,
}
logger.debug(f"GET_EXISTING_TASK_ID: Terminal states defined as: {TERMINAL_STATES}")
all_existing_tasks_summary = get_all_tasks() # This function already filters by default based on its own TERMINAL_STATES
logger.debug(f"GET_EXISTING_TASK_ID: Found {len(all_existing_tasks_summary)} tasks from get_all_tasks(). Iterating...")
for task_summary in all_existing_tasks_summary:
existing_task_id = task_summary.get("task_id")
if not existing_task_id:
logger.debug("GET_EXISTING_TASK_ID: Skipping summary with no task_id.")
continue
logger.debug(f"GET_EXISTING_TASK_ID: Processing existing task_id='{existing_task_id}' from summary.")
# First, check the status of the task directly from its latest status record.
# get_all_tasks() might have its own view of terminal, but we re-check here for absolute certainty.
existing_last_status_obj = get_last_task_status(existing_task_id)
if not existing_last_status_obj:
logger.debug(f"GET_EXISTING_TASK_ID: No last status object for task_id='{existing_task_id}'. Skipping.")
continue
existing_status = existing_last_status_obj.get("status")
logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', last_status_obj='{existing_last_status_obj}', extracted status='{existing_status}'.")
# If the task is in a terminal state, ignore it and move to the next one.
if existing_status in TERMINAL_STATES:
logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' has terminal status='{existing_status}'. Skipping.")
continue
logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' has ACTIVE status='{existing_status}'. Proceeding to check URL/type.")
# If the task is active, then check if its URL and type match.
existing_task_info = get_task_info(existing_task_id)
if not existing_task_info:
logger.debug(f"GET_EXISTING_TASK_ID: No task info for active task_id='{existing_task_id}'. Skipping.")
continue
existing_url = existing_task_info.get("url")
logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', info_url='{existing_url}'. Comparing with target_url='{url}'.")
if existing_url != url:
logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' URL mismatch. Skipping.")
continue
if download_type:
existing_type = existing_task_info.get("download_type")
logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', info_type='{existing_type}'. Comparing with target_type='{download_type}'.")
if existing_type != download_type:
logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' type mismatch. Skipping.")
continue
# Found an active task that matches the criteria.
logger.info(f"GET_EXISTING_TASK_ID: Found ACTIVE duplicate: task_id='{existing_task_id}' for URL='{url}', type='{download_type}'. Returning this ID.")
return existing_task_id
logger.debug(f"GET_EXISTING_TASK_ID: No active duplicate found for URL='{url}', type='{download_type}'. Returning None.")
return None
class CeleryDownloadQueueManager:
"""
Manages a queue of download tasks using Celery.
@@ -125,14 +208,14 @@ class CeleryDownloadQueueManager:
"Task being added with no URL. Duplicate check might be unreliable."
)
NON_BLOCKING_STATES = [
TERMINAL_STATES = { # Renamed and converted to a set for consistency
ProgressState.COMPLETE,
ProgressState.DONE,
ProgressState.CANCELLED,
ProgressState.ERROR,
ProgressState.ERROR_RETRIED,
ProgressState.ERROR_AUTO_CLEANED,
]
}
all_existing_tasks_summary = get_all_tasks()
if incoming_url:
@@ -154,7 +237,7 @@ class CeleryDownloadQueueManager:
if (
existing_url == incoming_url
and existing_type == incoming_type
and existing_status not in NON_BLOCKING_STATES
and existing_status not in TERMINAL_STATES
):
message = f"Duplicate download: URL '{incoming_url}' (type: {incoming_type}) is already being processed by task {existing_task_id} (status: {existing_status})."
logger.warning(message)

View File

@@ -181,6 +181,70 @@ def get_task_info(task_id):
return {}
def delete_task_data(task_id):
"""Deletes all Redis data associated with a task_id."""
try:
redis_client.delete(f"task:{task_id}:info")
redis_client.delete(f"task:{task_id}:status")
redis_client.delete(f"task:{task_id}:status:next_id")
logger.info(f"Deleted data for task {task_id}")
except Exception as e:
logger.error(f"Error deleting data for task {task_id}: {e}")
CLEANUP_THRESHOLD_SECONDS = 3600 # 1 hour
@celery_app.task(name="routes.utils.celery_tasks.cleanup_old_tasks")
def cleanup_old_tasks():
"""
Periodically cleans up old, finished tasks from Redis to prevent data buildup.
"""
logger.info("Starting cleanup of old finished tasks...")
# Define terminal states that are safe to clean up
TERMINAL_STATES = {
ProgressState.COMPLETE,
ProgressState.DONE,
ProgressState.CANCELLED,
ProgressState.ERROR,
ProgressState.ERROR_RETRIED,
ProgressState.ERROR_AUTO_CLEANED,
}
cleaned_count = 0
# Scan for all task info keys, which serve as the master record for a task's existence
task_info_keys = redis_client.keys("task:*:info")
for key in task_info_keys:
try:
task_id = key.decode("utf-8").split(":")[1]
last_status = get_last_task_status(task_id)
if not last_status:
# If there's no status, we can't determine age or state.
# Could be an orphaned task info key. Consider a separate cleanup for these.
continue
status = last_status.get("status")
timestamp = last_status.get("timestamp", 0)
# Check if the task is in a terminal state and has expired
if status in TERMINAL_STATES:
if (time.time() - timestamp) > CLEANUP_THRESHOLD_SECONDS:
logger.info(
f"Cleaning up expired task {task_id} (status: {status}, age: {time.time() - timestamp}s)"
)
delete_task_data(task_id)
cleaned_count += 1
except Exception as e:
logger.error(
f"Error processing task key {key} for cleanup: {e}", exc_info=True
)
logger.info(f"Finished cleanup of old tasks. Removed {cleaned_count} tasks.")
# --- History Logging Helper ---
def _log_task_to_history(task_id, final_status_str, error_msg=None):
"""Helper function to gather task data and log it to the history database."""
@@ -1324,6 +1388,7 @@ def download_track(self, **task_data):
progress_callback=self.progress_callback,
convert_to=convert_to,
bitrate=bitrate,
_is_celery_task_execution=True, # Skip duplicate check inside Celery task (consistency)
)
return {"status": "success", "message": "Track download completed"}
@@ -1410,6 +1475,7 @@ def download_album(self, **task_data):
progress_callback=self.progress_callback,
convert_to=convert_to,
bitrate=bitrate,
_is_celery_task_execution=True, # Skip duplicate check inside Celery task
)
return {"status": "success", "message": "Album download completed"}
@@ -1508,6 +1574,7 @@ def download_playlist(self, **task_data):
progress_callback=self.progress_callback,
convert_to=convert_to,
bitrate=bitrate,
_is_celery_task_execution=True, # Skip duplicate check inside Celery task
)
return {"status": "success", "message": "Playlist download completed"}

6
routes/utils/errors.py Normal file
View File

@@ -0,0 +1,6 @@
class DuplicateDownloadError(Exception):
def __init__(self, message, existing_task=None):
if existing_task:
message = f"{message} (Conflicting Task ID: {existing_task})"
super().__init__(message)
self.existing_task = existing_task

View File

@@ -3,6 +3,8 @@ from deezspot.spotloader import SpoLogin
from deezspot.deezloader import DeeLogin
from pathlib import Path
from routes.utils.credentials import get_credential, _get_global_spotify_api_creds
from routes.utils.celery_queue_manager import get_existing_task_id
from routes.utils.errors import DuplicateDownloadError
def download_playlist(
@@ -22,7 +24,15 @@ def download_playlist(
progress_callback=None,
convert_to=None,
bitrate=None,
_is_celery_task_execution=False, # Added to skip duplicate check from Celery task
):
if not _is_celery_task_execution:
existing_task = get_existing_task_id(url) # Check for duplicates only if not called by Celery task
if existing_task:
raise DuplicateDownloadError(
f"Download for this URL is already in progress.",
existing_task=existing_task,
)
try:
# Detect URL source (Spotify or Deezer) from URL
is_spotify_url = "open.spotify.com" in url.lower()

View File

@@ -25,6 +25,7 @@ def download_track(
progress_callback=None,
convert_to=None,
bitrate=None,
_is_celery_task_execution=False, # Added for consistency, not currently used for duplicate check
):
try:
# Detect URL source (Spotify or Deezer) from URL