changed prgs reporting logic to be SSE

This commit is contained in:
architect.in.git
2025-03-23 16:39:41 -06:00
parent 39916173a9
commit ee815b745a
5 changed files with 1169 additions and 168 deletions

View File

@@ -33,7 +33,17 @@ class ProgressState:
COMPLETE = "complete"
ERROR = "error"
RETRYING = "retrying"
CANCELLED = "cancel"
CANCELLED = "cancelled"
PROGRESS = "progress"
# Additional states from deezspot library
INITIALIZING = "initializing"
DOWNLOADING = "downloading"
TRACK_PROGRESS = "track_progress"
TRACK_COMPLETE = "track_complete"
REAL_TIME = "real_time"
SKIPPED = "skipped"
DONE = "done"
# Reuse the application's logging configuration for Celery workers
@setup_logging.connect
@@ -55,16 +65,37 @@ def worker_init_handler(**kwargs):
logger.debug("Worker Redis connection: " + REDIS_URL)
def store_task_status(task_id, status_data):
"""Store task status information in Redis"""
"""
Store task status information in Redis with a sequential ID
Args:
task_id: The task ID
status_data: Dictionary containing status information
"""
# Add timestamp if not present
if 'timestamp' not in status_data:
status_data['timestamp'] = time.time()
# Convert to JSON and store in Redis
# Generate sequential ID for this status update (atomic operation)
try:
# Get next ID for this task's status updates
status_id = redis_client.incr(f"task:{task_id}:status:next_id")
status_data['id'] = status_id
# Convert to JSON and store in Redis
redis_client.rpush(f"task:{task_id}:status", json.dumps(status_data))
# Set expiry for the list to avoid filling up Redis with old data
redis_client.expire(f"task:{task_id}:status", 60 * 60 * 24 * 7) # 7 days
redis_client.expire(f"task:{task_id}:status:next_id", 60 * 60 * 24 * 7) # 7 days
# Publish an update event to a Redis channel for subscribers
# This will be used by the SSE endpoint to push updates in real-time
update_channel = f"task_updates:{task_id}"
redis_client.publish(update_channel, json.dumps({
"task_id": task_id,
"status_id": status_id
}))
except Exception as e:
logger.error(f"Error storing task status: {e}")
traceback.print_exc()
@@ -79,14 +110,91 @@ def get_task_status(task_id):
return []
def get_last_task_status(task_id):
"""Get the most recent task status update from Redis"""
"""
Get the most recent task status update from Redis
If the task is an album or playlist, prioritize progress updates
showing current track information over generic processing status.
"""
try:
last_status = redis_client.lindex(f"task:{task_id}:status", -1)
if last_status:
return json.loads(last_status.decode('utf-8'))
return None
# Get all status updates for this task
all_statuses = redis_client.lrange(f"task:{task_id}:status", 0, -1)
if not all_statuses:
logger.debug(f"Task {task_id}: No status updates found")
return None
logger.debug(f"Task {task_id}: Found {len(all_statuses)} status updates")
# First decode and analyze all status updates
decoded_statuses = []
has_progress_updates = False
for status_json in all_statuses:
try:
status = json.loads(status_json.decode('utf-8'))
decoded_statuses.append(status)
# Check if we have any progress updates with track information
if status.get("status") == "progress" and status.get("track"):
has_progress_updates = True
logger.debug(f"Task {task_id}: Found progress update with track: {status.get('track')}")
except Exception as e:
logger.error(f"Error decoding status update: {e}")
if not has_progress_updates:
logger.debug(f"Task {task_id}: No progress updates with track info found")
# Find the latest terminal status (complete, error, cancelled)
latest_status = decoded_statuses[-1] if decoded_statuses else None
if latest_status and latest_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED]:
logger.debug(f"Task {task_id}: Returning terminal status: {latest_status.get('status')}")
return latest_status
# Find the most recent progress update with track information
# Start from the most recent and go backward
latest_progress = None
for status in reversed(decoded_statuses):
status_type = status.get("status")
# For album/playlist downloads, find progress updates with track information
if status_type == "progress" and status.get("track"):
latest_progress = status
logger.debug(f"Task {task_id}: Selected progress update for track: {status.get('track')}")
break
# If we found a progress update, make sure it has all the necessary fields
if latest_progress:
# Parse current_track from "X/Y" format if needed
current_track_raw = latest_progress.get("current_track", "0")
# Always reprocess the values to ensure consistency
if isinstance(current_track_raw, str) and "/" in current_track_raw:
try:
parts = current_track_raw.split("/")
current_track = int(parts[0])
total_tracks = int(parts[1])
# Calculate and update progress information
overall_progress = min(int((current_track / total_tracks) * 100), 100)
latest_progress["parsed_current_track"] = current_track
latest_progress["parsed_total_tracks"] = total_tracks
latest_progress["overall_progress"] = overall_progress
logger.debug(f"Task {task_id}: Parsed track progress {current_track}/{total_tracks} ({overall_progress}%)")
except (ValueError, IndexError) as e:
logger.error(f"Error parsing track numbers: {e}")
# Return the enhanced progress update
return latest_progress
# If no suitable progress updates found, return the most recent status
logger.debug(f"Task {task_id}: No suitable progress updates found, returning latest status")
return latest_status
except Exception as e:
logger.error(f"Error getting last task status: {e}")
logger.error(f"Error getting last task status for {task_id}: {e}")
traceback.print_exc()
return None
def store_task_info(task_id, task_info):
@@ -316,6 +424,9 @@ class ProgressTrackingTask(Task):
"""
task_id = self.request.id
# Debug log incoming progress data
logger.debug(f"Task {task_id}: Got progress data: {json.dumps(progress_data)}")
# Add timestamp if not present
if 'timestamp' not in progress_data:
progress_data['timestamp'] = time.time()
@@ -323,25 +434,269 @@ class ProgressTrackingTask(Task):
# Map deezspot status to our progress state
status = progress_data.get("status", "unknown")
# Store the progress update in Redis
store_task_status(task_id, progress_data)
# First, make a copy of the data to avoid modifying the original
stored_data = progress_data.copy()
# Log the progress update with appropriate level
message = progress_data.get("message", "Progress update")
# Process the data based on status type
if status == "initializing":
# Get content information when initializing
content_type = stored_data.get('type', '').upper()
album_name = stored_data.get('album', '')
name = stored_data.get('name', '')
artist = stored_data.get('artist', '')
total_tracks = stored_data.get('total_tracks', 0)
# Store initialization details
if not name and album_name:
stored_data['name'] = album_name
# Log initialization
if album_name:
logger.info(f"Task {task_id} initializing: {content_type} '{album_name}' with {total_tracks} tracks")
elif name:
logger.info(f"Task {task_id} initializing: {content_type} '{name}' with {total_tracks} tracks")
else:
logger.info(f"Task {task_id} initializing: {content_type} with {total_tracks} tracks")
elif status == "downloading":
# Track starting to download
track_name = stored_data.get('song', 'Unknown')
artist = stored_data.get('artist', '')
album = stored_data.get('album', '')
if artist and album:
logger.info(f"Task {task_id} downloading: '{track_name}' by {artist} from {album}")
elif artist:
logger.info(f"Task {task_id} downloading: '{track_name}' by {artist}")
else:
logger.info(f"Task {task_id} downloading: '{track_name}'")
elif status == "progress":
# For album/playlist downloads, process track progress
track_name = stored_data.get("track", stored_data.get("song", "Unknown track"))
current_track_raw = stored_data.get("current_track", "0")
album = stored_data.get("album", "")
artist = stored_data.get("artist", "")
# Process and store artist correctly
if isinstance(artist, list) and len(artist) > 0:
# Take the first artist if it's a list
artist_name = artist[0]
# Store the processed artist back in the data
stored_data["artist_name"] = artist_name
elif isinstance(artist, str):
stored_data["artist_name"] = artist
# Parse current_track and total_tracks from the format "current/total"
if isinstance(current_track_raw, str) and "/" in current_track_raw:
try:
parts = current_track_raw.split("/")
current_track = int(parts[0])
total_tracks = int(parts[1])
# Store the parsed values
stored_data["parsed_current_track"] = current_track
stored_data["parsed_total_tracks"] = total_tracks
# Calculate overall percentage
overall_progress = min(int((current_track / total_tracks) * 100), 100)
stored_data["overall_progress"] = overall_progress
logger.debug(f"Task {task_id}: Processed track progress {current_track}/{total_tracks} ({overall_progress}%) for '{track_name}'")
except (ValueError, IndexError) as e:
logger.error(f"Error parsing track numbers '{current_track_raw}': {e}")
elif isinstance(current_track_raw, int):
# Handle the case where it's already an integer
current_track = current_track_raw
total_tracks = stored_data.get("total_tracks", 0)
if total_tracks > 0:
# Calculate overall percentage
overall_progress = min(int((current_track / total_tracks) * 100), 100)
stored_data["parsed_current_track"] = current_track
stored_data["parsed_total_tracks"] = total_tracks
stored_data["overall_progress"] = overall_progress
# Log appropriate message based on available information
artist_name = stored_data.get("artist_name", artist)
if album and artist_name:
logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name} by {artist_name} from {album}")
elif album:
logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name} from {album}")
else:
logger.info(f"Task {task_id} progress: [{stored_data.get('parsed_current_track', 0)}/{stored_data.get('parsed_total_tracks', 0)}] {stored_data.get('overall_progress', 0)}% - {track_name}")
elif status == "track_progress" or status == "real_time":
# Track real-time progress of a file download
title = stored_data.get('title', stored_data.get('song', 'Unknown'))
artist = stored_data.get('artist', 'Unknown')
# Handle different percent formats
percent = stored_data.get('percent', stored_data.get('percentage', 0))
if isinstance(percent, float) and percent <= 1.0:
percent = int(percent * 100)
# Update bytes received information if available
if 'bytes_received' in stored_data:
# Calculate and store download rate
last_update = stored_data.get('last_update_time', stored_data['timestamp'])
bytes_received = stored_data['bytes_received']
last_bytes = stored_data.get('last_bytes_received', 0)
time_diff = stored_data['timestamp'] - last_update
if time_diff > 0 and bytes_received > last_bytes:
bytes_diff = bytes_received - last_bytes
download_rate = bytes_diff / time_diff
stored_data['download_rate'] = download_rate
stored_data['last_update_time'] = stored_data['timestamp']
stored_data['last_bytes_received'] = bytes_received
# Format download rate for display
if 'download_rate' in stored_data:
rate = stored_data['download_rate']
if rate < 1024:
stored_data['download_rate_formatted'] = f"{rate:.2f} B/s"
elif rate < 1024 * 1024:
stored_data['download_rate_formatted'] = f"{rate/1024:.2f} KB/s"
else:
stored_data['download_rate_formatted'] = f"{rate/(1024*1024):.2f} MB/s"
# Log real-time progress
logger.debug(f"Task {task_id} track progress: {title} by {artist}: {percent}%")
if status == "processing":
progress = progress_data.get("progress", 0)
elif status == "track_complete" or (status == "done" and stored_data.get('type') == 'track'):
# Track completed successfully
title = stored_data.get('title', stored_data.get('song', 'Unknown'))
artist = stored_data.get('artist', 'Unknown')
album = stored_data.get('album', 'Unknown')
quality = stored_data.get('quality', 'Unknown')
path = stored_data.get('path', '')
# Log completion with file size if available
if 'file_size' in stored_data:
size = stored_data['file_size']
if size < 1024:
stored_data['file_size_formatted'] = f"{size} B"
elif size < 1024 * 1024:
stored_data['file_size_formatted'] = f"{size/1024:.2f} KB"
elif size < 1024 * 1024 * 1024:
stored_data['file_size_formatted'] = f"{size/(1024*1024):.2f} MB"
else:
stored_data['file_size_formatted'] = f"{size/(1024*1024*1024):.2f} GB"
logger.info(f"Task {task_id} track complete: {artist} - {title} ({quality}) {stored_data.get('file_size_formatted', '')}")
else:
logger.info(f"Task {task_id} track complete: {artist} - {title} ({quality})")
if path:
logger.debug(f"Task {task_id} saved to: {path}")
# Update completion progress
task_info = get_task_info(task_id)
total_tracks = task_info.get('total_tracks', stored_data.get('total_tracks', 0))
completed_tracks = task_info.get('completed_tracks', 0) + 1
# Update task info with new completed track count
task_info['completed_tracks'] = completed_tracks
store_task_info(task_id, task_info)
# Calculate completion percentage
if total_tracks > 0:
completion_percent = int((completed_tracks / total_tracks) * 100)
stored_data['completion_percent'] = completion_percent
logger.info(f"Task {task_id} progress: {completed_tracks}/{total_tracks} tracks ({completion_percent}%)")
elif status == "skipped":
# Track was skipped (usually because it already exists)
title = stored_data.get('song', 'Unknown')
artist = stored_data.get('artist', 'Unknown')
reason = stored_data.get('reason', 'Unknown reason')
logger.info(f"Task {task_id} skipped: {artist} - {title}")
logger.debug(f"Task {task_id} skip reason: {reason}")
# Update task info with skipped track
task_info = get_task_info(task_id)
skipped_tracks = task_info.get('skipped_tracks', 0) + 1
task_info['skipped_tracks'] = skipped_tracks
store_task_info(task_id, task_info)
elif status == "retrying":
# Download failed and is being retried
song = stored_data.get('song', 'Unknown')
artist = stored_data.get('artist', 'Unknown')
retry_count = stored_data.get('retry_count', 0)
seconds_left = stored_data.get('seconds_left', 0)
error = stored_data.get('error', 'Unknown error')
logger.warning(f"Task {task_id} retrying: {artist} - {song} (Attempt {retry_count}, waiting {seconds_left}s)")
logger.debug(f"Task {task_id} retry reason: {error}")
# Update task info with retry count
task_info = get_task_info(task_id)
retry_count_total = task_info.get('retry_count', 0) + 1
task_info['retry_count'] = retry_count_total
store_task_info(task_id, task_info)
elif status == "error":
# Error occurred during download
message = stored_data.get('message', 'Unknown error')
logger.error(f"Task {task_id} error: {message}")
# Update task info with error count
task_info = get_task_info(task_id)
error_count = task_info.get('error_count', 0) + 1
task_info['error_count'] = error_count
store_task_info(task_id, task_info)
elif status == "done":
# Overall download operation completed
content_type = stored_data.get('type', '').upper()
name = stored_data.get('name', '')
album = stored_data.get('album', '')
artist = stored_data.get('artist', '')
total_tracks = stored_data.get('total_tracks', 0)
# Get task info for summary
task_info = get_task_info(task_id)
completed_tracks = task_info.get('completed_tracks', 0)
skipped_tracks = task_info.get('skipped_tracks', 0)
error_count = task_info.get('error_count', 0)
# Create completion message
if album and artist:
logger.info(f"Task {task_id} completed: {content_type} '{album}' by {artist}")
elif album:
logger.info(f"Task {task_id} completed: {content_type} '{album}'")
elif name:
logger.info(f"Task {task_id} completed: {content_type} '{name}'")
else:
logger.info(f"Task {task_id} completed")
# Log summary
logger.info(f"Task {task_id} summary: {completed_tracks} completed, {skipped_tracks} skipped, {error_count} errors")
# Update task status to complete
stored_data["status"] = ProgressState.COMPLETE
stored_data["message"] = f"Download complete: {completed_tracks} tracks downloaded"
elif status == "processing":
# Processing status - log message with progress if available
progress = stored_data.get("progress", 0)
message = stored_data.get("message", "Processing")
if progress > 0:
logger.debug(f"Task {task_id} progress: {progress}% - {message}")
logger.debug(f"Task {task_id} processing: {progress}% - {message}")
else:
logger.info(f"Task {task_id} processing: {message}")
elif status == "error":
error_message = progress_data.get("error", message)
logger.error(f"Task {task_id} error: {error_message}")
elif status == "complete":
logger.info(f"Task {task_id} completed: {message}")
else:
logger.info(f"Task {task_id} {status}: {message}")
# Unknown status - just log it
logger.info(f"Task {task_id} {status}: {stored_data.get('message', 'No details')}")
# Store the enhanced progress update in Redis
store_task_status(task_id, stored_data)
# Celery signal handlers
@task_prerun.connect