added queue system

This commit is contained in:
cool.gitter.choco
2025-02-09 16:30:33 -06:00
parent e27dc15475
commit ed23ad0f48
7 changed files with 389 additions and 879 deletions

View File

@@ -1,126 +0,0 @@
import json
import traceback
from deezspot.easy_spoty import Spo
from deezspot.libutils.utils import get_ids, link_is_valid
from routes.utils.album import download_album # Assumes album.py is in routes/utils/
def log_json(message_dict):
"""Helper function to output a JSON-formatted log message."""
print(json.dumps(message_dict))
def get_artist_discography(url, album_type='album,single,compilation,appears_on'):
if not url:
message = "No artist URL provided."
log_json({"status": "error", "message": message})
raise ValueError(message)
try:
# Validate the URL (this function should raise an error if invalid).
link_is_valid(link=url)
except Exception as validation_error:
message = f"Link validation failed: {validation_error}"
log_json({"status": "error", "message": message})
raise ValueError(message)
try:
# Extract the artist ID from the URL.
artist_id = get_ids(url)
except Exception as id_error:
message = f"Failed to extract artist ID from URL: {id_error}"
log_json({"status": "error", "message": message})
raise ValueError(message)
try:
# Retrieve the discography using the artist ID.
discography = Spo.get_artist(artist_id, album_type=album_type)
return discography
except Exception as fetch_error:
message = f"An error occurred while fetching the discography: {fetch_error}"
log_json({"status": "error", "message": message})
raise
def download_artist_albums(service, artist_url, main, fallback=None, quality=None,
fall_quality=None, real_time=False, album_type='album,single,compilation,appears_on',
custom_dir_format="%ar_album%/%album%/%copyright%",
custom_track_format="%tracknum%. %music% - %artist%"):
try:
discography = get_artist_discography(artist_url, album_type=album_type)
except Exception as e:
log_json({"status": "error", "message": f"Error retrieving artist discography: {e}"})
raise
albums = discography.get('items', [])
# Extract artist name from the first album's artists as fallback.
artist_name = artist_url
if albums:
first_album = albums[0]
artists = first_album.get('artists', [])
if artists:
artist_name = artists[0].get('name', artist_url)
if not albums:
log_json({
"status": "done",
"type": "artist",
"artist": artist_name,
"album_type": album_type,
"message": "No albums found for the artist."
})
return
log_json({
"status": "initializing",
"type": "artist",
"artist": artist_name,
"total_albums": len(albums),
"album_type": album_type
})
for album in albums:
try:
album_url = album.get('external_urls', {}).get('spotify')
album_name = album.get('name', 'Unknown Album')
# Extract artist names if available.
artists = []
if "artists" in album:
artists = [artist.get("name", "Unknown") for artist in album["artists"]]
if not album_url:
log_json({
"status": "warning",
"type": "album",
"album": album_name,
"artist": artists,
"message": "No Spotify URL found; skipping."
})
continue
download_album(
service=service,
url=album_url,
main=main,
fallback=fallback,
quality=quality,
fall_quality=fall_quality,
real_time=real_time,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format
)
except Exception as album_error:
log_json({
"status": "error",
"type": "album",
"album": album.get('name', 'Unknown'),
"error": str(album_error)
})
traceback.print_exc()
# When everything has been processed, print the final status.
log_json({
"status": "done",
"type": "artist",
"artist": artist_name,
"album_type": album_type
})

256
routes/utils/queue.py Normal file
View File

