Implemented queue parsing for deezspot 2.0

This commit is contained in:
Xoconoch
2025-06-14 20:49:19 +00:00
parent 93d867127a
commit 2e966b4245
9 changed files with 1942 additions and 1987 deletions

View File

@@ -513,37 +513,59 @@ def retry_task(task_id):
return {"status": "error", "error": str(e)}
def get_all_tasks():
"""Get all active task IDs"""
def get_all_tasks(include_finished=False):
"""Get all active task IDs, with an option to include finished tasks."""
try:
# Get all keys matching the task info pattern
task_keys = redis_client.keys("task:*:info")
# Extract task IDs from the keys
task_ids = [key.decode("utf-8").split(":")[1] for key in task_keys]
# Get info for each task
task_keys = redis_client.scan_iter("task:*:info")
tasks = []
for task_id in task_ids:
task_info = get_task_info(task_id)
last_status = get_last_task_status(task_id)
if task_info and last_status:
tasks.append(
{
"task_id": task_id,
"type": task_info.get("type", "unknown"),
"name": task_info.get("name", "Unknown"),
"artist": task_info.get("artist", ""),
"download_type": task_info.get("download_type", "unknown"),
"status": last_status.get("status", "unknown"),
"timestamp": last_status.get("timestamp", 0),
}
)
TERMINAL_STATES = {
ProgressState.COMPLETE,
ProgressState.DONE,
ProgressState.CANCELLED,
ProgressState.ERROR,
ProgressState.ERROR_AUTO_CLEANED,
ProgressState.ERROR_RETRIED,
}
for key in task_keys:
task_id = key.decode("utf-8").split(":")[1]
last_status = get_last_task_status(task_id)
current_status = None
if last_status:
# Accommodate for status being nested inside 'status_info' or at the top level
if "status" in last_status:
current_status = last_status.get("status")
elif isinstance(last_status.get("status_info"), dict):
current_status = last_status.get("status_info", {}).get("status")
is_terminal = current_status in TERMINAL_STATES
if not include_finished and is_terminal:
continue # Skip terminal tasks if not requested
task_info = get_task_info(task_id)
if task_info:
task_summary = {
"task_id": task_id,
"type": task_info.get("type", "unknown"),
"name": task_info.get("name", "Unknown"),
"artist": task_info.get("artist", ""),
"download_type": task_info.get("download_type", "unknown"),
"created_at": task_info.get("created_at", 0),
}
if last_status:
task_summary["status"] = current_status if current_status else "unknown"
task_summary["summary"] = last_status.get("summary")
else:
task_summary["status"] = "unknown"
task_summary["summary"] = None
tasks.append(task_summary)
return tasks
except Exception as e:
logger.error(f"Error getting all tasks: {e}")
logger.error(f"Error getting all tasks: {e}", exc_info=True)
return []
@@ -564,7 +586,7 @@ class ProgressTrackingTask(Task):
task_id = self.request.id
# Ensure ./logs/tasks directory exists
logs_tasks_dir = Path("./logs/tasks") # Using relative path as per your update
logs_tasks_dir = Path("./logs/tasks")
try:
logs_tasks_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
@@ -578,235 +600,118 @@ class ProgressTrackingTask(Task):
# Log progress_data to the task-specific file
try:
with open(log_file_path, "a") as log_file:
# Add a timestamp to the log entry if not present, for consistency in the file
log_entry = progress_data.copy()
if "timestamp" not in log_entry:
log_entry["timestamp"] = time.time()
print(json.dumps(log_entry), file=log_file) # Use print to file
print(json.dumps(log_entry), file=log_file)
except Exception as e:
logger.error(
f"Task {task_id}: Could not write to task log file {log_file_path}: {e}"
)
# Add timestamp if not present
if "timestamp" not in progress_data:
progress_data["timestamp"] = time.time()
# Get status type
status = progress_data.get("status", "unknown")
# Get task info for context
task_info = get_task_info(task_id)
# Log raw progress data at debug level
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Task {task_id}: Raw progress data: {json.dumps(progress_data)}"
)
# Process based on status type using a more streamlined approach
if status == "initializing":
# --- INITIALIZING: Start of a download operation ---
self._handle_initializing(task_id, progress_data, task_info)
elif status == "downloading":
# --- DOWNLOADING: Track download started ---
self._handle_downloading(task_id, progress_data, task_info)
elif status == "progress":
# --- PROGRESS: Album/playlist track progress ---
self._handle_progress(task_id, progress_data, task_info)
elif status == "real_time" or status == "track_progress":
# --- REAL_TIME/TRACK_PROGRESS: Track download real-time progress ---
elif status in ["real_time", "track_progress"]:
self._handle_real_time(task_id, progress_data)
elif status == "skipped":
# --- SKIPPED: Track was skipped ---
self._handle_skipped(task_id, progress_data, task_info)
elif status == "retrying":
# --- RETRYING: Download failed and being retried ---
self._handle_retrying(task_id, progress_data, task_info)
elif status == "error":
# --- ERROR: Error occurred during download ---
self._handle_error(task_id, progress_data, task_info)
elif status == "done":
# --- DONE: Download operation completed ---
self._handle_done(task_id, progress_data, task_info)
else:
# --- UNKNOWN: Unrecognized status ---
logger.info(
f"Task {task_id} {status}: {progress_data.get('message', 'No details')}"
)
# Embed the raw callback data into the status object before storing
progress_data["raw_callback"] = raw_callback_data
# Store the processed status update
store_task_status(task_id, progress_data)
def _handle_initializing(self, task_id, data, task_info):
"""Handle initializing status from deezspot"""
# Extract relevant fields
content_type = data.get("type", "").upper()
name = data.get("name", "")
album_name = data.get("album", "")
artist = data.get("artist", "")
total_tracks = data.get("total_tracks", 0)
# Use album name as name if name is empty
if not name and album_name:
data["name"] = album_name
# Log initialization with appropriate detail level
if album_name and artist:
logger.info(
f"Task {task_id} initializing: {content_type} '{album_name}' by {artist} with {total_tracks} tracks"
)
elif album_name:
logger.info(
f"Task {task_id} initializing: {content_type} '{album_name}' with {total_tracks} tracks"
)
elif name:
logger.info(
f"Task {task_id} initializing: {content_type} '{name}' with {total_tracks} tracks"
)
else:
logger.info(
f"Task {task_id} initializing: {content_type} with {total_tracks} tracks"
)
# Update task info with total tracks count
if total_tracks > 0:
task_info["total_tracks"] = total_tracks
task_info["completed_tracks"] = task_info.get("completed_tracks", 0)
task_info["skipped_tracks"] = task_info.get("skipped_tracks", 0)
store_task_info(task_id, task_info)
# Update status in data
# data["status"] = ProgressState.INITIALIZING
logger.info(f"Task {task_id} initializing...")
# Initializing object is now very basic, mainly for acknowledging the start.
# More detailed info comes with 'progress' or 'downloading' states.
data["status"] = ProgressState.INITIALIZING
def _handle_downloading(self, task_id, data, task_info):
"""Handle downloading status from deezspot"""
# Extract relevant fields
track_name = data.get("song", "Unknown")
artist = data.get("artist", "")
album = data.get("album", "")
download_type = data.get("type", "")
track_obj = data.get("track", {})
track_name = track_obj.get("title", "Unknown")
artists = track_obj.get("artists", [])
artist_name = artists[0].get("name", "") if artists else ""
album_obj = track_obj.get("album", {})
album_name = album_obj.get("title", "")
# Get parent task context
parent_type = task_info.get("type", "").lower()
logger.info(f"Task {task_id}: Starting download for track '{track_name}' by {artist_name}")
# If this is a track within an album/playlist, update progress
if parent_type in ["album", "playlist"] and download_type == "track":
total_tracks = task_info.get("total_tracks", 0)
current_track = task_info.get("current_track_num", 0) + 1
# Update task info
task_info["current_track_num"] = current_track
task_info["current_track"] = track_name
task_info["current_artist"] = artist
store_task_info(task_id, task_info)
# Only calculate progress if we have total tracks
if total_tracks > 0:
overall_progress = min(int((current_track / total_tracks) * 100), 100)
data["overall_progress"] = overall_progress
data["parsed_current_track"] = current_track
data["parsed_total_tracks"] = total_tracks
# Create a progress update for the album/playlist
progress_update = {
"status": ProgressState.DOWNLOADING,
"type": parent_type,
"track": track_name,
"current_track": f"{current_track}/{total_tracks}",
"album": album,
"artist": artist,
"timestamp": data["timestamp"],
"parent_task": True,
}
# Store separate progress update
store_task_status(task_id, progress_update)
# Log with appropriate detail level
if artist and album:
logger.info(
f"Task {task_id} downloading: '{track_name}' by {artist} from {album}"
)
elif artist:
logger.info(f"Task {task_id} downloading: '{track_name}' by {artist}")
else:
logger.info(f"Task {task_id} downloading: '{track_name}'")
# Update status
# data["status"] = ProgressState.DOWNLOADING
data["status"] = ProgressState.DOWNLOADING
data["song"] = track_name
data["artist"] = artist_name
data["album"] = album_name
def _handle_progress(self, task_id, data, task_info):
"""Handle progress status from deezspot"""
# Extract track info
track_name = data.get("track", data.get("song", "Unknown track"))
current_track_raw = data.get("current_track", "0")
album = data.get("album", "")
artist = data.get("artist", "")
"""Handle progress status for albums/playlists from deezspot"""
item = data.get("playlist") or data.get("album", {})
track = data.get("track", {})
item_name = item.get("title", "Unknown Item")
total_tracks = item.get("total_tracks", 0)
track_name = track.get("title", "Unknown Track")
artists = track.get("artists", [])
artist_name = artists[0].get("name", "") if artists else ""
# The 'progress' field in the callback is the track number being processed
current_track_num = data.get("progress", 0)
# Process artist if it's a list
if isinstance(artist, list) and len(artist) > 0:
data["artist_name"] = artist[0]
elif isinstance(artist, str):
data["artist_name"] = artist
if total_tracks > 0:
task_info["total_tracks"] = total_tracks
task_info["completed_tracks"] = current_track_num - 1
task_info["current_track_num"] = current_track_num
store_task_info(task_id, task_info)
overall_progress = min(int(((current_track_num -1) / total_tracks) * 100), 100)
data["overall_progress"] = overall_progress
data["parsed_current_track"] = current_track_num
data["parsed_total_tracks"] = total_tracks
# Parse track numbers from "current/total" format
if isinstance(current_track_raw, str) and "/" in current_track_raw:
try:
parts = current_track_raw.split("/")
current_track = int(parts[0])
total_tracks = int(parts[1])
logger.info(f"Task {task_id}: Progress on '{item_name}': Processing track {current_track_num}/{total_tracks} - '{track_name}'")
# Update with parsed values
data["parsed_current_track"] = current_track
data["parsed_total_tracks"] = total_tracks
# Calculate percentage
overall_progress = min(int((current_track / total_tracks) * 100), 100)
data["overall_progress"] = overall_progress
# Update task info
task_info["current_track_num"] = current_track
task_info["total_tracks"] = total_tracks
task_info["current_track"] = track_name
store_task_info(task_id, task_info)
# Log progress with appropriate detail
artist_name = data.get("artist_name", artist)
if album and artist_name:
logger.info(
f"Task {task_id} progress: [{current_track}/{total_tracks}] {overall_progress}% - {track_name} by {artist_name} from {album}"
)
elif album:
logger.info(
f"Task {task_id} progress: [{current_track}/{total_tracks}] {overall_progress}% - {track_name} from {album}"
)
else:
logger.info(
f"Task {task_id} progress: [{current_track}/{total_tracks}] {overall_progress}% - {track_name}"
)
except (ValueError, IndexError) as e:
logger.error(f"Error parsing track numbers '{current_track_raw}': {e}")
# Ensure correct status
# data["status"] = ProgressState.PROGRESS
data["status"] = ProgressState.PROGRESS
data["song"] = track_name
data["artist"] = artist_name
data["current_track"] = f"{current_track_num}/{total_tracks}"
def _handle_real_time(self, task_id, data):
"""Handle real-time progress status from deezspot"""
# Extract track info
title = data.get("title", data.get("song", "Unknown"))
track_obj = data.get("track", {})
track_name = track_obj.get("title", "Unknown Track")
percentage = data.get("percentage", 0)
logger.debug(f"Task {task_id}: Real-time progress for '{track_name}': {percentage}%")
data["status"] = ProgressState.TRACK_PROGRESS
data["song"] = track_name
artist = data.get("artist", "Unknown")
# Handle percent formatting