Merge pull request #338 from Phlogi/performance-improvements
enh(api): add per-task sse throttling and batching for robust updates
This commit is contained in:
@@ -4,7 +4,7 @@
|
||||
### can leave the defaults as they are.
|
||||
###
|
||||
### If you plan on using for a server,
|
||||
### see [insert docs url]
|
||||
### see https://spotizerr.rtfd.io
|
||||
###
|
||||
|
||||
# Interface to bind to. Unless you know what you're doing, don't change this
|
||||
@@ -62,4 +62,4 @@ GITHUB_CLIENT_SECRET=
|
||||
# Log level for application logging.
|
||||
# Possible values: debug, info, warning, error, critical
|
||||
# Set to 'info' or 'warning' for general use. Use 'debug' for troubleshooting.
|
||||
LOG_LEVEL=info
|
||||
LOG_LEVEL=info
|
||||
|
||||
60
.github/workflows/pr-build.yml
vendored
Normal file
60
.github/workflows/pr-build.yml
vendored
Normal file
@@ -0,0 +1,60 @@
|
||||
name: PR Dev/Test Container
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened]
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
pr_number:
|
||||
description: 'Pull request number (optional, for manual runs)'
|
||||
required: false
|
||||
branch:
|
||||
description: 'Branch to build (optional, defaults to PR head or main)'
|
||||
required: false
|
||||
|
||||
env:
|
||||
REGISTRY: ghcr.io
|
||||
IMAGE_NAME: ${{ github.repository }}
|
||||
|
||||
jobs:
|
||||
build-and-push:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
packages: write
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: ${{ github.event.inputs.branch || github.head_ref || github.ref }}
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
|
||||
- name: Login to GHCR
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
# Extract Docker metadata
|
||||
- name: Extract Docker metadata
|
||||
id: meta
|
||||
uses: docker/metadata-action@v4
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
tags: |
|
||||
type=raw,value=dev-pr-${{ github.event.inputs.pr_number || github.event.pull_request.number }}
|
||||
|
||||
# Build and push multi-arch dev image
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: true
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
@@ -27,6 +27,10 @@ If you self-host a music server with other users than yourself, you almost certa
|
||||
<img width="1588" height="994" alt="image" src="https://github.com/user-attachments/assets/e34d7dbb-29e3-4d75-bcbd-0cee03fa57dc" />
|
||||
</details>
|
||||
|
||||
## How do I start?
|
||||
|
||||
Docs are available at: https://spotizerr.rtfd.io
|
||||
|
||||
### Common Issues
|
||||
|
||||
**Downloads not starting?**
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import Set, Optional
|
||||
|
||||
import redis
|
||||
import threading
|
||||
from routes.utils.celery_config import REDIS_URL
|
||||
from routes.utils.celery_config import REDIS_URL, get_config_params
|
||||
|
||||
from routes.utils.celery_tasks import (
|
||||
get_task_info,
|
||||
@@ -37,6 +37,11 @@ router = APIRouter()
|
||||
class SSEBroadcaster:
|
||||
def __init__(self):
|
||||
self.clients: Set[asyncio.Queue] = set()
|
||||
# Per-task throttling/batching/deduplication state
|
||||
self._task_state = {} # task_id -> dict with last_sent, last_event, last_send_time, scheduled_handle
|
||||
# Load configurable interval
|
||||
config = get_config_params()
|
||||
self.sse_update_interval = float(config.get("sseUpdateIntervalSeconds", 1))
|
||||
|
||||
async def add_client(self, queue: asyncio.Queue):
|
||||
"""Add a new SSE client"""
|
||||
@@ -49,51 +54,105 @@ class SSEBroadcaster:
|
||||
logger.debug(f"SSE: Client disconnected (total: {len(self.clients)})")
|
||||
|
||||
async def broadcast_event(self, event_data: dict):
|
||||
"""Broadcast an event to all connected clients"""
|
||||
logger.debug(
|
||||
f"SSE Broadcaster: Attempting to broadcast to {len(self.clients)} clients"
|
||||
)
|
||||
|
||||
"""
|
||||
Throttle, batch, and deduplicate SSE events per task.
|
||||
Only emit at most 1 update/sec per task, aggregate within window, suppress redundant updates.
|
||||
"""
|
||||
if not self.clients:
|
||||
logger.debug("SSE Broadcaster: No clients connected, skipping broadcast")
|
||||
return
|
||||
# Defensive: always work with a list of tasks
|
||||
tasks = event_data.get("tasks", [])
|
||||
if not isinstance(tasks, list):
|
||||
tasks = [tasks]
|
||||
|
||||
# For each task, throttle/batch/dedupe
|
||||
for task in tasks:
|
||||
task_id = task.get("task_id")
|
||||
if not task_id:
|
||||
continue
|
||||
|
||||
now = time.time()
|
||||
state = self._task_state.setdefault(task_id, {
|
||||
"last_sent": None,
|
||||
"last_event": None,
|
||||
"last_send_time": 0,
|
||||
"scheduled_handle": None,
|
||||
})
|
||||
|
||||
# Deduplication: if event is identical to last sent, skip
|
||||
if state["last_sent"] is not None and self._events_equal(state["last_sent"], task):
|
||||
logger.debug(f"SSE: Deduped event for task {task_id}")
|
||||
continue
|
||||
|
||||
# Throttling: if within interval, batch (store as last_event, schedule send)
|
||||
elapsed = now - state["last_send_time"]
|
||||
if elapsed < self.sse_update_interval:
|
||||
state["last_event"] = task
|
||||
if state["scheduled_handle"] is None:
|
||||
delay = self.sse_update_interval - elapsed
|
||||
loop = asyncio.get_event_loop()
|
||||
state["scheduled_handle"] = loop.call_later(
|
||||
delay, lambda: asyncio.create_task(self._send_batched_event(task_id))
|
||||
)
|
||||
continue
|
||||
|
||||
# Otherwise, send immediately
|
||||
await self._send_event(task_id, task)
|
||||
state["last_send_time"] = now
|
||||
state["last_sent"] = task
|
||||
state["last_event"] = None
|
||||
if state["scheduled_handle"]:
|
||||
state["scheduled_handle"].cancel()
|
||||
state["scheduled_handle"] = None
|
||||
|
||||
async def _send_batched_event(self, task_id):
|
||||
state = self._task_state.get(task_id)
|
||||
if not state or not state["last_event"]:
|
||||
return
|
||||
await self._send_event(task_id, state["last_event"])
|
||||
state["last_send_time"] = time.time()
|
||||
state["last_sent"] = state["last_event"]
|
||||
state["last_event"] = None
|
||||
state["scheduled_handle"] = None
|
||||
|
||||
async def _send_event(self, task_id, task):
|
||||
# Compose event_data for this task
|
||||
event_data = {
|
||||
"tasks": [task],
|
||||
"current_timestamp": time.time(),
|
||||
"change_type": "update",
|
||||
}
|
||||
enhanced_event_data = add_global_task_counts_to_event(event_data.copy())
|
||||
|
||||
# Add global task counts right before broadcasting - this is the single source of truth
|
||||
# Skip expensive count recomputation for high-frequency callback/progress updates
|
||||
try:
|
||||
trigger_reason = event_data.get("trigger_reason")
|
||||
except Exception:
|
||||
trigger_reason = None
|
||||
if trigger_reason and trigger_reason in {"callback_update", "progress_update"}:
|
||||
enhanced_event_data = event_data.copy()
|
||||
else:
|
||||
enhanced_event_data = add_global_task_counts_to_event(event_data.copy())
|
||||
event_json = json.dumps(enhanced_event_data)
|
||||
sse_data = f"data: {event_json}\n\n"
|
||||
|
||||
logger.debug(
|
||||
f"SSE Broadcaster: Broadcasting event: {enhanced_event_data.get('change_type', 'unknown')} with {enhanced_event_data.get('active_tasks', 0)} active tasks"
|
||||
)
|
||||
|
||||
# Send to all clients, remove disconnected ones
|
||||
disconnected = set()
|
||||
sent_count = 0
|
||||
for client_queue in self.clients.copy():
|
||||
try:
|
||||
await client_queue.put(sse_data)
|
||||
sent_count += 1
|
||||
logger.debug("SSE: Successfully sent to client queue")
|
||||
except Exception as e:
|
||||
logger.error(f"SSE: Failed to send to client: {e}")
|
||||
disconnected.add(client_queue)
|
||||
|
||||
# Clean up disconnected clients
|
||||
for client in disconnected:
|
||||
self.clients.discard(client)
|
||||
logger.debug(
|
||||
f"SSE Broadcaster: Successfully sent to {sent_count} clients, removed {len(disconnected)} disconnected clients"
|
||||
f"SSE Broadcaster: Sent throttled/batched event for task {task_id} to {sent_count} clients"
|
||||
)
|
||||
|
||||
def _events_equal(self, a, b):
|
||||
# Compare two task dicts for deduplication (ignore timestamps)
|
||||
if not isinstance(a, dict) or not isinstance(b, dict):
|
||||
return False
|
||||
a_copy = dict(a)
|
||||
b_copy = dict(b)
|
||||
a_copy.pop("timestamp", None)
|
||||
b_copy.pop("timestamp", None)
|
||||
return a_copy == b_copy
|
||||
|
||||
|
||||
# Global broadcaster instance
|
||||
sse_broadcaster = SSEBroadcaster()
|
||||
|
||||
@@ -54,6 +54,7 @@ DEFAULT_MAIN_CONFIG = {
|
||||
"watch": {},
|
||||
"realTimeMultiplier": 0,
|
||||
"padNumberWidth": 3,
|
||||
"sseUpdateIntervalSeconds": 1, # Configurable SSE update interval (default: 1s)
|
||||
}
|
||||
|
||||
|
||||
@@ -190,7 +191,7 @@ task_annotations = {
|
||||
"rate_limit": f"{MAX_CONCURRENT_DL}/m",
|
||||
},
|
||||
"routes.utils.celery_tasks.trigger_sse_update_task": {
|
||||
"rate_limit": "500/m", # Allow high rate for real-time SSE updates
|
||||
"rate_limit": "60/m", # Throttle to 1 update/sec per task (matches SSE throttle)
|
||||
"default_retry_delay": 1, # Quick retry for SSE updates
|
||||
"max_retries": 1, # Limited retries for best-effort delivery
|
||||
"ignore_result": True, # Don't store results for SSE tasks
|
||||
|
||||
Reference in New Issue
Block a user