I'm too stupid for sse

This commit is contained in:
coolgitternotin
2025-03-23 22:05:44 -06:00
parent 85e15fb851
commit b4c2c69b76
4 changed files with 312 additions and 596 deletions

View File

@@ -62,13 +62,28 @@ def get_prg_file(task_id):
"display_title": original_request.get("display_title", task_info.get("name", "")), "display_title": original_request.get("display_title", task_info.get("name", "")),
"display_type": original_request.get("display_type", task_info.get("type", "")), "display_type": original_request.get("display_type", task_info.get("type", "")),
"display_artist": original_request.get("display_artist", task_info.get("artist", "")), "display_artist": original_request.get("display_artist", task_info.get("artist", "")),
"status_count": status_count "status_count": status_count,
"task_id": task_id,
"timestamp": time.time()
} }
# Handle different status types # Handle different status types
if last_status: if last_status:
status_type = last_status.get("status", "unknown") status_type = last_status.get("status", "unknown")
# Set event type based on status (like in the previous SSE implementation)
event_type = "update"
if status_type in [ProgressState.COMPLETE, ProgressState.DONE]:
event_type = "complete"
elif status_type == ProgressState.TRACK_COMPLETE:
event_type = "track_complete"
elif status_type == ProgressState.ERROR:
event_type = "error"
elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]:
event_type = "progress"
response["event"] = event_type
# For terminal statuses (complete, error, cancelled) # For terminal statuses (complete, error, cancelled)
if status_type in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED]: if status_type in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED]:
response["progress_message"] = last_status.get("message", f"Download {status_type}") response["progress_message"] = last_status.get("message", f"Download {status_type}")
@@ -161,7 +176,9 @@ def get_prg_file(task_id):
"original_request": None, "original_request": None,
"display_title": "", "display_title": "",
"display_type": "", "display_type": "",
"display_artist": "" "display_artist": "",
"task_id": task_id,
"event": "unknown"
}) })
# Attempt to extract the original request from the first line. # Attempt to extract the original request from the first line.
@@ -223,7 +240,10 @@ def get_prg_file(task_id):
"original_request": original_request, "original_request": original_request,
"display_title": display_title, "display_title": display_title,
"display_type": display_type, "display_type": display_type,
"display_artist": display_artist "display_artist": display_artist,
"task_id": task_id,
"event": "unknown", # Old files don't have event types
"timestamp": time.time()
}) })
except FileNotFoundError: except FileNotFoundError:
abort(404, "Task or file not found") abort(404, "Task or file not found")
@@ -357,164 +377,3 @@ def cancel_task_endpoint(task_id):
}), 400 }), 400
except Exception as e: except Exception as e:
abort(500, f"An error occurred: {e}") abort(500, f"An error occurred: {e}")
@prgs_bp.route('/stream/<task_id>', methods=['GET'])
def stream_task_status(task_id):
"""
Stream task status updates as Server-Sent Events (SSE).
This endpoint opens a persistent connection and sends updates in real-time.
Args:
task_id: The ID of the task to stream updates for
"""
def generate():
try:
# Get initial task info to send as the opening message
task_info = get_task_info(task_id)
if not task_info:
# Check if this is an old PRG file
if os.path.exists(os.path.join(PRGS_DIR, task_id)):
# Return error - SSE not supported for old PRG files
yield f"event: error\ndata: {json.dumps({'error': 'SSE streaming not supported for old PRG files'})}\n\n"
return
else:
# Task not found
yield f"event: error\ndata: {json.dumps({'error': 'Task not found'})}\n\n"
return
# Get the original request and other basic info for the opening message
original_request = task_info.get("original_request", {})
download_type = task_info.get("type", "")
name = task_info.get("name", "")
artist = task_info.get("artist", "")
# Prepare the opening message with the required information
opening_data = {
"event": "start",
"task_id": task_id,
"type": download_type,
"name": name,
"artist": artist,
"url": original_request.get("url", ""),
"service": original_request.get("service", ""),
"timestamp": time.time(),
"status": "initializing",
"message": f"Starting {download_type} download: {name}" + (f" by {artist}" if artist else "")
}
# Send the opening message
yield f"event: start\ndata: {json.dumps(opening_data)}\n\n"
# Get existing status updates to catch up (most recent first)
all_updates = get_task_status(task_id)
# Sort updates by id
sorted_updates = sorted(all_updates, key=lambda x: x.get("id", 0))
# Send the most recent updates first (up to 10)
for i, update in enumerate(sorted_updates[-10:]):
# Add the task_id to each update message
update["task_id"] = task_id
yield f"event: update\ndata: {json.dumps(update)}\n\n"
# Keep track of the last update ID we've sent
last_sent_id = 0
if sorted_updates:
last_sent_id = sorted_updates[-1].get("id", 0)
# Create a Redis connection for subscribing to updates
redis_pubsub = redis_client.pubsub()
redis_pubsub.subscribe(f"task_updates:{task_id}")
# Hold the connection open and check for updates
last_heartbeat = time.time()
heartbeat_interval = 15 # Send heartbeat every 15 seconds
while True:
# Check for new updates via Redis Pub/Sub
message = redis_pubsub.get_message(timeout=1.0)
if message and message['type'] == 'message':
# Got a new message from Redis Pub/Sub
try:
data = json.loads(message['data'].decode('utf-8'))
status_id = data.get('status_id', 0)
# Fetch the actual status data
if status_id > last_sent_id:
all_status = redis_client.lrange(f"task:{task_id}:status", 0, -1)
for status_data in all_status:
try:
status = json.loads(status_data.decode('utf-8'))
if status.get("id") == status_id:
# Add the task_id to the update
status["task_id"] = task_id
# Choose the appropriate event type based on status
status_type = status.get("status", "")
event_type = "update"
if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE:
event_type = "complete"
elif status_type == ProgressState.TRACK_COMPLETE:
# Create a distinct event type for track completion to prevent UI issues
event_type = "track_complete"
elif status_type == ProgressState.ERROR:
event_type = "error"
elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]:
event_type = "progress"
# Send the update
yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n"
last_sent_id = status_id
break
except Exception as e:
logger.error(f"Error parsing status data: {e}")
except Exception as e:
logger.error(f"Error processing Redis Pub/Sub message: {e}")
# Check if task is complete, error, or cancelled - if so, end the stream
last_status = get_last_task_status(task_id)
if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]:
# Send final message
final_data = {
"event": "end",
"task_id": task_id,
"status": last_status.get("status"),
"message": last_status.get("message", "Download complete"),
"timestamp": time.time()
}
yield f"event: end\ndata: {json.dumps(final_data)}\n\n"
break
# Send a heartbeat periodically to keep the connection alive
now = time.time()
if now - last_heartbeat >= heartbeat_interval:
yield f"event: heartbeat\ndata: {json.dumps({'timestamp': now})}\n\n"
last_heartbeat = now
# Small sleep to prevent CPU spinning
time.sleep(0.1)
except Exception as e:
logger.error(f"Error in SSE stream: {e}")
yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
finally:
# Clean up: unsubscribe and close Redis Pub/Sub connection
if 'redis_pubsub' in locals():
try:
redis_pubsub.unsubscribe()
redis_pubsub.close()
except Exception as e:
logger.error(f"Error closing Redis Pub/Sub: {e}")
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no' # Disable Nginx buffering
}
)

