added spot fallback

This commit is contained in:
cool.gitter.choco
2025-01-26 08:39:06 -06:00
parent 6cce067d0e
commit 48db9a1606
11 changed files with 525 additions and 277 deletions

6
.gitignore vendored
View File

@@ -3,4 +3,8 @@
/venv
/downloads/
/creds/
Test.py
/Test.py
/prgs/
/flask_server.log
routes/__pycache__/
routes/utils/__pycache__/

54
app.py
View File

@@ -1,16 +1,66 @@
from flask import Flask
from flask import Flask, request
from flask_cors import CORS
from routes.search import search_bp
from routes.credentials import credentials_bp
from routes.album import album_bp
from routes.track import track_bp
from routes.playlist import playlist_bp
import logging
from datetime import datetime
import time
def create_app():
app = Flask(__name__)
# Configure basic logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s',
handlers=[
logging.FileHandler('flask_server.log'),
logging.StreamHandler()
]
)
# Get Flask's logger
logger = logging.getLogger('werkzeug')
logger.setLevel(logging.INFO)
CORS(app)
# Register blueprints
app.register_blueprint(search_bp, url_prefix='/api')
app.register_blueprint(credentials_bp, url_prefix='/api/credentials')
app.register_blueprint(album_bp, url_prefix='/api/album')
app.register_blueprint(track_bp, url_prefix='/api/track')
app.register_blueprint(playlist_bp, url_prefix='/api/playlist')
# Add request logging middleware
@app.before_request
def log_request():
request.start_time = time.time()
logger.info(f"Request: {request.method} {request.path}")
@app.after_request
def log_response(response):
duration = round((time.time() - request.start_time) * 1000, 2)
logger.info(f"Response: {response.status} | Duration: {duration}ms")
return response
# Error logging
@app.errorhandler(Exception)
def handle_exception(e):
logger.error(f"Server error: {str(e)}", exc_info=True)
return "Internal Server Error", 500
return app
if __name__ == '__main__':
from waitress import serve
# Configure waitress logger
logger = logging.getLogger('waitress')
logger.setLevel(logging.INFO)
app = create_app()
logging.info("Starting Flask server on port 5000")
from waitress import serve
serve(app, host='0.0.0.0', port=5000)

87
routes/album.py Normal file
View File

@@ -0,0 +1,87 @@
from flask import Blueprint, Response, request
import json
import os
import random
import string
import sys
from threading import Thread
import traceback
album_bp = Blueprint('album', __name__)
def generate_random_filename(length=6):
chars = string.ascii_lowercase + string.digits
return ''.join(random.choice(chars) for _ in range(length)) + '.prg'
class FlushingFileWrapper:
def __init__(self, file):
self.file = file
def write(self, text):
self.file.write(text)
self.file.flush()
def flush(self):
self.file.flush()
@album_bp.route('/download', methods=['GET'])
def handle_download():
service = request.args.get('service')
url = request.args.get('url')
main = request.args.get('main')
fallback = request.args.get('fallback') # New fallback parameter
if not all([service, url, main]):
return Response(
json.dumps({"error": "Missing parameters"}),
status=400,
mimetype='application/json'
)
filename = generate_random_filename()
prg_dir = './prgs'
os.makedirs(prg_dir, exist_ok=True)
prg_path = os.path.join(prg_dir, filename)
def download_task():
try:
from routes.utils.album import download_album
with open(prg_path, 'w') as f:
flushing_file = FlushingFileWrapper(f)
original_stdout = sys.stdout
sys.stdout = flushing_file
try:
# Pass fallback parameter to download_album
download_album(
service=service,
url=url,
main=main,
fallback=fallback
)
flushing_file.write(json.dumps({"status": "complete"}) + "\n")
except Exception as e:
error_data = json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
})
flushing_file.write(error_data + "\n")
finally:
sys.stdout = original_stdout
except Exception as e:
with open(prg_path, 'w') as f:
error_data = json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
})
f.write(error_data + "\n")
Thread(target=download_task).start()
return Response(
json.dumps({"prg_file": filename}),
status=202,
mimetype='application/json'
)

88
routes/playlist.py Normal file
View File

