I think we good
This commit is contained in:
@@ -18,6 +18,9 @@ from routes.utils.celery_config import REDIS_URL, REDIS_BACKEND, REDIS_PASSWORD,
|
||||
# Import for playlist watch DB update
|
||||
from routes.utils.watch.db import add_single_track_to_playlist_db
|
||||
|
||||
# Import history manager function
|
||||
from .history_manager import add_entry_to_history
|
||||
|
||||
# Initialize Celery app
|
||||
celery_app = Celery('download_tasks',
|
||||
broker=REDIS_URL,
|
||||
@@ -146,6 +149,50 @@ def get_task_info(task_id):
|
||||
logger.error(f"Error getting task info: {e}")
|
||||
return {}
|
||||
|
||||
# --- History Logging Helper ---
|
||||
def _log_task_to_history(task_id, final_status_str, error_msg=None):
|
||||
"""Helper function to gather task data and log it to the history database."""
|
||||
try:
|
||||
task_info = get_task_info(task_id)
|
||||
last_status_obj = get_last_task_status(task_id)
|
||||
|
||||
if not task_info:
|
||||
logger.warning(f"History: No task_info found for task_id {task_id}. Cannot log to history.")
|
||||
return
|
||||
|
||||
# Extract Spotify ID from item URL if possible
|
||||
spotify_id = None
|
||||
item_url = task_info.get('url', '')
|
||||
if item_url:
|
||||
try:
|
||||
spotify_id = item_url.split('/')[-1]
|
||||
# Further validation if it looks like a Spotify ID (e.g., 22 chars, alphanumeric)
|
||||
if not (spotify_id and len(spotify_id) == 22 and spotify_id.isalnum()):
|
||||
spotify_id = None # Reset if not a valid-looking ID
|
||||
except Exception:
|
||||
spotify_id = None # Ignore errors in parsing
|
||||
|
||||
history_entry = {
|
||||
'task_id': task_id,
|
||||
'download_type': task_info.get('download_type'),
|
||||
'item_name': task_info.get('name'),
|
||||
'item_artist': task_info.get('artist'),
|
||||
'item_album': task_info.get('album', task_info.get('name') if task_info.get('download_type') == 'album' else None),
|
||||
'item_url': item_url,
|
||||
'spotify_id': spotify_id,
|
||||
'status_final': final_status_str,
|
||||
'error_message': error_msg if error_msg else (last_status_obj.get('error') if last_status_obj else None),
|
||||
'timestamp_added': task_info.get('created_at', time.time()),
|
||||
'timestamp_completed': last_status_obj.get('timestamp', time.time()) if last_status_obj else time.time(),
|
||||
'original_request_json': json.dumps(task_info.get('original_request', {})),
|
||||
'last_status_obj_json': json.dumps(last_status_obj if last_status_obj else {})
|
||||
}
|
||||
add_entry_to_history(history_entry)
|
||||
except Exception as e:
|
||||
logger.error(f"History: Error preparing or logging history for task {task_id}: {e}", exc_info=True)
|
||||
|
||||
# --- End History Logging Helper ---
|
||||
|
||||
def cancel_task(task_id):
|
||||
"""Cancel a task by its ID"""
|
||||
try:
|
||||
@@ -159,7 +206,16 @@ def cancel_task(task_id):
|
||||
# Try to revoke the Celery task if it hasn't started yet
|
||||
celery_app.control.revoke(task_id, terminate=True, signal='SIGTERM')
|
||||
|
||||
logger.info(f"Task {task_id} cancelled by user")
|
||||
# Log cancellation to history
|
||||
_log_task_to_history(task_id, 'CANCELLED', "Task cancelled by user")
|
||||
|
||||
# Schedule deletion of task data after 30 seconds
|
||||
delayed_delete_task_data.apply_async(
|
||||
args=[task_id, "Task cancelled by user and auto-cleaned."],
|
||||
countdown=30
|
||||
)
|
||||
logger.info(f"Task {task_id} cancelled by user. Data scheduled for deletion in 30s.")
|
||||
|
||||
return {"status": "cancelled", "task_id": task_id}
|
||||
except Exception as e:
|
||||
logger.error(f"Error cancelling task {task_id}: {e}")
|
||||
@@ -440,17 +496,6 @@ class ProgressTrackingTask(Task):
|
||||
# Store the processed status update
|
||||
store_task_status(task_id, stored_data)
|
||||
|
||||
# Immediately delete task info from Redis after marking as complete
|
||||
if stored_data.get("status") == ProgressState.COMPLETE:
|
||||
logger.info(f"Task {task_id} completed. Deleting task data from Redis.")
|
||||
try:
|
||||
redis_client.delete(f"task:{task_id}:info")
|
||||
redis_client.delete(f"task:{task_id}:status")
|
||||
redis_client.delete(f"task:{task_id}:status:next_id") # Also delete the counter
|
||||
logger.info(f"Successfully deleted Redis data for completed task {task_id}.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting Redis data for completed task {task_id}: {e}", exc_info=True)
|
||||
|
||||
def _handle_initializing(self, task_id, data, task_info):
|
||||
"""Handle initializing status from deezspot"""
|
||||
# Extract relevant fields
|
||||
@@ -789,6 +834,11 @@ class ProgressTrackingTask(Task):
|
||||
|
||||
# Log summary
|
||||
logger.info(f"Task {task_id} summary: {completed_tracks} completed, {skipped_tracks} skipped, {error_count} errors")
|
||||
# Schedule deletion for completed multi-track downloads
|
||||
delayed_delete_task_data.apply_async(
|
||||
args=[task_id, "Task completed successfully and auto-cleaned."],
|
||||
countdown=30 # Delay in seconds
|
||||
)
|
||||
|
||||
else:
|
||||
# Generic done for other types
|
||||
@@ -796,20 +846,6 @@ class ProgressTrackingTask(Task):
|
||||
data["status"] = ProgressState.COMPLETE
|
||||
data["message"] = "Download complete"
|
||||
|
||||
# Store the processed status update
|
||||
store_task_status(task_id, data)
|
||||
|
||||
# Immediately delete task info from Redis after marking as complete
|
||||
if data.get("status") == ProgressState.COMPLETE:
|
||||
logger.info(f"Task {task_id} ({task_info.get('name', 'Unknown')}) completed. Deleting task data from Redis.")
|
||||
try:
|
||||
redis_client.delete(f"task:{task_id}:info")
|
||||
redis_client.delete(f"task:{task_id}:status")
|
||||
redis_client.delete(f"task:{task_id}:status:next_id") # Also delete the counter
|
||||
logger.info(f"Successfully deleted Redis data for completed task {task_id}.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting Redis data for completed task {task_id}: {e}", exc_info=True)
|
||||
|
||||
# Celery signal handlers
|
||||
@task_prerun.connect
|
||||
def task_prerun_handler(task_id=None, task=None, *args, **kwargs):
|
||||
@@ -834,25 +870,40 @@ def task_prerun_handler(task_id=None, task=None, *args, **kwargs):
|
||||
def task_postrun_handler(task_id=None, task=None, retval=None, state=None, *args, **kwargs):
|
||||
"""Signal handler when a task finishes"""
|
||||
try:
|
||||
# Skip if task is already marked as complete or error in Redis
|
||||
last_status = get_last_task_status(task_id)
|
||||
if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR]:
|
||||
return
|
||||
|
||||
# Get task info
|
||||
# Skip if task is already marked as complete or error in Redis for history logging purposes
|
||||
last_status_for_history = get_last_task_status(task_id)
|
||||
if last_status_for_history and last_status_for_history.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, "ERROR_RETRIED", "ERROR_AUTO_CLEANED"]:
|
||||
# Check if it was a REVOKED (cancelled) task, if so, ensure it's logged.
|
||||
if state == states.REVOKED and last_status_for_history.get("status") != ProgressState.CANCELLED:
|
||||
logger.info(f"Task {task_id} was REVOKED (likely cancelled), logging to history.")
|
||||
_log_task_to_history(task_id, 'CANCELLED', "Task was revoked/cancelled.")
|
||||
# else:
|
||||
# logger.debug(f"History: Task {task_id} already in terminal state {last_status_for_history.get('status')} in Redis. History logging likely handled.")
|
||||
# return # Do not return here, let the normal status update proceed for Redis if necessary
|
||||
|
||||
task_info = get_task_info(task_id)
|
||||
|
||||
current_redis_status = last_status_for_history.get("status") if last_status_for_history else None
|
||||
|
||||
# Update task status based on Celery task state
|
||||
if state == states.SUCCESS:
|
||||
store_task_status(task_id, {
|
||||
"status": ProgressState.COMPLETE,
|
||||
"timestamp": time.time(),
|
||||
"type": task_info.get("type", "unknown"),
|
||||
"name": task_info.get("name", "Unknown"),
|
||||
"artist": task_info.get("artist", ""),
|
||||
"message": "Download completed successfully."
|
||||
})
|
||||
if current_redis_status != ProgressState.COMPLETE:
|
||||
store_task_status(task_id, {
|
||||
"status": ProgressState.COMPLETE,
|
||||
"timestamp": time.time(),
|
||||
"type": task_info.get("type", "unknown"),
|
||||
"name": task_info.get("name", "Unknown"),
|
||||
"artist": task_info.get("artist", ""),
|
||||
"message": "Download completed successfully."
|
||||
})
|
||||
logger.info(f"Task {task_id} completed successfully: {task_info.get('name', 'Unknown')}")
|
||||
_log_task_to_history(task_id, 'COMPLETED')
|
||||
|
||||
# If the task was a single track, schedule its data for deletion after a delay
|
||||
if task_info.get("download_type") == "track":
|
||||
delayed_delete_task_data.apply_async(
|
||||
args=[task_id, "Task completed successfully and auto-cleaned."],
|
||||
countdown=30 # Delay in seconds
|
||||
)
|
||||
|
||||
# If from playlist_watch and successful, add track to DB
|
||||
original_request = task_info.get("original_request", {})
|
||||
@@ -896,24 +947,34 @@ def task_failure_handler(task_id=None, exception=None, traceback=None, *args, **
|
||||
# Check if we can retry
|
||||
can_retry = retry_count < max_retries
|
||||
|
||||
# Update task status to error
|
||||
error_message_str = str(exception)
|
||||
store_task_status(task_id, {
|
||||
"status": ProgressState.ERROR,
|
||||
"timestamp": time.time(),
|
||||
"type": task_info.get("type", "unknown"),
|
||||
"name": task_info.get("name", "Unknown"),
|
||||
"artist": task_info.get("artist", ""),
|
||||
"error": error_message_str,
|
||||
"traceback": str(traceback),
|
||||
"can_retry": can_retry,
|
||||
"retry_count": retry_count,
|
||||
"max_retries": max_retries
|
||||
})
|
||||
# Update task status to error in Redis if not already an error
|
||||
if last_status and last_status.get("status") != ProgressState.ERROR:
|
||||
store_task_status(task_id, {
|
||||
"status": ProgressState.ERROR,
|
||||
"timestamp": time.time(),
|
||||
"type": task_info.get("type", "unknown"),
|
||||
"name": task_info.get("name", "Unknown"),
|
||||
"artist": task_info.get("artist", ""),
|
||||
"error": str(exception),
|
||||
"traceback": str(traceback),
|
||||
"can_retry": can_retry,
|
||||
"retry_count": retry_count,
|
||||
"max_retries": max_retries
|
||||
})
|
||||
|
||||
logger.error(f"Task {task_id} failed: {error_message_str}")
|
||||
logger.error(f"Task {task_id} failed: {str(exception)}")
|
||||
_log_task_to_history(task_id, 'ERROR', str(exception))
|
||||
|
||||
if can_retry:
|
||||
logger.info(f"Task {task_id} can be retried ({retry_count}/{max_retries})")
|
||||
else:
|
||||
# If task cannot be retried, schedule its data for deletion
|
||||
logger.info(f"Task {task_id} failed and cannot be retried. Data scheduled for deletion in 30s.")
|
||||
delayed_delete_task_data.apply_async(
|
||||
args=[task_id, f"Task failed ({str(exception)}) and max retries reached. Auto-cleaned."],
|
||||
countdown=30
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in task_failure_handler: {e}")
|
||||
|
||||
@@ -1147,15 +1208,40 @@ def delete_task_data_and_log(task_id, reason="Task data deleted"):
|
||||
try:
|
||||
task_info = get_task_info(task_id) # Get info before deleting
|
||||
last_status = get_last_task_status(task_id)
|
||||
current_status_val = last_status.get("status") if last_status else None
|
||||
|
||||
# Update status to cancelled if it's not already in a terminal state that implies deletion is okay
|
||||
if not last_status or last_status.get("status") not in [ProgressState.CANCELLED, ProgressState.ERROR_RETRIED, ProgressState.ERROR_AUTO_CLEANED]:
|
||||
# Determine the final status for Redis before deletion
|
||||
# The reason passed to this function indicates why it's being deleted.
|
||||
final_redis_status = ProgressState.ERROR_AUTO_CLEANED # Default for most cleanup scenarios
|
||||
error_message_for_status = reason
|
||||
|
||||
if reason == "Task completed successfully and auto-cleaned.":
|
||||
final_redis_status = ProgressState.COMPLETE # It was already complete
|
||||
error_message_for_status = "Task completed and auto-cleaned."
|
||||
elif reason == "Task cancelled by user and auto-cleaned.":
|
||||
final_redis_status = ProgressState.CANCELLED # It was already cancelled
|
||||
error_message_for_status = "Task cancelled and auto-cleaned."
|
||||
elif "Task failed" in reason and "max retries reached" in reason:
|
||||
final_redis_status = ProgressState.ERROR # It was already an error (non-retryable)
|
||||
error_message_for_status = reason
|
||||
elif reason == "Task interrupted by application restart and auto-cleaned.":
|
||||
final_redis_status = ProgressState.ERROR # It was marked as ERROR (interrupted)
|
||||
error_message_for_status = reason
|
||||
# Add more specific conditions if needed based on other reasons `delayed_delete_task_data` might be called with.
|
||||
|
||||
# Update Redis status one last time if it's not already reflecting the final intended state for this cleanup.
|
||||
# This is mainly for cases where cleanup is initiated for tasks not yet in a fully terminal state by other handlers.
|
||||
if current_status_val not in [ProgressState.COMPLETE, ProgressState.CANCELLED, ProgressState.ERROR_RETRIED, ProgressState.ERROR_AUTO_CLEANED, final_redis_status]:
|
||||
store_task_status(task_id, {
|
||||
"status": ProgressState.ERROR_AUTO_CLEANED, # Use specific status
|
||||
"error": reason,
|
||||
"status": final_redis_status,
|
||||
"error": error_message_for_status, # Use the reason as the error/message for this status
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
# History logging for COMPLETION, CANCELLATION, or definitive ERROR should have occurred when those states were first reached.
|
||||
# If this cleanup is for a task that *wasn't* in such a state (e.g. stale, still processing), log it now.
|
||||
if final_redis_status == ProgressState.ERROR_AUTO_CLEANED:
|
||||
_log_task_to_history(task_id, 'ERROR', error_message_for_status) # Or a more specific status if desired
|
||||
|
||||
# Delete Redis keys associated with the task
|
||||
redis_client.delete(f"task:{task_id}:info")
|
||||
redis_client.delete(f"task:{task_id}:status")
|
||||
@@ -1204,4 +1290,12 @@ def cleanup_stale_errors():
|
||||
return {"status": "complete", "cleaned_count": cleaned_count}
|
||||
except Exception as e:
|
||||
logger.error(f"Error during cleanup_stale_errors: {e}", exc_info=True)
|
||||
return {"status": "error", "error": str(e)}
|
||||
return {"status": "error", "error": str(e)}
|
||||
|
||||
@celery_app.task(name="delayed_delete_task_data", queue="default") # Use default queue for utility tasks
|
||||
def delayed_delete_task_data(task_id, reason):
|
||||
"""
|
||||
Celery task to delete task data after a delay.
|
||||
"""
|
||||
logger.info(f"Executing delayed deletion for task {task_id}. Reason: {reason}")
|
||||
delete_task_data_and_log(task_id, reason)
|
||||
Reference in New Issue
Block a user