Improved SSE and queue handling. Updated to use callbacks.ts

This commit is contained in:
Xoconoch
2025-08-02 17:12:44 -06:00
parent 9fdc0bde42
commit 5abc62d8be
5 changed files with 1078 additions and 1179 deletions

View File

@@ -31,6 +31,15 @@ ACTIVE_TASK_STATES = {
"real-time", # "real-time" - real-time download progress (hyphenated version)
}
# Define terminal task states that should be included when recently completed
TERMINAL_TASK_STATES = {
ProgressState.COMPLETE, # "complete" - task completed successfully
ProgressState.DONE, # "done" - task finished processing
ProgressState.ERROR, # "error" - task failed
ProgressState.CANCELLED, # "cancelled" - task was cancelled
ProgressState.SKIPPED, # "skipped" - task was skipped
}
def get_task_status_from_last_status(last_status):
"""
Extract the task status from last_status, checking both possible locations.
@@ -506,7 +515,9 @@ async def get_task_updates(request: Request):
task_counts["active"] += 1
# Always include active tasks in updates, apply filtering to others
should_include = is_active_task or (task_timestamp > since_timestamp and not active_only)
# Also include recently completed/terminal tasks to ensure "done" status gets sent
is_recently_terminal = task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp
should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) or is_recently_terminal
if should_include:
# Construct the same detailed task object as in list_tasks()
@@ -661,21 +672,40 @@ async def stream_task_updates(request: Request):
active_only = request.query_params.get('active_only', '').lower() == 'true'
async def event_generator():
# Track last update timestamp for this client connection
# Track last known state of each task to detect actual changes
last_task_states = {} # task_id -> {"status": str, "timestamp": float, "status_count": int}
last_update_timestamp = time.time()
last_heartbeat = time.time()
heartbeat_interval = 10.0 # Reduced from 30s to 10s for faster connection monitoring
burst_mode_until = 0 # Timestamp until which we stay in burst mode
try:
# Send initial data immediately upon connection
yield await generate_task_update_event(last_update_timestamp, active_only, request)
initial_data = await generate_task_update_event(last_update_timestamp, active_only, request)
yield initial_data
# Initialize task states from initial data
try:
initial_json = json.loads(initial_data.replace("data: ", "").strip())
for task in initial_json.get("tasks", []):
task_id = task.get("task_id")
if task_id:
last_task_states[task_id] = {
"status": get_task_status_from_last_status(task.get("last_line")),
"timestamp": task.get("timestamp", last_update_timestamp),
"status_count": task.get("status_count", 0)
}
except:
pass # Continue if initial state parsing fails
last_update_timestamp = time.time()
# Continuous monitoring loop
# Optimized monitoring loop - only send when changes detected
while True:
try:
# Check for updates since last timestamp
current_time = time.time()
# Get all tasks and check for updates
# Get all tasks and detect actual changes
all_tasks = get_all_tasks()
updated_tasks = []
active_tasks = []
@@ -691,27 +721,26 @@ async def stream_task_updates(request: Request):
"skipped": 0
}
has_updates = False
has_actual_changes = False
current_task_ids = set()
for task_summary in all_tasks:
task_id = task_summary.get("task_id")
if not task_id:
continue
current_task_ids.add(task_id)
task_info = get_task_info(task_id)
if not task_info:
continue
last_status = get_last_task_status(task_id)
# Check if task has been updated since the given timestamp
task_timestamp = last_status.get("timestamp") if last_status else task_info.get("created_at", 0)
# Determine task status and categorize
task_status = get_task_status_from_last_status(last_status)
is_active_task = is_task_active(task_status)
status_count = len(get_task_status(task_id))
# Categorize tasks by status using ProgressState constants
# Categorize tasks by status
if task_status == ProgressState.RETRYING:
task_counts["retrying"] += 1
elif task_status in {ProgressState.QUEUED, "pending"}:
@@ -727,22 +756,102 @@ async def stream_task_updates(request: Request):
elif is_active_task:
task_counts["active"] += 1
# Always include active tasks in updates, apply filtering to others
should_include = is_active_task or (task_timestamp > last_update_timestamp and not active_only)
# Check if this task has actually changed
previous_state = last_task_states.get(task_id)
# Determine if task has meaningful changes
task_changed = False
is_new_task = previous_state is None
just_became_terminal = False
if is_new_task:
# Include new tasks if they're active OR if they're recently terminal
# (avoid sending old completed/cancelled tasks on connection)
if not (task_status in TERMINAL_TASK_STATES):
task_changed = True
# Trigger burst mode for new active tasks to catch rapid completions
burst_mode_until = current_time + 10.0 # 10 seconds of frequent polling
logger.debug(f"SSE: New active task detected: {task_id} - entering burst mode")
else:
# Check if terminal task is recent (completed within last 30 seconds)
is_recently_terminal = (current_time - task_timestamp) <= 30.0
if is_recently_terminal:
task_changed = True
logger.info(f"SSE: New recently terminal task detected: {task_id} (status: {task_status}, age: {current_time - task_timestamp:.1f}s)")
else:
logger.debug(f"SSE: Skipping old terminal task: {task_id} (status: {task_status}, age: {current_time - task_timestamp:.1f}s)")
else:
# Check for status changes
status_changed = previous_state["status"] != task_status
# Check for new status updates (more detailed progress)
status_count_changed = previous_state["status_count"] != status_count
# Check for significant timestamp changes (new activity)
significant_timestamp_change = task_timestamp > previous_state["timestamp"]
if status_changed:
task_changed = True
# Check if this is a transition TO terminal state
was_terminal = previous_state["status"] in TERMINAL_TASK_STATES
is_now_terminal = task_status in TERMINAL_TASK_STATES
just_became_terminal = not was_terminal and is_now_terminal
# Extend burst mode on significant status changes
if not is_now_terminal:
burst_mode_until = max(burst_mode_until, current_time + 5.0) # 5 more seconds
logger.debug(f"SSE: Status changed for {task_id}: {previous_state['status']} -> {task_status}")
if just_became_terminal:
logger.debug(f"SSE: Task {task_id} just became terminal")
elif status_count_changed and significant_timestamp_change and not (task_status in TERMINAL_TASK_STATES):
# Only track progress updates for non-terminal tasks
task_changed = True
logger.debug(f"SSE: Progress update for {task_id}: status_count {previous_state['status_count']} -> {status_count}")
# Include task if it changed and meets criteria
should_include = False
if task_changed:
# For terminal state tasks, only include if they just became terminal
if task_status in TERMINAL_TASK_STATES:
if just_became_terminal:
should_include = True
has_actual_changes = True
logger.debug(f"SSE: Including terminal task {task_id} (just transitioned)")
# Note: we don't include new terminal tasks (handled above)
else:
# Non-terminal tasks are always included when they change
should_include = True
has_actual_changes = True
elif is_active_task and not active_only:
# For non-active_only streams, include active tasks periodically for frontend state sync
# But only if significant time has passed since last update
if current_time - last_update_timestamp > 10.0: # Every 10 seconds max
should_include = True
if should_include:
has_updates = True
# Construct the same detailed task object as in updates endpoint
# Update our tracked state
last_task_states[task_id] = {
"status": task_status,
"timestamp": task_timestamp,
"status_count": status_count
}
# Build response
task_response = _build_task_response(task_info, last_status, task_id, current_time, request)
if is_active_task:
active_tasks.append(task_response)
else:
updated_tasks.append(task_response)
# Clean up states for tasks that no longer exist
removed_tasks = set(last_task_states.keys()) - current_task_ids
for removed_task_id in removed_tasks:
del last_task_states[removed_task_id]
has_actual_changes = True
logger.debug(f"SSE: Task removed: {removed_task_id}")
# Only send update if there are changes
if has_updates:
# Combine active tasks (always shown) with updated tasks
# Send update only if there are actual changes
if has_actual_changes:
all_returned_tasks = active_tasks + updated_tasks
# Sort by priority (active first, then by creation time)
@@ -760,24 +869,55 @@ async def stream_task_updates(request: Request):
"active_tasks": len(active_tasks),
"updated_count": len(updated_tasks),
"since_timestamp": last_update_timestamp,
"change_type": "update"
}
# Send SSE event with update data
event_data = json.dumps(update_data)
yield f"data: {event_data}\n\n"
logger.debug(f"SSE: Sent {len(active_tasks)} active + {len(updated_tasks)} updated tasks")
# Log details about what was sent
task_statuses = [f"{task.get('task_id', 'unknown')}:{get_task_status_from_last_status(task.get('last_line'))}" for task in all_returned_tasks]
logger.info(f"SSE: Sent {len(active_tasks)} active + {len(updated_tasks)} updated tasks: {task_statuses}")
# Update last timestamp
last_update_timestamp = current_time
last_heartbeat = current_time
# Send heartbeat if no updates for a while (keeps connection alive)
elif current_time - last_heartbeat > heartbeat_interval:
heartbeat_data = {
"current_timestamp": current_time,
"total_tasks": task_counts["active"] + task_counts["retrying"],
"task_counts": task_counts,
"change_type": "heartbeat"
}
event_data = json.dumps(heartbeat_data)
yield f"data: {event_data}\n\n"
last_heartbeat = current_time
logger.debug("SSE: Sent heartbeat")
# Wait before next check (much shorter than polling interval)
await asyncio.sleep(0.5) # Check every 500ms for real-time feel
# Responsive polling - much faster for real-time updates
active_task_count = task_counts["active"] + task_counts["retrying"]
if current_time < burst_mode_until:
# Burst mode: poll every 100ms to catch rapid task completions
await asyncio.sleep(0.1)
elif has_actual_changes or active_task_count > 0:
# When there are changes or active tasks, poll very frequently
await asyncio.sleep(0.2) # 200ms for immediate responsiveness
elif current_time - last_update_timestamp < 30.0:
# For 30 seconds after last update, poll more frequently to catch fast completions
await asyncio.sleep(0.5) # 500ms to catch fast transitions
else:
# Only when truly idle for >30s, use longer interval
await asyncio.sleep(2.0) # 2 seconds max when completely idle
except Exception as e:
logger.error(f"Error in SSE event generation: {e}", exc_info=True)
# Send error event and continue
error_data = json.dumps({"error": "Internal server error", "timestamp": time.time()})
error_data = json.dumps({"error": "Internal server error", "timestamp": time.time(), "change_type": "error"})
yield f"data: {error_data}\n\n"
await asyncio.sleep(1) # Wait longer on error
@@ -859,7 +999,9 @@ async def generate_task_update_event(since_timestamp: float, active_only: bool,
task_counts["active"] += 1
# Always include active tasks in updates, apply filtering to others
should_include = is_active_task or (task_timestamp > since_timestamp and not active_only)
# Also include recently completed/terminal tasks to ensure "done" status gets sent
is_recently_terminal = task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp
should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) or is_recently_terminal
if should_include:
# Construct the same detailed task object as in updates endpoint

View File

@@ -235,12 +235,12 @@ def cancel_task(task_id):
# Try to revoke the Celery task if it hasn't started yet
celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM")
# Schedule deletion of task data after 30 seconds
# Schedule deletion of task data after 3 seconds
delayed_delete_task_data.apply_async(
args=[task_id, "Task cancelled by user and auto-cleaned."], countdown=30
args=[task_id, "Task cancelled by user and auto-cleaned."], countdown=3
)
logger.info(
f"Task {task_id} cancelled by user. Data scheduled for deletion in 30s."
f"Task {task_id} cancelled by user. Data scheduled for deletion in 3s."
)
return {"status": "cancelled", "task_id": task_id}
@@ -917,7 +917,7 @@ class ProgressTrackingTask(Task):
# Schedule deletion for completed multi-track downloads
delayed_delete_task_data.apply_async(
args=[task_id, "Task completed successfully and auto-cleaned."],
countdown=30, # Delay in seconds
countdown=3, # Delay in seconds
)
# If from playlist_watch and successful, add track to DB
@@ -1055,7 +1055,7 @@ def task_postrun_handler(
): # Applies to single track downloads and tracks from playlists/albums
delayed_delete_task_data.apply_async(
args=[task_id, "Task completed successfully and auto-cleaned."],
countdown=30,
countdown=3,
)
original_request = task_info.get("original_request", {})
@@ -1175,14 +1175,14 @@ def task_failure_handler(
else:
# If task cannot be retried, schedule its data for deletion
logger.info(
f"Task {task_id} failed and cannot be retried. Data scheduled for deletion in 30s."
f"Task {task_id} failed and cannot be retried. Data scheduled for deletion in 3s."
)
delayed_delete_task_data.apply_async(
args=[
task_id,
f"Task failed ({str(exception)}) and max retries reached. Auto-cleaned.",
],
countdown=30,
countdown=3,
)
except Exception as e: