fix: Distroless ffmpeg support

This commit is contained in:
Xoconoch
2025-08-23 12:12:48 -06:00
parent cb1b061297
commit 262eefd66d
8 changed files with 485 additions and 284 deletions

View File

@@ -18,20 +18,29 @@ RUN uv pip install --target /python -r requirements.txt
FROM debian:stable-slim AS ffmpeg FROM debian:stable-slim AS ffmpeg
ARG TARGETARCH ARG TARGETARCH
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates curl xz-utils \ ca-certificates curl xz-utils jq \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
RUN case "$TARGETARCH" in \ RUN set -euo pipefail; \
amd64) FFMPEG_PKG=ffmpeg-master-latest-linux64-gpl.tar.xz ;; \ case "$TARGETARCH" in \
arm64) FFMPEG_PKG=ffmpeg-master-latest-linuxarm64-gpl.tar.xz ;; \ amd64) ARCH_SUFFIX=linux64 ;; \
arm64) ARCH_SUFFIX=linuxarm64 ;; \
*) echo "Unsupported arch: $TARGETARCH" && exit 1 ;; \ *) echo "Unsupported arch: $TARGETARCH" && exit 1 ;; \
esac && \ esac; \
curl -fsSL -o /tmp/ffmpeg.tar.xz https://github.com/BtbN/FFmpeg-Builds/releases/latest/download/${FFMPEG_PKG} && \ ASSET_URL=$(curl -fsSL https://api.github.com/repos/BtbN/FFmpeg-Builds/releases/latest \
tar -xJf /tmp/ffmpeg.tar.xz -C /tmp && \ | jq -r ".assets[] | select(.name | endswith(\"${ARCH_SUFFIX}-gpl.tar.xz\")) | .browser_download_url" \
| head -n1); \
if [ -z "$ASSET_URL" ]; then \
echo "Failed to resolve FFmpeg asset for arch ${ARCH_SUFFIX}" && exit 1; \
fi; \
echo "Fetching FFmpeg from: $ASSET_URL"; \
curl -fsSL -o /tmp/ffmpeg.tar.xz "$ASSET_URL"; \
tar -xJf /tmp/ffmpeg.tar.xz -C /tmp; \
mv /tmp/ffmpeg-* /ffmpeg mv /tmp/ffmpeg-* /ffmpeg
# Stage 4: Prepare world-writable runtime directories # Stage 4: Prepare world-writable runtime directories
FROM busybox:1.36.1-musl AS runtime-dirs FROM busybox:1.36.1-musl AS runtime-dirs
RUN mkdir -p /artifact/downloads /artifact/data/config /artifact/data/creds /artifact/data/watch /artifact/data/history /artifact/logs/tasks \ RUN mkdir -p /artifact/downloads /artifact/data/config /artifact/data/creds /artifact/data/watch /artifact/data/history /artifact/logs/tasks \
&& touch /artifact/.cache \
&& chmod -R 0777 /artifact && chmod -R 0777 /artifact
# Stage 5: Final application image (distroless) # Stage 5: Final application image (distroless)
@@ -44,6 +53,9 @@ WORKDIR /app
# Ensure Python finds vendored site-packages and unbuffered output # Ensure Python finds vendored site-packages and unbuffered output
ENV PYTHONPATH=/python ENV PYTHONPATH=/python
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
ENV PYTHONUTF8=1
ENV LANG=C.UTF-8
ENV LC_ALL=C.UTF-8
# Copy application code # Copy application code
COPY --chown=65532:65532 . . COPY --chown=65532:65532 . .

View File

@@ -1,7 +1,7 @@
fastapi==0.116.1 fastapi==0.116.1
uvicorn[standard]==0.35.0 uvicorn[standard]==0.35.0
celery==5.5.3 celery==5.5.3
deezspot-spotizerr==2.7.4 deezspot-spotizerr==2.7.6
httpx==0.28.1 httpx==0.28.1
bcrypt==4.2.1 bcrypt==4.2.1
PyJWT==2.10.1 PyJWT==2.10.1

View File

