From 85e15fb851a95a4d80dc6dfd31f6a8e8de7b5a4b Mon Sep 17 00:00:00 2001 From: "architect.in.git" Date: Sun, 23 Mar 2025 20:04:57 -0600 Subject: [PATCH] optimizations(? --- routes/prgs.py | 128 ++++----- routes/utils/celery_queue_manager.py | 4 +- routes/utils/celery_tasks.py | 3 - static/css/queue.css | 1 + static/js/queue.js | 401 +++++++++++++++------------ 5 files changed, 289 insertions(+), 248 deletions(-) create mode 100644 static/css/queue.css diff --git a/routes/prgs.py b/routes/prgs.py index 1858ba4..1f25f97 100755 --- a/routes/prgs.py +++ b/routes/prgs.py @@ -3,7 +3,6 @@ import os import json import logging import time -import random from routes.utils.celery_tasks import ( get_task_info, @@ -413,8 +412,8 @@ def stream_task_status(task_id): # Sort updates by id sorted_updates = sorted(all_updates, key=lambda x: x.get("id", 0)) - # Limit to send only the 5 most recent updates to reduce initial payload - for i, update in enumerate(sorted_updates[-5:]): + # Send the most recent updates first (up to 10) + for i, update in enumerate(sorted_updates[-10:]): # Add the task_id to each update message update["task_id"] = task_id yield f"event: update\ndata: {json.dumps(update)}\n\n" @@ -430,76 +429,65 @@ def stream_task_status(task_id): # Hold the connection open and check for updates last_heartbeat = time.time() - heartbeat_interval = 30 # Increased from 15 to 30 seconds to reduce overhead - - # Optimize polling with a more efficient loop structure - check_interval = 0.2 # Check for messages every 200ms instead of continuously - message_batch_size = 5 # Process up to 5 messages at a time + heartbeat_interval = 15 # Send heartbeat every 15 seconds while True: - # Process a batch of messages to reduce CPU usage - messages_processed = 0 - while messages_processed < message_batch_size: - # Check for new updates via Redis Pub/Sub with a timeout - message = redis_pubsub.get_message(timeout=check_interval) - - if not message: - break # No more messages to process - - if message['type'] == 'message': - messages_processed += 1 - # Got a new message from Redis Pub/Sub - try: - data = json.loads(message['data'].decode('utf-8')) - status_id = data.get('status_id', 0) + # Check for new updates via Redis Pub/Sub + message = redis_pubsub.get_message(timeout=1.0) + + if message and message['type'] == 'message': + # Got a new message from Redis Pub/Sub + try: + data = json.loads(message['data'].decode('utf-8')) + status_id = data.get('status_id', 0) + + # Fetch the actual status data + if status_id > last_sent_id: + all_status = redis_client.lrange(f"task:{task_id}:status", 0, -1) - # Only process if this is a new status update - if status_id > last_sent_id: - # Efficient fetch - only get the specific status update we need - for idx in range(-10, 0): # Check last 10 entries for efficiency - status_data = redis_client.lindex(f"task:{task_id}:status", idx) - if status_data: - status = json.loads(status_data.decode('utf-8')) - if status.get("id") == status_id: - # Add the task_id to the update - status["task_id"] = task_id - - # Choose the appropriate event type based on status - status_type = status.get("status", "") - event_type = "update" - - if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE: - event_type = "complete" - elif status_type == ProgressState.TRACK_COMPLETE: - # Create a distinct event type for track completion to prevent UI issues - event_type = "track_complete" - elif status_type == ProgressState.ERROR: - event_type = "error" - elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]: - event_type = "progress" - - # Send the update - yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n" - last_sent_id = status_id - break - except Exception as e: - logger.error(f"Error processing Redis Pub/Sub message: {e}") + for status_data in all_status: + try: + status = json.loads(status_data.decode('utf-8')) + if status.get("id") == status_id: + # Add the task_id to the update + status["task_id"] = task_id + + # Choose the appropriate event type based on status + status_type = status.get("status", "") + event_type = "update" + + if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE: + event_type = "complete" + elif status_type == ProgressState.TRACK_COMPLETE: + # Create a distinct event type for track completion to prevent UI issues + event_type = "track_complete" + elif status_type == ProgressState.ERROR: + event_type = "error" + elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]: + event_type = "progress" + + # Send the update + yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n" + last_sent_id = status_id + break + except Exception as e: + logger.error(f"Error parsing status data: {e}") + except Exception as e: + logger.error(f"Error processing Redis Pub/Sub message: {e}") # Check if task is complete, error, or cancelled - if so, end the stream - # Only do this check every 5 loops to reduce load - if random.random() < 0.2: # ~20% chance to check terminal status each loop - last_status = get_last_task_status(task_id) - if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]: - # Send final message - final_data = { - "event": "end", - "task_id": task_id, - "status": last_status.get("status"), - "message": last_status.get("message", "Download complete"), - "timestamp": time.time() - } - yield f"event: end\ndata: {json.dumps(final_data)}\n\n" - break + last_status = get_last_task_status(task_id) + if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]: + # Send final message + final_data = { + "event": "end", + "task_id": task_id, + "status": last_status.get("status"), + "message": last_status.get("message", "Download complete"), + "timestamp": time.time() + } + yield f"event: end\ndata: {json.dumps(final_data)}\n\n" + break # Send a heartbeat periodically to keep the connection alive now = time.time() @@ -507,8 +495,8 @@ def stream_task_status(task_id): yield f"event: heartbeat\ndata: {json.dumps({'timestamp': now})}\n\n" last_heartbeat = now - # More efficient sleep between batch checks - time.sleep(check_interval) + # Small sleep to prevent CPU spinning + time.sleep(0.1) except Exception as e: logger.error(f"Error in SSE stream: {e}") diff --git a/routes/utils/celery_queue_manager.py b/routes/utils/celery_queue_manager.py index 075be2e..3317814 100644 --- a/routes/utils/celery_queue_manager.py +++ b/routes/utils/celery_queue_manager.py @@ -29,11 +29,11 @@ CONFIG_PATH = './config/main.json' try: with open(CONFIG_PATH, 'r') as f: config_data = json.load(f) - MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 10) + MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 3) except Exception as e: print(f"Error loading configuration: {e}") # Fallback default - MAX_CONCURRENT_DL = 10 + MAX_CONCURRENT_DL = 3 def get_config_params(): """ diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index 8abf183..17cb915 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -85,9 +85,6 @@ def store_task_status(task_id, status_data): # Convert to JSON and store in Redis redis_client.rpush(f"task:{task_id}:status", json.dumps(status_data)) - # Trim the list to keep only the most recent 100 updates to avoid excessive memory usage - redis_client.ltrim(f"task:{task_id}:status", -100, -1) - # Set expiry for the list to avoid filling up Redis with old data redis_client.expire(f"task:{task_id}:status", 60 * 60 * 24 * 7) # 7 days redis_client.expire(f"task:{task_id}:status:next_id", 60 * 60 * 24 * 7) # 7 days diff --git a/static/css/queue.css b/static/css/queue.css new file mode 100644 index 0000000..0519ecb --- /dev/null +++ b/static/css/queue.css @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/static/js/queue.js b/static/js/queue.js index 2de054a..c8df3bb 100644 --- a/static/js/queue.js +++ b/static/js/queue.js @@ -20,14 +20,12 @@ class DownloadQueue { this.MAX_RETRIES = 3; // Default max retries this.RETRY_DELAY = 5; // Default retry delay in seconds this.RETRY_DELAY_INCREASE = 5; // Default retry delay increase in seconds - this.MAX_SSE_CONNECTIONS = 5; // Maximum number of active SSE connections this.downloadQueue = {}; // keyed by unique queueId this.currentConfig = {}; // Cache for current config // EventSource connections for SSE tracking this.sseConnections = {}; // keyed by prgFile/task_id - this.pendingForSSE = []; // Queue of entries waiting for SSE connections // Load the saved visible count (or default to 10) const storedVisibleCount = localStorage.getItem("downloadQueueVisibleCount"); @@ -36,9 +34,6 @@ class DownloadQueue { // Load the cached status info (object keyed by prgFile) this.queueCache = JSON.parse(localStorage.getItem("downloadQueueCache") || "{}"); - // Add a throttled update method to reduce UI updates - this.throttledUpdateQueue = this.throttle(this.updateQueueOrder.bind(this), 500); - // Wait for initDOM to complete before setting up event listeners and loading existing PRG files. this.initDOM().then(() => { this.initEventListeners(); @@ -46,25 +41,6 @@ class DownloadQueue { }); } - /* Utility method to throttle frequent function calls */ - throttle(func, delay) { - let lastCall = 0; - let timeout; - return function(...args) { - const now = Date.now(); - if (now - lastCall < delay) { - clearTimeout(timeout); - timeout = setTimeout(() => { - lastCall = now; - func(...args); - }, delay); - } else { - lastCall = now; - func(...args); - } - }; - } - /* DOM Management */ async initDOM() { // New HTML structure for the download queue. @@ -559,8 +535,6 @@ class DownloadQueue { updateQueueOrder() { const container = document.getElementById('queueItems'); const footer = document.getElementById('queueFooter'); - if (!container || !footer) return; - const entries = Object.values(this.downloadQueue); // Sorting: errors/canceled first (group 0), ongoing next (group 1), queued last (group 2, sorted by position). @@ -595,10 +569,7 @@ class DownloadQueue { const activeEntries = entries.filter(e => !e.hasEnded).length; // Update the header with detailed count - const countEl = document.getElementById('queueTotalCount'); - if (countEl) { - countEl.textContent = totalEntries; - } + document.getElementById('queueTotalCount').textContent = totalEntries; // Update subtitle with detailed stats if we have entries if (totalEntries > 0) { @@ -634,45 +605,57 @@ class DownloadQueue { } } - // Use DocumentFragment for better performance when updating the DOM - const fragment = document.createDocumentFragment(); + // Only recreate the container content if really needed + const visibleEntries = entries.slice(0, this.visibleCount); // Handle empty state if (entries.length === 0) { - const emptyDiv = document.createElement('div'); - emptyDiv.className = 'queue-empty'; - emptyDiv.innerHTML = ` - Empty queue -

Your download queue is empty

+ container.innerHTML = ` +
+ Empty queue +

Your download queue is empty

+
`; - container.innerHTML = ''; - container.appendChild(emptyDiv); } else { - // Get the visible entries slice - const visibleEntries = entries.slice(0, this.visibleCount); + // Get currently visible items + const visibleItems = Array.from(container.children).filter(el => el.classList.contains('queue-item')); - // Create a map of current DOM elements by queue ID - const existingElements = container.querySelectorAll('.queue-item'); - const existingElementMap = {}; - Array.from(existingElements).forEach(el => { - const cancelBtn = el.querySelector('.cancel-btn'); - if (cancelBtn) { - const queueId = cancelBtn.dataset.queueid; + // Update container more efficiently + if (visibleItems.length === 0) { + // No items in container, append all visible entries + container.innerHTML = ''; // Clear any empty state + visibleEntries.forEach(entry => { + // We no longer automatically start monitoring here + // Monitoring is now explicitly started by the methods that create downloads + container.appendChild(entry.element); + }); + } else { + // Container already has items, update more efficiently + + // Create a map of current DOM elements by queue ID + const existingElementMap = {}; + visibleItems.forEach(el => { + const queueId = el.querySelector('.cancel-btn')?.dataset.queueid; if (queueId) existingElementMap[queueId] = el; - } - }); - - // Add visible entries to the fragment in the correct order - visibleEntries.forEach(entry => { - fragment.appendChild(entry.element); - entry.isNew = false; - }); - - // Clear container and append the fragment - container.innerHTML = ''; - container.appendChild(fragment); + }); + + // Clear container to re-add in correct order + container.innerHTML = ''; + + // Add visible entries in correct order + visibleEntries.forEach(entry => { + // We no longer automatically start monitoring here + container.appendChild(entry.element); + + // Mark the entry as not new anymore + entry.isNew = false; + }); + } } + // We no longer start or stop monitoring based on visibility changes here + // This allows the explicit monitoring control from the download methods + // Update footer footer.innerHTML = ''; if (entries.length > this.visibleCount) { @@ -1418,17 +1401,6 @@ class DownloadQueue { // Close any existing connection this.closeSSEConnection(queueId); - // Check if we're at the connection limit - const activeConnectionCount = Object.keys(this.sseConnections).length; - if (activeConnectionCount >= this.MAX_SSE_CONNECTIONS) { - // Add to pending queue instead of creating connection now - if (!this.pendingForSSE.includes(queueId)) { - this.pendingForSSE.push(queueId); - console.log(`Queued SSE connection for ${queueId} (max connections reached)`); - } - return; - } - // Create a new EventSource connection try { const sse = new EventSource(`/api/prgs/stream/${entry.prgFile}`); @@ -1466,44 +1438,96 @@ class DownloadQueue { entry.status = data.status; }); - // Combined handler for all update-style events - const updateHandler = (event) => { + sse.addEventListener('update', (event) => { const data = JSON.parse(event.data); - const eventType = event.type; + console.log('SSE update event:', data); + this.handleSSEUpdate(queueId, data); + }); + + sse.addEventListener('progress', (event) => { + const data = JSON.parse(event.data); + console.log('SSE progress event:', data); + this.handleSSEUpdate(queueId, data); + }); + + // Add specific handler for track_complete events + sse.addEventListener('track_complete', (event) => { + const data = JSON.parse(event.data); + console.log('SSE track_complete event:', data); + console.log(`Current entry type: ${entry.type}`); - if (eventType === 'track_complete') { - // Special handling for track completions - console.log('SSE track_complete event:', data); + // Mark this status as a track completion + data.status = 'track_complete'; + + // Only update the log message without changing status colors + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + let message = `Completed track: ${data.title || data.track || 'Unknown'}`; + if (data.artist) message += ` by ${data.artist}`; + logElement.textContent = message; + } + + // For single track downloads, track_complete is a terminal state + if (entry.type === 'track') { + console.log('Single track download completed - terminating'); + // Mark the track as ended + entry.hasEnded = true; - // Mark this status as a track completion - data.status = 'track_complete'; + // Handle as a terminal state + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + } else { + console.log(`Album/playlist track completed - continuing download (type: ${entry.type})`); + // For albums/playlists, just update entry data without changing status + entry.lastStatus = data; + entry.lastUpdated = Date.now(); - // Only update the log message without changing status colors - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (logElement) { - let message = `Completed track: ${data.title || data.track || 'Unknown'}`; - if (data.artist) message += ` by ${data.artist}`; - logElement.textContent = message; - } + // Save to cache + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + } + }); + + // Also handle 'done' events which can come for individual tracks + sse.addEventListener('done', (event) => { + const data = JSON.parse(event.data); + console.log('SSE done event (individual track):', data); + console.log(`Current entry type: ${entry.type}`); + + // Only update the log message without changing status colors for album tracks + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + let message = `Completed track: ${data.song || data.title || data.track || 'Unknown'}`; + if (data.artist) message += ` by ${data.artist}`; + logElement.textContent = message; + } + + // For single track downloads, done is a terminal state + if (entry.type === 'track') { + console.log('Single track download completed (done) - terminating'); + // Mark the track as ended + entry.hasEnded = true; - // For single track downloads, track_complete is a terminal state - if (entry.type === 'track') { - entry.hasEnded = true; - setTimeout(() => { - this.closeSSEConnection(queueId); - this.cleanupEntry(queueId); - }, 5000); - } else { - // For albums/playlists, just update entry data without changing status - entry.lastStatus = data; - entry.lastUpdated = Date.now(); - this.queueCache[entry.prgFile] = data; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); - } - } else if (eventType === 'complete' || eventType === 'done') { - // Terminal state handling - console.log(`SSE ${eventType} event:`, data); + // Handle as a terminal state + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + } else if (data.song) { + console.log(`Album/playlist individual track done - continuing download (type: ${entry.type})`); + // For albums/playlists, just update entry data without changing status + data._isIndividualTrack = true; // Mark it for special handling in update logic + entry.lastStatus = data; + entry.lastUpdated = Date.now(); + // Save to cache + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + } else { + // This is a real done event for the entire album/playlist + console.log(`Entire ${entry.type} completed - finalizing`); this.handleSSEUpdate(queueId, data); entry.hasEnded = true; @@ -1511,38 +1535,91 @@ class DownloadQueue { this.closeSSEConnection(queueId); this.cleanupEntry(queueId); }, 5000); - } else if (eventType === 'error') { - // Error state handling - console.log('SSE error event:', data); - this.handleSSEUpdate(queueId, data); - entry.hasEnded = true; - this.closeSSEConnection(queueId); - } else if (eventType === 'end') { - // End event handling - console.log('SSE end event:', data); - - // Update with final status - this.handleSSEUpdate(queueId, data); - entry.hasEnded = true; - this.closeSSEConnection(queueId); - - if (data.status === 'complete' || data.status === 'done') { - setTimeout(() => this.cleanupEntry(queueId), 5000); - } - } else { - // Standard update handling - this.handleSSEUpdate(queueId, data); } - }; + }); - // Set up shared handler for all events - sse.addEventListener('update', updateHandler); - sse.addEventListener('progress', updateHandler); - sse.addEventListener('track_complete', updateHandler); - sse.addEventListener('complete', updateHandler); - sse.addEventListener('done', updateHandler); - sse.addEventListener('error', updateHandler); - sse.addEventListener('end', updateHandler); + sse.addEventListener('complete', (event) => { + const data = JSON.parse(event.data); + console.log('SSE complete event:', data); + console.log(`Current entry type: ${entry.type}`); + + // Skip terminal processing for track_complete status in albums/playlists + // Also skip for "done" status when it's for an individual track in an album/playlist + if ((data.status === 'track_complete' && entry.type !== 'track') || + (data.status === 'done' && data.song && entry.type !== 'track')) { + console.log(`Track ${data.status} in ${entry.type} download - continuing`); + // Don't process individual track completion events here + return; + } + + // Make sure the status is set to 'complete' for UI purposes + if (!data.status || data.status === '') { + data.status = 'complete'; + } + + // For track downloads, make sure we have a proper name + if (entry.type === 'track' && !data.name && entry.lastStatus) { + data.name = entry.lastStatus.name || ''; + data.artist = entry.lastStatus.artist || ''; + } + + this.handleSSEUpdate(queueId, data); + + // Always mark as terminal state for 'complete' events (except individual track completions in albums) + entry.hasEnded = true; + + // Close the connection after a short delay + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + }); + + sse.addEventListener('error', (event) => { + const data = JSON.parse(event.data); + console.log('SSE error event:', data); + this.handleSSEUpdate(queueId, data); + + // Mark the download as ended with error + entry.hasEnded = true; + + // Close the connection, but don't automatically clean up the entry + // to allow for potential retry + this.closeSSEConnection(queueId); + }); + + sse.addEventListener('end', (event) => { + const data = JSON.parse(event.data); + console.log('SSE end event:', data); + + // For track downloads, ensure we have the proper fields for UI display + if (entry.type === 'track') { + // If the end event doesn't have a name/artist, copy from lastStatus + if ((!data.name || !data.artist) && entry.lastStatus) { + data.name = data.name || entry.lastStatus.name || ''; + data.artist = data.artist || entry.lastStatus.artist || ''; + } + + // Force status to 'complete' if not provided + if (!data.status || data.status === '') { + data.status = 'complete'; + } + } + + // Update with final status + this.handleSSEUpdate(queueId, data); + + // Mark the download as ended + entry.hasEnded = true; + + // Close the connection + this.closeSSEConnection(queueId); + + // Clean up the entry after a delay if it's a success + if (data.status === 'complete' || data.status === 'done') { + setTimeout(() => this.cleanupEntry(queueId), 5000); + } + }); // Handle connection error sse.onerror = (error) => { @@ -1577,13 +1654,6 @@ class DownloadQueue { console.error('Error closing SSE connection:', error); } delete this.sseConnections[queueId]; - - // Now that we've freed a slot, check if any entries are waiting for an SSE connection - if (this.pendingForSSE.length > 0) { - const nextQueueId = this.pendingForSSE.shift(); - console.log(`Starting SSE connection for queued entry ${nextQueueId}`); - this.setupSSEConnection(nextQueueId); - } } } @@ -1599,6 +1669,8 @@ class DownloadQueue { return; } + console.log(`handleSSEUpdate for ${queueId} with type ${entry.type} and status ${data.status}`); + // Track completion is special - don't change visible status ONLY for albums/playlists // Check for both 'track_complete' and 'done' statuses for individual tracks in albums const isTrackCompletion = data.status === 'track_complete' || @@ -1619,46 +1691,29 @@ class DownloadQueue { entry.status = data.status; } - // Update status message in the UI - use a more efficient approach - this.updateEntryStatusUI(entry, data, skipStatusChange); + // Update status message in the UI + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + const statusMessage = this.getStatusMessage(data); + logElement.textContent = statusMessage; + } - // Save updated status to cache - debounce these writes to reduce storage operations - clearTimeout(entry.cacheWriteTimeout); - entry.cacheWriteTimeout = setTimeout(() => { - this.queueCache[entry.prgFile] = data; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); - }, 500); + // Apply appropriate CSS classes based on status only if not skipping status change + if (!skipStatusChange) { + this.applyStatusClasses(entry, data); + } + + // Save updated status to cache + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); // Special handling for error status if (data.status === 'error') { this.handleTerminalState(entry, queueId, data); } - // Throttle UI updates to improve performance with multiple downloads - this.throttledUpdateQueue(); - } - - // Optimized method to update the entry status in the UI - updateEntryStatusUI(entry, data, skipStatusChange) { - // First, update the log message text if the element exists - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (logElement) { - // Only modify the text content if it doesn't already have child elements - // (which would be the case for error states with retry buttons) - if (!logElement.querySelector('.error-message')) { - const statusMessage = this.getStatusMessage(data); - - // Only update DOM if the text has changed - if (logElement.textContent !== statusMessage) { - logElement.textContent = statusMessage; - } - } - } - - // Apply CSS classes for status indication only if we're not skipping status changes - if (!skipStatusChange) { - this.applyStatusClasses(entry, data); - } + // Update the queue order + this.updateQueueOrder(); } /* Close all active SSE connections */