@@ -0,0 +1,256 @@
import os
import sys
import json
import time
import string
import random
import traceback
import threading
from multiprocessing import Process
from queue import Queue, Empty
# ------------------------------------------------------------------------------
# Configuration
# ------------------------------------------------------------------------------
MAX_CONCURRENT_DL = 3 # maximum number of concurrent download processes
PRG_DIR = './prgs' # directory where .prg files will be stored
# ------------------------------------------------------------------------------
# Utility Functions and Classes
# ------------------------------------------------------------------------------
def generate_random_filename(length=6, extension=".prg"):
"""Generate a random filename with the given extension."""
chars = string.ascii_lowercase + string.digits
return ''.join(random.choice(chars) for _ in range(length)) + extension
class FlushingFileWrapper:
"""
A file wrapper that flushes after writing each line and
skips lines whose JSON content has a "type" of "track".
"""
def __init__(self, file):
self.file = file
def write(self, text):
for line in text.split('\n'):
line = line.strip()
if line and line.startswith('{'):
try:
obj = json.loads(line)
if obj.get("type") == "track":
continue # skip lines that represent track messages
except ValueError:
pass # not valid JSON; write the line as is
self.file.write(line + '\n')
self.file.flush()
def flush(self):
self.file.flush()
def run_download_task(task, prg_path):
"""
This function is executed in a separate process.
It opens the given prg file (in append mode), calls the appropriate download
function (album, track, or playlist), and writes a completion or error status
to the file.
"""
try:
# Determine which download function to use based on task type.
download_type = task.get("download_type")
if download_type == "album":
from routes.utils.album import download_album
download_func = download_album
elif download_type == "track":
from routes.utils.track import download_track
download_func = download_track
elif download_type == "playlist":
from routes.utils.playlist import download_playlist
download_func = download_playlist
else:
raise ValueError(f"Unsupported download type: {download_type}")
# Open the .prg file in append mode so as not to overwrite the queued lines.
with open(prg_path, 'a') as f:
flushing_file = FlushingFileWrapper(f)
original_stdout = sys.stdout
sys.stdout = flushing_file
try:
# Call the appropriate download function with parameters from the task.
download_func(
service=task.get("service"),
url=task.get("url"),
main=task.get("main"),
fallback=task.get("fallback"),
quality=task.get("quality"),
fall_quality=task.get("fall_quality"),
real_time=task.get("real_time", False),
custom_dir_format=task.get("custom_dir_format", "%ar_album%/%album%/%copyright%"),
custom_track_format=task.get("custom_track_format", "%tracknum%. %music% - %artist%")
)
flushing_file.write(json.dumps({"status": "complete"}) + "\n")
except Exception as e:
flushing_file.write(json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}) + "\n")
finally:
sys.stdout = original_stdout # restore original stdout
except Exception as e:
# If something fails even before opening the prg file properly.
with open(prg_path, 'a') as f:
error_data = json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
})
f.write(error_data + "\n")
# ------------------------------------------------------------------------------
# Download Queue Manager Class
# ------------------------------------------------------------------------------
class DownloadQueueManager:
"""
Manages a queue of download tasks, ensuring that no more than
MAX_CONCURRENT_DL downloads run concurrently.
"""
def __init__(self, max_concurrent=MAX_CONCURRENT_DL, prg_dir=PRG_DIR):
self.max_concurrent = max_concurrent
self.prg_dir = prg_dir
os.makedirs(self.prg_dir, exist_ok=True)
self.pending_tasks = Queue() # holds tasks waiting to run
self.running_downloads = {} # maps prg_filename -> Process instance
self.lock = threading.Lock() # protects access to running_downloads
self.worker_thread = threading.Thread(target=self.queue_worker, daemon=True)
self.running = False
def start(self):
"""Start the worker thread that monitors the queue."""
self.running = True
self.worker_thread.start()
def stop(self):
"""Stop the worker thread gracefully."""
self.running = False
self.worker_thread.join()
def add_task(self, task):
"""
Adds a new download task to the queue.
The task is expected to be a dictionary with all necessary parameters,
including a "download_type" key (album, track, or playlist).
A .prg file is created for progress logging with an initial two entries:
1. The original request (merged with the extra keys: type, name, artist)
2. A queued status entry (including type, name, artist, and the task's position in the queue)
Returns the generated prg filename so that the caller can later
check the status or request cancellation.
"""
prg_filename = generate_random_filename()
prg_path = os.path.join(self.prg_dir, prg_filename)
task['prg_path'] = prg_path
# Compute the overall position in the queue:
# position = (number of running tasks) + (number of pending tasks) + 1.
position = len(self.running_downloads) + self.pending_tasks.qsize() + 1
# Create and immediately write the initial entries to the .prg file.
try:
with open(prg_path, 'w') as f:
# Merge extra keys into the original request.
original_request = task.get("orig_request", {})
for key in ["type", "name", "artist"]:
if key in task and task[key] is not None:
original_request[key] = task[key]
f.write(json.dumps({"original_request": original_request}) + "\n")
# Write a queued status entry with the extra parameters and queue position.
queued_entry = {
"status": "queued",
"name": task.get("name"),
"type": task.get("type"),
"artist": task.get("artist"),
"position": position
}
f.write(json.dumps(queued_entry) + "\n")
except Exception as e:
print("Error writing prg file:", e)
self.pending_tasks.put((prg_filename, task))
return prg_filename
def cancel_task(self, prg_filename):
"""
Cancel a running download task by terminating its process.
If the task is found and alive, it is terminated and a cancellation
status is appended to its .prg file.
Returns a dictionary indicating the result.
"""
with self.lock:
process = self.running_downloads.get(prg_filename)
if process and process.is_alive():
process.terminate()
process.join()
del self.running_downloads[prg_filename]
prg_path = os.path.join(self.prg_dir, prg_filename)
try:
with open(prg_path, 'a') as f:
f.write(json.dumps({"status": "cancel"}) + "\n")
except Exception as e:
return {"error": f"Failed to write cancel status: {str(e)}"}
return {"status": "cancelled"}
else:
return {"error": "Task not found or already terminated"}
def queue_worker(self):
"""
Worker thread that continuously monitors the pending_tasks queue.
It cleans up finished download processes and starts new ones if the
number of running downloads is less than the allowed maximum.
"""
while self.running:
# First, clean up any finished processes.
with self.lock:
finished = []
for prg_filename, process in self.running_downloads.items():
if not process.is_alive():
finished.append(prg_filename)
for prg_filename in finished:
del self.running_downloads[prg_filename]
# Start new tasks if there is available capacity.
if len(self.running_downloads) < self.max_concurrent:
try:
prg_filename, task = self.pending_tasks.get(timeout=1)
except Empty:
time.sleep(0.5)
continue
prg_path = task.get('prg_path')
# Create and start a new process for the task.
p = Process(
target=run_download_task,
args=(task, prg_path)
)
with self.lock:
self.running_downloads[prg_filename] = p
p.start()
else:
# At capacity; sleep briefly.
time.sleep(1)
# Small sleep to avoid a tight loop.
time.sleep(0.1)
# ------------------------------------------------------------------------------
# Global Instance
# ------------------------------------------------------------------------------
# Create and start a global instance of the queue manager.
download_queue_manager = DownloadQueueManager()
download_queue_manager.start()