@@ -0,0 +1,88 @@
from flask import Blueprint, Response, request
import json
import os
import random
import string
import sys
from threading import Thread
import traceback
playlist_bp = Blueprint('playlist', __name__)
def generate_random_filename(length=6):
chars = string.ascii_lowercase + string.digits
return ''.join(random.choice(chars) for _ in range(length)) + '.prg'
class FlushingFileWrapper:
def __init__(self, file):
self.file = file
def write(self, text):
self.file.write(text)
self.file.flush()
def flush(self):
self.file.flush()
@playlist_bp.route('/download', methods=['GET'])
def handle_download():
service = request.args.get('service')
url = request.args.get('url')
main = request.args.get('main') # Changed from 'account'
fallback = request.args.get('fallback') # New parameter
# Validate required parameters (main instead of account)
if not all([service, url, main]):
return Response(
json.dumps({"error": "Missing parameters"}),
status=400,
mimetype='application/json'
)
filename = generate_random_filename()
prg_dir = './prgs'
os.makedirs(prg_dir, exist_ok=True)
prg_path = os.path.join(prg_dir, filename)
def download_task():
try:
from routes.utils.playlist import download_playlist
with open(prg_path, 'w') as f:
flushing_file = FlushingFileWrapper(f)
original_stdout = sys.stdout
sys.stdout = flushing_file
try:
# Updated call with main/fallback parameters
download_playlist(
service=service,
url=url,
main=main,
fallback=fallback
)
flushing_file.write(json.dumps({"status": "complete"}) + "\n")
except Exception as e:
error_data = json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
})
flushing_file.write(error_data + "\n")
finally:
sys.stdout = original_stdout
except Exception as e:
with open(prg_path, 'w') as f:
error_data = json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
})
f.write(error_data + "\n")
Thread(target=download_task).start()
return Response(
json.dumps({"prg_file": filename}),
status=202,
mimetype='application/json'
)

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, jsonify, request
from routes.utils.search import search_and_combine
from routes.utils.search import search # Renamed import
search_bp = Blueprint('search', __name__)
@@ -9,7 +9,6 @@ def handle_search():
# Get query parameters
query = request.args.get('q', '')
search_type = request.args.get('type', 'track')
service = request.args.get('service', 'both')
limit = int(request.args.get('limit', 10))
# Validate parameters
@@ -21,16 +20,14 @@ def handle_search():
return jsonify({'error': 'Invalid search type'}), 400
# Perform the search
results = search_and_combine(
raw_results = search(
query=query,
search_type=search_type,
service=service,
limit=limit
)
return jsonify({
'results': results,
'count': len(results),
'data': raw_results,
'error': None
})

87
routes/track.py Normal file
View File

@@ -0,0 +1,87 @@
from flask import Blueprint, Response, request
import json
import os
import random
import string
import sys
from threading import Thread
import traceback
track_bp = Blueprint('track', __name__)
def generate_random_filename(length=6):
chars = string.ascii_lowercase + string.digits
return ''.join(random.choice(chars) for _ in range(length)) + '.prg'
class FlushingFileWrapper:
def __init__(self, file):
self.file = file
def write(self, text):
self.file.write(text)
self.file.flush()
def flush(self):
self.file.flush()
@track_bp.route('/download', methods=['GET'])
def handle_download():
service = request.args.get('service')
url = request.args.get('url')
main = request.args.get('main')
fallback = request.args.get('fallback') # New fallback parameter
if not all([service, url, main]):
return Response(
json.dumps({"error": "Missing parameters"}),
status=400,
mimetype='application/json'
)
filename = generate_random_filename()
prg_dir = './prgs'
os.makedirs(prg_dir, exist_ok=True)
prg_path = os.path.join(prg_dir, filename)
def download_task():
try:
from routes.utils.track import download_track
with open(prg_path, 'w') as f:
flushing_file = FlushingFileWrapper(f)
original_stdout = sys.stdout
sys.stdout = flushing_file
try:
# Pass all parameters including fallback
download_track(
service=service,
url=url,
main=main,
fallback=fallback
)
flushing_file.write(json.dumps({"status": "complete"}) + "\n")
except Exception as e:
error_data = json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
})
flushing_file.write(error_data + "\n")
finally:
sys.stdout = original_stdout
except Exception as e:
with open(prg_path, 'w') as f:
error_data = json.dumps({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
})
f.write(error_data + "\n")
Thread(target=download_task).start()
return Response(
json.dumps({"prg_file": filename}),
status=202,
mimetype='application/json'
)

