diff --git a/app.py b/app.py index 773652b..1d9da05 100755 --- a/app.py +++ b/app.py @@ -5,7 +5,6 @@ from routes.credentials import credentials_bp from routes.album import album_bp from routes.track import track_bp from routes.playlist import playlist_bp -from routes.artist import artist_bp from routes.prgs import prgs_bp import logging import time @@ -40,7 +39,6 @@ def create_app(): app.register_blueprint(album_bp, url_prefix='/api/album') app.register_blueprint(track_bp, url_prefix='/api/track') app.register_blueprint(playlist_bp, url_prefix='/api/playlist') - app.register_blueprint(artist_bp, url_prefix='/api/artist') app.register_blueprint(prgs_bp, url_prefix='/api/prgs') # Serve frontend diff --git a/routes/album.py b/routes/album.py index 27e1e7b..24bef6e 100755 --- a/routes/album.py +++ b/routes/album.py @@ -1,100 +1,14 @@ from flask import Blueprint, Response, request import json import os -import random -import string -import sys import traceback -from multiprocessing import Process +from routes.utils.queue import download_queue_manager album_bp = Blueprint('album', __name__) -# Global dictionary to keep track of running download processes. -download_processes = {} - -def generate_random_filename(length=6): - chars = string.ascii_lowercase + string.digits - return ''.join(random.choice(chars) for _ in range(length)) + '.album.prg' - -class FlushingFileWrapper: - def __init__(self, file): - self.file = file - - def write(self, text): - # Process each line separately. - for line in text.split('\n'): - line = line.strip() - # Only process non-empty lines that look like JSON objects. - if line and line.startswith('{'): - try: - obj = json.loads(line) - # Skip writing if the JSON object has a "type" of "track" - if obj.get("type") == "track": - continue - except ValueError: - # If not valid JSON, write the line as is. - pass - self.file.write(line + '\n') - self.file.flush() - - def flush(self): - self.file.flush() - -def download_task(service, url, main, fallback, quality, fall_quality, real_time, prg_path, orig_request, - custom_dir_format, custom_track_format): - """ - The download task writes out the original request data into the progress file - and then runs the album download. - """ - try: - from routes.utils.album import download_album - with open(prg_path, 'w') as f: - flushing_file = FlushingFileWrapper(f) - original_stdout = sys.stdout - sys.stdout = flushing_file # Redirect stdout - - # Write the original request data into the progress file. - try: - flushing_file.write(json.dumps({"original_request": orig_request}) + "\n") - except Exception as e: - flushing_file.write(json.dumps({ - "status": "error", - "message": f"Failed to write original request data: {str(e)}" - }) + "\n") - - try: - download_album( - service=service, - url=url, - main=main, - fallback=fallback, - quality=quality, - fall_quality=fall_quality, - real_time=real_time, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format - ) - flushing_file.write(json.dumps({"status": "complete"}) + "\n") - except Exception as e: - error_data = json.dumps({ - "status": "error", - "message": str(e), - "traceback": traceback.format_exc() - }) - flushing_file.write(error_data + "\n") - finally: - sys.stdout = original_stdout # Restore stdout - except Exception as e: - with open(prg_path, 'w') as f: - error_data = json.dumps({ - "status": "error", - "message": str(e), - "traceback": traceback.format_exc() - }) - f.write(error_data + "\n") - @album_bp.route('/download', methods=['GET']) def handle_download(): + # Retrieve parameters from the request. service = request.args.get('service') url = request.args.get('url') main = request.args.get('main') @@ -102,7 +16,7 @@ def handle_download(): quality = request.args.get('quality') fall_quality = request.args.get('fall_quality') - # Retrieve and normalize the real_time parameter; defaults to False. + # Normalize the real_time parameter; default to False. real_time_arg = request.args.get('real_time', 'false') real_time = real_time_arg.lower() in ['true', '1', 'yes'] @@ -110,7 +24,7 @@ def handle_download(): custom_dir_format = request.args.get('custom_dir_format', "%ar_album%/%album%") custom_track_format = request.args.get('custom_track_format', "%tracknum%. %music% - %artist%") - # Sanitize main and fallback to prevent directory traversal + # Sanitize main and fallback to prevent directory traversal. if main: main = os.path.basename(main) if fallback: @@ -123,11 +37,11 @@ def handle_download(): mimetype='application/json' ) - # Validate credentials based on service and fallback + # Validate credentials based on service and fallback. try: if service == 'spotify': if fallback: - # Validate Deezer main and Spotify fallback credentials + # Validate Deezer main and Spotify fallback credentials. deezer_creds_path = os.path.abspath(os.path.join('./creds/deezer', main, 'credentials.json')) if not os.path.isfile(deezer_creds_path): return Response( @@ -143,7 +57,7 @@ def handle_download(): mimetype='application/json' ) else: - # Validate Spotify main credentials + # Validate Spotify main credentials. spotify_creds_path = os.path.abspath(os.path.join('./creds/spotify', main, 'credentials.json')) if not os.path.isfile(spotify_creds_path): return Response( @@ -152,7 +66,7 @@ def handle_download(): mimetype='application/json' ) elif service == 'deezer': - # Validate Deezer main credentials + # Validate Deezer main credentials. deezer_creds_path = os.path.abspath(os.path.join('./creds/deezer', main, 'credentials.json')) if not os.path.isfile(deezer_creds_path): return Response( @@ -173,36 +87,32 @@ def handle_download(): mimetype='application/json' ) - filename = generate_random_filename() - prg_dir = './prgs' - os.makedirs(prg_dir, exist_ok=True) - prg_path = os.path.join(prg_dir, filename) + # Build the task dictionary. + # Note: The new keys "type", "name", and "artist" will be merged into the original_request + # message by the queue handler. + task = { + "download_type": "album", # tells the queue handler which download function to call + "service": service, + "url": url, + "main": main, + "fallback": fallback, + "quality": quality, + "fall_quality": fall_quality, + "real_time": real_time, + "custom_dir_format": custom_dir_format, + "custom_track_format": custom_track_format, + "orig_request": request.args.to_dict(), + # New additional parameters: + "type": "album", + "name": request.args.get('name'), + "artist": request.args.get('artist') + } + + # Add the task to the queue and get the generated prg filename. + prg_filename = download_queue_manager.add_task(task) - # Capture the original request parameters as a dictionary. - orig_request = request.args.to_dict() - - # Create and start the download process, and track it in the global dictionary. - process = Process( - target=download_task, - args=( - service, - url, - main, - fallback, - quality, - fall_quality, - real_time, - prg_path, - orig_request, - custom_dir_format, - custom_track_format - ) - ) - process.start() - download_processes[filename] = process - return Response( - json.dumps({"prg_file": filename}), + json.dumps({"prg_file": prg_filename}), status=202, mimetype='application/json' ) @@ -210,7 +120,7 @@ def handle_download(): @album_bp.route('/download/cancel', methods=['GET']) def cancel_download(): """ - Cancel a running download process by its process id (prg file name). + Cancel a running download process by its prg file name. """ prg_file = request.args.get('prg_file') if not prg_file: @@ -220,42 +130,16 @@ def cancel_download(): mimetype='application/json' ) - process = download_processes.get(prg_file) - prg_dir = './prgs' - prg_path = os.path.join(prg_dir, prg_file) + # Use the queue manager's cancellation method. + result = download_queue_manager.cancel_task(prg_file) + status_code = 200 if result.get("status") == "cancelled" else 404 - if process and process.is_alive(): - # Terminate the running process - process.terminate() - process.join() # Wait for process termination - # Remove it from our global tracking dictionary - del download_processes[prg_file] + return Response( + json.dumps(result), + status=status_code, + mimetype='application/json' + ) - # Append a cancellation status to the log file - try: - with open(prg_path, 'a') as f: - f.write(json.dumps({"status": "cancel"}) + "\n") - except Exception as e: - # If writing fails, we log the error in the response. - return Response( - json.dumps({"error": f"Failed to write cancel status to file: {str(e)}"}), - status=500, - mimetype='application/json' - ) - - return Response( - json.dumps({"status": "cancel"}), - status=200, - mimetype='application/json' - ) - else: - return Response( - json.dumps({"error": "Process not found or already terminated"}), - status=404, - mimetype='application/json' - ) - -# NEW ENDPOINT: Get Album Information @album_bp.route('/info', methods=['GET']) def get_album_info(): """ @@ -271,9 +155,8 @@ def get_album_info(): ) try: - # Import the get_spotify_info function from the utility module. + # Import and use the get_spotify_info function from the utility module. from routes.utils.get_info import get_spotify_info - # Call the function with the album type. album_info = get_spotify_info(spotify_id, "album") return Response( json.dumps(album_info), diff --git a/routes/artist.py b/routes/artist.py deleted file mode 100644 index a083ee5..0000000 --- a/routes/artist.py +++ /dev/null @@ -1,309 +0,0 @@ -#!/usr/bin/env python3 -""" -Artist endpoint blueprint. -""" - -from flask import Blueprint, Response, request -import json -import os -import random -import string -import sys -import traceback -from multiprocessing import Process - -artist_bp = Blueprint('artist', __name__) - -# Global dictionary to keep track of running download processes. -download_processes = {} - -def generate_random_filename(length=6): - chars = string.ascii_lowercase + string.digits - return ''.join(random.choice(chars) for _ in range(length)) + '.artist.prg' - -class FlushingFileWrapper: - def __init__(self, file): - self.file = file - - def write(self, text): - # Only write lines that start with '{' - for line in text.split('\n'): - line = line.strip() - if line and line.startswith('{'): - try: - obj = json.loads(line) - # Skip writing if the JSON object has type "track" - if obj.get("type") == "track": - continue - except ValueError: - # If not valid JSON, write the line as is. - pass - self.file.write(line + '\n') - self.file.flush() - - def flush(self): - self.file.flush() - -def download_artist_task(service, artist_url, main, fallback, quality, fall_quality, real_time, - album_type, prg_path, orig_request, custom_dir_format, custom_track_format): - """ - This function wraps the call to download_artist_albums, writes the original - request data to the progress file, and then writes JSON status updates. - """ - try: - from routes.utils.artist import download_artist_albums - with open(prg_path, 'w') as f: - flushing_file = FlushingFileWrapper(f) - original_stdout = sys.stdout - sys.stdout = flushing_file # Redirect stdout to our flushing file wrapper - - # Write the original request data to the progress file. - try: - flushing_file.write(json.dumps({"original_request": orig_request}) + "\n") - except Exception as e: - flushing_file.write(json.dumps({ - "status": "error", - "message": f"Failed to write original request data: {str(e)}" - }) + "\n") - - try: - download_artist_albums( - service=service, - artist_url=artist_url, - main=main, - fallback=fallback, - quality=quality, - fall_quality=fall_quality, - real_time=real_time, - album_type=album_type, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format - ) - flushing_file.write(json.dumps({"status": "complete"}) + "\n") - except Exception as e: - error_data = json.dumps({ - "status": "error", - "message": str(e), - "traceback": traceback.format_exc() - }) - flushing_file.write(error_data + "\n") - finally: - sys.stdout = original_stdout # Restore stdout - except Exception as e: - with open(prg_path, 'w') as f: - error_data = json.dumps({ - "status": "error", - "message": str(e), - "traceback": traceback.format_exc() - }) - f.write(error_data + "\n") - -@artist_bp.route('/download', methods=['GET']) -def handle_artist_download(): - """ - Starts the artist album download process. - Expected query parameters: - - artist_url: string (e.g., a Spotify artist URL) - - service: string (e.g., "deezer" or "spotify") - - main: string (e.g., "MX") - - fallback: string (optional, e.g., "JP") - - quality: string (e.g., "MP3_128") - - fall_quality: string (optional, e.g., "HIGH") - - real_time: bool (e.g., "true" or "false") - - album_type: string(s); one or more of "album", "single", "appears_on", "compilation" (if multiple, comma-separated) - - custom_dir_format: string (optional, default: "%ar_album%/%album%/%copyright%") - - custom_track_format: string (optional, default: "%tracknum%. %music% - %artist%") - """ - service = request.args.get('service') - artist_url = request.args.get('artist_url') - main = request.args.get('main') - fallback = request.args.get('fallback') - quality = request.args.get('quality') - fall_quality = request.args.get('fall_quality') - album_type = request.args.get('album_type') - real_time_arg = request.args.get('real_time', 'false') - real_time = real_time_arg.lower() in ['true', '1', 'yes'] - - # New query parameters for custom formatting. - custom_dir_format = request.args.get('custom_dir_format', "%ar_album%/%album%") - custom_track_format = request.args.get('custom_track_format', "%tracknum%. %music% - %artist%") - - # Sanitize main and fallback to prevent directory traversal - if main: - main = os.path.basename(main) - if fallback: - fallback = os.path.basename(fallback) - - # Check for required parameters. - if not all([service, artist_url, main, quality, album_type]): - return Response( - json.dumps({"error": "Missing parameters"}), - status=400, - mimetype='application/json' - ) - - # Validate credentials based on the selected service. - try: - if service == 'spotify': - if fallback: - # When using Spotify as the main service with a fallback, assume main credentials for Deezer and fallback for Spotify. - deezer_creds_path = os.path.abspath(os.path.join('./creds/deezer', main, 'credentials.json')) - if not os.path.isfile(deezer_creds_path): - return Response( - json.dumps({"error": "Invalid Deezer credentials directory"}), - status=400, - mimetype='application/json' - ) - spotify_fallback_path = os.path.abspath(os.path.join('./creds/spotify', fallback, 'credentials.json')) - if not os.path.isfile(spotify_fallback_path): - return Response( - json.dumps({"error": "Invalid Spotify fallback credentials directory"}), - status=400, - mimetype='application/json' - ) - else: - # Validate Spotify main credentials. - spotify_creds_path = os.path.abspath(os.path.join('./creds/spotify', main, 'credentials.json')) - if not os.path.isfile(spotify_creds_path): - return Response( - json.dumps({"error": "Invalid Spotify credentials directory"}), - status=400, - mimetype='application/json' - ) - elif service == 'deezer': - # Validate Deezer main credentials. - deezer_creds_path = os.path.abspath(os.path.join('./creds/deezer', main, 'credentials.json')) - if not os.path.isfile(deezer_creds_path): - return Response( - json.dumps({"error": "Invalid Deezer credentials directory"}), - status=400, - mimetype='application/json' - ) - else: - return Response( - json.dumps({"error": "Unsupported service"}), - status=400, - mimetype='application/json' - ) - except Exception as e: - return Response( - json.dumps({"error": f"Credential validation failed: {str(e)}"}), - status=500, - mimetype='application/json' - ) - - # Create a random filename for the progress file. - filename = generate_random_filename() - prg_dir = './prgs' - os.makedirs(prg_dir, exist_ok=True) - prg_path = os.path.join(prg_dir, filename) - - # Capture the original request parameters as a dictionary. - orig_request = request.args.to_dict() - - # Create and start the download process. - process = Process( - target=download_artist_task, - args=( - service, - artist_url, - main, - fallback, - quality, - fall_quality, - real_time, - album_type, - prg_path, - orig_request, - custom_dir_format, - custom_track_format - ) - ) - process.start() - download_processes[filename] = process - - return Response( - json.dumps({"prg_file": filename}), - status=202, - mimetype='application/json' - ) - -@artist_bp.route('/download/cancel', methods=['GET']) -def cancel_artist_download(): - """ - Cancel a running artist download process by its prg file name. - """ - prg_file = request.args.get('prg_file') - if not prg_file: - return Response( - json.dumps({"error": "Missing process id (prg_file) parameter"}), - status=400, - mimetype='application/json' - ) - - process = download_processes.get(prg_file) - prg_dir = './prgs' - prg_path = os.path.join(prg_dir, prg_file) - - if process and process.is_alive(): - process.terminate() - process.join() # Wait for termination - del download_processes[prg_file] - - try: - with open(prg_path, 'a') as f: - f.write(json.dumps({"status": "cancel"}) + "\n") - except Exception as e: - return Response( - json.dumps({"error": f"Failed to write cancel status to file: {str(e)}"}), - status=500, - mimetype='application/json' - ) - - return Response( - json.dumps({"status": "cancel"}), - status=200, - mimetype='application/json' - ) - else: - return Response( - json.dumps({"error": "Process not found or already terminated"}), - status=404, - mimetype='application/json' - ) - -# NEW ENDPOINT: Get Artist Information -@artist_bp.route('/info', methods=['GET']) -def get_artist_info(): - """ - Retrieve Spotify artist metadata given a Spotify ID. - Expects a query parameter 'id' that contains the Spotify artist ID. - """ - spotify_id = request.args.get('id') - if not spotify_id: - return Response( - json.dumps({"error": "Missing parameter: id"}), - status=400, - mimetype='application/json' - ) - - try: - # Import the get_spotify_info function from the utility module. - from routes.utils.get_info import get_spotify_info - # Call the function with the artist type. - artist_info = get_spotify_info(spotify_id, "artist") - return Response( - json.dumps(artist_info), - status=200, - mimetype='application/json' - ) - except Exception as e: - error_data = { - "error": str(e), - "traceback": traceback.format_exc() - } - return Response( - json.dumps(error_data), - status=500, - mimetype='application/json' - ) diff --git a/routes/playlist.py b/routes/playlist.py index 1bb6e4a..1cd50a5 100755 --- a/routes/playlist.py +++ b/routes/playlist.py @@ -2,97 +2,13 @@ from flask import Blueprint, Response, request import os import json import traceback -from deezspot.spotloader import SpoLogin -from deezspot.deezloader import DeeLogin -from multiprocessing import Process -import random -import string -import sys +from routes.utils.queue import download_queue_manager playlist_bp = Blueprint('playlist', __name__) -# Global dictionary to track running playlist download processes -playlist_processes = {} - -def generate_random_filename(length=6): - chars = string.ascii_lowercase + string.digits - return ''.join(random.choice(chars) for _ in range(length)) + '.playlist.prg' - -class FlushingFileWrapper: - def __init__(self, file): - self.file = file - - def write(self, text): - for line in text.split('\n'): - line = line.strip() - # Only process non-empty lines that start with '{' - if line and line.startswith('{'): - try: - # Try to parse the line as JSON - obj = json.loads(line) - # If the object has a "type" key with the value "track", skip writing it. - if obj.get("type") == "track": - continue - except ValueError: - # If the line isn't valid JSON, we don't filter it. - pass - self.file.write(line + '\n') - self.file.flush() - - def flush(self): - self.file.flush() - -def download_task(service, url, main, fallback, quality, fall_quality, real_time, - prg_path, orig_request, custom_dir_format, custom_track_format): - try: - from routes.utils.playlist import download_playlist - with open(prg_path, 'w') as f: - flushing_file = FlushingFileWrapper(f) - original_stdout = sys.stdout - sys.stdout = flushing_file # Process-specific stdout - - # Write the original request data into the progress file. - try: - flushing_file.write(json.dumps({"original_request": orig_request}) + "\n") - except Exception as e: - flushing_file.write(json.dumps({ - "status": "error", - "message": f"Failed to write original request data: {str(e)}" - }) + "\n") - - try: - download_playlist( - service=service, - url=url, - main=main, - fallback=fallback, - quality=quality, - fall_quality=fall_quality, - real_time=real_time, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format - ) - flushing_file.write(json.dumps({"status": "complete"}) + "\n") - except Exception as e: - error_data = json.dumps({ - "status": "error", - "message": str(e), - "traceback": traceback.format_exc() - }) - flushing_file.write(error_data + "\n") - finally: - sys.stdout = original_stdout # Restore original stdout - except Exception as e: - with open(prg_path, 'w') as f: - error_data = json.dumps({ - "status": "error", - "message": str(e), - "traceback": traceback.format_exc() - }) - f.write(error_data + "\n") - @playlist_bp.route('/download', methods=['GET']) def handle_download(): + # Retrieve parameters from the request. service = request.args.get('service') url = request.args.get('url') main = request.args.get('main') @@ -100,15 +16,20 @@ def handle_download(): quality = request.args.get('quality') fall_quality = request.args.get('fall_quality') - # Retrieve the real_time parameter from the request query string. - # Here, if real_time is provided as "true", "1", or "yes" (case-insensitive) it will be interpreted as True. - real_time_str = request.args.get('real_time', 'false').lower() - real_time = real_time_str in ['true', '1', 'yes'] + # Normalize the real_time parameter; default to False. + real_time_arg = request.args.get('real_time', 'false') + real_time = real_time_arg.lower() in ['true', '1', 'yes'] - # New custom formatting parameters, with defaults. + # New custom formatting parameters (with defaults) custom_dir_format = request.args.get('custom_dir_format', "%ar_album%/%album%/%copyright%") custom_track_format = request.args.get('custom_track_format', "%tracknum%. %music% - %artist%") + # Sanitize main and fallback to prevent directory traversal. + if main: + main = os.path.basename(main) + if fallback: + fallback = os.path.basename(fallback) + if not all([service, url, main]): return Response( json.dumps({"error": "Missing parameters"}), @@ -116,27 +37,31 @@ def handle_download(): mimetype='application/json' ) - filename = generate_random_filename() - prg_dir = './prgs' - os.makedirs(prg_dir, exist_ok=True) - prg_path = os.path.join(prg_dir, filename) + # Build the task dictionary. + # Note: the key "download_type" tells the queue handler which download function to call. + task = { + "download_type": "playlist", + "service": service, + "url": url, + "main": main, + "fallback": fallback, + "quality": quality, + "fall_quality": fall_quality, + "real_time": real_time, + "custom_dir_format": custom_dir_format, + "custom_track_format": custom_track_format, + "orig_request": request.args.to_dict(), + # If provided, these additional parameters can be used by your download function. + "type": "playlist", + "name": request.args.get('name'), + "artist": request.args.get('artist') + } - # Capture the original request parameters as a dictionary. - orig_request = request.args.to_dict() - - process = Process( - target=download_task, - args=( - service, url, main, fallback, quality, fall_quality, real_time, - prg_path, orig_request, custom_dir_format, custom_track_format - ) - ) - process.start() - # Track the running process using the generated filename. - playlist_processes[filename] = process + # Add the task to the queue and get the generated process (prg) filename. + prg_filename = download_queue_manager.add_task(task) return Response( - json.dumps({"prg_file": filename}), + json.dumps({"prg_file": prg_filename}), status=202, mimetype='application/json' ) @@ -144,7 +69,7 @@ def handle_download(): @playlist_bp.route('/download/cancel', methods=['GET']) def cancel_download(): """ - Cancel a running playlist download process by its process id (prg file name). + Cancel a running playlist download process by its prg file name. """ prg_file = request.args.get('prg_file') if not prg_file: @@ -154,45 +79,20 @@ def cancel_download(): mimetype='application/json' ) - process = playlist_processes.get(prg_file) - prg_dir = './prgs' - prg_path = os.path.join(prg_dir, prg_file) + # Use the queue manager's cancellation method. + result = download_queue_manager.cancel_task(prg_file) + status_code = 200 if result.get("status") == "cancelled" else 404 + + return Response( + json.dumps(result), + status=status_code, + mimetype='application/json' + ) - if process and process.is_alive(): - # Terminate the running process and wait for it to finish - process.terminate() - process.join() - # Remove it from our tracking dictionary - del playlist_processes[prg_file] - - # Append a cancellation status to the log file - try: - with open(prg_path, 'a') as f: - f.write(json.dumps({"status": "cancel"}) + "\n") - except Exception as e: - return Response( - json.dumps({"error": f"Failed to write cancel status to file: {str(e)}"}), - status=500, - mimetype='application/json' - ) - - return Response( - json.dumps({"status": "cancel"}), - status=200, - mimetype='application/json' - ) - else: - return Response( - json.dumps({"error": "Process not found or already terminated"}), - status=404, - mimetype='application/json' - ) - -# NEW ENDPOINT: Get Playlist Information @playlist_bp.route('/info', methods=['GET']) def get_playlist_info(): """ - Retrieve Spotify playlist metadata given a Spotify ID. + Retrieve Spotify playlist metadata given a Spotify playlist ID. Expects a query parameter 'id' that contains the Spotify playlist ID. """ spotify_id = request.args.get('id') @@ -204,9 +104,8 @@ def get_playlist_info(): ) try: - # Import the get_spotify_info function from the utility module. + # Import and use the get_spotify_info function from the utility module. from routes.utils.get_info import get_spotify_info - # Call the function with the playlist type. playlist_info = get_spotify_info(spotify_id, "playlist") return Response( json.dumps(playlist_info), diff --git a/routes/track.py b/routes/track.py index 11b18ec..b86f296 100755 --- a/routes/track.py +++ b/routes/track.py @@ -1,86 +1,14 @@ from flask import Blueprint, Response, request -import json import os -import random -import string -import sys +import json import traceback -from multiprocessing import Process +from routes.utils.queue import download_queue_manager track_bp = Blueprint('track', __name__) -# Global dictionary to track running track download processes. -track_processes = {} - -def generate_random_filename(length=6): - chars = string.ascii_lowercase + string.digits - return ''.join(random.choice(chars) for _ in range(length)) + '.track.prg' - -class FlushingFileWrapper: - def __init__(self, file): - self.file = file - - def write(self, text): - # Write only lines that start with a JSON object. - for line in text.split('\n'): - if line.startswith('{'): - self.file.write(line + '\n') - self.file.flush() - - def flush(self): - self.file.flush() - -def download_task(service, url, main, fallback, quality, fall_quality, real_time, - prg_path, orig_request, custom_dir_format, custom_track_format): - try: - from routes.utils.track import download_track - with open(prg_path, 'w') as f: - flushing_file = FlushingFileWrapper(f) - original_stdout = sys.stdout - sys.stdout = flushing_file # Redirect stdout for this process - - # Write the original request data into the progress file. - try: - flushing_file.write(json.dumps({"original_request": orig_request}) + "\n") - except Exception as e: - flushing_file.write(json.dumps({ - "status": "error", - "message": f"Failed to write original request data: {str(e)}" - }) + "\n") - - try: - download_track( - service=service, - url=url, - main=main, - fallback=fallback, - quality=quality, - fall_quality=fall_quality, - real_time=real_time, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format - ) - flushing_file.write(json.dumps({"status": "complete"}) + "\n") - except Exception as e: - error_data = json.dumps({ - "status": "error", - "message": str(e), - "traceback": traceback.format_exc() - }) - flushing_file.write(error_data + "\n") - finally: - sys.stdout = original_stdout # Restore original stdout - except Exception as e: - with open(prg_path, 'w') as f: - error_data = json.dumps({ - "status": "error", - "message": str(e), - "traceback": traceback.format_exc() - }) - f.write(error_data + "\n") - @track_bp.route('/download', methods=['GET']) def handle_download(): + # Retrieve parameters from the request. service = request.args.get('service') url = request.args.get('url') main = request.args.get('main') @@ -88,20 +16,20 @@ def handle_download(): quality = request.args.get('quality') fall_quality = request.args.get('fall_quality') - # Retrieve and normalize the real_time parameter; defaults to False. + # Normalize the real_time parameter; default to False. real_time_arg = request.args.get('real_time', 'false') real_time = real_time_arg.lower() in ['true', '1', 'yes'] - - # New query parameters for custom formatting. + + # New custom formatting parameters (with defaults). custom_dir_format = request.args.get('custom_dir_format', "%ar_album%/%album%/%copyright%") custom_track_format = request.args.get('custom_track_format', "%tracknum%. %music% - %artist%") - - # Sanitize main and fallback to prevent directory traversal + + # Sanitize main and fallback to prevent directory traversal. if main: main = os.path.basename(main) if fallback: fallback = os.path.basename(fallback) - + if not all([service, url, main]): return Response( json.dumps({"error": "Missing parameters"}), @@ -109,7 +37,7 @@ def handle_download(): mimetype='application/json' ) - # Validate credentials based on service and fallback + # Validate credentials based on service and fallback. try: if service == 'spotify': if fallback: @@ -158,28 +86,35 @@ def handle_download(): status=500, mimetype='application/json' ) - - filename = generate_random_filename() - prg_dir = './prgs' - os.makedirs(prg_dir, exist_ok=True) - prg_path = os.path.join(prg_dir, filename) - # Capture the original request parameters as a dictionary. + # Capture the original request parameters. orig_request = request.args.to_dict() - - process = Process( - target=download_task, - args=( - service, url, main, fallback, quality, fall_quality, real_time, - prg_path, orig_request, custom_dir_format, custom_track_format - ) - ) - process.start() - # Track the running process using the generated filename. - track_processes[filename] = process + + # Build the task dictionary. + # The key "download_type" tells the queue handler which download function to call. + task = { + "download_type": "track", + "service": service, + "url": url, + "main": main, + "fallback": fallback, + "quality": quality, + "fall_quality": fall_quality, + "real_time": real_time, + "custom_dir_format": custom_dir_format, + "custom_track_format": custom_track_format, + "orig_request": orig_request, + # Additional parameters if needed. + "type": "track", + "name": request.args.get('name'), + "artist": request.args.get('artist') + } + + # Add the task to the queue and get the generated process (prg) filename. + prg_filename = download_queue_manager.add_task(task) return Response( - json.dumps({"prg_file": filename}), + json.dumps({"prg_file": prg_filename}), status=202, mimetype='application/json' ) @@ -197,41 +132,16 @@ def cancel_download(): mimetype='application/json' ) - process = track_processes.get(prg_file) - prg_dir = './prgs' - prg_path = os.path.join(prg_dir, prg_file) + # Use the queue manager's cancellation method. + result = download_queue_manager.cancel_task(prg_file) + status_code = 200 if result.get("status") == "cancelled" else 404 + + return Response( + json.dumps(result), + status=status_code, + mimetype='application/json' + ) - if process and process.is_alive(): - # Terminate the running process and wait for it to finish - process.terminate() - process.join() - # Remove it from our tracking dictionary - del track_processes[prg_file] - - # Append a cancellation status to the log file - try: - with open(prg_path, 'a') as f: - f.write(json.dumps({"status": "cancel"}) + "\n") - except Exception as e: - return Response( - json.dumps({"error": f"Failed to write cancel status to file: {str(e)}"}), - status=500, - mimetype='application/json' - ) - - return Response( - json.dumps({"status": "cancel"}), - status=200, - mimetype='application/json' - ) - else: - return Response( - json.dumps({"error": "Process not found or already terminated"}), - status=404, - mimetype='application/json' - ) - -# NEW ENDPOINT: Get Track Information @track_bp.route('/info', methods=['GET']) def get_track_info(): """ @@ -247,9 +157,8 @@ def get_track_info(): ) try: - # Import the get_spotify_info function from the utility module. + # Import and use the get_spotify_info function from the utility module. from routes.utils.get_info import get_spotify_info - # Call the function with the track type. track_info = get_spotify_info(spotify_id, "track") return Response( json.dumps(track_info), diff --git a/routes/utils/artist.py b/routes/utils/artist.py deleted file mode 100644 index c988f10..0000000 --- a/routes/utils/artist.py +++ /dev/null @@ -1,126 +0,0 @@ -import json -import traceback - -from deezspot.easy_spoty import Spo -from deezspot.libutils.utils import get_ids, link_is_valid -from routes.utils.album import download_album # Assumes album.py is in routes/utils/ - -def log_json(message_dict): - """Helper function to output a JSON-formatted log message.""" - print(json.dumps(message_dict)) - - -def get_artist_discography(url, album_type='album,single,compilation,appears_on'): - if not url: - message = "No artist URL provided." - log_json({"status": "error", "message": message}) - raise ValueError(message) - - try: - # Validate the URL (this function should raise an error if invalid). - link_is_valid(link=url) - except Exception as validation_error: - message = f"Link validation failed: {validation_error}" - log_json({"status": "error", "message": message}) - raise ValueError(message) - - try: - # Extract the artist ID from the URL. - artist_id = get_ids(url) - except Exception as id_error: - message = f"Failed to extract artist ID from URL: {id_error}" - log_json({"status": "error", "message": message}) - raise ValueError(message) - - try: - # Retrieve the discography using the artist ID. - discography = Spo.get_artist(artist_id, album_type=album_type) - return discography - except Exception as fetch_error: - message = f"An error occurred while fetching the discography: {fetch_error}" - log_json({"status": "error", "message": message}) - raise - - -def download_artist_albums(service, artist_url, main, fallback=None, quality=None, - fall_quality=None, real_time=False, album_type='album,single,compilation,appears_on', - custom_dir_format="%ar_album%/%album%/%copyright%", - custom_track_format="%tracknum%. %music% - %artist%"): - try: - discography = get_artist_discography(artist_url, album_type=album_type) - except Exception as e: - log_json({"status": "error", "message": f"Error retrieving artist discography: {e}"}) - raise - albums = discography.get('items', []) - # Extract artist name from the first album's artists as fallback. - artist_name = artist_url - if albums: - first_album = albums[0] - artists = first_album.get('artists', []) - if artists: - artist_name = artists[0].get('name', artist_url) - - if not albums: - log_json({ - "status": "done", - "type": "artist", - "artist": artist_name, - "album_type": album_type, - "message": "No albums found for the artist." - }) - return - - log_json({ - "status": "initializing", - "type": "artist", - "artist": artist_name, - "total_albums": len(albums), - "album_type": album_type - }) - - for album in albums: - try: - album_url = album.get('external_urls', {}).get('spotify') - album_name = album.get('name', 'Unknown Album') - # Extract artist names if available. - artists = [] - if "artists" in album: - artists = [artist.get("name", "Unknown") for artist in album["artists"]] - if not album_url: - log_json({ - "status": "warning", - "type": "album", - "album": album_name, - "artist": artists, - "message": "No Spotify URL found; skipping." - }) - continue - - download_album( - service=service, - url=album_url, - main=main, - fallback=fallback, - quality=quality, - fall_quality=fall_quality, - real_time=real_time, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format - ) - - except Exception as album_error: - log_json({ - "status": "error", - "type": "album", - "album": album.get('name', 'Unknown'), - "error": str(album_error) - }) - traceback.print_exc() - - # When everything has been processed, print the final status. - log_json({ - "status": "done", - "type": "artist", - "artist": artist_name, - "album_type": album_type - }) diff --git a/routes/utils/queue.py b/routes/utils/queue.py new file mode 100644 index 0000000..44fcc28 --- /dev/null +++ b/routes/utils/queue.py @@ -0,0 +1,256 @@ +import os +import sys +import json +import time +import string +import random +import traceback +import threading +from multiprocessing import Process +from queue import Queue, Empty + +# ------------------------------------------------------------------------------ +# Configuration +# ------------------------------------------------------------------------------ +MAX_CONCURRENT_DL = 3 # maximum number of concurrent download processes +PRG_DIR = './prgs' # directory where .prg files will be stored + +# ------------------------------------------------------------------------------ +# Utility Functions and Classes +# ------------------------------------------------------------------------------ + +def generate_random_filename(length=6, extension=".prg"): + """Generate a random filename with the given extension.""" + chars = string.ascii_lowercase + string.digits + return ''.join(random.choice(chars) for _ in range(length)) + extension + +class FlushingFileWrapper: + """ + A file wrapper that flushes after writing each line and + skips lines whose JSON content has a "type" of "track". + """ + def __init__(self, file): + self.file = file + + def write(self, text): + for line in text.split('\n'): + line = line.strip() + if line and line.startswith('{'): + try: + obj = json.loads(line) + if obj.get("type") == "track": + continue # skip lines that represent track messages + except ValueError: + pass # not valid JSON; write the line as is + self.file.write(line + '\n') + self.file.flush() + + def flush(self): + self.file.flush() + +def run_download_task(task, prg_path): + """ + This function is executed in a separate process. + It opens the given prg file (in append mode), calls the appropriate download + function (album, track, or playlist), and writes a completion or error status + to the file. + """ + try: + # Determine which download function to use based on task type. + download_type = task.get("download_type") + if download_type == "album": + from routes.utils.album import download_album + download_func = download_album + elif download_type == "track": + from routes.utils.track import download_track + download_func = download_track + elif download_type == "playlist": + from routes.utils.playlist import download_playlist + download_func = download_playlist + else: + raise ValueError(f"Unsupported download type: {download_type}") + + # Open the .prg file in append mode so as not to overwrite the queued lines. + with open(prg_path, 'a') as f: + flushing_file = FlushingFileWrapper(f) + original_stdout = sys.stdout + sys.stdout = flushing_file + + try: + # Call the appropriate download function with parameters from the task. + download_func( + service=task.get("service"), + url=task.get("url"), + main=task.get("main"), + fallback=task.get("fallback"), + quality=task.get("quality"), + fall_quality=task.get("fall_quality"), + real_time=task.get("real_time", False), + custom_dir_format=task.get("custom_dir_format", "%ar_album%/%album%/%copyright%"), + custom_track_format=task.get("custom_track_format", "%tracknum%. %music% - %artist%") + ) + flushing_file.write(json.dumps({"status": "complete"}) + "\n") + except Exception as e: + flushing_file.write(json.dumps({ + "status": "error", + "message": str(e), + "traceback": traceback.format_exc() + }) + "\n") + finally: + sys.stdout = original_stdout # restore original stdout + except Exception as e: + # If something fails even before opening the prg file properly. + with open(prg_path, 'a') as f: + error_data = json.dumps({ + "status": "error", + "message": str(e), + "traceback": traceback.format_exc() + }) + f.write(error_data + "\n") + +# ------------------------------------------------------------------------------ +# Download Queue Manager Class +# ------------------------------------------------------------------------------ + +class DownloadQueueManager: + """ + Manages a queue of download tasks, ensuring that no more than + MAX_CONCURRENT_DL downloads run concurrently. + """ + def __init__(self, max_concurrent=MAX_CONCURRENT_DL, prg_dir=PRG_DIR): + self.max_concurrent = max_concurrent + self.prg_dir = prg_dir + os.makedirs(self.prg_dir, exist_ok=True) + + self.pending_tasks = Queue() # holds tasks waiting to run + self.running_downloads = {} # maps prg_filename -> Process instance + self.lock = threading.Lock() # protects access to running_downloads + self.worker_thread = threading.Thread(target=self.queue_worker, daemon=True) + self.running = False + + def start(self): + """Start the worker thread that monitors the queue.""" + self.running = True + self.worker_thread.start() + + def stop(self): + """Stop the worker thread gracefully.""" + self.running = False + self.worker_thread.join() + + def add_task(self, task): + """ + Adds a new download task to the queue. + The task is expected to be a dictionary with all necessary parameters, + including a "download_type" key (album, track, or playlist). + A .prg file is created for progress logging with an initial two entries: + 1. The original request (merged with the extra keys: type, name, artist) + 2. A queued status entry (including type, name, artist, and the task's position in the queue) + + Returns the generated prg filename so that the caller can later + check the status or request cancellation. + """ + prg_filename = generate_random_filename() + prg_path = os.path.join(self.prg_dir, prg_filename) + task['prg_path'] = prg_path + + # Compute the overall position in the queue: + # position = (number of running tasks) + (number of pending tasks) + 1. + position = len(self.running_downloads) + self.pending_tasks.qsize() + 1 + + # Create and immediately write the initial entries to the .prg file. + try: + with open(prg_path, 'w') as f: + # Merge extra keys into the original request. + original_request = task.get("orig_request", {}) + for key in ["type", "name", "artist"]: + if key in task and task[key] is not None: + original_request[key] = task[key] + f.write(json.dumps({"original_request": original_request}) + "\n") + + # Write a queued status entry with the extra parameters and queue position. + queued_entry = { + "status": "queued", + "name": task.get("name"), + "type": task.get("type"), + "artist": task.get("artist"), + "position": position + } + f.write(json.dumps(queued_entry) + "\n") + except Exception as e: + print("Error writing prg file:", e) + + self.pending_tasks.put((prg_filename, task)) + return prg_filename + + def cancel_task(self, prg_filename): + """ + Cancel a running download task by terminating its process. + If the task is found and alive, it is terminated and a cancellation + status is appended to its .prg file. + + Returns a dictionary indicating the result. + """ + with self.lock: + process = self.running_downloads.get(prg_filename) + if process and process.is_alive(): + process.terminate() + process.join() + del self.running_downloads[prg_filename] + prg_path = os.path.join(self.prg_dir, prg_filename) + try: + with open(prg_path, 'a') as f: + f.write(json.dumps({"status": "cancel"}) + "\n") + except Exception as e: + return {"error": f"Failed to write cancel status: {str(e)}"} + return {"status": "cancelled"} + else: + return {"error": "Task not found or already terminated"} + + def queue_worker(self): + """ + Worker thread that continuously monitors the pending_tasks queue. + It cleans up finished download processes and starts new ones if the + number of running downloads is less than the allowed maximum. + """ + while self.running: + # First, clean up any finished processes. + with self.lock: + finished = [] + for prg_filename, process in self.running_downloads.items(): + if not process.is_alive(): + finished.append(prg_filename) + for prg_filename in finished: + del self.running_downloads[prg_filename] + + # Start new tasks if there is available capacity. + if len(self.running_downloads) < self.max_concurrent: + try: + prg_filename, task = self.pending_tasks.get(timeout=1) + except Empty: + time.sleep(0.5) + continue + + prg_path = task.get('prg_path') + # Create and start a new process for the task. + p = Process( + target=run_download_task, + args=(task, prg_path) + ) + with self.lock: + self.running_downloads[prg_filename] = p + p.start() + else: + # At capacity; sleep briefly. + time.sleep(1) + + # Small sleep to avoid a tight loop. + time.sleep(0.1) + +# ------------------------------------------------------------------------------ +# Global Instance +# ------------------------------------------------------------------------------ + +# Create and start a global instance of the queue manager. +download_queue_manager = DownloadQueueManager() +download_queue_manager.start()