diff --git a/routes/prgs.py b/routes/prgs.py
index d6a2745..9ac9c1a 100755
--- a/routes/prgs.py
+++ b/routes/prgs.py
@@ -31,6 +31,15 @@ ACTIVE_TASK_STATES = {
"real-time", # "real-time" - real-time download progress (hyphenated version)
}
+# Define terminal task states that should be included when recently completed
+TERMINAL_TASK_STATES = {
+ ProgressState.COMPLETE, # "complete" - task completed successfully
+ ProgressState.DONE, # "done" - task finished processing
+ ProgressState.ERROR, # "error" - task failed
+ ProgressState.CANCELLED, # "cancelled" - task was cancelled
+ ProgressState.SKIPPED, # "skipped" - task was skipped
+}
+
def get_task_status_from_last_status(last_status):
"""
Extract the task status from last_status, checking both possible locations.
@@ -506,7 +515,9 @@ async def get_task_updates(request: Request):
task_counts["active"] += 1
# Always include active tasks in updates, apply filtering to others
- should_include = is_active_task or (task_timestamp > since_timestamp and not active_only)
+ # Also include recently completed/terminal tasks to ensure "done" status gets sent
+ is_recently_terminal = task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp
+ should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) or is_recently_terminal
if should_include:
# Construct the same detailed task object as in list_tasks()
@@ -661,21 +672,40 @@ async def stream_task_updates(request: Request):
active_only = request.query_params.get('active_only', '').lower() == 'true'
async def event_generator():
- # Track last update timestamp for this client connection
+ # Track last known state of each task to detect actual changes
+ last_task_states = {} # task_id -> {"status": str, "timestamp": float, "status_count": int}
last_update_timestamp = time.time()
+ last_heartbeat = time.time()
+ heartbeat_interval = 10.0 # Reduced from 30s to 10s for faster connection monitoring
+ burst_mode_until = 0 # Timestamp until which we stay in burst mode
try:
# Send initial data immediately upon connection
- yield await generate_task_update_event(last_update_timestamp, active_only, request)
+ initial_data = await generate_task_update_event(last_update_timestamp, active_only, request)
+ yield initial_data
+
+ # Initialize task states from initial data
+ try:
+ initial_json = json.loads(initial_data.replace("data: ", "").strip())
+ for task in initial_json.get("tasks", []):
+ task_id = task.get("task_id")
+ if task_id:
+ last_task_states[task_id] = {
+ "status": get_task_status_from_last_status(task.get("last_line")),
+ "timestamp": task.get("timestamp", last_update_timestamp),
+ "status_count": task.get("status_count", 0)
+ }
+ except:
+ pass # Continue if initial state parsing fails
+
last_update_timestamp = time.time()
- # Continuous monitoring loop
+ # Optimized monitoring loop - only send when changes detected
while True:
try:
- # Check for updates since last timestamp
current_time = time.time()
- # Get all tasks and check for updates
+ # Get all tasks and detect actual changes
all_tasks = get_all_tasks()
updated_tasks = []
active_tasks = []
@@ -691,27 +721,26 @@ async def stream_task_updates(request: Request):
"skipped": 0
}
- has_updates = False
+ has_actual_changes = False
+ current_task_ids = set()
for task_summary in all_tasks:
task_id = task_summary.get("task_id")
if not task_id:
continue
+ current_task_ids.add(task_id)
task_info = get_task_info(task_id)
if not task_info:
continue
last_status = get_last_task_status(task_id)
-
- # Check if task has been updated since the given timestamp
task_timestamp = last_status.get("timestamp") if last_status else task_info.get("created_at", 0)
-
- # Determine task status and categorize
task_status = get_task_status_from_last_status(last_status)
is_active_task = is_task_active(task_status)
+ status_count = len(get_task_status(task_id))
- # Categorize tasks by status using ProgressState constants
+ # Categorize tasks by status
if task_status == ProgressState.RETRYING:
task_counts["retrying"] += 1
elif task_status in {ProgressState.QUEUED, "pending"}:
@@ -727,22 +756,102 @@ async def stream_task_updates(request: Request):
elif is_active_task:
task_counts["active"] += 1
- # Always include active tasks in updates, apply filtering to others
- should_include = is_active_task or (task_timestamp > last_update_timestamp and not active_only)
+ # Check if this task has actually changed
+ previous_state = last_task_states.get(task_id)
+
+ # Determine if task has meaningful changes
+ task_changed = False
+ is_new_task = previous_state is None
+ just_became_terminal = False
+
+ if is_new_task:
+ # Include new tasks if they're active OR if they're recently terminal
+ # (avoid sending old completed/cancelled tasks on connection)
+ if not (task_status in TERMINAL_TASK_STATES):
+ task_changed = True
+ # Trigger burst mode for new active tasks to catch rapid completions
+ burst_mode_until = current_time + 10.0 # 10 seconds of frequent polling
+ logger.debug(f"SSE: New active task detected: {task_id} - entering burst mode")
+ else:
+ # Check if terminal task is recent (completed within last 30 seconds)
+ is_recently_terminal = (current_time - task_timestamp) <= 30.0
+ if is_recently_terminal:
+ task_changed = True
+ logger.info(f"SSE: New recently terminal task detected: {task_id} (status: {task_status}, age: {current_time - task_timestamp:.1f}s)")
+ else:
+ logger.debug(f"SSE: Skipping old terminal task: {task_id} (status: {task_status}, age: {current_time - task_timestamp:.1f}s)")
+ else:
+ # Check for status changes
+ status_changed = previous_state["status"] != task_status
+ # Check for new status updates (more detailed progress)
+ status_count_changed = previous_state["status_count"] != status_count
+ # Check for significant timestamp changes (new activity)
+ significant_timestamp_change = task_timestamp > previous_state["timestamp"]
+
+ if status_changed:
+ task_changed = True
+ # Check if this is a transition TO terminal state
+ was_terminal = previous_state["status"] in TERMINAL_TASK_STATES
+ is_now_terminal = task_status in TERMINAL_TASK_STATES
+ just_became_terminal = not was_terminal and is_now_terminal
+
+ # Extend burst mode on significant status changes
+ if not is_now_terminal:
+ burst_mode_until = max(burst_mode_until, current_time + 5.0) # 5 more seconds
+
+ logger.debug(f"SSE: Status changed for {task_id}: {previous_state['status']} -> {task_status}")
+ if just_became_terminal:
+ logger.debug(f"SSE: Task {task_id} just became terminal")
+ elif status_count_changed and significant_timestamp_change and not (task_status in TERMINAL_TASK_STATES):
+ # Only track progress updates for non-terminal tasks
+ task_changed = True
+ logger.debug(f"SSE: Progress update for {task_id}: status_count {previous_state['status_count']} -> {status_count}")
+
+ # Include task if it changed and meets criteria
+ should_include = False
+ if task_changed:
+ # For terminal state tasks, only include if they just became terminal
+ if task_status in TERMINAL_TASK_STATES:
+ if just_became_terminal:
+ should_include = True
+ has_actual_changes = True
+ logger.debug(f"SSE: Including terminal task {task_id} (just transitioned)")
+ # Note: we don't include new terminal tasks (handled above)
+ else:
+ # Non-terminal tasks are always included when they change
+ should_include = True
+ has_actual_changes = True
+ elif is_active_task and not active_only:
+ # For non-active_only streams, include active tasks periodically for frontend state sync
+ # But only if significant time has passed since last update
+ if current_time - last_update_timestamp > 10.0: # Every 10 seconds max
+ should_include = True
if should_include:
- has_updates = True
- # Construct the same detailed task object as in updates endpoint
+ # Update our tracked state
+ last_task_states[task_id] = {
+ "status": task_status,
+ "timestamp": task_timestamp,
+ "status_count": status_count
+ }
+
+ # Build response
task_response = _build_task_response(task_info, last_status, task_id, current_time, request)
if is_active_task:
active_tasks.append(task_response)
else:
updated_tasks.append(task_response)
+
+ # Clean up states for tasks that no longer exist
+ removed_tasks = set(last_task_states.keys()) - current_task_ids
+ for removed_task_id in removed_tasks:
+ del last_task_states[removed_task_id]
+ has_actual_changes = True
+ logger.debug(f"SSE: Task removed: {removed_task_id}")
- # Only send update if there are changes
- if has_updates:
- # Combine active tasks (always shown) with updated tasks
+ # Send update only if there are actual changes
+ if has_actual_changes:
all_returned_tasks = active_tasks + updated_tasks
# Sort by priority (active first, then by creation time)
@@ -760,24 +869,55 @@ async def stream_task_updates(request: Request):
"active_tasks": len(active_tasks),
"updated_count": len(updated_tasks),
"since_timestamp": last_update_timestamp,
+ "change_type": "update"
}
# Send SSE event with update data
event_data = json.dumps(update_data)
yield f"data: {event_data}\n\n"
- logger.debug(f"SSE: Sent {len(active_tasks)} active + {len(updated_tasks)} updated tasks")
+ # Log details about what was sent
+ task_statuses = [f"{task.get('task_id', 'unknown')}:{get_task_status_from_last_status(task.get('last_line'))}" for task in all_returned_tasks]
+ logger.info(f"SSE: Sent {len(active_tasks)} active + {len(updated_tasks)} updated tasks: {task_statuses}")
- # Update last timestamp
last_update_timestamp = current_time
+ last_heartbeat = current_time
+
+ # Send heartbeat if no updates for a while (keeps connection alive)
+ elif current_time - last_heartbeat > heartbeat_interval:
+ heartbeat_data = {
+ "current_timestamp": current_time,
+ "total_tasks": task_counts["active"] + task_counts["retrying"],
+ "task_counts": task_counts,
+ "change_type": "heartbeat"
+ }
+
+ event_data = json.dumps(heartbeat_data)
+ yield f"data: {event_data}\n\n"
+
+ last_heartbeat = current_time
+ logger.debug("SSE: Sent heartbeat")
- # Wait before next check (much shorter than polling interval)
- await asyncio.sleep(0.5) # Check every 500ms for real-time feel
+ # Responsive polling - much faster for real-time updates
+ active_task_count = task_counts["active"] + task_counts["retrying"]
+
+ if current_time < burst_mode_until:
+ # Burst mode: poll every 100ms to catch rapid task completions
+ await asyncio.sleep(0.1)
+ elif has_actual_changes or active_task_count > 0:
+ # When there are changes or active tasks, poll very frequently
+ await asyncio.sleep(0.2) # 200ms for immediate responsiveness
+ elif current_time - last_update_timestamp < 30.0:
+ # For 30 seconds after last update, poll more frequently to catch fast completions
+ await asyncio.sleep(0.5) # 500ms to catch fast transitions
+ else:
+ # Only when truly idle for >30s, use longer interval
+ await asyncio.sleep(2.0) # 2 seconds max when completely idle
except Exception as e:
logger.error(f"Error in SSE event generation: {e}", exc_info=True)
# Send error event and continue
- error_data = json.dumps({"error": "Internal server error", "timestamp": time.time()})
+ error_data = json.dumps({"error": "Internal server error", "timestamp": time.time(), "change_type": "error"})
yield f"data: {error_data}\n\n"
await asyncio.sleep(1) # Wait longer on error
@@ -859,7 +999,9 @@ async def generate_task_update_event(since_timestamp: float, active_only: bool,
task_counts["active"] += 1
# Always include active tasks in updates, apply filtering to others
- should_include = is_active_task or (task_timestamp > since_timestamp and not active_only)
+ # Also include recently completed/terminal tasks to ensure "done" status gets sent
+ is_recently_terminal = task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp
+ should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) or is_recently_terminal
if should_include:
# Construct the same detailed task object as in updates endpoint
diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py
index c8811c7..a6268ce 100644
--- a/routes/utils/celery_tasks.py
+++ b/routes/utils/celery_tasks.py
@@ -235,12 +235,12 @@ def cancel_task(task_id):
# Try to revoke the Celery task if it hasn't started yet
celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM")
- # Schedule deletion of task data after 30 seconds
+ # Schedule deletion of task data after 3 seconds
delayed_delete_task_data.apply_async(
- args=[task_id, "Task cancelled by user and auto-cleaned."], countdown=30
+ args=[task_id, "Task cancelled by user and auto-cleaned."], countdown=3
)
logger.info(
- f"Task {task_id} cancelled by user. Data scheduled for deletion in 30s."
+ f"Task {task_id} cancelled by user. Data scheduled for deletion in 3s."
)
return {"status": "cancelled", "task_id": task_id}
@@ -917,7 +917,7 @@ class ProgressTrackingTask(Task):
# Schedule deletion for completed multi-track downloads
delayed_delete_task_data.apply_async(
args=[task_id, "Task completed successfully and auto-cleaned."],
- countdown=30, # Delay in seconds
+ countdown=3, # Delay in seconds
)
# If from playlist_watch and successful, add track to DB
@@ -1055,7 +1055,7 @@ def task_postrun_handler(
): # Applies to single track downloads and tracks from playlists/albums
delayed_delete_task_data.apply_async(
args=[task_id, "Task completed successfully and auto-cleaned."],
- countdown=30,
+ countdown=3,
)
original_request = task_info.get("original_request", {})
@@ -1175,14 +1175,14 @@ def task_failure_handler(
else:
# If task cannot be retried, schedule its data for deletion
logger.info(
- f"Task {task_id} failed and cannot be retried. Data scheduled for deletion in 30s."
+ f"Task {task_id} failed and cannot be retried. Data scheduled for deletion in 3s."
)
delayed_delete_task_data.apply_async(
args=[
task_id,
f"Task failed ({str(exception)}) and max retries reached. Auto-cleaned.",
],
- countdown=30,
+ countdown=3,
)
except Exception as e:
diff --git a/spotizerr-ui/src/components/Queue.tsx b/spotizerr-ui/src/components/Queue.tsx
index 94551d0..356e191 100644
--- a/spotizerr-ui/src/components/Queue.tsx
+++ b/spotizerr-ui/src/components/Queue.tsx
@@ -1,179 +1,26 @@
import { useContext, useState, useRef, useEffect } from "react";
-import {
- FaTimes,
- FaSync,
- FaCheckCircle,
- FaExclamationCircle,
- FaHourglassHalf,
- FaMusic,
- FaCompactDisc,
-} from "react-icons/fa";
-import { QueueContext, type QueueItem, type QueueStatus, isActiveTaskStatus } from "@/contexts/queue-context";
-
-const isTerminalStatus = (status: QueueStatus) =>
- ["completed", "error", "cancelled", "skipped", "done"].includes(status);
-
-const statusStyles: Record<
- QueueStatus,
- { icon: React.ReactNode; color: string; bgColor: string; borderColor: string; name: string }
-> = {
- queued: {
- icon: ,
- color: "text-content-muted dark:text-content-muted-dark",
- bgColor: "bg-gradient-to-r from-surface-muted to-surface-accent dark:from-surface-muted-dark dark:to-surface-accent-dark",
- borderColor: "border-border dark:border-border-dark",
- name: "Queued",
- },
- initializing: {
- icon: ,
- color: "text-info",
- bgColor: "bg-gradient-to-r from-blue-50 to-blue-100 dark:from-blue-900/20 dark:to-blue-800/30",
- borderColor: "border-info/30 dark:border-info/40",
- name: "Initializing",
- },
- downloading: {
- icon: ,
- color: "text-info",
- bgColor: "bg-gradient-to-r from-blue-50 to-blue-100 dark:from-blue-900/20 dark:to-blue-800/30",
- borderColor: "border-info/30 dark:border-info/40",
- name: "Downloading",
- },
- processing: {
- icon: ,
- color: "text-processing",
- bgColor: "bg-gradient-to-r from-purple-50 to-purple-100 dark:from-purple-900/20 dark:to-purple-800/30",
- borderColor: "border-processing/30 dark:border-processing/40",
- name: "Processing",
- },
- retrying: {
- icon: ,
- color: "text-warning",
- bgColor: "bg-gradient-to-r from-orange-50 to-orange-100 dark:from-orange-900/20 dark:to-orange-800/30",
- borderColor: "border-warning/30 dark:border-warning/40",
- name: "Retrying",
- },
- completed: {
- icon: ,
- color: "text-success",
- bgColor: "bg-gradient-to-r from-green-50 to-green-100 dark:from-green-900/20 dark:to-green-800/30",
- borderColor: "border-success/30 dark:border-success/40",
- name: "Completed",
- },
- done: {
- icon: ,
- color: "text-success",
- bgColor: "bg-gradient-to-r from-green-50 to-green-100 dark:from-green-900/20 dark:to-green-800/30",
- borderColor: "border-success/30 dark:border-success/40",
- name: "Done",
- },
- error: {
- icon: ,
- color: "text-error",
- bgColor: "bg-gradient-to-r from-red-50 to-red-100 dark:from-red-900/20 dark:to-red-800/30",
- borderColor: "border-error/30 dark:border-error/40",
- name: "Error",
- },
- cancelled: {
- icon: ,
- color: "text-warning",
- bgColor: "bg-gradient-to-r from-orange-50 to-orange-100 dark:from-orange-900/20 dark:to-orange-800/30",
- borderColor: "border-warning/30 dark:border-warning/40",
- name: "Cancelled",
- },
- skipped: {
- icon: ,
- color: "text-content-muted dark:text-content-muted-dark",
- bgColor: "bg-gradient-to-r from-surface-muted to-surface-accent dark:from-surface-muted-dark dark:to-surface-accent-dark",
- borderColor: "border-border dark:border-border-dark",
- name: "Skipped",
- },
- pending: {
- icon: ,
- color: "text-content-muted dark:text-content-muted-dark",
- bgColor: "bg-gradient-to-r from-surface-muted to-surface-accent dark:from-surface-muted-dark dark:to-surface-accent-dark",
- borderColor: "border-border dark:border-border-dark",
- name: "Pending",
- },
- "real-time": {
- icon: ,
- color: "text-info",
- bgColor: "bg-gradient-to-r from-blue-50 to-blue-100 dark:from-blue-900/20 dark:to-blue-800/30",
- borderColor: "border-info/30 dark:border-info/40",
- name: "Real-time Download",
- },
- progress: {
- icon: ,
- color: "text-info",
- bgColor: "bg-gradient-to-r from-blue-50 to-blue-100 dark:from-blue-900/20 dark:to-blue-800/30",
- borderColor: "border-info/30 dark:border-info/40",
- name: "Progress",
- },
- track_progress: {
- icon: ,
- color: "text-info",
- bgColor: "bg-gradient-to-r from-blue-50 to-blue-100 dark:from-blue-900/20 dark:to-blue-800/30",
- borderColor: "border-info/30 dark:border-info/40",
- name: "Track Progress",
- },
-};
+import { FaTimes, FaSync, FaCheckCircle, FaExclamationCircle, FaHourglassHalf, FaMusic, FaCompactDisc } from "react-icons/fa";
+import { QueueContext, type QueueItem, getStatus, getProgress, getCurrentTrackInfo, isActiveStatus, isTerminalStatus } from "@/contexts/queue-context";
// Circular Progress Component
const CircularProgress = ({
progress,
isCompleted = false,
- isRealProgress = false,
size = 60,
- strokeWidth = 6,
- className = ""
+ strokeWidth = 6
}: {
progress: number;
isCompleted?: boolean;
- isRealProgress?: boolean;
size?: number;
strokeWidth?: number;
- className?: string;
}) => {
- // Apply a logarithmic curve to make progress slower near the end - ONLY for fake progress
- const getAdjustedProgress = (rawProgress: number) => {
- if (isCompleted) return 100;
- if (rawProgress <= 0) return 0;
-
- // If this is real progress data, show it as-is without any artificial manipulation
- if (isRealProgress) {
- return Math.min(Math.max(rawProgress, 0), 100);
- }
-
- // Only apply logarithmic curve for fake/simulated progress
- // Use a logarithmic curve that slows down significantly near 100%
- // This creates the effect of filling more slowly as it approaches completion
- const normalized = Math.min(Math.max(rawProgress, 0), 100) / 100;
-
- // Apply easing function that slows down dramatically near the end
- const eased = 1 - Math.pow(1 - normalized, 3); // Cubic ease-out
- const logarithmic = Math.log(normalized * 9 + 1) / Math.log(10); // Logarithmic scaling
-
- // Combine both for a very slow approach to 100%
- const combined = (eased * 0.7 + logarithmic * 0.3) * 95; // Cap at 95% during download
-
- // Ensure minimum visibility for any progress > 0
- const minVisible = rawProgress > 0 ? Math.max(combined, 8) : 0;
-
- return Math.min(minVisible, 95); // Never quite reach 100% during download
- };
-
- const adjustedProgress = getAdjustedProgress(progress);
const radius = (size - strokeWidth) / 2;
const circumference = radius * 2 * Math.PI;
- const strokeDasharray = circumference;
- const strokeDashoffset = circumference - (adjustedProgress / 100) * circumference;
+ const strokeDashoffset = circumference - (progress / 100) * circumference;
return (
-
-