View File

@@ -4,57 +4,95 @@ import traceback
from deezspot.spotloader import SpoLogin
from deezspot.deezloader import DeeLogin
def download_album(service, url, account):
def download_album(service, url, main, fallback=None):
try:
if service == 'spotify':
# Construct Spotify credentials path
creds_dir = os.path.join('./creds/spotify', account)
credentials_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
# Initialize Spotify client
spo = SpoLogin(credentials_path=credentials_path)
# Download Spotify album
spo.download_album(
link_album=url,
output_dir="./downloads/albums",
quality_download="NORMAL",
recursive_quality=True,
recursive_download=False,
not_interface=False,
method_save=1,
make_zip=True
)
if fallback:
# First attempt: use DeeLogin's download_albumspo with the 'main' (Deezer credentials)
try:
# Load Deezer credentials from 'main' under deezer directory
deezer_creds_dir = os.path.join('./creds/deezer', main)
deezer_creds_path = os.path.abspath(os.path.join(deezer_creds_dir, 'credentials.json'))
with open(deezer_creds_path, 'r') as f:
deezer_creds = json.load(f)
# Initialize DeeLogin with Deezer credentials
dl = DeeLogin(
arl=deezer_creds.get('arl', ''),
email=deezer_creds.get('email', ''),
password=deezer_creds.get('password', '')
)
# Download using download_albumspo
dl.download_albumspo(
link_album=url,
output_dir="./downloads",
quality_download="FLAC",
recursive_quality=True,
recursive_download=False,
not_interface=False,
make_zip=True,
method_save=1
)
except Exception as e:
# If the first attempt fails, use the fallback Spotify main
print(f"Failed to download via Deezer fallback: {e}. Trying Spotify fallback main.")
# Load fallback Spotify credentials and attempt download
try:
spo_creds_dir = os.path.join('./creds/spotify', fallback)
spo_creds_path = os.path.abspath(os.path.join(spo_creds_dir, 'credentials.json'))
spo = SpoLogin(credentials_path=spo_creds_path)
spo.download_album(
link_album=url,
output_dir="./downloads",
quality_download="HIGH",
recursive_quality=True,
recursive_download=False,
not_interface=False,
method_save=1,
make_zip=False
)
except Exception as e2:
# If fallback also fails, raise an error indicating both attempts failed
raise RuntimeError(
f"Both main (Deezer) and fallback (Spotify) attempts failed. "
f"Deezer error: {e}, Spotify error: {e2}"
) from e2
else:
# Original behavior: use Spotify main
creds_dir = os.path.join('./creds/spotify', main)
credentials_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
spo = SpoLogin(credentials_path=credentials_path)
spo.download_album(
link_album=url,
output_dir="./downloads",
quality_download="HIGH",
recursive_quality=True,
recursive_download=False,
not_interface=False,
method_save=1,
make_zip=False
)
elif service == 'deezer':
# Construct Deezer credentials path
creds_dir = os.path.join('./creds/deezer', account)
# Existing code remains the same, ignoring fallback
creds_dir = os.path.join('./creds/deezer', main)
creds_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
# Load Deezer credentials
with open(creds_path, 'r') as f:
creds = json.load(f)
# Initialize Deezer client
dl = DeeLogin(
arl=creds.get('arl', ''),
email=creds.get('email', ''),
password=creds.get('password', '')
)
# Download Deezer album
dl.download_albumdee(
link_album=url,
output_dir="./downloads/albums",
output_dir="./downloads",
quality_download="FLAC",
recursive_quality=True,
recursive_download=False,
method_save=1
method_save=1,
make_zip=False
)
else:
raise ValueError(f"Unsupported service: {service}")
except Exception as e:
traceback.print_exc()
raise
raise # Re-raise the exception after logging

View File

