diff --git a/.env.example b/.env.example index e229325..50a5425 100644 --- a/.env.example +++ b/.env.example @@ -4,7 +4,7 @@ ### can leave the defaults as they are. ### ### If you plan on using for a server, -### see [insert docs url] +### see https://spotizerr.rtfd.io ### # Interface to bind to. Unless you know what you're doing, don't change this @@ -62,4 +62,4 @@ GITHUB_CLIENT_SECRET= # Log level for application logging. # Possible values: debug, info, warning, error, critical # Set to 'info' or 'warning' for general use. Use 'debug' for troubleshooting. -LOG_LEVEL=info \ No newline at end of file +LOG_LEVEL=info diff --git a/.github/workflows/pr-build.yml b/.github/workflows/pr-build.yml new file mode 100644 index 0000000..f4cea3a --- /dev/null +++ b/.github/workflows/pr-build.yml @@ -0,0 +1,60 @@ +name: PR Dev/Test Container + +on: + pull_request: + types: [opened, synchronize, reopened] + workflow_dispatch: + inputs: + pr_number: + description: 'Pull request number (optional, for manual runs)' + required: false + branch: + description: 'Branch to build (optional, defaults to PR head or main)' + required: false + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.branch || github.head_ref || github.ref }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to GHCR + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + # Extract Docker metadata + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=raw,value=dev-pr-${{ github.event.inputs.pr_number || github.event.pull_request.number }} + + # Build and push multi-arch dev image + - name: Build and push + uses: docker/build-push-action@v4 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/README.md b/README.md index 5627571..80c7030 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,10 @@ If you self-host a music server with other users than yourself, you almost certa image +## How do I start? + +Docs are available at: https://spotizerr.rtfd.io + ### Common Issues **Downloads not starting?** diff --git a/routes/system/progress.py b/routes/system/progress.py index 6001762..812455a 100755 --- a/routes/system/progress.py +++ b/routes/system/progress.py @@ -8,7 +8,7 @@ from typing import Set, Optional import redis import threading -from routes.utils.celery_config import REDIS_URL +from routes.utils.celery_config import REDIS_URL, get_config_params from routes.utils.celery_tasks import ( get_task_info, @@ -37,6 +37,11 @@ router = APIRouter() class SSEBroadcaster: def __init__(self): self.clients: Set[asyncio.Queue] = set() + # Per-task throttling/batching/deduplication state + self._task_state = {} # task_id -> dict with last_sent, last_event, last_send_time, scheduled_handle + # Load configurable interval + config = get_config_params() + self.sse_update_interval = float(config.get("sseUpdateIntervalSeconds", 1)) async def add_client(self, queue: asyncio.Queue): """Add a new SSE client""" @@ -49,51 +54,105 @@ class SSEBroadcaster: logger.debug(f"SSE: Client disconnected (total: {len(self.clients)})") async def broadcast_event(self, event_data: dict): - """Broadcast an event to all connected clients""" - logger.debug( - f"SSE Broadcaster: Attempting to broadcast to {len(self.clients)} clients" - ) - + """ + Throttle, batch, and deduplicate SSE events per task. + Only emit at most 1 update/sec per task, aggregate within window, suppress redundant updates. + """ if not self.clients: logger.debug("SSE Broadcaster: No clients connected, skipping broadcast") return + # Defensive: always work with a list of tasks + tasks = event_data.get("tasks", []) + if not isinstance(tasks, list): + tasks = [tasks] + + # For each task, throttle/batch/dedupe + for task in tasks: + task_id = task.get("task_id") + if not task_id: + continue + + now = time.time() + state = self._task_state.setdefault(task_id, { + "last_sent": None, + "last_event": None, + "last_send_time": 0, + "scheduled_handle": None, + }) + + # Deduplication: if event is identical to last sent, skip + if state["last_sent"] is not None and self._events_equal(state["last_sent"], task): + logger.debug(f"SSE: Deduped event for task {task_id}") + continue + + # Throttling: if within interval, batch (store as last_event, schedule send) + elapsed = now - state["last_send_time"] + if elapsed < self.sse_update_interval: + state["last_event"] = task + if state["scheduled_handle"] is None: + delay = self.sse_update_interval - elapsed + loop = asyncio.get_event_loop() + state["scheduled_handle"] = loop.call_later( + delay, lambda: asyncio.create_task(self._send_batched_event(task_id)) + ) + continue + + # Otherwise, send immediately + await self._send_event(task_id, task) + state["last_send_time"] = now + state["last_sent"] = task + state["last_event"] = None + if state["scheduled_handle"]: + state["scheduled_handle"].cancel() + state["scheduled_handle"] = None + + async def _send_batched_event(self, task_id): + state = self._task_state.get(task_id) + if not state or not state["last_event"]: + return + await self._send_event(task_id, state["last_event"]) + state["last_send_time"] = time.time() + state["last_sent"] = state["last_event"] + state["last_event"] = None + state["scheduled_handle"] = None + + async def _send_event(self, task_id, task): + # Compose event_data for this task + event_data = { + "tasks": [task], + "current_timestamp": time.time(), + "change_type": "update", + } + enhanced_event_data = add_global_task_counts_to_event(event_data.copy()) - # Add global task counts right before broadcasting - this is the single source of truth - # Skip expensive count recomputation for high-frequency callback/progress updates - try: - trigger_reason = event_data.get("trigger_reason") - except Exception: - trigger_reason = None - if trigger_reason and trigger_reason in {"callback_update", "progress_update"}: - enhanced_event_data = event_data.copy() - else: - enhanced_event_data = add_global_task_counts_to_event(event_data.copy()) event_json = json.dumps(enhanced_event_data) sse_data = f"data: {event_json}\n\n" - logger.debug( - f"SSE Broadcaster: Broadcasting event: {enhanced_event_data.get('change_type', 'unknown')} with {enhanced_event_data.get('active_tasks', 0)} active tasks" - ) - - # Send to all clients, remove disconnected ones disconnected = set() sent_count = 0 for client_queue in self.clients.copy(): try: await client_queue.put(sse_data) sent_count += 1 - logger.debug("SSE: Successfully sent to client queue") except Exception as e: logger.error(f"SSE: Failed to send to client: {e}") disconnected.add(client_queue) - - # Clean up disconnected clients for client in disconnected: self.clients.discard(client) logger.debug( - f"SSE Broadcaster: Successfully sent to {sent_count} clients, removed {len(disconnected)} disconnected clients" + f"SSE Broadcaster: Sent throttled/batched event for task {task_id} to {sent_count} clients" ) + def _events_equal(self, a, b): + # Compare two task dicts for deduplication (ignore timestamps) + if not isinstance(a, dict) or not isinstance(b, dict): + return False + a_copy = dict(a) + b_copy = dict(b) + a_copy.pop("timestamp", None) + b_copy.pop("timestamp", None) + return a_copy == b_copy + # Global broadcaster instance sse_broadcaster = SSEBroadcaster() diff --git a/routes/utils/celery_config.py b/routes/utils/celery_config.py index 9a2cc86..d6e9f21 100644 --- a/routes/utils/celery_config.py +++ b/routes/utils/celery_config.py @@ -54,6 +54,7 @@ DEFAULT_MAIN_CONFIG = { "watch": {}, "realTimeMultiplier": 0, "padNumberWidth": 3, + "sseUpdateIntervalSeconds": 1, # Configurable SSE update interval (default: 1s) } @@ -190,7 +191,7 @@ task_annotations = { "rate_limit": f"{MAX_CONCURRENT_DL}/m", }, "routes.utils.celery_tasks.trigger_sse_update_task": { - "rate_limit": "500/m", # Allow high rate for real-time SSE updates + "rate_limit": "60/m", # Throttle to 1 update/sec per task (matches SSE throttle) "default_retry_delay": 1, # Quick retry for SSE updates "max_retries": 1, # Limited retries for best-effort delivery "ignore_result": True, # Don't store results for SSE tasks