View File

@@ -148,4 +148,16 @@ result_expires = 60 * 60 * 24 * 7 # 7 days
# Configure visibility timeout for task messages # Configure visibility timeout for task messages
broker_transport_options = { broker_transport_options = {
'visibility_timeout': 3600, # 1 hour 'visibility_timeout': 3600, # 1 hour
} 'fanout_prefix': True,
'fanout_patterns': True,
'priority_steps': [0, 3, 6, 9],
}
# Important broker connection settings
broker_connection_retry = True
broker_connection_retry_on_startup = True
broker_connection_max_retries = 10
broker_pool_limit = 10
worker_prefetch_multiplier = 1 # Process one task at a time per worker
worker_max_tasks_per_child = 100 # Restart worker after 100 tasks
worker_disable_rate_limits = False

View File

@@ -9,6 +9,7 @@ from pathlib import Path
import threading import threading
import queue import queue
import sys import sys
import uuid
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -102,16 +103,36 @@ class CeleryManager:
# Stop existing workers if running # Stop existing workers if running
if self.celery_process: if self.celery_process:
try: try:
logger.info("Stopping existing Celery workers...")
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM) os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM)
self.celery_process.wait(timeout=5) self.celery_process.wait(timeout=5)
except (subprocess.TimeoutExpired, ProcessLookupError): except (subprocess.TimeoutExpired, ProcessLookupError):
try: try:
logger.warning("Forcibly killing Celery workers with SIGKILL")
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL) os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL)
except ProcessLookupError: except ProcessLookupError:
pass pass
# Clear output threads list # Clear output threads list
self.output_threads = [] self.output_threads = []
# Wait a moment to ensure processes are terminated
time.sleep(2)
# Additional cleanup - find and kill any stray Celery processes
try:
# This runs a shell command to find and kill all celery processes
subprocess.run(
"ps aux | grep 'celery -A routes.utils.celery_tasks.celery_app worker' | grep -v grep | awk '{print $2}' | xargs -r kill -9",
shell=True,
stderr=subprocess.PIPE
)
logger.info("Killed any stray Celery processes")
# Wait a moment to ensure processes are terminated
time.sleep(1)
except Exception as e:
logger.error(f"Error during stray process cleanup: {e}")
# Start new workers with updated concurrency # Start new workers with updated concurrency
try: try:
@@ -127,13 +148,16 @@ class CeleryManager:
'--loglevel=info', '--loglevel=info',
f'--concurrency={new_worker_count}', f'--concurrency={new_worker_count}',
'-Q', 'downloads', '-Q', 'downloads',
# Add timestamp to Celery logs
'--logfile=-', # Output logs to stdout '--logfile=-', # Output logs to stdout
'--without-heartbeat', # Reduce log noise '--without-heartbeat', # Reduce log noise
'--without-gossip', # Reduce log noise '--without-gossip', # Reduce log noise
'--without-mingle' # Reduce log noise '--without-mingle', # Reduce log noise
# Add unique worker name to prevent conflicts
f'--hostname=worker@%h-{uuid.uuid4()}'
] ]
logger.info(f"Starting new Celery workers with command: {' '.join(cmd)}")
self.celery_process = subprocess.Popen( self.celery_process = subprocess.Popen(
cmd, cmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@@ -145,7 +169,23 @@ class CeleryManager:
) )
self.current_worker_count = new_worker_count self.current_worker_count = new_worker_count
logger.info(f"Started Celery workers with concurrency {new_worker_count}") logger.info(f"Started Celery workers with concurrency {new_worker_count}, PID: {self.celery_process.pid}")
# Verify the process started correctly
time.sleep(2)
if self.celery_process.poll() is not None:
# Process exited prematurely
stdout, stderr = "", ""
try:
stdout, stderr = self.celery_process.communicate(timeout=1)
except subprocess.TimeoutExpired:
pass
logger.error(f"Celery workers failed to start. Exit code: {self.celery_process.poll()}")
logger.error(f"Stdout: {stdout}")
logger.error(f"Stderr: {stderr}")
self.celery_process = None
raise RuntimeError("Celery workers failed to start")
# Start non-blocking output reader threads for both stdout and stderr # Start non-blocking output reader threads for both stdout and stderr
stdout_thread = threading.Thread( stdout_thread = threading.Thread(
@@ -166,6 +206,13 @@ class CeleryManager:
except Exception as e: except Exception as e:
logger.error(f"Error starting Celery workers: {e}") logger.error(f"Error starting Celery workers: {e}")
# In case of failure, make sure we don't leave orphaned processes
if self.celery_process and self.celery_process.poll() is None:
try:
os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL)
except (ProcessLookupError, OSError):
pass
self.celery_process = None
def _process_output_reader(self, pipe, stream_name): def _process_output_reader(self, pipe, stream_name):
"""Read and log output from the process""" """Read and log output from the process"""

View File

@@ -21,12 +21,24 @@ class DownloadQueue {
this.RETRY_DELAY = 5; // Default retry delay in seconds this.RETRY_DELAY = 5; // Default retry delay in seconds
this.RETRY_DELAY_INCREASE = 5; // Default retry delay increase in seconds this.RETRY_DELAY_INCREASE = 5; // Default retry delay increase in seconds
this.downloadQueue = {}; // keyed by unique queueId // Cache for queue items
this.currentConfig = {}; // Cache for current config this.queueCache = {};
// Queue entry objects
this.queueEntries = {};
// EventSource connections for SSE tracking // EventSource connections for SSE tracking
this.sseConnections = {}; // keyed by prgFile/task_id this.sseConnections = {};
// DOM elements cache
this.elements = {};
// Event handlers
this.eventHandlers = {};
// Configuration
this.config = null;
// Load the saved visible count (or default to 10) // Load the saved visible count (or default to 10)
const storedVisibleCount = localStorage.getItem("downloadQueueVisibleCount"); const storedVisibleCount = localStorage.getItem("downloadQueueVisibleCount");
this.visibleCount = storedVisibleCount ? parseInt(storedVisibleCount, 10) : 10; this.visibleCount = storedVisibleCount ? parseInt(storedVisibleCount, 10) : 10;
@@ -75,17 +87,17 @@ class DownloadQueue {
// Override the server value with locally persisted queue visibility (if present). // Override the server value with locally persisted queue visibility (if present).
const storedVisible = localStorage.getItem("downloadQueueVisible"); const storedVisible = localStorage.getItem("downloadQueueVisible");
if (storedVisible !== null) { if (storedVisible !== null) {
this.currentConfig.downloadQueueVisible = storedVisible === "true"; this.config.downloadQueueVisible = storedVisible === "true";
} }
const queueSidebar = document.getElementById('downloadQueue'); const queueSidebar = document.getElementById('downloadQueue');
queueSidebar.hidden = !this.currentConfig.downloadQueueVisible; queueSidebar.hidden = !this.config.downloadQueueVisible;
queueSidebar.classList.toggle('active', this.currentConfig.downloadQueueVisible); queueSidebar.classList.toggle('active', this.config.downloadQueueVisible);
// Initialize the queue icon based on sidebar visibility // Initialize the queue icon based on sidebar visibility
const queueIcon = document.getElementById('queueIcon'); const queueIcon = document.getElementById('queueIcon');
if (queueIcon) { if (queueIcon) {
if (this.currentConfig.downloadQueueVisible) { if (this.config.downloadQueueVisible) {
queueIcon.innerHTML = '<span class="queue-x">&times;</span>'; queueIcon.innerHTML = '<span class="queue-x">&times;</span>';
queueIcon.setAttribute('aria-expanded', 'true'); queueIcon.setAttribute('aria-expanded', 'true');
queueIcon.classList.add('queue-icon-active'); // Add red tint class queueIcon.classList.add('queue-icon-active'); // Add red tint class
@@ -111,8 +123,8 @@ class DownloadQueue {
const cancelAllBtn = document.getElementById('cancelAllBtn'); const cancelAllBtn = document.getElementById('cancelAllBtn');
if (cancelAllBtn) { if (cancelAllBtn) {
cancelAllBtn.addEventListener('click', () => { cancelAllBtn.addEventListener('click', () => {
for (const queueId in this.downloadQueue) { for (const queueId in this.queueEntries) {
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
if (!entry.hasEnded) { if (!entry.hasEnded) {
fetch(`/api/${entry.type}/download/cancel?prg_file=${entry.prgFile}`) fetch(`/api/${entry.type}/download/cancel?prg_file=${entry.prgFile}`)
.then(response => response.json()) .then(response => response.json())
@@ -191,7 +203,7 @@ class DownloadQueue {
try { try {
await this.loadConfig(); await this.loadConfig();
const updatedConfig = { ...this.currentConfig, downloadQueueVisible: isVisible }; const updatedConfig = { ...this.config, downloadQueueVisible: isVisible };
await this.saveConfig(updatedConfig); await this.saveConfig(updatedConfig);
this.dispatchEvent('queueVisibilityChanged', { visible: isVisible }); this.dispatchEvent('queueVisibilityChanged', { visible: isVisible });
} catch (error) { } catch (error) {
@@ -230,7 +242,7 @@ class DownloadQueue {
addDownload(item, type, prgFile, requestUrl = null, startMonitoring = false) { addDownload(item, type, prgFile, requestUrl = null, startMonitoring = false) {
const queueId = this.generateQueueId(); const queueId = this.generateQueueId();
const entry = this.createQueueEntry(item, type, prgFile, queueId, requestUrl); const entry = this.createQueueEntry(item, type, prgFile, queueId, requestUrl);
this.downloadQueue[queueId] = entry; this.queueEntries[queueId] = entry;
// Re-render and update which entries are processed. // Re-render and update which entries are processed.
this.updateQueueOrder(); this.updateQueueOrder();
@@ -245,7 +257,7 @@ class DownloadQueue {
/* Start processing the entry only if it is visible. */ /* Start processing the entry only if it is visible. */
async startEntryMonitoring(queueId) { async startEntryMonitoring(queueId) {
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
if (!entry || entry.hasEnded) return; if (!entry || entry.hasEnded) return;
// Don't restart monitoring if SSE connection already exists // Don't restart monitoring if SSE connection already exists
@@ -417,7 +429,7 @@ class DownloadQueue {
} }
// Store it in our queue object // Store it in our queue object
this.downloadQueue[queueId] = entry; this.queueEntries[queueId] = entry;
return entry; return entry;
} }
@@ -456,31 +468,42 @@ class DownloadQueue {
// Add a helper method to apply the right CSS classes based on status // Add a helper method to apply the right CSS classes based on status
applyStatusClasses(entry, status) { applyStatusClasses(entry, status) {
if (!entry || !entry.element || !status) return; // If no element, nothing to do
if (!entry.element) return;
// Clear existing status classes // Remove all status classes first
entry.element.classList.remove('queue-item--processing', 'queue-item--error', 'download-success'); entry.element.classList.remove(
'queued', 'initializing', 'downloading', 'processing',
'error', 'complete', 'cancelled', 'progress'
);
// Apply appropriate class based on status // Handle various status types
if (status.status === 'processing' || status.status === 'downloading' || status.status === 'progress') { switch (status) {
entry.element.classList.add('queue-item--processing'); case 'queued':
} else if (status.status === 'error') { entry.element.classList.add('queued');
entry.element.classList.add('queue-item--error'); break;
entry.hasEnded = true; case 'initializing':
} else if (status.status === 'complete' || status.status === 'done') { entry.element.classList.add('initializing');
entry.element.classList.add('download-success'); break;
entry.hasEnded = true; case 'processing':
// Distinguish 'track_complete' from final 'complete' state case 'downloading':
} else if (status.status === 'track_complete') { entry.element.classList.add('processing');
// Don't mark as ended, just show it's in progress break;
entry.element.classList.add('queue-item--processing'); case 'progress':
} else if (status.status === 'cancel' || status.status === 'interrupted') { case 'track_progress':
entry.hasEnded = true; case 'real_time':
} entry.element.classList.add('progress');
break;
// Special case for retry status case 'error':
if (status.retrying || status.status === 'retrying') { entry.element.classList.add('error');
entry.element.classList.add('queue-item--processing'); break;
case 'complete':
case 'done':
entry.element.classList.add('complete');
break;
case 'cancelled':
entry.element.classList.add('cancelled');
break;
} }
} }
@@ -495,7 +518,7 @@ class DownloadQueue {
if (data.status === "cancel") { if (data.status === "cancel") {
const logElement = document.getElementById(`log-${queueid}-${prg}`); const logElement = document.getElementById(`log-${queueid}-${prg}`);
logElement.textContent = "Download cancelled"; logElement.textContent = "Download cancelled";
const entry = this.downloadQueue[queueid]; const entry = this.queueEntries[queueid];
if (entry) { if (entry) {
entry.hasEnded = true; entry.hasEnded = true;
@@ -535,7 +558,7 @@ class DownloadQueue {
updateQueueOrder() { updateQueueOrder() {
const container = document.getElementById('queueItems'); const container = document.getElementById('queueItems');
const footer = document.getElementById('queueFooter'); const footer = document.getElementById('queueFooter');
const entries = Object.values(this.downloadQueue); const entries = Object.values(this.queueEntries);
// Sorting: errors/canceled first (group 0), ongoing next (group 1), queued last (group 2, sorted by position). // Sorting: errors/canceled first (group 0), ongoing next (group 1), queued last (group 2, sorted by position).
entries.sort((a, b) => { entries.sort((a, b) => {
@@ -673,7 +696,7 @@ class DownloadQueue {
/* Checks if an entry is visible in the queue display. */ /* Checks if an entry is visible in the queue display. */
isEntryVisible(queueId) { isEntryVisible(queueId) {
const entries = Object.values(this.downloadQueue); const entries = Object.values(this.queueEntries);
entries.sort((a, b) => { entries.sort((a, b) => {
const getGroup = (entry) => { const getGroup = (entry) => {
if (entry.lastStatus && (entry.lastStatus.status === "error" || entry.lastStatus.status === "cancel")) { if (entry.lastStatus && (entry.lastStatus.status === "error" || entry.lastStatus.status === "cancel")) {
@@ -702,7 +725,7 @@ class DownloadQueue {
} }
async cleanupEntry(queueId) { async cleanupEntry(queueId) {
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
if (entry) { if (entry) {
// Close any SSE connection // Close any SSE connection
this.closeSSEConnection(queueId); this.closeSSEConnection(queueId);
@@ -719,7 +742,7 @@ class DownloadQueue {
entry.element.remove(); entry.element.remove();
// Delete from in-memory queue // Delete from in-memory queue
delete this.downloadQueue[queueId]; delete this.queueEntries[queueId];
// Remove the cached info // Remove the cached info
if (this.queueCache[entry.prgFile]) { if (this.queueCache[entry.prgFile]) {
@@ -886,102 +909,26 @@ class DownloadQueue {
/* New Methods to Handle Terminal State, Inactivity and Auto-Retry */ /* New Methods to Handle Terminal State, Inactivity and Auto-Retry */
handleTerminalState(entry, queueId, progress) { handleTerminalState(entry, queueId, progress) {
// Mark the entry as ended
entry.hasEnded = true; entry.hasEnded = true;
clearInterval(entry.intervalId);
const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`);
if (!logElement) return;
// Save the terminal state to the cache for persistence across reloads // Update progress bar if available
this.queueCache[entry.prgFile] = progress; if (typeof progress === 'number') {
localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); const progressBar = entry.element.querySelector('.progress-bar');
if (progressBar) {
// Add status classes without triggering animations progressBar.style.width = '100%';
this.applyStatusClasses(entry, progress); progressBar.setAttribute('aria-valuenow', 100);
progressBar.classList.add('bg-success');
if (progress.status === 'error') {
const cancelBtn = entry.element.querySelector('.cancel-btn');
if (cancelBtn) {
cancelBtn.style.display = 'none';
} }
// Check if we're under the max retries threshold for auto-retry
const canRetry = entry.retryCount < this.MAX_RETRIES;
if (canRetry) {
logElement.innerHTML = `
<div class="error-message">${this.getStatusMessage(progress)}</div>
<div class="error-buttons">
<button class="close-error-btn" title="Close">&times;</button>
<button class="retry-btn" title="Retry">Retry</button>
</div>
`;
logElement.querySelector('.close-error-btn').addEventListener('click', () => {
if (entry.autoRetryInterval) {
clearInterval(entry.autoRetryInterval);
entry.autoRetryInterval = null;
}
this.cleanupEntry(queueId);
});
logElement.querySelector('.retry-btn').addEventListener('click', async () => {
if (entry.autoRetryInterval) {
clearInterval(entry.autoRetryInterval);
entry.autoRetryInterval = null;
}
this.retryDownload(queueId, logElement);
});
// Implement auto-retry if we have the original request URL
if (entry.requestUrl) {
const maxRetries = this.MAX_RETRIES;
if (entry.retryCount < maxRetries) {
// Calculate the delay based on retry count (exponential backoff)
const baseDelay = this.RETRY_DELAY || 5; // seconds, use server's retry delay or default to 5
const increase = this.RETRY_DELAY_INCREASE || 5;
const retryDelay = baseDelay + (entry.retryCount * increase);
let secondsLeft = retryDelay;
entry.autoRetryInterval = setInterval(() => {
secondsLeft--;
const errorMsgEl = logElement.querySelector('.error-message');
if (errorMsgEl) {
errorMsgEl.textContent = `Error: ${progress.message || 'Unknown error'}. Retrying in ${secondsLeft} seconds... (attempt ${entry.retryCount + 1}/${maxRetries})`;
}
if (secondsLeft <= 0) {
clearInterval(entry.autoRetryInterval);
entry.autoRetryInterval = null;
this.retryDownload(queueId, logElement);
}
}, 1000);
}
}
} else {
// Cannot be retried - just show the error
logElement.innerHTML = `
<div class="error-message">${this.getStatusMessage(progress)}</div>
<div class="error-buttons">
<button class="close-error-btn" title="Close">&times;</button>
</div>
`;
logElement.querySelector('.close-error-btn').addEventListener('click', () => {
this.cleanupEntry(queueId);
});
}
return;
} else if (progress.status === 'interrupted') {
logElement.textContent = 'Download was interrupted';
setTimeout(() => this.cleanupEntry(queueId), 5000);
} else if (progress.status === 'complete') {
logElement.textContent = 'Download completed successfully';
// Hide the cancel button
const cancelBtn = entry.element.querySelector('.cancel-btn');
if (cancelBtn) {
cancelBtn.style.display = 'none';
}
setTimeout(() => this.cleanupEntry(queueId), 5000);
} else {
logElement.textContent = this.getStatusMessage(progress);
setTimeout(() => this.cleanupEntry(queueId), 5000);
} }
// Stop polling
this.closeSSEConnection(queueId);
// Clean up after a delay
setTimeout(() => {
this.cleanupEntry(queueId);
}, 5000);
} }
handleInactivity(entry, queueId, logElement) { handleInactivity(entry, queueId, logElement) {
@@ -1003,7 +950,7 @@ class DownloadQueue {
} }
async retryDownload(queueId, logElement) { async retryDownload(queueId, logElement) {
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
if (!entry) return; if (!entry) return;
logElement.textContent = 'Retrying download...'; logElement.textContent = 'Retrying download...';
@@ -1075,8 +1022,8 @@ class DownloadQueue {
* Start monitoring for all active entries in the queue that are visible * Start monitoring for all active entries in the queue that are visible
*/ */
startMonitoringActiveEntries() { startMonitoringActiveEntries() {
for (const queueId in this.downloadQueue) { for (const queueId in this.queueEntries) {
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
// Only start monitoring if the entry is not in a terminal state and is visible // Only start monitoring if the entry is not in a terminal state and is visible
if (!entry.hasEnded && this.isEntryVisible(queueId) && !this.sseConnections[queueId]) { if (!entry.hasEnded && this.isEntryVisible(queueId) && !this.sseConnections[queueId]) {
this.setupSSEConnection(queueId); this.setupSSEConnection(queueId);
@@ -1170,7 +1117,7 @@ class DownloadQueue {
// Set up SSE connections for each entry // Set up SSE connections for each entry
for (const {queueId, prgFile} of queueIds) { for (const {queueId, prgFile} of queueIds) {
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
if (entry && !entry.hasEnded) { if (entry && !entry.hasEnded) {
this.setupSSEConnection(queueId); this.setupSSEConnection(queueId);
} }
@@ -1188,7 +1135,7 @@ class DownloadQueue {
await new Promise(resolve => setTimeout(resolve, 1000)); await new Promise(resolve => setTimeout(resolve, 1000));
// Set up SSE connection // Set up SSE connection
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
if (entry && !entry.hasEnded) { if (entry && !entry.hasEnded) {
this.setupSSEConnection(queueId); this.setupSSEConnection(queueId);
} }
@@ -1209,13 +1156,13 @@ class DownloadQueue {
async loadExistingPrgFiles() { async loadExistingPrgFiles() {
try { try {
// Clear existing queue entries first to avoid duplicates when refreshing // Clear existing queue entries first to avoid duplicates when refreshing
for (const queueId in this.downloadQueue) { for (const queueId in this.queueEntries) {
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
// Close any active connections // Close any active connections
this.closeSSEConnection(queueId); this.closeSSEConnection(queueId);
// Don't remove the entry from DOM - we'll rebuild it entirely // Don't remove the entry from DOM - we'll rebuild it entirely
delete this.downloadQueue[queueId]; delete this.queueEntries[queueId];
} }
const response = await fetch('/api/prgs/list'); const response = await fetch('/api/prgs/list');
@@ -1329,7 +1276,7 @@ class DownloadQueue {
this.applyStatusClasses(entry, prgData.last_line); this.applyStatusClasses(entry, prgData.last_line);
} }
this.downloadQueue[queueId] = entry; this.queueEntries[queueId] = entry;
} catch (error) { } catch (error) {
console.error("Error fetching details for", prgFile, error); console.error("Error fetching details for", prgFile, error);
} }
@@ -1353,23 +1300,23 @@ class DownloadQueue {
try { try {
const response = await fetch('/api/config'); const response = await fetch('/api/config');
if (!response.ok) throw new Error('Failed to fetch config'); if (!response.ok) throw new Error('Failed to fetch config');
this.currentConfig = await response.json(); this.config = await response.json();
// Update our retry constants from the server config // Update our retry constants from the server config
if (this.currentConfig.maxRetries !== undefined) { if (this.config.maxRetries !== undefined) {
this.MAX_RETRIES = this.currentConfig.maxRetries; this.MAX_RETRIES = this.config.maxRetries;
} }
if (this.currentConfig.retryDelaySeconds !== undefined) { if (this.config.retryDelaySeconds !== undefined) {
this.RETRY_DELAY = this.currentConfig.retryDelaySeconds; this.RETRY_DELAY = this.config.retryDelaySeconds;
} }
if (this.currentConfig.retry_delay_increase !== undefined) { if (this.config.retry_delay_increase !== undefined) {
this.RETRY_DELAY_INCREASE = this.currentConfig.retry_delay_increase; this.RETRY_DELAY_INCREASE = this.config.retry_delay_increase;
} }
console.log(`Loaded retry settings from config: max=${this.MAX_RETRIES}, delay=${this.RETRY_DELAY}, increase=${this.RETRY_DELAY_INCREASE}`); console.log(`Loaded retry settings from config: max=${this.MAX_RETRIES}, delay=${this.RETRY_DELAY}, increase=${this.RETRY_DELAY_INCREASE}`);
} catch (error) { } catch (error) {
console.error('Error loading config:', error); console.error('Error loading config:', error);
this.currentConfig = {}; this.config = {};
} }
} }
@@ -1381,7 +1328,7 @@ class DownloadQueue {
body: JSON.stringify(updatedConfig) body: JSON.stringify(updatedConfig)
}); });
if (!response.ok) throw new Error('Failed to save config'); if (!response.ok) throw new Error('Failed to save config');
this.currentConfig = await response.json(); this.config = await response.json();
} catch (error) { } catch (error) {
console.error('Error saving config:', error); console.error('Error saving config:', error);
throw error; throw error;
@@ -1390,330 +1337,181 @@ class DownloadQueue {
// Add a method to check if explicit filter is enabled // Add a method to check if explicit filter is enabled
isExplicitFilterEnabled() { isExplicitFilterEnabled() {
return !!this.currentConfig.explicitFilter; return !!this.config.explicitFilter;
} }
/* Sets up a Server-Sent Events connection for real-time status updates */ /* Sets up a Server-Sent Events connection for real-time status updates */
setupSSEConnection(queueId) { setupSSEConnection(queueId) {
const entry = this.downloadQueue[queueId]; console.log(`Setting up polling for ${queueId}`);
if (!entry || entry.hasEnded) return; const entry = this.queueEntries[queueId];
if (!entry || !entry.prgFile) {
console.warn(`No entry or prgFile for ${queueId}`);
return;
}
// Close any existing connection // Close any existing connection
this.closeSSEConnection(queueId); this.closeSSEConnection(queueId);
// Create a new EventSource connection
try { try {
const sse = new EventSource(`/api/prgs/stream/${entry.prgFile}`); // Immediately fetch initial data
this.fetchTaskStatus(queueId);
// Store the connection // Create a polling interval of 1 second
this.sseConnections[queueId] = sse; const intervalId = setInterval(() => {
this.fetchTaskStatus(queueId);
}, 1000);
// Set up event handlers // Store the interval ID for later cleanup
sse.addEventListener('start', (event) => { this.sseConnections[queueId] = intervalId;
const data = JSON.parse(event.data); } catch (error) {
console.log('SSE start event:', data); console.error(`Error creating polling for ${queueId}:`, error);
const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`);
const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); if (logElement) {
if (logElement) { logElement.textContent = `Error with download: ${error.message}`;
logElement.textContent = `Starting ${data.type} download: ${data.name}${data.artist ? ` by ${data.artist}` : ''}`; entry.element.classList.add('error');
} }
}
// IMPORTANT: Save the download type from the start event }
if (data.type) {
console.log(`Setting entry type to: ${data.type}`); async fetchTaskStatus(queueId) {
entry.type = data.type; const entry = this.queueEntries[queueId];
if (!entry || !entry.prgFile) {
// Update type display if element exists console.warn(`No entry or prgFile for ${queueId}`);
const typeElement = entry.element.querySelector('.type'); return;
if (typeElement) { }
typeElement.textContent = data.type.charAt(0).toUpperCase() + data.type.slice(1);
// Update type class without triggering animation try {
typeElement.className = `type ${data.type}`; const response = await fetch(`/api/prgs/${entry.prgFile}`);
} if (!response.ok) {
} throw new Error(`HTTP error: ${response.status}`);
}
// Store the initial status
entry.lastStatus = data;
entry.lastUpdated = Date.now();
entry.status = data.status;
});
sse.addEventListener('update', (event) => { const data = await response.json();
const data = JSON.parse(event.data);
console.log('SSE update event:', data);
this.handleSSEUpdate(queueId, data);
});
sse.addEventListener('progress', (event) => { // Initialize the download type if needed
const data = JSON.parse(event.data); if (data.type && !entry.type) {
console.log('SSE progress event:', data); console.log(`Setting entry type to: ${data.type}`);
this.handleSSEUpdate(queueId, data); entry.type = data.type;
});
// Update type display if element exists
const typeElement = entry.element.querySelector('.type');
if (typeElement) {
typeElement.textContent = data.type.charAt(0).toUpperCase() + data.type.slice(1);
// Update type class without triggering animation
typeElement.className = `type ${data.type}`;
}
}
// Add specific handler for track_complete events // Process the update
sse.addEventListener('track_complete', (event) => { this.handleSSEUpdate(queueId, data);
const data = JSON.parse(event.data);
console.log('SSE track_complete event:', data);
console.log(`Current entry type: ${entry.type}`);
// Mark this status as a track completion
data.status = 'track_complete';
// Only update the log message without changing status colors
const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`);
if (logElement) {
let message = `Completed track: ${data.title || data.track || 'Unknown'}`;
if (data.artist) message += ` by ${data.artist}`;
logElement.textContent = message;
}
// For single track downloads, track_complete is a terminal state
if (entry.type === 'track') {
console.log('Single track download completed - terminating');
// Mark the track as ended
entry.hasEnded = true;
// Handle as a terminal state
setTimeout(() => {
this.closeSSEConnection(queueId);
this.cleanupEntry(queueId);
}, 5000);
} else {
console.log(`Album/playlist track completed - continuing download (type: ${entry.type})`);
// For albums/playlists, just update entry data without changing status
entry.lastStatus = data;
entry.lastUpdated = Date.now();
// Save to cache
this.queueCache[entry.prgFile] = data;
localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache));
}
});
// Also handle 'done' events which can come for individual tracks // Handle terminal states
sse.addEventListener('done', (event) => { if (data.last_line && ['complete', 'error', 'cancelled', 'done'].includes(data.last_line.status)) {
const data = JSON.parse(event.data); console.log(`Terminal state detected: ${data.last_line.status} for ${queueId}`);
console.log('SSE done event (individual track):', data);
console.log(`Current entry type: ${entry.type}`);
// Only update the log message without changing status colors for album tracks
const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`);
if (logElement) {
let message = `Completed track: ${data.song || data.title || data.track || 'Unknown'}`;
if (data.artist) message += ` by ${data.artist}`;
logElement.textContent = message;
}
// For single track downloads, done is a terminal state
if (entry.type === 'track') {
console.log('Single track download completed (done) - terminating');
// Mark the track as ended
entry.hasEnded = true;
// Handle as a terminal state
setTimeout(() => {
this.closeSSEConnection(queueId);
this.cleanupEntry(queueId);
}, 5000);
} else if (data.song) {
console.log(`Album/playlist individual track done - continuing download (type: ${entry.type})`);
// For albums/playlists, just update entry data without changing status
data._isIndividualTrack = true; // Mark it for special handling in update logic
entry.lastStatus = data;
entry.lastUpdated = Date.now();
// Save to cache
this.queueCache[entry.prgFile] = data;
localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache));
} else {
// This is a real done event for the entire album/playlist
console.log(`Entire ${entry.type} completed - finalizing`);
this.handleSSEUpdate(queueId, data);
entry.hasEnded = true;
setTimeout(() => {
this.closeSSEConnection(queueId);
this.cleanupEntry(queueId);
}, 5000);
}
});
sse.addEventListener('complete', (event) => {
const data = JSON.parse(event.data);
console.log('SSE complete event:', data);
console.log(`Current entry type: ${entry.type}`);
// Skip terminal processing for track_complete status in albums/playlists
// Also skip for "done" status when it's for an individual track in an album/playlist
if ((data.status === 'track_complete' && entry.type !== 'track') ||
(data.status === 'done' && data.song && entry.type !== 'track')) {
console.log(`Track ${data.status} in ${entry.type} download - continuing`);
// Don't process individual track completion events here
return;
}
// Make sure the status is set to 'complete' for UI purposes
if (!data.status || data.status === '') {
data.status = 'complete';
}
// For track downloads, make sure we have a proper name
if (entry.type === 'track' && !data.name && entry.lastStatus) {
data.name = entry.lastStatus.name || '';
data.artist = entry.lastStatus.artist || '';
}
this.handleSSEUpdate(queueId, data);
// Always mark as terminal state for 'complete' events (except individual track completions in albums)
entry.hasEnded = true; entry.hasEnded = true;
// Close the connection after a short delay
setTimeout(() => { setTimeout(() => {
this.closeSSEConnection(queueId); this.closeSSEConnection(queueId);
this.cleanupEntry(queueId); this.cleanupEntry(queueId);
}, 5000); }, 5000);
}); }
sse.addEventListener('error', (event) => {
const data = JSON.parse(event.data);
console.log('SSE error event:', data);
this.handleSSEUpdate(queueId, data);
// Mark the download as ended with error
entry.hasEnded = true;
// Close the connection, but don't automatically clean up the entry
// to allow for potential retry
this.closeSSEConnection(queueId);
});
sse.addEventListener('end', (event) => {
const data = JSON.parse(event.data);
console.log('SSE end event:', data);
// For track downloads, ensure we have the proper fields for UI display
if (entry.type === 'track') {
// If the end event doesn't have a name/artist, copy from lastStatus
if ((!data.name || !data.artist) && entry.lastStatus) {
data.name = data.name || entry.lastStatus.name || '';
data.artist = data.artist || entry.lastStatus.artist || '';
}
// Force status to 'complete' if not provided
if (!data.status || data.status === '') {
data.status = 'complete';
}
}
// Update with final status
this.handleSSEUpdate(queueId, data);
// Mark the download as ended
entry.hasEnded = true;
// Close the connection
this.closeSSEConnection(queueId);
// Clean up the entry after a delay if it's a success
if (data.status === 'complete' || data.status === 'done') {
setTimeout(() => this.cleanupEntry(queueId), 5000);
}
});
// Handle connection error
sse.onerror = (error) => {
console.error('SSE connection error:', error);
// If the connection is closed, try to reconnect after a delay
if (sse.readyState === EventSource.CLOSED) {
console.log('SSE connection closed, will try to reconnect');
// Only attempt to reconnect if the entry is still active
if (entry && !entry.hasEnded) {
setTimeout(() => {
this.setupSSEConnection(queueId);
}, 5000);
}
}
};
return sse;
} catch (error) { } catch (error) {
console.error('Error setting up SSE connection:', error); console.error(`Error fetching status for ${queueId}:`, error);
return null;
// Show error in log
const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`);
if (logElement) {
logElement.textContent = `Error updating status: ${error.message}`;
}
} }
} }
/* Close an existing SSE connection */
closeSSEConnection(queueId) { closeSSEConnection(queueId) {
if (this.sseConnections[queueId]) { if (this.sseConnections[queueId]) {
console.log(`Stopping polling for ${queueId}`);
try { try {
this.sseConnections[queueId].close(); // Clear the interval instead of closing the SSE connection
clearInterval(this.sseConnections[queueId]);
} catch (error) { } catch (error) {
console.error('Error closing SSE connection:', error); console.error(`Error stopping polling for ${queueId}:`, error);
} }
delete this.sseConnections[queueId]; delete this.sseConnections[queueId];
} }
} }
/* Handle SSE update events */ /* Handle SSE update events */
handleSSEUpdate(queueId, data) { handleSSEUpdate(queueId, data) {
const entry = this.downloadQueue[queueId]; const entry = this.queueEntries[queueId];
if (!entry) return; if (!entry) {
console.warn(`No entry for ${queueId}`);
// Skip if the status hasn't changed
if (entry.lastStatus &&
entry.lastStatus.id === data.id &&
entry.lastStatus.status === data.status) {
return; return;
} }
console.log(`handleSSEUpdate for ${queueId} with type ${entry.type} and status ${data.status}`); // Get status from the appropriate location in the data structure
// For the new polling API, data is structured differently than the SSE events
let status, message, progress;
// Track completion is special - don't change visible status ONLY for albums/playlists // Extract the actual status data from the API response
// Check for both 'track_complete' and 'done' statuses for individual tracks in albums const statusData = data.last_line || {};
const isTrackCompletion = data.status === 'track_complete' || status = statusData.status || data.event || 'unknown';
(data.status === 'done' && data.song && entry.type !== 'track');
const isAlbumOrPlaylist = entry.type !== 'track'; // Anything that's not a track is treated as multi-track
const skipStatusChange = isTrackCompletion && isAlbumOrPlaylist;
if (skipStatusChange) { // For new polling API structure
console.log(`Skipping status change for ${data.status} in ${entry.type} download - track: ${data.song || data.track || 'Unknown'}`); if (data.progress_message) {
message = data.progress_message;
} else if (statusData.message) {
message = statusData.message;
} else {
message = `Status: ${status}`;
} }
// Update the entry // Track progress data
entry.lastStatus = data; if (data.progress_percent) {
progress = data.progress_percent;
} else if (statusData.overall_progress) {
progress = statusData.overall_progress;
} else if (statusData.progress) {
progress = statusData.progress;
}
// Update the log element with the latest message
const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`);
if (logElement && message) {
logElement.textContent = message;
}
// Set the proper status classes on the list item
this.applyStatusClasses(entry, status);
// Handle progress indicators
const progressBar = entry.element.querySelector('.progress-bar');
if (progressBar && typeof progress === 'number') {
progressBar.style.width = `${progress}%`;
progressBar.setAttribute('aria-valuenow', progress);
if (progress >= 100) {
progressBar.classList.add('bg-success');
} else {
progressBar.classList.remove('bg-success');
}
}
// Store the last status update
entry.lastStatus = {
...statusData,
message: message,
status: status
};
entry.lastUpdated = Date.now(); entry.lastUpdated = Date.now();
// Only update visible status if not skipping status change // Store in cache
if (!skipStatusChange) { this.queueCache[entry.prgFile] = entry.lastStatus;
entry.status = data.status;
}
// Update status message in the UI
const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`);
if (logElement) {
const statusMessage = this.getStatusMessage(data);
logElement.textContent = statusMessage;
}
// Apply appropriate CSS classes based on status only if not skipping status change
if (!skipStatusChange) {
this.applyStatusClasses(entry, data);
}
// Save updated status to cache
this.queueCache[entry.prgFile] = data;
localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache));
// Special handling for error status // Handle terminal states
if (data.status === 'error') { if (['complete', 'error', 'cancelled', 'done'].includes(status)) {
this.handleTerminalState(entry, queueId, data); this.handleTerminalState(entry, queueId, progress);
} }
// Update the queue order
this.updateQueueOrder();
} }
/* Close all active SSE connections */ /* Close all active SSE connections */