@@ -4,57 +4,95 @@ import traceback
from deezspot.spotloader import SpoLogin
from deezspot.deezloader import DeeLogin
def download_playlist(service, url, account):
def download_playlist(service, url, main, fallback=None):
try:
if service == 'spotify':
# Construct Spotify credentials path
creds_dir = os.path.join('./creds/spotify', account)
credentials_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
# Initialize Spotify client
spo = SpoLogin(credentials_path=credentials_path)
# Download Spotify playlist
spo.download_playlist(
link_playlist=url,
output_dir="./downloads/playlists",
quality_download="NORMAL",
recursive_quality=True,
recursive_download=False,
not_interface=False,
method_save=1,
make_zip=True
)
if fallback:
# First attempt: use DeeLogin's download_playlistspo with the 'main' (Deezer credentials)
try:
# Load Deezer credentials from 'main' under deezer directory
deezer_creds_dir = os.path.join('./creds/deezer', main)
deezer_creds_path = os.path.abspath(os.path.join(deezer_creds_dir, 'credentials.json'))
with open(deezer_creds_path, 'r') as f:
deezer_creds = json.load(f)
# Initialize DeeLogin with Deezer credentials
dl = DeeLogin(
arl=deezer_creds.get('arl', ''),
email=deezer_creds.get('email', ''),
password=deezer_creds.get('password', '')
)
# Download using download_playlistspo
dl.download_playlistspo(
link_playlist=url,
output_dir="./downloads",
quality_download="FLAC",
recursive_quality=True,
recursive_download=False,
not_interface=False,
make_zip=True,
method_save=1
)
except Exception as e:
# If the first attempt fails, use the fallback Spotify main
print(f"Failed to download via Deezer fallback: {e}. Trying Spotify fallback main.")
# Load fallback Spotify credentials and attempt download
try:
spo_creds_dir = os.path.join('./creds/spotify', fallback)
spo_creds_path = os.path.abspath(os.path.join(spo_creds_dir, 'credentials.json'))
spo = SpoLogin(credentials_path=spo_creds_path)
spo.download_playlist(
link_playlist=url,
output_dir="./downloads",
quality_download="HIGH",
recursive_quality=True,
recursive_download=False,
not_interface=False,
method_save=1,
make_zip=False
)
except Exception as e2:
# If fallback also fails, raise an error indicating both attempts failed
raise RuntimeError(
f"Both main (Deezer) and fallback (Spotify) attempts failed. "
f"Deezer error: {e}, Spotify error: {e2}"
) from e2
else:
# Original behavior: use Spotify main
creds_dir = os.path.join('./creds/spotify', main)
credentials_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
spo = SpoLogin(credentials_path=credentials_path)
spo.download_playlist(
link_playlist=url,
output_dir="./downloads",
quality_download="HIGH",
recursive_quality=True,
recursive_download=False,
not_interface=False,
method_save=1,
make_zip=False
)
elif service == 'deezer':
# Construct Deezer credentials path
creds_dir = os.path.join('./creds/deezer', account)
# Existing code for Deezer, using main as Deezer account
creds_dir = os.path.join('./creds/deezer', main)
creds_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
# Load Deezer credentials
with open(creds_path, 'r') as f:
creds = json.load(f)
# Initialize Deezer client
dl = DeeLogin(
arl=creds.get('arl', ''),
email=creds.get('email', ''),
password=creds.get('password', '')
)
# Download Deezer playlist
dl.download_playlistdee(
link_playlist=url,
output_dir="./downloads/playlists",
output_dir="./downloads",
quality_download="FLAC",
recursive_quality=False,
recursive_download=False,
method_save=1
method_save=1,
make_zip=False
)
else:
raise ValueError(f"Unsupported service: {service}")
except Exception as e:
traceback.print_exc()
raise
raise # Re-raise the exception after logging

View File

