Finally implemented SSE

This commit is contained in:
Xoconoch
2025-08-02 12:46:36 -06:00
parent a80c4c846e
commit 9fdc0bde42
12 changed files with 1588 additions and 1298 deletions

View File

@@ -43,10 +43,12 @@ export function QueueProvider({ children }: { children: ReactNode }) {
const pollingIntervals = useRef<Record<string, number>>({});
const cancelledRemovalTimers = useRef<Record<string, number>>({});
// Smart polling state
const smartPollingInterval = useRef<number | null>(null);
const lastUpdateTimestamp = useRef<number>(0);
// SSE connection state
const sseConnection = useRef<EventSource | null>(null);
const isInitialized = useRef<boolean>(false);
const reconnectTimeoutRef = useRef<number | null>(null);
const maxReconnectAttempts = 5;
const reconnectAttempts = useRef<number>(0);
// Pagination state
const [currentPage, setCurrentPage] = useState(1);
@@ -150,97 +152,135 @@ export function QueueProvider({ children }: { children: ReactNode }) {
}, [scheduleCancelledTaskRemoval]);
const startSmartPolling = useCallback(() => {
if (smartPollingInterval.current) return; // Already polling
if (sseConnection.current) return; // Already connected
console.log("Starting smart polling");
console.log("Starting SSE connection");
const intervalId = window.setInterval(async () => {
const connectSSE = () => {
try {
const response = await apiClient.get<{
tasks: any[];
current_timestamp: number;
total_tasks: number;
active_tasks: number;
updated_count: number;
task_counts?: {
active: number;
queued: number;
retrying: number;
completed: number;
error: number;
cancelled: number;
skipped: number;
};
}>(`/prgs/updates?since=${lastUpdateTimestamp.current}&active_only=true`);
// Create SSE connection
const eventSource = new EventSource(`/api/prgs/stream?active_only=true`);
sseConnection.current = eventSource;
const { tasks: updatedTasks, current_timestamp, total_tasks, task_counts } = response.data;
// Update the last timestamp for next poll
lastUpdateTimestamp.current = current_timestamp;
// Update total tasks count - use active + queued if task_counts available
const calculatedTotal = task_counts ?
(task_counts.active + task_counts.queued) :
(total_tasks || 0);
setTotalTasks(calculatedTotal);
eventSource.onopen = () => {
console.log("SSE connection established");
reconnectAttempts.current = 0; // Reset reconnect attempts on successful connection
};
if (updatedTasks.length > 0) {
console.log(`Smart polling: ${updatedTasks.length} tasks updated (${response.data.active_tasks} active) out of ${response.data.total_tasks} total`);
// Create a map of updated tasks by task_id for efficient lookup
const updatedTasksMap = new Map(updatedTasks.map(task => [task.task_id, task]));
setItems(prev => {
// Update existing items with new data, and add any new active tasks
const updatedItems = prev.map(item => {
const updatedTaskData = updatedTasksMap.get(item.taskId || item.id);
if (updatedTaskData) {
return updateItemFromPrgs(item, updatedTaskData);
}
return item;
});
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
// Handle error events
if (data.error) {
console.error("SSE error event:", data.error);
toast.error("Connection error: " + data.error);
return;
}
// Only add new active tasks that aren't in our current items and aren't in terminal state
const currentTaskIds = new Set(prev.map(item => item.taskId || item.id));
const newActiveTasks = updatedTasks
.filter(task => {
const isNew = !currentTaskIds.has(task.task_id);
const status = task.last_line?.status_info?.status || task.last_line?.status || "unknown";
const isActive = isActiveTaskStatus(status);
const isTerminal = ["completed", "error", "cancelled", "skipped", "done"].includes(status);
return isNew && isActive && !isTerminal;
})
.map(task => {
const spotifyId = task.original_url?.split("/").pop() || "";
const baseItem: QueueItem = {
id: task.task_id,
taskId: task.task_id,
name: task.name || "Unknown",
type: task.download_type || "track",
spotifyId: spotifyId,
status: "initializing",
artist: task.artist,
};
return updateItemFromPrgs(baseItem, task);
const { tasks: updatedTasks, current_timestamp, total_tasks, task_counts } = data;
// Update total tasks count - use active + queued if task_counts available
const calculatedTotal = task_counts ?
(task_counts.active + task_counts.queued) :
(total_tasks || 0);
setTotalTasks(calculatedTotal);
if (updatedTasks && updatedTasks.length > 0) {
console.log(`SSE: ${updatedTasks.length} tasks updated (${data.active_tasks} active) out of ${data.total_tasks} total`);
// Create a map of updated tasks by task_id for efficient lookup
const updatedTasksMap = new Map(updatedTasks.map((task: any) => [task.task_id, task]));
setItems(prev => {
// Update existing items with new data, and add any new active tasks
const updatedItems = prev.map(item => {
const updatedTaskData = updatedTasksMap.get(item.taskId || item.id);
if (updatedTaskData) {
return updateItemFromPrgs(item, updatedTaskData);
}
return item;
});
// Only add new active tasks that aren't in our current items and aren't in terminal state
const currentTaskIds = new Set(prev.map(item => item.taskId || item.id));
const newActiveTasks = updatedTasks
.filter((task: any) => {
const isNew = !currentTaskIds.has(task.task_id);
const status = task.last_line?.status_info?.status || task.last_line?.status || "unknown";
const isActive = isActiveTaskStatus(status);
const isTerminal = ["completed", "error", "cancelled", "skipped", "done"].includes(status);
return isNew && isActive && !isTerminal;
})
.map((task: any) => {
const spotifyId = task.original_url?.split("/").pop() || "";
const baseItem: QueueItem = {
id: task.task_id,
taskId: task.task_id,
name: task.name || "Unknown",
type: task.download_type || "track",
spotifyId: spotifyId,
status: "initializing",
artist: task.artist,
};
return updateItemFromPrgs(baseItem, task);
});
return newActiveTasks.length > 0 ? [...newActiveTasks, ...updatedItems] : updatedItems;
});
}
} catch (error) {
console.error("Failed to parse SSE message:", error);
}
};
eventSource.onerror = (error) => {
console.error("SSE connection error:", error);
// Close the connection
eventSource.close();
sseConnection.current = null;
// Attempt to reconnect with exponential backoff
if (reconnectAttempts.current < maxReconnectAttempts) {
reconnectAttempts.current++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts.current - 1), 30000); // Max 30 seconds
console.log(`SSE reconnecting in ${delay}ms (attempt ${reconnectAttempts.current}/${maxReconnectAttempts})`);
reconnectTimeoutRef.current = window.setTimeout(() => {
connectSSE();
}, delay);
} else {
console.error("SSE max reconnection attempts reached");
toast.error("Connection lost. Please refresh the page.");
}
};
return newActiveTasks.length > 0 ? [...newActiveTasks, ...updatedItems] : updatedItems;
});
}
} catch (error) {
console.error("Smart polling failed:", error);
console.error("Failed to create SSE connection:", error);
toast.error("Failed to establish real-time connection");
}
}, 2000); // Poll every 2 seconds
};
smartPollingInterval.current = intervalId;
connectSSE();
}, [updateItemFromPrgs]);
const stopSmartPolling = useCallback(() => {
if (smartPollingInterval.current) {
console.log("Stopping smart polling");
clearInterval(smartPollingInterval.current);
smartPollingInterval.current = null;
if (sseConnection.current) {
console.log("Closing SSE connection");
sseConnection.current.close();
sseConnection.current = null;
}
// Clear any pending reconnection timeout
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
// Reset reconnection attempts
reconnectAttempts.current = 0;
}, []);
const loadMoreTasks = useCallback(async () => {
@@ -312,7 +352,7 @@ export function QueueProvider({ children }: { children: ReactNode }) {
const startPolling = useCallback(
(taskId: string) => {
// Legacy function - now just ensures smart polling is active
// Legacy function - now just ensures SSE connection is active
startSmartPolling();
},
[startSmartPolling],
@@ -373,10 +413,9 @@ export function QueueProvider({ children }: { children: ReactNode }) {
setTotalTasks(calculatedTotal);
// Set initial timestamp to current time
lastUpdateTimestamp.current = timestamp;
isInitialized.current = true;
// Start smart polling for real-time updates
// Start SSE connection for real-time updates
startSmartPolling();
} catch (error) {
console.error("Failed to fetch queue from backend:", error);
@@ -386,7 +425,7 @@ export function QueueProvider({ children }: { children: ReactNode }) {
fetchQueue();
// Cleanup function to stop polling when component unmounts
// Cleanup function to stop SSE connection when component unmounts
return () => {
stopSmartPolling();
// Clean up any remaining individual polling intervals (legacy cleanup)