@@ -29,7 +29,7 @@ from routes.utils.watch.db import (
|
||||
)
|
||||
|
||||
# Import history manager function
|
||||
from .history_manager import add_entry_to_history
|
||||
from .history_manager import add_entry_to_history, add_tracks_from_summary
|
||||
|
||||
# Create Redis connection for storing task data that's not part of the Celery result backend
|
||||
import redis
|
||||
@@ -238,6 +238,9 @@ def _log_task_to_history(task_id, final_status_str, error_msg=None):
|
||||
except Exception:
|
||||
spotify_id = None # Ignore errors in parsing
|
||||
|
||||
# Check for the new summary object in the last status
|
||||
summary_obj = last_status_obj.get("summary") if last_status_obj else None
|
||||
|
||||
history_entry = {
|
||||
"task_id": task_id,
|
||||
"download_type": task_info.get("download_type"),
|
||||
@@ -271,15 +274,34 @@ def _log_task_to_history(task_id, final_status_str, error_msg=None):
|
||||
"bitrate": bitrate_str
|
||||
if bitrate_str
|
||||
else None, # Store None if empty string
|
||||
"summary_json": json.dumps(summary_obj) if summary_obj else None,
|
||||
"total_successful": summary_obj.get("total_successful")
|
||||
if summary_obj
|
||||
else None,
|
||||
"total_skipped": summary_obj.get("total_skipped") if summary_obj else None,
|
||||
"total_failed": summary_obj.get("total_failed") if summary_obj else None,
|
||||
}
|
||||
|
||||
# Add the main history entry for the task
|
||||
add_entry_to_history(history_entry)
|
||||
|
||||
# Process track-level entries from summary if this is a multi-track download
|
||||
if summary_obj and task_info.get("download_type") in ["album", "playlist"]:
|
||||
tracks_processed = add_tracks_from_summary(
|
||||
summary_data=summary_obj,
|
||||
parent_task_id=task_id,
|
||||
parent_history_data=history_entry
|
||||
)
|
||||
logger.info(
|
||||
f"Track-level history: Processed {tracks_processed['successful']} successful, "
|
||||
f"{tracks_processed['skipped']} skipped, and {tracks_processed['failed']} failed tracks for task {task_id}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"History: Error preparing or logging history for task {task_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
||||
# --- End History Logging Helper ---
|
||||
|
||||
|
||||
@@ -536,6 +558,9 @@ class ProgressTrackingTask(Task):
|
||||
Args:
|
||||
progress_data: Dictionary containing progress information from deezspot
|
||||
"""
|
||||
# Store a copy of the original, unprocessed callback data
|
||||
raw_callback_data = progress_data.copy()
|
||||
|
||||
task_id = self.request.id
|
||||
|
||||
# Ensure ./logs/tasks directory exists
|
||||
@@ -570,9 +595,6 @@ class ProgressTrackingTask(Task):
|
||||
# Get status type
|
||||
status = progress_data.get("status", "unknown")
|
||||
|
||||
# Create a work copy of the data to avoid modifying the original
|
||||
stored_data = progress_data.copy()
|
||||
|
||||
# Get task info for context
|
||||
task_info = get_task_info(task_id)
|
||||
|
||||
@@ -585,44 +607,47 @@ class ProgressTrackingTask(Task):
|
||||
# Process based on status type using a more streamlined approach
|
||||
if status == "initializing":
|
||||
# --- INITIALIZING: Start of a download operation ---
|
||||
self._handle_initializing(task_id, stored_data, task_info)
|
||||
self._handle_initializing(task_id, progress_data, task_info)
|
||||
|
||||
elif status == "downloading":
|
||||
# --- DOWNLOADING: Track download started ---
|
||||
self._handle_downloading(task_id, stored_data, task_info)
|
||||
self._handle_downloading(task_id, progress_data, task_info)
|
||||
|
||||
elif status == "progress":
|
||||
# --- PROGRESS: Album/playlist track progress ---
|
||||
self._handle_progress(task_id, stored_data, task_info)
|
||||
self._handle_progress(task_id, progress_data, task_info)
|
||||
|
||||
elif status == "real_time" or status == "track_progress":
|
||||
# --- REAL_TIME/TRACK_PROGRESS: Track download real-time progress ---
|
||||
self._handle_real_time(task_id, stored_data)
|
||||
self._handle_real_time(task_id, progress_data)
|
||||
|
||||
elif status == "skipped":
|
||||
# --- SKIPPED: Track was skipped ---
|
||||
self._handle_skipped(task_id, stored_data, task_info)
|
||||
self._handle_skipped(task_id, progress_data, task_info)
|
||||
|
||||
elif status == "retrying":
|
||||
# --- RETRYING: Download failed and being retried ---
|
||||
self._handle_retrying(task_id, stored_data, task_info)
|
||||
self._handle_retrying(task_id, progress_data, task_info)
|
||||
|
||||
elif status == "error":
|
||||
# --- ERROR: Error occurred during download ---
|
||||
self._handle_error(task_id, stored_data, task_info)
|
||||
self._handle_error(task_id, progress_data, task_info)
|
||||
|
||||
elif status == "done":
|
||||
# --- DONE: Download operation completed ---
|
||||
self._handle_done(task_id, stored_data, task_info)
|
||||
self._handle_done(task_id, progress_data, task_info)
|
||||
|
||||
else:
|
||||
# --- UNKNOWN: Unrecognized status ---
|
||||
logger.info(
|
||||
f"Task {task_id} {status}: {stored_data.get('message', 'No details')}"
|
||||
f"Task {task_id} {status}: {progress_data.get('message', 'No details')}"
|
||||
)
|
||||
|
||||
# Embed the raw callback data into the status object before storing
|
||||
progress_data["raw_callback"] = raw_callback_data
|
||||
|
||||
# Store the processed status update
|
||||
store_task_status(task_id, stored_data)
|
||||
store_task_status(task_id, progress_data)
|
||||
|
||||
def _handle_initializing(self, task_id, data, task_info):
|
||||
"""Handle initializing status from deezspot"""
|
||||
@@ -663,7 +688,7 @@ class ProgressTrackingTask(Task):
|
||||
store_task_info(task_id, task_info)
|
||||
|
||||
# Update status in data
|
||||
data["status"] = ProgressState.INITIALIZING
|
||||
# data["status"] = ProgressState.INITIALIZING
|
||||
|
||||
def _handle_downloading(self, task_id, data, task_info):
|
||||
"""Handle downloading status from deezspot"""
|
||||
@@ -720,7 +745,7 @@ class ProgressTrackingTask(Task):
|
||||
logger.info(f"Task {task_id} downloading: '{track_name}'")
|
||||
|
||||
# Update status
|
||||
data["status"] = ProgressState.DOWNLOADING
|
||||
# data["status"] = ProgressState.DOWNLOADING
|
||||
|
||||
def _handle_progress(self, task_id, data, task_info):
|
||||
"""Handle progress status from deezspot"""
|
||||
@@ -776,7 +801,7 @@ class ProgressTrackingTask(Task):
|
||||
logger.error(f"Error parsing track numbers '{current_track_raw}': {e}")
|
||||
|
||||
# Ensure correct status
|
||||
data["status"] = ProgressState.PROGRESS
|
||||
# data["status"] = ProgressState.PROGRESS
|
||||
|
||||
def _handle_real_time(self, task_id, data):
|
||||
"""Handle real-time progress status from deezspot"""
|
||||
@@ -818,11 +843,11 @@ class ProgressTrackingTask(Task):
|
||||
logger.debug(f"Task {task_id} track progress: {title} by {artist}: {percent}%")
|
||||
|
||||
# Set appropriate status
|
||||
data["status"] = (
|
||||
ProgressState.REAL_TIME
|
||||
if data.get("status") == "real_time"
|
||||
else ProgressState.TRACK_PROGRESS
|
||||
)
|
||||
# data["status"] = (
|
||||
# ProgressState.REAL_TIME
|
||||
# if data.get("status") == "real_time"
|
||||
# else ProgressState.TRACK_PROGRESS
|
||||
# )
|
||||
|
||||
def _handle_skipped(self, task_id, data, task_info):
|
||||
"""Handle skipped status from deezspot"""
|
||||
@@ -872,7 +897,7 @@ class ProgressTrackingTask(Task):
|
||||
store_task_status(task_id, progress_update)
|
||||
|
||||
# Set status
|
||||
data["status"] = ProgressState.SKIPPED
|
||||
# data["status"] = ProgressState.SKIPPED
|
||||
|
||||
def _handle_retrying(self, task_id, data, task_info):
|
||||
"""Handle retrying status from deezspot"""
|
||||
@@ -895,7 +920,7 @@ class ProgressTrackingTask(Task):
|
||||
store_task_info(task_id, task_info)
|
||||
|
||||
# Set status
|
||||
data["status"] = ProgressState.RETRYING
|
||||
# data["status"] = ProgressState.RETRYING
|
||||
|
||||
def _handle_error(self, task_id, data, task_info):
|
||||
"""Handle error status from deezspot"""
|
||||
@@ -911,7 +936,7 @@ class ProgressTrackingTask(Task):
|
||||
store_task_info(task_id, task_info)
|
||||
|
||||
# Set status and error message
|
||||
data["status"] = ProgressState.ERROR
|
||||
# data["status"] = ProgressState.ERROR
|
||||
data["error"] = message
|
||||
|
||||
def _handle_done(self, task_id, data, task_info):
|
||||
@@ -931,7 +956,7 @@ class ProgressTrackingTask(Task):
|
||||
logger.info(f"Task {task_id} completed: Track '{song}'")
|
||||
|
||||
# Update status to track_complete
|
||||
data["status"] = ProgressState.TRACK_COMPLETE
|
||||
# data["status"] = ProgressState.TRACK_COMPLETE
|
||||
|
||||
# Update task info
|
||||
completed_tracks = task_info.get("completed_tracks", 0) + 1
|
||||
@@ -989,15 +1014,28 @@ class ProgressTrackingTask(Task):
|
||||
logger.info(f"Task {task_id} completed: {content_type.upper()}")
|
||||
|
||||
# Add summary
|
||||
data["status"] = ProgressState.COMPLETE
|
||||
data["message"] = (
|
||||
f"Download complete: {completed_tracks} tracks downloaded, {skipped_tracks} skipped"
|
||||
)
|
||||
# data["status"] = ProgressState.COMPLETE
|
||||
summary_obj = data.get("summary")
|
||||
|
||||
# Log summary
|
||||
logger.info(
|
||||
f"Task {task_id} summary: {completed_tracks} completed, {skipped_tracks} skipped, {error_count} errors"
|
||||
)
|
||||
if summary_obj:
|
||||
total_successful = summary_obj.get("total_successful", 0)
|
||||
total_skipped = summary_obj.get("total_skipped", 0)
|
||||
total_failed = summary_obj.get("total_failed", 0)
|
||||
# data[
|
||||
# "message"
|
||||
# ] = f"Download complete: {total_successful} tracks downloaded, {total_skipped} skipped, {total_failed} failed."
|
||||
# Log summary from the summary object
|
||||
logger.info(
|
||||
f"Task {task_id} summary: {total_successful} successful, {total_skipped} skipped, {total_failed} failed."
|
||||
)
|
||||
else:
|
||||
# data["message"] = (
|
||||
# f"Download complete: {completed_tracks} tracks downloaded, {skipped_tracks} skipped"
|
||||
# )
|
||||
# Log summary
|
||||
logger.info(
|
||||
f"Task {task_id} summary: {completed_tracks} completed, {skipped_tracks} skipped, {error_count} errors"
|
||||
)
|
||||
# Schedule deletion for completed multi-track downloads
|
||||
delayed_delete_task_data.apply_async(
|
||||
args=[task_id, "Task completed successfully and auto-cleaned."],
|
||||
@@ -1066,8 +1104,8 @@ class ProgressTrackingTask(Task):
|
||||
else:
|
||||
# Generic done for other types
|
||||
logger.info(f"Task {task_id} completed: {content_type.upper()}")
|
||||
data["status"] = ProgressState.COMPLETE
|
||||
data["message"] = "Download complete"
|
||||
# data["status"] = ProgressState.COMPLETE
|
||||
# data["message"] = "Download complete"
|
||||
|
||||
|
||||
# Celery signal handlers
|
||||
@@ -1134,18 +1172,11 @@ def task_postrun_handler(
|
||||
)
|
||||
|
||||
if state == states.SUCCESS:
|
||||
if current_redis_status != ProgressState.COMPLETE:
|
||||
store_task_status(
|
||||
task_id,
|
||||
{
|
||||
"status": ProgressState.COMPLETE,
|
||||
"timestamp": time.time(),
|
||||
"type": task_info.get("type", "unknown"),
|
||||
"name": task_info.get("name", "Unknown"),
|
||||
"artist": task_info.get("artist", ""),
|
||||
"message": "Download completed successfully.",
|
||||
},
|
||||
)
|
||||
if current_redis_status not in [ProgressState.COMPLETE, "done"]:
|
||||
# The final status is now set by the 'done' callback from deezspot.
|
||||
# We no longer need to store a generic 'COMPLETE' status here.
|
||||
# This ensures the raw callback data is the last thing in the log.
|
||||
pass
|
||||
logger.info(
|
||||
f"Task {task_id} completed successfully: {task_info.get('name', 'Unknown')}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user