@@ -1,178 +1,13 @@
from deezspot.easy_spoty import Spo
from deezspot.deezloader import API
import json
import difflib
from typing import List, Dict
def string_similarity(a: str, b: str) -> float:
return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio()
def normalize_item(item: Dict, service: str, item_type: str) -> Dict:
normalized = {
"service": service,
"type": item_type
}
if item_type == "track":
normalized.update({
"id": item.get('id'),
"title": item.get('title') if service == "deezer" else item.get('name'),
"artists": [{"name": item['artist']['name']}] if service == "deezer"
else [{"name": a['name']} for a in item.get('artists', [])],
"album": {
"title": item['album']['title'] if service == "deezer" else item['album']['name'],
"id": item['album']['id'] if service == "deezer" else item['album'].get('id'),
},
"duration": item.get('duration') if service == "deezer" else item.get('duration_ms'),
"url": item.get('link') if service == "deezer" else item.get('external_urls', {}).get('spotify'),
"isrc": item.get('isrc') if service == "deezer" else item.get('external_ids', {}).get('isrc')
})
elif item_type == "album":
normalized.update({
"id": item.get('id'),
"title": item.get('title') if service == "deezer" else item.get('name'),
"artists": [{"name": item['artist']['name']}] if service == "deezer"
else [{"name": a['name']} for a in item.get('artists', [])],
"total_tracks": item.get('nb_tracks') if service == "deezer" else item.get('total_tracks'),
"release_date": item.get('release_date'),
"url": item.get('link') if service == "deezer" else item.get('external_urls', {}).get('spotify'),
"images": [
{"url": item.get('cover_xl')},
{"url": item.get('cover_big')},
{"url": item.get('cover_medium')}
] if service == "deezer" else item.get('images', [])
})
elif item_type == "artist":
normalized.update({
"id": item.get('id'),
"name": item.get('name'),
"url": item.get('link') if service == "deezer" else item.get('external_urls', {}).get('spotify'),
"images": [
{"url": item.get('picture_xl')},
{"url": item.get('picture_big')},
{"url": item.get('picture_medium')}
] if service == "deezer" else item.get('images', [])
})
else: # For playlists, episodes, etc.
normalized.update({
"id": item.get('id'),
"title": item.get('title') if service == "deezer" else item.get('name'),
"url": item.get('link') if service == "deezer" else item.get('external_urls', {}).get('spotify'),
"description": item.get('description'),
"owner": item.get('user', {}).get('name') if service == "deezer" else item.get('owner', {}).get('display_name')
})
return {k: v for k, v in normalized.items() if v is not None}
def is_same_item(deezer_item: Dict, spotify_item: Dict, item_type: str) -> bool:
deezer_normalized = normalize_item(deezer_item, "deezer", item_type)
spotify_normalized = normalize_item(spotify_item, "spotify", item_type)
if item_type == "track":
title_match = string_similarity(deezer_normalized['title'], spotify_normalized['title']) >= 0.8
artist_match = string_similarity(
deezer_normalized['artists'][0]['name'],
spotify_normalized['artists'][0]['name']
) >= 0.8
album_match = string_similarity(
deezer_normalized['album']['title'],
spotify_normalized['album']['title']
) >= 0.9
return title_match and artist_match and album_match
if item_type == "album":
title_match = string_similarity(deezer_normalized['title'], spotify_normalized['title']) >= 0.8
artist_match = string_similarity(
deezer_normalized['artists'][0]['name'],
spotify_normalized['artists'][0]['name']
) >= 0.8
tracks_match = deezer_normalized['total_tracks'] == spotify_normalized['total_tracks']
return title_match and artist_match and tracks_match
if item_type == "artist":
name_match = string_similarity(deezer_normalized['name'], spotify_normalized['name']) >= 0.85
return name_match
return False
def process_results(deezer_results: Dict, spotify_results: Dict, search_type: str) -> List[Dict]:
combined = []
processed_spotify_ids = set()
for deezer_item in deezer_results.get('data', []):
match_found = False
normalized_deezer = normalize_item(deezer_item, "deezer", search_type)
for spotify_item in spotify_results.get('items', []):
if is_same_item(deezer_item, spotify_item, search_type):
processed_spotify_ids.add(spotify_item['id'])
match_found = True
break
combined.append(normalized_deezer)
for spotify_item in spotify_results.get('items', []):
if spotify_item['id'] not in processed_spotify_ids:
combined.append(normalize_item(spotify_item, "spotify", search_type))
return combined
def search_and_combine(
def search(
query: str,
search_type: str,
service: str = "both",
limit: int = 3
) -> List[Dict]:
if search_type == "playlist" and service == "both":
raise ValueError("Playlist search requires explicit service selection (deezer or spotify)")
) -> dict:
# Initialize the Spotify client
Spo.__init__()
if search_type == "episode" and service != "spotify":
raise ValueError("Episode search is only available for Spotify")
deezer_data = []
spotify_items = []
# Deezer search with limit
if service in ["both", "deezer"] and search_type != "episode":
deezer_api = API()
deezer_methods = {
'track': deezer_api.search_track,
'album': deezer_api.search_album,
'artist': deezer_api.search_artist,
'playlist': deezer_api.search_playlist
}
deezer_method = deezer_methods.get(search_type, deezer_api.search)
deezer_response = deezer_method(query, limit=limit)
deezer_data = deezer_response.get('data', [])[:limit]
if service == "deezer":
return [normalize_item(item, "deezer", search_type) for item in deezer_data]
# Spotify search with limit
if service in ["both", "spotify"]:
Spo.__init__()
spotify_response = Spo.search(query=query, search_type=search_type, limit=limit)
if search_type == "episode":
spotify_items = spotify_response.get('episodes', {}).get('items', [])[:limit]
else:
spotify_items = spotify_response.get('tracks', {}).get('items',
spotify_response.get('albums', {}).get('items',
spotify_response.get('artists', {}).get('items',
spotify_response.get('playlists', {}).get('items', []))))[:limit]
if service == "spotify":
return [normalize_item(item, "spotify", search_type) for item in spotify_items]
# Combined results
if service == "both" and search_type != "playlist":
return process_results(
{"data": deezer_data},
{"items": spotify_items},
search_type
)[:limit]
return []
# Perform the Spotify search and return the raw response
spotify_response = Spo.search(query=query, search_type=search_type, limit=limit)
return spotify_response