@@ -1,10 +1,14 @@
from fastapi import APIRouter, HTTPException, Request, Depends from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse, StreamingResponse from fastapi.responses import StreamingResponse
import logging import logging
import time import time
import json import json
import asyncio import asyncio
from typing import Dict, Set from typing import Set
import redis
import threading
from routes.utils.celery_config import REDIS_URL
from routes.utils.celery_tasks import ( from routes.utils.celery_tasks import (
get_task_info, get_task_info,
@@ -12,17 +16,23 @@ from routes.utils.celery_tasks import (
get_last_task_status, get_last_task_status,
get_all_tasks, get_all_tasks,
cancel_task, cancel_task,
delete_task_data_and_log,
ProgressState, ProgressState,
) )
# Import authentication dependencies # Import authentication dependencies
from routes.auth.middleware import require_auth_from_state, get_current_user_from_state, User from routes.auth.middleware import (
require_auth_from_state,
get_current_user_from_state,
User,
)
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
# Global SSE Event Broadcaster # Global SSE Event Broadcaster
class SSEBroadcaster: class SSEBroadcaster:
def __init__(self): def __init__(self):
@@ -40,7 +50,9 @@ class SSEBroadcaster:
async def broadcast_event(self, event_data: dict): async def broadcast_event(self, event_data: dict):
"""Broadcast an event to all connected clients""" """Broadcast an event to all connected clients"""
logger.debug(f"SSE Broadcaster: Attempting to broadcast to {len(self.clients)} clients") logger.debug(
f"SSE Broadcaster: Attempting to broadcast to {len(self.clients)} clients"
)
if not self.clients: if not self.clients:
logger.debug("SSE Broadcaster: No clients connected, skipping broadcast") logger.debug("SSE Broadcaster: No clients connected, skipping broadcast")
@@ -51,7 +63,9 @@ class SSEBroadcaster:
event_json = json.dumps(enhanced_event_data) event_json = json.dumps(enhanced_event_data)
sse_data = f"data: {event_json}\n\n" sse_data = f"data: {event_json}\n\n"
logger.debug(f"SSE Broadcaster: Broadcasting event: {enhanced_event_data.get('change_type', 'unknown')} with {enhanced_event_data.get('active_tasks', 0)} active tasks") logger.debug(
f"SSE Broadcaster: Broadcasting event: {enhanced_event_data.get('change_type', 'unknown')} with {enhanced_event_data.get('active_tasks', 0)} active tasks"
)
# Send to all clients, remove disconnected ones # Send to all clients, remove disconnected ones
disconnected = set() disconnected = set()
@@ -60,7 +74,7 @@ class SSEBroadcaster:
try: try:
await client_queue.put(sse_data) await client_queue.put(sse_data)
sent_count += 1 sent_count += 1
logger.debug(f"SSE: Successfully sent to client queue") logger.debug("SSE: Successfully sent to client queue")
except Exception as e: except Exception as e:
logger.error(f"SSE: Failed to send to client: {e}") logger.error(f"SSE: Failed to send to client: {e}")
disconnected.add(client_queue) disconnected.add(client_queue)
@@ -69,21 +83,23 @@ class SSEBroadcaster:
for client in disconnected: for client in disconnected:
self.clients.discard(client) self.clients.discard(client)
logger.info(f"SSE Broadcaster: Successfully sent to {sent_count} clients, removed {len(disconnected)} disconnected clients") logger.info(
f"SSE Broadcaster: Successfully sent to {sent_count} clients, removed {len(disconnected)} disconnected clients"
)
# Global broadcaster instance # Global broadcaster instance
sse_broadcaster = SSEBroadcaster() sse_broadcaster = SSEBroadcaster()
# Redis subscriber for cross-process SSE events # Redis subscriber for cross-process SSE events
import redis
import threading
from routes.utils.celery_config import REDIS_URL
# Redis client for SSE pub/sub # Redis client for SSE pub/sub
sse_redis_client = redis.Redis.from_url(REDIS_URL) sse_redis_client = redis.Redis.from_url(REDIS_URL)
def start_sse_redis_subscriber(): def start_sse_redis_subscriber():
"""Start Redis subscriber to listen for SSE events from Celery workers""" """Start Redis subscriber to listen for SSE events from Celery workers"""
def redis_subscriber_thread(): def redis_subscriber_thread():
try: try:
pubsub = sse_redis_client.pubsub() pubsub = sse_redis_client.pubsub()
@@ -91,33 +107,49 @@ def start_sse_redis_subscriber():
logger.info("SSE Redis Subscriber: Started listening for events") logger.info("SSE Redis Subscriber: Started listening for events")
for message in pubsub.listen(): for message in pubsub.listen():
if message['type'] == 'message': if message["type"] == "message":
try: try:
event_data = json.loads(message['data'].decode('utf-8')) event_data = json.loads(message["data"].decode("utf-8"))
event_type = event_data.get('event_type', 'unknown') event_type = event_data.get("event_type", "unknown")
task_id = event_data.get('task_id', 'unknown') task_id = event_data.get("task_id", "unknown")
logger.debug(f"SSE Redis Subscriber: Received {event_type} for task {task_id}") logger.debug(
f"SSE Redis Subscriber: Received {event_type} for task {task_id}"
)
# Handle different event types # Handle different event types
if event_type == 'progress_update': if event_type == "progress_update":
# Transform callback data into task format expected by frontend # Transform callback data into task format expected by frontend
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
broadcast_data = loop.run_until_complete(transform_callback_to_task_format(task_id, event_data)) broadcast_data = loop.run_until_complete(
transform_callback_to_task_format(
task_id, event_data
)
)
if broadcast_data: if broadcast_data:
loop.run_until_complete(sse_broadcaster.broadcast_event(broadcast_data)) loop.run_until_complete(
logger.debug(f"SSE Redis Subscriber: Broadcasted callback to {len(sse_broadcaster.clients)} clients") sse_broadcaster.broadcast_event(broadcast_data)
)
logger.debug(
f"SSE Redis Subscriber: Broadcasted callback to {len(sse_broadcaster.clients)} clients"
)
finally: finally:
loop.close() loop.close()
elif event_type == 'summary_update': elif event_type == "summary_update":
# Task summary update - use existing trigger_sse_update logic # Task summary update - use existing trigger_sse_update logic
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
loop.run_until_complete(trigger_sse_update(task_id, event_data.get('reason', 'update'))) loop.run_until_complete(
logger.debug(f"SSE Redis Subscriber: Processed summary update for {task_id}") trigger_sse_update(
task_id, event_data.get("reason", "update")
)
)
logger.debug(
f"SSE Redis Subscriber: Processed summary update for {task_id}"
)
finally: finally:
loop.close() loop.close()
else: else:
@@ -125,13 +157,20 @@ def start_sse_redis_subscriber():
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
loop.run_until_complete(sse_broadcaster.broadcast_event(event_data)) loop.run_until_complete(
logger.debug(f"SSE Redis Subscriber: Broadcasted {event_type} to {len(sse_broadcaster.clients)} clients") sse_broadcaster.broadcast_event(event_data)
)
logger.debug(
f"SSE Redis Subscriber: Broadcasted {event_type} to {len(sse_broadcaster.clients)} clients"
)
finally: finally:
loop.close() loop.close()
except Exception as e: except Exception as e:
logger.error(f"SSE Redis Subscriber: Error processing message: {e}", exc_info=True) logger.error(
f"SSE Redis Subscriber: Error processing message: {e}",
exc_info=True,
)
except Exception as e: except Exception as e:
logger.error(f"SSE Redis Subscriber: Fatal error: {e}", exc_info=True) logger.error(f"SSE Redis Subscriber: Fatal error: {e}", exc_info=True)
@@ -141,11 +180,12 @@ def start_sse_redis_subscriber():
thread.start() thread.start()
logger.info("SSE Redis Subscriber: Background thread started") logger.info("SSE Redis Subscriber: Background thread started")
async def transform_callback_to_task_format(task_id: str, event_data: dict) -> dict: async def transform_callback_to_task_format(task_id: str, event_data: dict) -> dict:
"""Transform callback event data into the task format expected by frontend""" """Transform callback event data into the task format expected by frontend"""
try: try:
# Import here to avoid circular imports # Import here to avoid circular imports
from routes.utils.celery_tasks import get_task_info, get_all_tasks from routes.utils.celery_tasks import get_task_info
# Get task info to build complete task object # Get task info to build complete task object
task_info = get_task_info(task_id) task_info = get_task_info(task_id)
@@ -154,19 +194,19 @@ async def transform_callback_to_task_format(task_id: str, event_data: dict) -> d
return None return None
# Extract callback data # Extract callback data
callback_data = event_data.get('callback_data', {}) callback_data = event_data.get("callback_data", {})
# Build task object in the format expected by frontend # Build task object in the format expected by frontend
task_object = { task_object = {
"task_id": task_id, "task_id": task_id,
"original_url": f"http://localhost:7171/api/{task_info.get('download_type', 'track')}/download/{task_info.get('url', '').split('/')[-1] if task_info.get('url') else ''}", "original_url": f"http://localhost:7171/api/{task_info.get('download_type', 'track')}/download/{task_info.get('url', '').split('/')[-1] if task_info.get('url') else ''}",
"last_line": callback_data, # This is what frontend expects for callback data "last_line": callback_data, # This is what frontend expects for callback data
"timestamp": event_data.get('timestamp', time.time()), "timestamp": event_data.get("timestamp", time.time()),
"download_type": task_info.get('download_type', 'track'), "download_type": task_info.get("download_type", "track"),
"type": task_info.get('type', task_info.get('download_type', 'track')), "type": task_info.get("type", task_info.get("download_type", "track")),
"name": task_info.get('name', 'Unknown'), "name": task_info.get("name", "Unknown"),
"artist": task_info.get('artist', ''), "artist": task_info.get("artist", ""),
"created_at": task_info.get('created_at'), "created_at": task_info.get("created_at"),
} }
# Build minimal event data - global counts will be added at broadcast time # Build minimal event data - global counts will be added at broadcast time
@@ -176,16 +216,21 @@ async def transform_callback_to_task_format(task_id: str, event_data: dict) -> d
"current_timestamp": time.time(), "current_timestamp": time.time(),
"updated_count": 1, "updated_count": 1,
"since_timestamp": time.time(), "since_timestamp": time.time(),
"trigger_reason": "callback_update" "trigger_reason": "callback_update",
} }
except Exception as e: except Exception as e:
logger.error(f"SSE Transform: Error transforming callback for task {task_id}: {e}", exc_info=True) logger.error(
f"SSE Transform: Error transforming callback for task {task_id}: {e}",
exc_info=True,
)
return None return None
# Start the Redis subscriber when module loads # Start the Redis subscriber when module loads
start_sse_redis_subscriber() start_sse_redis_subscriber()
async def trigger_sse_update(task_id: str, reason: str = "task_update"): async def trigger_sse_update(task_id: str, reason: str = "task_update"):
"""Trigger an immediate SSE update for a specific task""" """Trigger an immediate SSE update for a specific task"""
try: try:
@@ -200,13 +245,14 @@ async def trigger_sse_update(task_id: str, reason: str = "task_update"):
last_status = get_last_task_status(task_id) last_status = get_last_task_status(task_id)
# Create a dummy request for the _build_task_response function # Create a dummy request for the _build_task_response function
from fastapi import Request
class DummyRequest: class DummyRequest:
def __init__(self): def __init__(self):
self.base_url = "http://localhost:7171" self.base_url = "http://localhost:7171"
dummy_request = DummyRequest() dummy_request = DummyRequest()
task_response = _build_task_response(task_info, last_status, task_id, current_time, dummy_request) task_response = _build_task_response(
task_info, last_status, task_id, current_time, dummy_request
)
# Create minimal event data - global counts will be added at broadcast time # Create minimal event data - global counts will be added at broadcast time
event_data = { event_data = {
@@ -214,7 +260,7 @@ async def trigger_sse_update(task_id: str, reason: str = "task_update"):
"current_timestamp": current_time, "current_timestamp": current_time,
"since_timestamp": current_time, "since_timestamp": current_time,
"change_type": "realtime", "change_type": "realtime",
"trigger_reason": reason "trigger_reason": reason,
} }
await sse_broadcaster.broadcast_event(event_data) await sse_broadcaster.broadcast_event(event_data)
@@ -223,29 +269,31 @@ async def trigger_sse_update(task_id: str, reason: str = "task_update"):
except Exception as e: except Exception as e:
logger.error(f"SSE: Failed to trigger update for task {task_id}: {e}") logger.error(f"SSE: Failed to trigger update for task {task_id}: {e}")
# Define active task states using ProgressState constants # Define active task states using ProgressState constants
ACTIVE_TASK_STATES = { ACTIVE_TASK_STATES = {
ProgressState.INITIALIZING, # "initializing" - task is starting up ProgressState.INITIALIZING, # "initializing" - task is starting up
ProgressState.PROCESSING, # "processing" - task is being processed ProgressState.PROCESSING, # "processing" - task is being processed
ProgressState.DOWNLOADING, # "downloading" - actively downloading ProgressState.DOWNLOADING, # "downloading" - actively downloading
ProgressState.PROGRESS, # "progress" - album/playlist progress updates ProgressState.PROGRESS, # "progress" - album/playlist progress updates
ProgressState.TRACK_PROGRESS, # "track_progress" - real-time track progress ProgressState.TRACK_PROGRESS, # "track_progress" - real-time track progress
ProgressState.REAL_TIME, # "real_time" - real-time download progress ProgressState.REAL_TIME, # "real_time" - real-time download progress
ProgressState.RETRYING, # "retrying" - task is retrying after error ProgressState.RETRYING, # "retrying" - task is retrying after error
"real-time", # "real-time" - real-time download progress (hyphenated version) "real-time", # "real-time" - real-time download progress (hyphenated version)
ProgressState.QUEUED, # "queued" - task is queued and waiting ProgressState.QUEUED, # "queued" - task is queued and waiting
"pending", # "pending" - legacy queued status "pending", # "pending" - legacy queued status
} }
# Define terminal task states that should be included when recently completed # Define terminal task states that should be included when recently completed
TERMINAL_TASK_STATES = { TERMINAL_TASK_STATES = {
ProgressState.COMPLETE, # "complete" - task completed successfully ProgressState.COMPLETE, # "complete" - task completed successfully
ProgressState.DONE, # "done" - task finished processing ProgressState.DONE, # "done" - task finished processing
ProgressState.ERROR, # "error" - task failed ProgressState.ERROR, # "error" - task failed
ProgressState.CANCELLED, # "cancelled" - task was cancelled ProgressState.CANCELLED, # "cancelled" - task was cancelled
ProgressState.SKIPPED, # "skipped" - task was skipped ProgressState.SKIPPED, # "skipped" - task was skipped
} }
def get_task_status_from_last_status(last_status): def get_task_status_from_last_status(last_status):
""" """
Extract the task status from last_status, checking both possible locations. Extract the task status from last_status, checking both possible locations.
@@ -306,7 +354,7 @@ def get_global_task_counts():
"error": 0, "error": 0,
"cancelled": 0, "cancelled": 0,
"retrying": 0, "retrying": 0,
"skipped": 0 "skipped": 0,
} }
try: try:
@@ -342,7 +390,9 @@ def get_global_task_counts():
elif is_active_task: elif is_active_task:
task_counts["active"] += 1 task_counts["active"] += 1
logger.debug(f"Global task counts: {task_counts} (total: {len(all_tasks)} tasks)") logger.debug(
f"Global task counts: {task_counts} (total: {len(all_tasks)} tasks)"
)
except Exception as e: except Exception as e:
logger.error(f"Error getting global task counts: {e}", exc_info=True) logger.error(f"Error getting global task counts: {e}", exc_info=True)
@@ -373,7 +423,9 @@ def add_global_task_counts_to_event(event_data):
return event_data return event_data
except Exception as e: except Exception as e:
logger.error(f"Error adding global task counts to SSE event: {e}", exc_info=True) logger.error(
f"Error adding global task counts to SSE event: {e}", exc_info=True
)
return event_data return event_data
@@ -397,10 +449,9 @@ def _build_error_callback_object(last_status):
callback_object["album"] = { callback_object["album"] = {
"type": "album", "type": "album",
"title": name, "title": name,
"artists": [{ "artists": [{"type": "artistAlbum", "name": artist_or_owner}]
"type": "artistAlbum", if artist_or_owner
"name": artist_or_owner else [],
}] if artist_or_owner else [],
} }
elif download_type == "playlist": elif download_type == "playlist":
playlist_payload = {"type": "playlist", "title": name} playlist_payload = {"type": "playlist", "title": name}
@@ -411,10 +462,9 @@ def _build_error_callback_object(last_status):
callback_object["track"] = { callback_object["track"] = {
"type": "track", "type": "track",
"title": name, "title": name,
"artists": [{ "artists": [{"type": "artistTrack", "name": artist_or_owner}]
"type": "artistTrack", if artist_or_owner
"name": artist_or_owner else [],
}] if artist_or_owner else [],
} }
else: else:
# Fallback for unknown types to avoid breaking the client, returning a basic error structure. # Fallback for unknown types to avoid breaking the client, returning a basic error structure.
@@ -431,7 +481,9 @@ def _build_error_callback_object(last_status):
return callback_object return callback_object
def _build_task_response(task_info, last_status, task_id, current_time, request: Request): def _build_task_response(
task_info, last_status, task_id, current_time, request: Request
):
""" """
Helper function to build a standardized task response object. Helper function to build a standardized task response object.
""" """
@@ -465,6 +517,27 @@ def _build_task_response(task_info, last_status, task_id, current_time, request:
logger.warning( logger.warning(
f"Missing download_type ('{download_type}') or item_url ('{item_url}') in task_info for task {task_id}. Falling back for original_url." f"Missing download_type ('{download_type}') or item_url ('{item_url}') in task_info for task {task_id}. Falling back for original_url."
) )
# Auto-delete faulty task data to keep the queue clean
try:
delete_task_data_and_log(
task_id,
reason="Auto-cleaned: Missing download_type or url in task_info.",
)
# Trigger SSE so clients refresh their task lists
try:
# Avoid circular import at top-level
import asyncio as _asyncio
# Fire-and-forget; if no event loop available, ignore
loop = _asyncio.get_event_loop()
if loop.is_running():
_asyncio.create_task(
trigger_sse_update(task_id, "auto_deleted_faulty")
)
except Exception:
pass
except Exception as _e:
logger.error(f"Auto-delete failed for faulty task {task_id}: {_e}")
original_request_obj = task_info.get("original_request", {}) original_request_obj = task_info.get("original_request", {})
dynamic_original_url = original_request_obj.get("original_url", "") dynamic_original_url = original_request_obj.get("original_url", "")
@@ -478,13 +551,18 @@ def _build_task_response(task_info, last_status, task_id, current_time, request:
else: else:
last_line_content = last_status last_line_content = last_status
# Normalize created_at to a numeric timestamp
created_at_value = task_info.get("created_at")
if not isinstance(created_at_value, (int, float)):
created_at_value = current_time
task_response = { task_response = {
"original_url": dynamic_original_url, "original_url": dynamic_original_url,
"last_line": last_line_content, "last_line": last_line_content,
"timestamp": last_status.get("timestamp") if last_status else current_time, "timestamp": last_status.get("timestamp") if last_status else current_time,
"task_id": task_id, "task_id": task_id,
"status_count": status_count, "status_count": status_count,
"created_at": task_info.get("created_at"), "created_at": created_at_value,
"name": task_info.get("name"), "name": task_info.get("name"),
"artist": task_info.get("artist"), "artist": task_info.get("artist"),
"type": task_info.get("type"), "type": task_info.get("type"),
@@ -496,7 +574,9 @@ def _build_task_response(task_info, last_status, task_id, current_time, request:
return task_response return task_response
async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Request = None): async def get_paginated_tasks(
page=1, limit=20, active_only=False, request: Request = None
):
""" """
Get paginated list of tasks. Get paginated list of tasks.
""" """
@@ -523,7 +603,9 @@ async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Requ
task_status = get_task_status_from_last_status(last_status) task_status = get_task_status_from_last_status(last_status)
is_active_task = is_task_active(task_status) is_active_task = is_task_active(task_status)
task_response = _build_task_response(task_info, last_status, task_id, time.time(), request) task_response = _build_task_response(
task_info, last_status, task_id, time.time(), request
)
if is_active_task: if is_active_task:
active_tasks.append(task_response) active_tasks.append(task_response)
@@ -531,7 +613,7 @@ async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Requ
other_tasks.append(task_response) other_tasks.append(task_response)
# Sort other tasks by creation time (newest first) # Sort other tasks by creation time (newest first)
other_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True) other_tasks.sort(key=lambda x: (x.get("created_at") or 0.0), reverse=True)
if active_only: if active_only:
paginated_tasks = active_tasks paginated_tasks = active_tasks
@@ -540,12 +622,12 @@ async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Requ
"limit": limit, "limit": limit,
"total_non_active": 0, "total_non_active": 0,
"has_more": False, "has_more": False,
"returned_non_active": 0 "returned_non_active": 0,
} }
else: else:
# Apply pagination to non-active tasks # Apply pagination to non-active tasks
offset = (page - 1) * limit offset = (page - 1) * limit
paginated_other_tasks = other_tasks[offset:offset + limit] paginated_other_tasks = other_tasks[offset : offset + limit]
paginated_tasks = active_tasks + paginated_other_tasks paginated_tasks = active_tasks + paginated_other_tasks
pagination_info = { pagination_info = {
@@ -553,32 +635,38 @@ async def get_paginated_tasks(page=1, limit=20, active_only=False, request: Requ
"limit": limit, "limit": limit,
"total_non_active": len(other_tasks), "total_non_active": len(other_tasks),
"has_more": len(other_tasks) > offset + limit, "has_more": len(other_tasks) > offset + limit,
"returned_non_active": len(paginated_other_tasks) "returned_non_active": len(paginated_other_tasks),
} }
response = { response = {
"tasks": paginated_tasks, "tasks": paginated_tasks,
"current_timestamp": time.time(), "current_timestamp": time.time(),
"total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter "total_tasks": task_counts["active"]
+ task_counts["retrying"], # Only active/retrying tasks for counter
"all_tasks_count": len(all_tasks), # Total count of all tasks "all_tasks_count": len(all_tasks), # Total count of all tasks
"task_counts": task_counts, # Categorized counts "task_counts": task_counts, # Categorized counts
"active_tasks": len(active_tasks), "active_tasks": len(active_tasks),
"updated_count": len(paginated_tasks), "updated_count": len(paginated_tasks),
"pagination": pagination_info "pagination": pagination_info,
} }
return response return response
except Exception as e: except Exception as e:
logger.error(f"Error in get_paginated_tasks: {e}", exc_info=True) logger.error(f"Error in get_paginated_tasks: {e}", exc_info=True)
raise HTTPException(status_code=500, detail={"error": "Failed to retrieve paginated tasks"}) raise HTTPException(
status_code=500, detail={"error": "Failed to retrieve paginated tasks"}
)
# IMPORTANT: Specific routes MUST come before parameterized routes in FastAPI # IMPORTANT: Specific routes MUST come before parameterized routes in FastAPI
# Otherwise "updates" gets matched as a {task_id} parameter! # Otherwise "updates" gets matched as a {task_id} parameter!
@router.get("/list") @router.get("/list")
async def list_tasks(request: Request, current_user: User = Depends(require_auth_from_state)): async def list_tasks(
request: Request, current_user: User = Depends(require_auth_from_state)
):
""" """
Retrieve a paginated list of all tasks in the system. Retrieve a paginated list of all tasks in the system.
Returns a detailed list of task objects including status and metadata. Returns a detailed list of task objects including status and metadata.
@@ -590,9 +678,9 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
""" """
try: try:
# Get query parameters # Get query parameters
page = int(request.query_params.get('page', 1)) page = int(request.query_params.get("page", 1))
limit = min(int(request.query_params.get('limit', 50)), 100) # Cap at 100 limit = min(int(request.query_params.get("limit", 50)), 100) # Cap at 100
active_only = request.query_params.get('active_only', '').lower() == 'true' active_only = request.query_params.get("active_only", "").lower() == "true"
tasks = get_all_tasks() tasks = get_all_tasks()
active_tasks = [] active_tasks = []
@@ -606,7 +694,7 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
"error": 0, "error": 0,
"cancelled": 0, "cancelled": 0,
"retrying": 0, "retrying": 0,
"skipped": 0 "skipped": 0,
} }
for task_summary in tasks: for task_summary in tasks:
@@ -625,7 +713,10 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
# Categorize tasks by status using ProgressState constants # Categorize tasks by status using ProgressState constants
if task_status == ProgressState.RETRYING: if task_status == ProgressState.RETRYING:
task_counts["retrying"] += 1 task_counts["retrying"] += 1
elif task_status in {ProgressState.QUEUED, "pending"}: # Keep "pending" for backward compatibility elif task_status in {
ProgressState.QUEUED,
"pending",
}: # Keep "pending" for backward compatibility
task_counts["queued"] += 1 task_counts["queued"] += 1
elif task_status in {ProgressState.COMPLETE, ProgressState.DONE}: elif task_status in {ProgressState.COMPLETE, ProgressState.DONE}:
task_counts["completed"] += 1 task_counts["completed"] += 1
@@ -638,7 +729,9 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
elif is_active_task: elif is_active_task:
task_counts["active"] += 1 task_counts["active"] += 1
task_response = _build_task_response(task_info, last_status, task_id, time.time(), request) task_response = _build_task_response(
task_info, last_status, task_id, time.time(), request
)
if is_active_task: if is_active_task:
active_tasks.append(task_response) active_tasks.append(task_response)
@@ -646,7 +739,7 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
other_tasks.append(task_response) other_tasks.append(task_response)
# Sort other tasks by creation time (newest first) # Sort other tasks by creation time (newest first)
other_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True) other_tasks.sort(key=lambda x: (x.get("created_at") or 0.0), reverse=True)
if active_only: if active_only:
# Return only active tasks without pagination # Return only active tasks without pagination
@@ -656,7 +749,7 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
"limit": limit, "limit": limit,
"total_items": len(active_tasks), "total_items": len(active_tasks),
"total_pages": 1, "total_pages": 1,
"has_more": False "has_more": False,
} }
else: else:
# Apply pagination to non-active tasks and combine with active tasks # Apply pagination to non-active tasks and combine with active tasks
@@ -666,7 +759,7 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
if page == 1: if page == 1:
# For first page, include active tasks + first batch of other tasks # For first page, include active tasks + first batch of other tasks
available_space = limit - len(active_tasks) available_space = limit - len(active_tasks)
paginated_other_tasks = other_tasks[:max(0, available_space)] paginated_other_tasks = other_tasks[: max(0, available_space)]
response_tasks = active_tasks + paginated_other_tasks response_tasks = active_tasks + paginated_other_tasks
else: else:
# For subsequent pages, only include other tasks # For subsequent pages, only include other tasks
@@ -674,7 +767,9 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
adjusted_offset = offset - len(active_tasks) adjusted_offset = offset - len(active_tasks)
if adjusted_offset < 0: if adjusted_offset < 0:
adjusted_offset = 0 adjusted_offset = 0
paginated_other_tasks = other_tasks[adjusted_offset:adjusted_offset + limit] paginated_other_tasks = other_tasks[
adjusted_offset : adjusted_offset + limit
]
response_tasks = paginated_other_tasks response_tasks = paginated_other_tasks
total_items = len(active_tasks) + len(other_tasks) total_items = len(active_tasks) + len(other_tasks)
@@ -687,27 +782,32 @@ async def list_tasks(request: Request, current_user: User = Depends(require_auth
"total_pages": total_pages, "total_pages": total_pages,
"has_more": page < total_pages, "has_more": page < total_pages,
"active_tasks": len(active_tasks), "active_tasks": len(active_tasks),
"total_other_tasks": len(other_tasks) "total_other_tasks": len(other_tasks),
} }
response = { response = {
"tasks": response_tasks, "tasks": response_tasks,
"pagination": pagination_info, "pagination": pagination_info,
"total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter "total_tasks": task_counts["active"]
+ task_counts["retrying"], # Only active/retrying tasks for counter
"all_tasks_count": len(tasks), # Total count of all tasks "all_tasks_count": len(tasks), # Total count of all tasks
"task_counts": task_counts, # Categorized counts "task_counts": task_counts, # Categorized counts
"active_tasks": len(active_tasks), "active_tasks": len(active_tasks),
"timestamp": time.time() "timestamp": time.time(),
} }
return response return response
except Exception as e: except Exception as e:
logger.error(f"Error in /api/prgs/list: {e}", exc_info=True) logger.error(f"Error in /api/prgs/list: {e}", exc_info=True)
raise HTTPException(status_code=500, detail={"error": "Failed to retrieve task list"}) raise HTTPException(
status_code=500, detail={"error": "Failed to retrieve task list"}
)
@router.get("/updates") @router.get("/updates")
async def get_task_updates(request: Request, current_user: User = Depends(require_auth_from_state)): async def get_task_updates(
request: Request, current_user: User = Depends(require_auth_from_state)
):
""" """
Retrieve only tasks that have been updated since the specified timestamp. Retrieve only tasks that have been updated since the specified timestamp.
This endpoint is optimized for polling to reduce unnecessary data transfer. This endpoint is optimized for polling to reduce unnecessary data transfer.
@@ -728,10 +828,10 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir
""" """
try: try:
# Get query parameters # Get query parameters
since_param = request.query_params.get('since') since_param = request.query_params.get("since")
page = int(request.query_params.get('page', 1)) page = int(request.query_params.get("page", 1))
limit = min(int(request.query_params.get('limit', 20)), 100) # Cap at 100 limit = min(int(request.query_params.get("limit", 20)), 100) # Cap at 100
active_only = request.query_params.get('active_only', '').lower() == 'true' active_only = request.query_params.get("active_only", "").lower() == "true"
if not since_param: if not since_param:
# If no 'since' parameter, return paginated tasks (fallback behavior) # If no 'since' parameter, return paginated tasks (fallback behavior)
@@ -741,7 +841,9 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir
try: try:
since_timestamp = float(since_param) since_timestamp = float(since_param)
except (ValueError, TypeError): except (ValueError, TypeError):
raise HTTPException(status_code=400, detail={"error": "Invalid 'since' timestamp format"}) raise HTTPException(
status_code=400, detail={"error": "Invalid 'since' timestamp format"}
)
# Get all tasks # Get all tasks
all_tasks = get_all_tasks() all_tasks = get_all_tasks()
@@ -768,16 +870,28 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir
is_active_task = is_task_active(task_status) is_active_task = is_task_active(task_status)
# Check if task has been updated since the given timestamp # Check if task has been updated since the given timestamp
task_timestamp = last_status.get("timestamp") if last_status else task_info.get("created_at", 0) task_timestamp = (
last_status.get("timestamp")
if last_status
else task_info.get("created_at", 0)
)
# Always include active tasks in updates, apply filtering to others # Always include active tasks in updates, apply filtering to others
# Also include recently completed/terminal tasks to ensure "done" status gets sent # Also include recently completed/terminal tasks to ensure "done" status gets sent
is_recently_terminal = task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp is_recently_terminal = (
should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) or is_recently_terminal task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp
)
should_include = (
is_active_task
or (task_timestamp > since_timestamp and not active_only)
or is_recently_terminal
)
if should_include: if should_include:
# Construct the same detailed task object as in list_tasks() # Construct the same detailed task object as in list_tasks()
task_response = _build_task_response(task_info, last_status, task_id, current_time, request) task_response = _build_task_response(
task_info, last_status, task_id, current_time, request
)
if is_active_task: if is_active_task:
active_tasks.append(task_response) active_tasks.append(task_response)
@@ -786,21 +900,26 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir
# Apply pagination to non-active tasks # Apply pagination to non-active tasks
offset = (page - 1) * limit offset = (page - 1) * limit
paginated_updated_tasks = updated_tasks[offset:offset + limit] if not active_only else [] paginated_updated_tasks = (
updated_tasks[offset : offset + limit] if not active_only else []
)
# Combine active tasks (always shown) with paginated updated tasks # Combine active tasks (always shown) with paginated updated tasks
all_returned_tasks = active_tasks + paginated_updated_tasks all_returned_tasks = active_tasks + paginated_updated_tasks
# Sort by priority (active first, then by creation time) # Sort by priority (active first, then by creation time)
all_returned_tasks.sort(key=lambda x: ( all_returned_tasks.sort(
0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1, key=lambda x: (
-(x.get("created_at") or 0) 0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1,
)) -(x.get("created_at") or 0),
)
)
response = { response = {
"tasks": all_returned_tasks, "tasks": all_returned_tasks,
"current_timestamp": current_time, "current_timestamp": current_time,
"total_tasks": task_counts["active"] + task_counts["retrying"], # Only active/retrying tasks for counter "total_tasks": task_counts["active"]
+ task_counts["retrying"], # Only active/retrying tasks for counter
"all_tasks_count": len(all_tasks), # Total count of all tasks "all_tasks_count": len(all_tasks), # Total count of all tasks
"task_counts": task_counts, # Categorized counts "task_counts": task_counts, # Categorized counts
"active_tasks": len(active_tasks), "active_tasks": len(active_tasks),
@@ -811,18 +930,22 @@ async def get_task_updates(request: Request, current_user: User = Depends(requir
"limit": limit, "limit": limit,
"total_non_active": len(updated_tasks), "total_non_active": len(updated_tasks),
"has_more": len(updated_tasks) > offset + limit, "has_more": len(updated_tasks) > offset + limit,
"returned_non_active": len(paginated_updated_tasks) "returned_non_active": len(paginated_updated_tasks),
} },
} }
logger.debug(f"Returning {len(active_tasks)} active + {len(paginated_updated_tasks)} paginated tasks out of {len(all_tasks)} total") logger.debug(
f"Returning {len(active_tasks)} active + {len(paginated_updated_tasks)} paginated tasks out of {len(all_tasks)} total"
)
return response return response
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error in /api/prgs/updates: {e}", exc_info=True) logger.error(f"Error in /api/prgs/updates: {e}", exc_info=True)
raise HTTPException(status_code=500, detail={"error": "Failed to retrieve task updates"}) raise HTTPException(
status_code=500, detail={"error": "Failed to retrieve task updates"}
)
@router.post("/cancel/all") @router.post("/cancel/all")
@@ -855,11 +978,15 @@ async def cancel_all_tasks(current_user: User = Depends(require_auth_from_state)
return response return response
except Exception as e: except Exception as e:
logger.error(f"Error in /api/prgs/cancel/all: {e}", exc_info=True) logger.error(f"Error in /api/prgs/cancel/all: {e}", exc_info=True)
raise HTTPException(status_code=500, detail={"error": "Failed to cancel all tasks"}) raise HTTPException(
status_code=500, detail={"error": "Failed to cancel all tasks"}
)
@router.post("/cancel/{task_id}") @router.post("/cancel/{task_id}")
async def cancel_task_endpoint(task_id: str, current_user: User = Depends(require_auth_from_state)): async def cancel_task_endpoint(
task_id: str, current_user: User = Depends(require_auth_from_state)
):
""" """
Cancel a running or queued task. Cancel a running or queued task.
@@ -888,7 +1015,7 @@ async def cancel_task_endpoint(task_id: str, current_user: User = Depends(requir
detail={ detail={
"status": "error", "status": "error",
"message": "Cancellation for old system is not supported in the new API. Please use the new task ID format.", "message": "Cancellation for old system is not supported in the new API. Please use the new task ID format.",
} },
) )
except HTTPException: except HTTPException:
raise raise
@@ -897,7 +1024,9 @@ async def cancel_task_endpoint(task_id: str, current_user: User = Depends(requir
@router.delete("/delete/{task_id}") @router.delete("/delete/{task_id}")
async def delete_task(task_id: str, current_user: User = Depends(require_auth_from_state)): async def delete_task(
task_id: str, current_user: User = Depends(require_auth_from_state)
):
""" """
Delete a task's information and history. Delete a task's information and history.
@@ -916,7 +1045,9 @@ async def delete_task(task_id: str, current_user: User = Depends(require_auth_fr
@router.get("/stream") @router.get("/stream")
async def stream_task_updates(request: Request, current_user: User = Depends(get_current_user_from_state)): async def stream_task_updates(
request: Request, current_user: User = Depends(get_current_user_from_state)
):
""" """
Stream real-time task updates via Server-Sent Events (SSE). Stream real-time task updates via Server-Sent Events (SSE).
Now uses event-driven architecture for true real-time updates. Now uses event-driven architecture for true real-time updates.
@@ -930,7 +1061,7 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
""" """
# Get query parameters # Get query parameters
active_only = request.query_params.get('active_only', '').lower() == 'true' active_only = request.query_params.get("active_only", "").lower() == "true"
async def event_generator(): async def event_generator():
# Create a queue for this client # Create a queue for this client
@@ -938,12 +1069,16 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
try: try:
# Register this client with the broadcaster # Register this client with the broadcaster
logger.info(f"SSE Stream: New client connecting...") logger.info("SSE Stream: New client connecting...")
await sse_broadcaster.add_client(client_queue) await sse_broadcaster.add_client(client_queue)
logger.info(f"SSE Stream: Client registered successfully, total clients: {len(sse_broadcaster.clients)}") logger.info(
f"SSE Stream: Client registered successfully, total clients: {len(sse_broadcaster.clients)}"
)
# Send initial data immediately upon connection # Send initial data immediately upon connection
initial_data = await generate_task_update_event(time.time(), active_only, request) initial_data = await generate_task_update_event(
time.time(), active_only, request
)
yield initial_data yield initial_data
# Also send any active tasks as callback-style events to newly connected clients # Also send any active tasks as callback-style events to newly connected clients
@@ -961,7 +1096,9 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
task_status = get_task_status_from_last_status(last_status) task_status = get_task_status_from_last_status(last_status)
# Send recent callback data for active or recently completed tasks # Send recent callback data for active or recently completed tasks
if is_task_active(task_status) or (last_status and last_status.get("timestamp", 0) > time.time() - 30): if is_task_active(task_status) or (
last_status and last_status.get("timestamp", 0) > time.time() - 30
):
if last_status and "raw_callback" in last_status: if last_status and "raw_callback" in last_status:
callback_event = { callback_event = {
"task_id": task_id, "task_id": task_id,
@@ -969,11 +1106,13 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
"timestamp": last_status.get("timestamp", time.time()), "timestamp": last_status.get("timestamp", time.time()),
"change_type": "callback", "change_type": "callback",
"event_type": "progress_update", "event_type": "progress_update",
"replay": True # Mark as replay for client "replay": True, # Mark as replay for client
} }
event_json = json.dumps(callback_event) event_json = json.dumps(callback_event)
yield f"data: {event_json}\n\n" yield f"data: {event_json}\n\n"
logger.info(f"SSE Stream: Sent replay callback for task {task_id}") logger.info(
f"SSE Stream: Sent replay callback for task {task_id}"
)
# Send periodic heartbeats and listen for real-time events # Send periodic heartbeats and listen for real-time events
last_heartbeat = time.time() last_heartbeat = time.time()
@@ -983,7 +1122,9 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
try: try:
# Wait for either an event or timeout for heartbeat # Wait for either an event or timeout for heartbeat
try: try:
event_data = await asyncio.wait_for(client_queue.get(), timeout=heartbeat_interval) event_data = await asyncio.wait_for(
client_queue.get(), timeout=heartbeat_interval
)
# Send the real-time event # Send the real-time event
yield event_data yield event_data
last_heartbeat = time.time() last_heartbeat = time.time()
@@ -993,7 +1134,15 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
if current_time - last_heartbeat >= heartbeat_interval: if current_time - last_heartbeat >= heartbeat_interval:
# Generate current task counts for heartbeat # Generate current task counts for heartbeat
all_tasks = get_all_tasks() all_tasks = get_all_tasks()
task_counts = {"active": 0, "queued": 0, "completed": 0, "error": 0, "cancelled": 0, "retrying": 0, "skipped": 0} task_counts = {
"active": 0,
"queued": 0,
"completed": 0,
"error": 0,
"cancelled": 0,
"retrying": 0,
"skipped": 0,
}
for task_summary in all_tasks: for task_summary in all_tasks:
task_id = task_summary.get("task_id") task_id = task_summary.get("task_id")
@@ -1003,13 +1152,18 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
if not task_info: if not task_info:
continue continue
last_status = get_last_task_status(task_id) last_status = get_last_task_status(task_id)
task_status = get_task_status_from_last_status(last_status) task_status = get_task_status_from_last_status(
last_status
)
if task_status == ProgressState.RETRYING: if task_status == ProgressState.RETRYING:
task_counts["retrying"] += 1 task_counts["retrying"] += 1
elif task_status in {ProgressState.QUEUED, "pending"}: elif task_status in {ProgressState.QUEUED, "pending"}:
task_counts["queued"] += 1 task_counts["queued"] += 1
elif task_status in {ProgressState.COMPLETE, ProgressState.DONE}: elif task_status in {
ProgressState.COMPLETE,
ProgressState.DONE,
}:
task_counts["completed"] += 1 task_counts["completed"] += 1
elif task_status == ProgressState.ERROR: elif task_status == ProgressState.ERROR:
task_counts["error"] += 1 task_counts["error"] += 1
@@ -1022,9 +1176,10 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
heartbeat_data = { heartbeat_data = {
"current_timestamp": current_time, "current_timestamp": current_time,
"total_tasks": task_counts["active"] + task_counts["retrying"], "total_tasks": task_counts["active"]
+ task_counts["retrying"],
"task_counts": task_counts, "task_counts": task_counts,
"change_type": "heartbeat" "change_type": "heartbeat",
} }
event_json = json.dumps(heartbeat_data) event_json = json.dumps(heartbeat_data)
@@ -1034,7 +1189,13 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
except Exception as e: except Exception as e:
logger.error(f"Error in SSE event streaming: {e}", exc_info=True) logger.error(f"Error in SSE event streaming: {e}", exc_info=True)
# Send error event and continue # Send error event and continue
error_data = json.dumps({"error": "Internal server error", "timestamp": time.time(), "change_type": "error"}) error_data = json.dumps(
{
"error": "Internal server error",
"timestamp": time.time(),
"change_type": "error",
}
)
yield f"data: {error_data}\n\n" yield f"data: {error_data}\n\n"
await asyncio.sleep(1) await asyncio.sleep(1)
@@ -1056,12 +1217,14 @@ async def stream_task_updates(request: Request, current_user: User = Depends(get
"Connection": "keep-alive", "Connection": "keep-alive",
"Content-Type": "text/event-stream", "Content-Type": "text/event-stream",
"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control" "Access-Control-Allow-Headers": "Cache-Control",
} },
) )
async def generate_task_update_event(since_timestamp: float, active_only: bool, request: Request) -> str: async def generate_task_update_event(
since_timestamp: float, active_only: bool, request: Request
) -> str:
""" """
Generate initial task update event for SSE connection. Generate initial task update event for SSE connection.
This replicates the logic from get_task_updates but for SSE format. This replicates the logic from get_task_updates but for SSE format.
@@ -1089,16 +1252,28 @@ async def generate_task_update_event(since_timestamp: float, active_only: bool,
is_active_task = is_task_active(task_status) is_active_task = is_task_active(task_status)
# Check if task has been updated since the given timestamp # Check if task has been updated since the given timestamp
task_timestamp = last_status.get("timestamp") if last_status else task_info.get("created_at", 0) task_timestamp = (
last_status.get("timestamp")
if last_status
else task_info.get("created_at", 0)
)
# Always include active tasks in updates, apply filtering to others # Always include active tasks in updates, apply filtering to others
# Also include recently completed/terminal tasks to ensure "done" status gets sent # Also include recently completed/terminal tasks to ensure "done" status gets sent
is_recently_terminal = task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp is_recently_terminal = (
should_include = is_active_task or (task_timestamp > since_timestamp and not active_only) or is_recently_terminal task_status in TERMINAL_TASK_STATES and task_timestamp > since_timestamp
)
should_include = (
is_active_task
or (task_timestamp > since_timestamp and not active_only)
or is_recently_terminal
)
if should_include: if should_include:
# Construct the same detailed task object as in updates endpoint # Construct the same detailed task object as in updates endpoint
task_response = _build_task_response(task_info, last_status, task_id, current_time, request) task_response = _build_task_response(
task_info, last_status, task_id, current_time, request
)
if is_active_task: if is_active_task:
active_tasks.append(task_response) active_tasks.append(task_response)
@@ -1109,17 +1284,19 @@ async def generate_task_update_event(since_timestamp: float, active_only: bool,
all_returned_tasks = active_tasks + updated_tasks all_returned_tasks = active_tasks + updated_tasks
# Sort by priority (active first, then by creation time) # Sort by priority (active first, then by creation time)
all_returned_tasks.sort(key=lambda x: ( all_returned_tasks.sort(
0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1, key=lambda x: (
-(x.get("created_at") or 0) 0 if x.get("task_id") in [t["task_id"] for t in active_tasks] else 1,
)) -(x.get("created_at") or 0),
)
)
initial_data = { initial_data = {
"tasks": all_returned_tasks, "tasks": all_returned_tasks,
"current_timestamp": current_time, "current_timestamp": current_time,
"updated_count": len(updated_tasks), "updated_count": len(updated_tasks),
"since_timestamp": since_timestamp, "since_timestamp": since_timestamp,
"initial": True # Mark as initial load "initial": True, # Mark as initial load
} }
# Add global task counts since this bypasses the broadcaster # Add global task counts since this bypasses the broadcaster
@@ -1130,14 +1307,20 @@ async def generate_task_update_event(since_timestamp: float, active_only: bool,
except Exception as e: except Exception as e:
logger.error(f"Error generating initial SSE event: {e}", exc_info=True) logger.error(f"Error generating initial SSE event: {e}", exc_info=True)
error_data = json.dumps({"error": "Failed to load initial data", "timestamp": time.time()}) error_data = json.dumps(
{"error": "Failed to load initial data", "timestamp": time.time()}
)
return f"data: {error_data}\n\n" return f"data: {error_data}\n\n"
# IMPORTANT: This parameterized route MUST come AFTER all specific routes # IMPORTANT: This parameterized route MUST come AFTER all specific routes
# Otherwise FastAPI will match specific routes like "/updates" as task_id parameters # Otherwise FastAPI will match specific routes like "/updates" as task_id parameters
@router.get("/{task_id}") @router.get("/{task_id}")
async def get_task_details(task_id: str, request: Request, current_user: User = Depends(require_auth_from_state)): async def get_task_details(
task_id: str,
request: Request,
current_user: User = Depends(require_auth_from_state),
):
""" """
Return a JSON object with the resource type, its name (title), Return a JSON object with the resource type, its name (title),
the last progress update, and, if available, the original request parameters. the last progress update, and, if available, the original request parameters.

View File

@@ -101,7 +101,7 @@ def download_album(
) )
dl.download_albumspo( dl.download_albumspo(
link_album=url, # Spotify URL link_album=url, # Spotify URL
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, # Deezer quality quality_download=quality, # Deezer quality
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -159,7 +159,7 @@ def download_album(
) )
spo.download_album( spo.download_album(
link_album=url, # Spotify URL link_album=url, # Spotify URL
output_dir="./downloads", output_dir="/app/downloads",
quality_download=fall_quality, # Spotify quality quality_download=fall_quality, # Spotify quality
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -216,7 +216,7 @@ def download_album(
) )
spo.download_album( spo.download_album(
link_album=url, link_album=url,
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, quality_download=quality,
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -260,7 +260,7 @@ def download_album(
) )
dl.download_albumdee( # Deezer URL, download via Deezer dl.download_albumdee( # Deezer URL, download via Deezer
link_album=url, link_album=url,
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, quality_download=quality,
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,

View File

@@ -2,6 +2,7 @@ import subprocess
import logging import logging
import time import time
import threading import threading
import sys
# Import Celery task utilities # Import Celery task utilities
from .celery_config import get_config_params, MAX_CONCURRENT_DL from .celery_config import get_config_params, MAX_CONCURRENT_DL
@@ -46,6 +47,8 @@ class CeleryManager:
# %h is replaced by celery with the actual hostname. # %h is replaced by celery with the actual hostname.
hostname = f"worker_{worker_name_suffix}@%h" hostname = f"worker_{worker_name_suffix}@%h"
command = [ command = [
sys.executable,
"-m",
"celery", "celery",
"-A", "-A",
self.app_name, self.app_name,
@@ -73,7 +76,10 @@ class CeleryManager:
log_method = logger.info # Default log method log_method = logger.info # Default log method
if error: # This is a stderr stream if error: # This is a stderr stream
if " - ERROR - " in line_stripped or " - CRITICAL - " in line_stripped: if (
" - ERROR - " in line_stripped
or " - CRITICAL - " in line_stripped
):
log_method = logger.error log_method = logger.error
elif " - WARNING - " in line_stripped: elif " - WARNING - " in line_stripped:
log_method = logger.warning log_method = logger.warning
@@ -151,7 +157,7 @@ class CeleryManager:
queues="utility_tasks,default", # Listen to utility and default queues="utility_tasks,default", # Listen to utility and default
concurrency=5, # Increased concurrency for SSE updates and utility tasks concurrency=5, # Increased concurrency for SSE updates and utility tasks
worker_name_suffix="utw", # Utility Worker worker_name_suffix="utw", # Utility Worker
log_level="ERROR" # Reduce log verbosity for utility worker (only errors) log_level="ERROR", # Reduce log verbosity for utility worker (only errors)
) )
logger.info( logger.info(
f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}" f"Starting Celery Utility Worker with command: {' '.join(utility_cmd)}"

View File

@@ -98,7 +98,7 @@ def download_playlist(
) )
dl.download_playlistspo( dl.download_playlistspo(
link_playlist=url, # Spotify URL link_playlist=url, # Spotify URL
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, # Deezer quality quality_download=quality, # Deezer quality
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -161,7 +161,7 @@ def download_playlist(
) )
spo.download_playlist( spo.download_playlist(
link_playlist=url, # Spotify URL link_playlist=url, # Spotify URL
output_dir="./downloads", output_dir="/app/downloads",
quality_download=fall_quality, # Spotify quality quality_download=fall_quality, # Spotify quality
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -224,7 +224,7 @@ def download_playlist(
) )
spo.download_playlist( spo.download_playlist(
link_playlist=url, link_playlist=url,
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, quality_download=quality,
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -268,7 +268,7 @@ def download_playlist(
) )
dl.download_playlistdee( # Deezer URL, download via Deezer dl.download_playlistdee( # Deezer URL, download via Deezer
link_playlist=url, link_playlist=url,
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, quality_download=quality,
recursive_quality=recursive_quality, # Usually False for playlists to get individual track qualities recursive_quality=recursive_quality, # Usually False for playlists to get individual track qualities
recursive_download=False, recursive_download=False,

View File

@@ -94,7 +94,7 @@ def download_track(
# download_trackspo means: Spotify URL, download via Deezer # download_trackspo means: Spotify URL, download via Deezer
dl.download_trackspo( dl.download_trackspo(
link_track=url, # Spotify URL link_track=url, # Spotify URL
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, # Deezer quality quality_download=quality, # Deezer quality
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -153,7 +153,7 @@ def download_track(
) )
spo.download_track( spo.download_track(
link_track=url, # Spotify URL link_track=url, # Spotify URL
output_dir="./downloads", output_dir="/app/downloads",
quality_download=fall_quality, # Spotify quality quality_download=fall_quality, # Spotify quality
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -169,7 +169,7 @@ def download_track(
convert_to=convert_to, convert_to=convert_to,
bitrate=bitrate, bitrate=bitrate,
artist_separator=artist_separator, artist_separator=artist_separator,
real_time_multiplier=real_time_multiplier, spotify_metadata=spotify_metadata,
pad_number_width=pad_number_width, pad_number_width=pad_number_width,
) )
print( print(
@@ -211,7 +211,7 @@ def download_track(
) )
spo.download_track( spo.download_track(
link_track=url, link_track=url,
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, quality_download=quality,
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,
@@ -254,7 +254,7 @@ def download_track(
) )
dl.download_trackdee( # Deezer URL, download via Deezer dl.download_trackdee( # Deezer URL, download via Deezer
link_track=url, link_track=url,
output_dir="./downloads", output_dir="/app/downloads",
quality_download=quality, quality_download=quality,
recursive_quality=recursive_quality, recursive_quality=recursive_quality,
recursive_download=False, recursive_download=False,

View File

@@ -1098,7 +1098,7 @@ def update_playlist_m3u_file(playlist_spotify_id: str):
# Get configuration settings # Get configuration settings
output_dir = ( output_dir = (
"./downloads" # This matches the output_dir used in download functions "/app/downloads" # This matches the output_dir used in download functions
) )
# Get all tracks for the playlist # Get all tracks for the playlist
@@ -1125,14 +1125,14 @@ def update_playlist_m3u_file(playlist_spotify_id: str):
skipped_missing_final_path = 0 skipped_missing_final_path = 0
for track in tracks: for track in tracks:
# Use final_path from deezspot summary and convert from ./downloads to ../ relative path # Use final_path from deezspot summary and convert from /app/downloads to ../ relative path
final_path = track.get("final_path") final_path = track.get("final_path")
if not final_path: if not final_path:
skipped_missing_final_path += 1 skipped_missing_final_path += 1
continue continue
normalized = str(final_path).replace("\\", "/") normalized = str(final_path).replace("\\", "/")
if normalized.startswith("./downloads/"): if normalized.startswith("/app/downloads/"):
relative_path = normalized.replace("./downloads/", "../", 1) relative_path = normalized.replace("/app/downloads/", "../", 1)
elif "/downloads/" in normalized.lower(): elif "/downloads/" in normalized.lower():
idx = normalized.lower().rfind("/downloads/") idx = normalized.lower().rfind("/downloads/")
relative_path = "../" + normalized[idx + len("/downloads/") :] relative_path = "../" + normalized[idx + len("/downloads/") :]