refactor(api): replace direct celery tasks with queue manager in bulk add
This commit is contained in:
@@ -1,12 +1,15 @@
|
||||
import re
|
||||
from typing import List, Dict, Any
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from fastapi import APIRouter, Request, Depends
|
||||
from pydantic import BaseModel
|
||||
import logging
|
||||
|
||||
# Assuming these imports are available for queue management and Spotify info
|
||||
# Import authentication dependencies
|
||||
from routes.auth.middleware import require_auth_from_state, User
|
||||
|
||||
# Import queue management and Spotify info
|
||||
from routes.utils.get_info import get_spotify_info
|
||||
from routes.utils.celery_tasks import download_track, download_album, download_playlist
|
||||
from routes.utils.celery_queue_manager import download_queue_manager
|
||||
|
||||
router = APIRouter()
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -15,7 +18,7 @@ class BulkAddLinksRequest(BaseModel):
|
||||
links: List[str]
|
||||
|
||||
@router.post("/bulk-add-spotify-links")
|
||||
async def bulk_add_spotify_links(request: BulkAddLinksRequest):
|
||||
async def bulk_add_spotify_links(request: BulkAddLinksRequest, req: Request, current_user: User = Depends(require_auth_from_state)):
|
||||
added_count = 0
|
||||
failed_links = []
|
||||
total_links = len(request.links)
|
||||
@@ -32,6 +35,7 @@ async def bulk_add_spotify_links(request: BulkAddLinksRequest):
|
||||
|
||||
spotify_type = match.group(1)
|
||||
spotify_id = match.group(2)
|
||||
logger.debug(f"Extracted from link: spotify_type={spotify_type}, spotify_id={spotify_id}")
|
||||
|
||||
try:
|
||||
# Get basic info to confirm existence and get name/artist
|
||||
@@ -54,41 +58,27 @@ async def bulk_add_spotify_links(request: BulkAddLinksRequest):
|
||||
# Construct URL for the download task
|
||||
spotify_url = f"https://open.spotify.com/{spotify_type}/{spotify_id}"
|
||||
|
||||
# Add to Celery queue based on type
|
||||
if spotify_type == "track":
|
||||
download_track.delay(
|
||||
url=spotify_url,
|
||||
spotify_id=spotify_id,
|
||||
type=spotify_type,
|
||||
name=item_name,
|
||||
artist=artist_name,
|
||||
download_type="track",
|
||||
)
|
||||
elif spotify_type == "album":
|
||||
download_album.delay(
|
||||
url=spotify_url,
|
||||
spotify_id=spotify_id,
|
||||
type=spotify_type,
|
||||
name=item_name,
|
||||
artist=artist_name,
|
||||
download_type="album",
|
||||
)
|
||||
elif spotify_type == "playlist":
|
||||
download_playlist.delay(
|
||||
url=spotify_url,
|
||||
spotify_id=spotify_id,
|
||||
type=spotify_type,
|
||||
name=item_name,
|
||||
artist=artist_name,
|
||||
download_type="playlist",
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Unsupported Spotify type for download: {spotify_type} for link: {link}")
|
||||
failed_links.append(link)
|
||||
continue
|
||||
# Prepare task data for the queue manager
|
||||
task_data = {
|
||||
"download_type": spotify_type,
|
||||
"url": spotify_url,
|
||||
"name": item_name,
|
||||
"artist": artist_name,
|
||||
"spotify_id": spotify_id,
|
||||
"type": spotify_type,
|
||||
"username": current_user.username,
|
||||
"orig_request": dict(req.query_params),
|
||||
}
|
||||
|
||||
added_count += 1
|
||||
logger.debug(f"Added {added_count+1}/{total_links} {spotify_type} '{item_name}' ({spotify_id}) to queue.")
|
||||
# Add to download queue using the queue manager
|
||||
task_id = download_queue_manager.add_task(task_data)
|
||||
|
||||
if task_id:
|
||||
added_count += 1
|
||||
logger.debug(f"Added {added_count}/{total_links} {spotify_type} '{item_name}' ({spotify_id}) to queue with task_id: {task_id}.")
|
||||
else:
|
||||
logger.warning(f"Failed to add {spotify_type} '{item_name}' ({spotify_id}) to queue.")
|
||||
failed_links.append(link)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Spotify link {link}: {e}", exc_info=True)
|
||||
|
||||
Reference in New Issue
Block a user