View File

@@ -4,56 +4,80 @@ import traceback
from deezspot.spotloader import SpoLogin
from deezspot.deezloader import DeeLogin
def download_track(service, url, account):
def download_track(service, url, main, fallback=None):
try:
if service == 'spotify':
# Construct Spotify credentials path
creds_dir = os.path.join('./creds/spotify', account)
credentials_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
# Initialize Spotify client
spo = SpoLogin(credentials_path=credentials_path)
# Download Spotify track
spo.download_track(
link_track=url,
output_dir="./downloads/tracks",
quality_download="NORMAL",
recursive_quality=False,
recursive_download=False,
not_interface=False,
method_save=1
)
if fallback:
# First attempt: use Deezer's download_trackspo with 'main' (Deezer credentials)
try:
deezer_creds_dir = os.path.join('./creds/deezer', main)
deezer_creds_path = os.path.abspath(os.path.join(deezer_creds_dir, 'credentials.json'))
with open(deezer_creds_path, 'r') as f:
deezer_creds = json.load(f)
dl = DeeLogin(
arl=deezer_creds.get('arl', ''),
email=deezer_creds.get('email', ''),
password=deezer_creds.get('password', '')
)
dl.download_trackspo(
link_track=url,
output_dir="./downloads",
quality_download="FLAC",
recursive_quality=False,
recursive_download=False,
not_interface=False,
method_save=1
)
except Exception as e:
# Fallback to Spotify credentials if Deezer fails
print(f"Failed to download via Deezer fallback: {e}. Trying Spotify fallback.")
spo_creds_dir = os.path.join('./creds/spotify', fallback)
spo_creds_path = os.path.abspath(os.path.join(spo_creds_dir, 'credentials.json'))
spo = SpoLogin(credentials_path=spo_creds_path)
spo.download_track(
link_track=url,
output_dir="./downloads",
quality_download="HIGH",
recursive_quality=False,
recursive_download=False,
not_interface=False,
method_save=1
)
else:
# Directly use Spotify main account
creds_dir = os.path.join('./creds/spotify', main)
credentials_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
spo = SpoLogin(credentials_path=credentials_path)
spo.download_track(
link_track=url,
output_dir="./downloads",
quality_download="HIGH",
recursive_quality=False,
recursive_download=False,
not_interface=False,
method_save=1
)
elif service == 'deezer':
# Construct Deezer credentials path
creds_dir = os.path.join('./creds/deezer', account)
# Deezer download logic remains unchanged
creds_dir = os.path.join('./creds/deezer', main)
creds_path = os.path.abspath(os.path.join(creds_dir, 'credentials.json'))
# Load Deezer credentials
with open(creds_path, 'r') as f:
creds = json.load(f)
# Initialize Deezer client
dl = DeeLogin(
arl=creds.get('arl', ''),
email=creds.get('email', ''),
password=creds.get('password', '')
)
# Download Deezer track
dl.download_trackdee(
link_track=url,
output_dir="./downloads/tracks",
output_dir="./downloads",
quality_download="FLAC",
recursive_quality=False,
recursive_download=False,
method_save=1
)
else:
raise ValueError(f"Unsupported service: {service}")
except Exception as e:
traceback.print_exc()
raise