diff --git a/routes/prgs.py b/routes/prgs.py
index 1858ba4..1f25f97 100755
--- a/routes/prgs.py
+++ b/routes/prgs.py
@@ -3,7 +3,6 @@ import os
import json
import logging
import time
-import random
from routes.utils.celery_tasks import (
get_task_info,
@@ -413,8 +412,8 @@ def stream_task_status(task_id):
# Sort updates by id
sorted_updates = sorted(all_updates, key=lambda x: x.get("id", 0))
- # Limit to send only the 5 most recent updates to reduce initial payload
- for i, update in enumerate(sorted_updates[-5:]):
+ # Send the most recent updates first (up to 10)
+ for i, update in enumerate(sorted_updates[-10:]):
# Add the task_id to each update message
update["task_id"] = task_id
yield f"event: update\ndata: {json.dumps(update)}\n\n"
@@ -430,76 +429,65 @@ def stream_task_status(task_id):
# Hold the connection open and check for updates
last_heartbeat = time.time()
- heartbeat_interval = 30 # Increased from 15 to 30 seconds to reduce overhead
-
- # Optimize polling with a more efficient loop structure
- check_interval = 0.2 # Check for messages every 200ms instead of continuously
- message_batch_size = 5 # Process up to 5 messages at a time
+ heartbeat_interval = 15 # Send heartbeat every 15 seconds
while True:
- # Process a batch of messages to reduce CPU usage
- messages_processed = 0
- while messages_processed < message_batch_size:
- # Check for new updates via Redis Pub/Sub with a timeout
- message = redis_pubsub.get_message(timeout=check_interval)
-
- if not message:
- break # No more messages to process
-
- if message['type'] == 'message':
- messages_processed += 1
- # Got a new message from Redis Pub/Sub
- try:
- data = json.loads(message['data'].decode('utf-8'))
- status_id = data.get('status_id', 0)
+ # Check for new updates via Redis Pub/Sub
+ message = redis_pubsub.get_message(timeout=1.0)
+
+ if message and message['type'] == 'message':
+ # Got a new message from Redis Pub/Sub
+ try:
+ data = json.loads(message['data'].decode('utf-8'))
+ status_id = data.get('status_id', 0)
+
+ # Fetch the actual status data
+ if status_id > last_sent_id:
+ all_status = redis_client.lrange(f"task:{task_id}:status", 0, -1)
- # Only process if this is a new status update
- if status_id > last_sent_id:
- # Efficient fetch - only get the specific status update we need
- for idx in range(-10, 0): # Check last 10 entries for efficiency
- status_data = redis_client.lindex(f"task:{task_id}:status", idx)
- if status_data:
- status = json.loads(status_data.decode('utf-8'))
- if status.get("id") == status_id:
- # Add the task_id to the update
- status["task_id"] = task_id
-
- # Choose the appropriate event type based on status
- status_type = status.get("status", "")
- event_type = "update"
-
- if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE:
- event_type = "complete"
- elif status_type == ProgressState.TRACK_COMPLETE:
- # Create a distinct event type for track completion to prevent UI issues
- event_type = "track_complete"
- elif status_type == ProgressState.ERROR:
- event_type = "error"
- elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]:
- event_type = "progress"
-
- # Send the update
- yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n"
- last_sent_id = status_id
- break
- except Exception as e:
- logger.error(f"Error processing Redis Pub/Sub message: {e}")
+ for status_data in all_status:
+ try:
+ status = json.loads(status_data.decode('utf-8'))
+ if status.get("id") == status_id:
+ # Add the task_id to the update
+ status["task_id"] = task_id
+
+ # Choose the appropriate event type based on status
+ status_type = status.get("status", "")
+ event_type = "update"
+
+ if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE:
+ event_type = "complete"
+ elif status_type == ProgressState.TRACK_COMPLETE:
+ # Create a distinct event type for track completion to prevent UI issues
+ event_type = "track_complete"
+ elif status_type == ProgressState.ERROR:
+ event_type = "error"
+ elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]:
+ event_type = "progress"
+
+ # Send the update
+ yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n"
+ last_sent_id = status_id
+ break
+ except Exception as e:
+ logger.error(f"Error parsing status data: {e}")
+ except Exception as e:
+ logger.error(f"Error processing Redis Pub/Sub message: {e}")
# Check if task is complete, error, or cancelled - if so, end the stream
- # Only do this check every 5 loops to reduce load
- if random.random() < 0.2: # ~20% chance to check terminal status each loop
- last_status = get_last_task_status(task_id)
- if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]:
- # Send final message
- final_data = {
- "event": "end",
- "task_id": task_id,
- "status": last_status.get("status"),
- "message": last_status.get("message", "Download complete"),
- "timestamp": time.time()
- }
- yield f"event: end\ndata: {json.dumps(final_data)}\n\n"
- break
+ last_status = get_last_task_status(task_id)
+ if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]:
+ # Send final message
+ final_data = {
+ "event": "end",
+ "task_id": task_id,
+ "status": last_status.get("status"),
+ "message": last_status.get("message", "Download complete"),
+ "timestamp": time.time()
+ }
+ yield f"event: end\ndata: {json.dumps(final_data)}\n\n"
+ break
# Send a heartbeat periodically to keep the connection alive
now = time.time()
@@ -507,8 +495,8 @@ def stream_task_status(task_id):
yield f"event: heartbeat\ndata: {json.dumps({'timestamp': now})}\n\n"
last_heartbeat = now
- # More efficient sleep between batch checks
- time.sleep(check_interval)
+ # Small sleep to prevent CPU spinning
+ time.sleep(0.1)
except Exception as e:
logger.error(f"Error in SSE stream: {e}")
diff --git a/routes/utils/celery_queue_manager.py b/routes/utils/celery_queue_manager.py
index 075be2e..3317814 100644
--- a/routes/utils/celery_queue_manager.py
+++ b/routes/utils/celery_queue_manager.py
@@ -29,11 +29,11 @@ CONFIG_PATH = './config/main.json'
try:
with open(CONFIG_PATH, 'r') as f:
config_data = json.load(f)
- MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 10)
+ MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 3)
except Exception as e:
print(f"Error loading configuration: {e}")
# Fallback default
- MAX_CONCURRENT_DL = 10
+ MAX_CONCURRENT_DL = 3
def get_config_params():
"""
diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py
index 8abf183..17cb915 100644
--- a/routes/utils/celery_tasks.py
+++ b/routes/utils/celery_tasks.py
@@ -85,9 +85,6 @@ def store_task_status(task_id, status_data):
# Convert to JSON and store in Redis
redis_client.rpush(f"task:{task_id}:status", json.dumps(status_data))
- # Trim the list to keep only the most recent 100 updates to avoid excessive memory usage
- redis_client.ltrim(f"task:{task_id}:status", -100, -1)
-
# Set expiry for the list to avoid filling up Redis with old data
redis_client.expire(f"task:{task_id}:status", 60 * 60 * 24 * 7) # 7 days
redis_client.expire(f"task:{task_id}:status:next_id", 60 * 60 * 24 * 7) # 7 days
diff --git a/static/css/queue.css b/static/css/queue.css
new file mode 100644
index 0000000..0519ecb
--- /dev/null
+++ b/static/css/queue.css
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/static/js/queue.js b/static/js/queue.js
index 2de054a..c8df3bb 100644
--- a/static/js/queue.js
+++ b/static/js/queue.js
@@ -20,14 +20,12 @@ class DownloadQueue {
this.MAX_RETRIES = 3; // Default max retries
this.RETRY_DELAY = 5; // Default retry delay in seconds
this.RETRY_DELAY_INCREASE = 5; // Default retry delay increase in seconds
- this.MAX_SSE_CONNECTIONS = 5; // Maximum number of active SSE connections
this.downloadQueue = {}; // keyed by unique queueId
this.currentConfig = {}; // Cache for current config
// EventSource connections for SSE tracking
this.sseConnections = {}; // keyed by prgFile/task_id
- this.pendingForSSE = []; // Queue of entries waiting for SSE connections
// Load the saved visible count (or default to 10)
const storedVisibleCount = localStorage.getItem("downloadQueueVisibleCount");
@@ -36,9 +34,6 @@ class DownloadQueue {
// Load the cached status info (object keyed by prgFile)
this.queueCache = JSON.parse(localStorage.getItem("downloadQueueCache") || "{}");
- // Add a throttled update method to reduce UI updates
- this.throttledUpdateQueue = this.throttle(this.updateQueueOrder.bind(this), 500);
-
// Wait for initDOM to complete before setting up event listeners and loading existing PRG files.
this.initDOM().then(() => {
this.initEventListeners();
@@ -46,25 +41,6 @@ class DownloadQueue {
});
}
- /* Utility method to throttle frequent function calls */
- throttle(func, delay) {
- let lastCall = 0;
- let timeout;
- return function(...args) {
- const now = Date.now();
- if (now - lastCall < delay) {
- clearTimeout(timeout);
- timeout = setTimeout(() => {
- lastCall = now;
- func(...args);
- }, delay);
- } else {
- lastCall = now;
- func(...args);
- }
- };
- }
-
/* DOM Management */
async initDOM() {
// New HTML structure for the download queue.
@@ -559,8 +535,6 @@ class DownloadQueue {
updateQueueOrder() {
const container = document.getElementById('queueItems');
const footer = document.getElementById('queueFooter');
- if (!container || !footer) return;
-
const entries = Object.values(this.downloadQueue);
// Sorting: errors/canceled first (group 0), ongoing next (group 1), queued last (group 2, sorted by position).
@@ -595,10 +569,7 @@ class DownloadQueue {
const activeEntries = entries.filter(e => !e.hasEnded).length;
// Update the header with detailed count
- const countEl = document.getElementById('queueTotalCount');
- if (countEl) {
- countEl.textContent = totalEntries;
- }
+ document.getElementById('queueTotalCount').textContent = totalEntries;
// Update subtitle with detailed stats if we have entries
if (totalEntries > 0) {
@@ -634,45 +605,57 @@ class DownloadQueue {
}
}
- // Use DocumentFragment for better performance when updating the DOM
- const fragment = document.createDocumentFragment();
+ // Only recreate the container content if really needed
+ const visibleEntries = entries.slice(0, this.visibleCount);
// Handle empty state
if (entries.length === 0) {
- const emptyDiv = document.createElement('div');
- emptyDiv.className = 'queue-empty';
- emptyDiv.innerHTML = `
-
-
Your download queue is empty
+ container.innerHTML = ` +Your download queue is empty
+