Files
deezspot-spotizerr-dev/deezspot/deezloader/__init__.py

1089 lines
43 KiB
Python

#!/usr/bin/python3
import os
import json
import logging
import re
from deezspot.deezloader.dee_api import API
from deezspot.easy_spoty import Spo
from deezspot.deezloader.deegw_api import API_GW
from deezspot.deezloader.deezer_settings import stock_quality
from deezspot.models.download import (
Track,
Album,
Playlist,
Preferences,
Smart,
Episode,
)
from deezspot.deezloader.__download__ import (
DW_TRACK,
DW_ALBUM,
DW_PLAYLIST,
DW_EPISODE,
Download_JOB,
)
from deezspot.exceptions import (
InvalidLink,
TrackNotFound,
NoDataApi,
AlbumNotFound,
MarketAvailabilityError,
)
from deezspot.libutils.utils import (
create_zip,
get_ids,
link_is_valid,
what_kind,
sanitize_name
)
from deezspot.libutils.others_settings import (
stock_output,
stock_recursive_quality,
stock_recursive_download,
stock_not_interface,
stock_zip,
stock_save_cover,
stock_market
)
from deezspot.libutils.logging_utils import ProgressReporter, logger, report_progress
import requests
from difflib import SequenceMatcher
from deezspot.models.callback.callbacks import (
trackCallbackObject,
albumCallbackObject,
playlistCallbackObject,
errorObject,
summaryObject,
failedTrackObject,
initializingObject,
doneObject,
)
from deezspot.models.callback.track import trackObject as trackCbObject, artistTrackObject
from deezspot.models.callback.album import albumObject as albumCbObject
from deezspot.models.callback.playlist import playlistObject as playlistCbObject
from deezspot.models.callback.common import IDs
from deezspot.models.callback.user import userObject
def _sim(a: str, b: str) -> float:
a = (a or '').strip().lower()
b = (b or '').strip().lower()
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
# Clean for searching on Deezer
def _remove_parentheses(string: str) -> str:
# remove () and [] and {}, as well as anything inside
return re.sub(r'\{[^)]*\}', '', re.sub(r'\[[^)]*\]', '', re.sub(r'\([^)]*\)', '', string)))
API()
# Create a logger for the deezspot library
logger = logging.getLogger('deezspot')
class DeeLogin:
def __init__(
self,
arl=None,
email=None,
password=None,
spotify_client_id=None,
spotify_client_secret=None,
progress_callback=None,
silent=False
) -> None:
# Store Spotify credentials
self.spotify_client_id = spotify_client_id
self.spotify_client_secret = spotify_client_secret
# Initialize Spotify API if credentials are provided
if spotify_client_id and spotify_client_secret:
Spo.__init__(client_id=spotify_client_id, client_secret=spotify_client_secret)
# Initialize Deezer API
if arl:
self.__gw_api = API_GW(arl=arl)
else:
self.__gw_api = API_GW(
email=email,
password=password
)
# Reference to the Spotify search functionality
self.__spo = Spo
# Configure progress reporting
self.progress_reporter = ProgressReporter(callback=progress_callback, silent=silent)
# Set the progress reporter for Download_JOB
Download_JOB.set_progress_reporter(self.progress_reporter)
def download_trackdee(
self, link_track,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
playlist_context=None,
artist_separator: str = "; ",
spotify_metadata: bool = False
) -> Track:
link_is_valid(link_track)
ids = get_ids(link_track)
track_obj = None
def report_error(e, current_ids, url):
error_status = errorObject(ids=IDs(deezer=current_ids), error=str(e))
summary = summaryObject(
failed_tracks=[failedTrackObject(track=trackCbObject(title=f"Track ID {current_ids}"), reason=str(e))],
total_failed=1
)
error_status.summary = summary
callback_obj = trackCallbackObject(
track=trackCbObject(title=f"Track ID {current_ids}", ids=IDs(deezer=current_ids)),
status_info=error_status
)
report_progress(reporter=self.progress_reporter, callback_obj=callback_obj)
try:
# Default: Get standardized Deezer track object for tagging
track_obj = API.get_track(ids)
except (NoDataApi, MarketAvailabilityError) as e:
# Try to get fallback track information
infos = self.__gw_api.get_song_data(ids)
if "FALLBACK" not in infos:
report_error(e, ids, link_track)
raise TrackNotFound(link_track) from e
fallback_id = infos['FALLBACK']['SNG_ID']
try:
# Try again with fallback ID
track_obj = API.get_track(fallback_id)
if not track_obj or not track_obj.available:
raise MarketAvailabilityError(f"Fallback track {fallback_id} not available.")
# Update the ID to use the fallback
ids = fallback_id
except (NoDataApi, MarketAvailabilityError) as e_fallback:
report_error(e_fallback, fallback_id, link_track)
raise TrackNotFound(url=link_track, message=str(e_fallback)) from e_fallback
if not track_obj:
e = TrackNotFound(f"Could not retrieve track metadata for {link_track}")
report_error(e, ids, link_track)
raise e
# If requested and provided via context, override with Spotify metadata for tagging
if spotify_metadata and playlist_context and playlist_context.get('spotify_track_obj'):
track_obj_for_tagging = playlist_context.get('spotify_track_obj')
else:
track_obj_for_tagging = track_obj
# Set up download preferences
preferences = Preferences()
preferences.link = link_track
preferences.song_metadata = track_obj_for_tagging # Use selected track object (Spotify or Deezer) for tagging
preferences.quality_download = quality_download
preferences.output_dir = output_dir
preferences.ids = ids
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
preferences.pad_tracks = pad_tracks
preferences.initial_retry_delay = initial_retry_delay
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
preferences.convert_to = convert_to
preferences.bitrate = bitrate
preferences.save_cover = save_cover
preferences.market = market
preferences.artist_separator = artist_separator
preferences.spotify_metadata = bool(spotify_metadata)
preferences.spotify_track_obj = playlist_context.get('spotify_track_obj') if (playlist_context and playlist_context.get('spotify_track_obj')) else None
if playlist_context:
preferences.json_data = playlist_context.get('json_data')
preferences.track_number = playlist_context.get('track_number')
preferences.total_tracks = playlist_context.get('total_tracks')
preferences.spotify_url = playlist_context.get('spotify_url')
try:
parent = 'playlist' if (playlist_context and playlist_context.get('json_data')) else None
track = DW_TRACK(preferences, parent=parent).dw()
return track
except Exception as e:
logger.error(f"Failed to download track: {str(e)}")
report_error(e, ids, link_track)
raise e
def download_albumdee(
self, link_album,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
playlist_context=None,
artist_separator: str = "; "
) -> Album:
link_is_valid(link_album)
ids = get_ids(link_album)
def report_error(e, current_ids, url):
error_status = errorObject(ids=IDs(deezer=current_ids), error=str(e))
callback_obj = albumCallbackObject(
album=albumCbObject(title=f"Album ID {current_ids}", ids=IDs(deezer=current_ids)),
status_info=error_status
)
report_progress(reporter=self.progress_reporter, callback_obj=callback_obj)
try:
# Get standardized album object
album_obj = API.get_album(ids)
if not album_obj:
e = AlbumNotFound(f"Could not retrieve album metadata for {link_album}")
report_error(e, ids, link_album)
raise e
except NoDataApi as e:
report_error(e, ids, link_album)
raise AlbumNotFound(link_album) from e
# Set up download preferences
preferences = Preferences()
preferences.link = link_album
preferences.song_metadata = album_obj # Using the standardized album object
preferences.quality_download = quality_download
preferences.output_dir = output_dir
preferences.ids = ids
preferences.json_data = album_obj # Pass the complete album object
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.make_zip = make_zip
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
preferences.pad_tracks = pad_tracks
preferences.initial_retry_delay = initial_retry_delay
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
preferences.convert_to = convert_to
preferences.bitrate = bitrate
preferences.save_cover = save_cover
preferences.market = market
preferences.artist_separator = artist_separator
if playlist_context:
preferences.json_data = playlist_context['json_data']
preferences.track_number = playlist_context['track_number']
preferences.total_tracks = playlist_context['total_tracks']
preferences.spotify_url = playlist_context['spotify_url']
try:
album = DW_ALBUM(preferences).dw()
return album
except Exception as e:
logger.error(f"Failed to download album: {str(e)}")
report_error(e, ids, link_album)
raise e
def download_playlistdee(
self, link_playlist,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
artist_separator: str = "; "
) -> Playlist:
link_is_valid(link_playlist)
ids = get_ids(link_playlist)
playlist_obj = API.get_playlist(ids)
if not playlist_obj:
raise NoDataApi(f"Playlist {ids} not found.")
# This part of fetching metadata track by track is now handled in __download__.py
# The logic here is simplified to pass the full playlist object.
preferences = Preferences()
preferences.link = link_playlist
# preferences.song_metadata is not needed here, DW_PLAYLIST will use json_data
preferences.quality_download = quality_download
preferences.output_dir = output_dir
preferences.ids = ids
preferences.json_data = playlist_obj
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.make_zip = make_zip
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
preferences.pad_tracks = pad_tracks
preferences.initial_retry_delay = initial_retry_delay
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
preferences.convert_to = convert_to
preferences.bitrate = bitrate
preferences.save_cover = save_cover
preferences.market = market
preferences.artist_separator = artist_separator
playlist = DW_PLAYLIST(preferences).dw()
return playlist
def download_artisttopdee(
self, link_artist,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market
) -> list[Track]:
link_is_valid(link_artist)
ids = get_ids(link_artist)
# Assuming get_artist_top_tracks returns a list of track-like dicts with a 'link'
top_tracks_json = API.get_artist_top_tracks(ids)['data']
names = [
self.download_trackdee(
track['link'], output_dir,
quality_download, recursive_quality,
recursive_download, not_interface,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
convert_to=convert_to,
bitrate=bitrate,
save_cover=save_cover,
market=market
)
for track in top_tracks_json
]
return names
def convert_spoty_to_dee_link_track(self, link_track):
link_is_valid(link_track)
ids = get_ids(link_track)
# Attempt via ISRC first
track_json = Spo.get_track(ids)
if not track_json:
raise TrackNotFound(url=link_track, message="Spotify track metadata fetch failed.")
external_ids = track_json.get('external_ids') or {}
spo_isrc = (external_ids.get('isrc') or '').upper()
spo_title = track_json.get('name', '')
spo_album_title = (track_json.get('album') or {}).get('name', '')
spo_tracknum = int(track_json.get('track_number') or 0)
spo_artists = track_json.get('artists') or []
spo_main_artist = (spo_artists[0].get('name') if spo_artists else '') or ''
try:
dz = API.get_track_json(f"isrc:{spo_isrc}")
if dz and dz.get('id'):
dz_json = dz
tn = (dz_json.get('track_position') or dz_json.get('track_number') or 0)
title_match = max(
_sim(spo_title, dz_json.get('title', '')),
_sim(spo_title, dz_json.get('title_short', '')),
)
album_match = _sim(spo_album_title, (dz_json.get('album') or {}).get('title', ''))
t_isrc = (dz_json.get('isrc') or '').upper()
# Enforce ISRC match strictly in ISRC lookup path
if (
t_isrc and spo_isrc and t_isrc == spo_isrc and
title_match >= 0.90 and album_match >= 0.90 and tn == spo_tracknum
):
return f"https://www.deezer.com/track/{dz_json.get('id')}"
except Exception:
pass
# Fallback: search by title + artist + album
query = f'"track:\'{spo_title}\' artist:\'{spo_main_artist}\' album:\'{spo_album_title}\'"'
try:
candidates = API.search_tracks_raw(query, limit=5)
except Exception:
candidates = []
for cand in candidates:
title_match_1 = max(
_sim(spo_title, dz_json.get('title', '')),
_sim(spo_title, dz_json.get('title_short', ''))
)
title_match_2 = max(
_sim(_remove_parentheses(spo_title), _remove_parentheses(dz_json.get('title', ''))),
_sim(_remove_parentheses(spo_title), _remove_parentheses(dz_json.get('title_short', '')))
)
if max(title_match_1, title_match_2) < 0.90:
continue
c_id = cand.get('id')
if not c_id:
continue
try:
dzc = API.get_track_json(str(c_id))
except Exception:
continue
# Validate using track number and ISRC to be safe
tn = (dzc.get('track_position') or dzc.get('track_number') or 0)
if tn != spo_tracknum:
continue
t_isrc = (dzc.get('isrc') or '').upper()
# Enforce ISRC strictly in fallback path as well: require present and equal
if not spo_isrc or not t_isrc or t_isrc != spo_isrc:
continue
return f"https://www.deezer.com/track/{c_id}"
raise TrackNotFound(url=link_track, message=f"Failed to find Deezer equivalent for ISRC {spo_isrc} from Spotify track {link_track}")
def convert_isrc_to_dee_link_track(self, isrc_code: str) -> str:
if not isinstance(isrc_code, str) or not isrc_code:
raise ValueError("ISRC code must be a non-empty string.")
isrc_query = f"isrc:{isrc_code}"
logger.debug(f"Attempting Deezer track search with ISRC query: {isrc_query}")
try:
track_obj = API.get_track(isrc_query)
except NoDataApi:
msg = f"⚠ The track with ISRC '{isrc_code}' can't be found on Deezer :( ⚠"
logger.warning(msg)
raise TrackNotFound(url=f"isrc:{isrc_code}", message=msg)
if not track_obj or not track_obj.type or not track_obj.ids or not track_obj.ids.deezer:
msg = f"⚠ Deezer API returned no link for ISRC '{isrc_code}' :( ⚠"
logger.warning(msg)
raise TrackNotFound(url=f"isrc:{isrc_code}", message=msg)
track_link_dee = f"https://www.deezer.com/{track_obj.type}/{track_obj.ids.deezer}"
logger.info(f"Successfully converted ISRC {isrc_code} to Deezer link: {track_link_dee}")
return track_link_dee
def convert_spoty_to_dee_link_album(self, link_album):
link_is_valid(link_album)
ids = get_ids(link_album)
spotify_album_data = Spo.get_album(ids)
if not spotify_album_data:
raise AlbumNotFound(f"Failed to fetch Spotify album metadata for {link_album}")
spo_album_title = spotify_album_data.get('name', '')
spo_artists = spotify_album_data.get('artists') or []
spo_main_artist = (spo_artists[0].get('name') if spo_artists else '') or ''
external_ids = spotify_album_data.get('external_ids') or {}
spo_upc = str(external_ids.get('upc') or '').strip()
# Try UPC first
if spo_upc:
try:
dz_album = API.get_album_json(f"upc:{spo_upc}")
if dz_album.get('id') and _sim(spo_album_title, dz_album.get('title', '')) >= 0.90:
return f"https://www.deezer.com/album/{dz_album.get('id')}"
except Exception:
pass
# Fallback: title search
q = f'"{spo_album_title}" {spo_main_artist}'.strip()
try:
candidates = API.search_albums_raw(q, limit=5)
except Exception:
candidates = []
for cand in candidates:
if _sim(spo_album_title, cand.get('title', '')) < 0.90:
continue
c_id = cand.get('id')
if not c_id:
continue
try:
dzc = API.get_album_json(str(c_id))
except Exception:
continue
upc = str(dzc.get('upc') or '').strip()
if spo_upc and upc and spo_upc != upc:
continue
link_dee = f"https://www.deezer.com/album/{c_id}"
return link_dee
raise AlbumNotFound(f"Failed to convert Spotify album link {link_album} to a Deezer link after all attempts.")
def download_trackspo(
self, link_track,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
playlist_context=None,
artist_separator: str = "; ",
spotify_metadata: bool = False
) -> Track:
link_dee = self.convert_spoty_to_dee_link_track(link_track)
# If requested, prepare Spotify track object for tagging in preferences via playlist_context
if spotify_metadata:
try:
from deezspot.spotloader.__spo_api__ import tracking as spo_tracking
spo_ids = get_ids(link_track)
spo_track_obj = spo_tracking(spo_ids)
if spo_track_obj:
if playlist_context is None:
playlist_context = {}
playlist_context = dict(playlist_context)
playlist_context['spotify_track_obj'] = spo_track_obj
except Exception:
pass
track = self.download_trackdee(
link_dee,
output_dir=output_dir,
quality_download=quality_download,
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
bitrate=bitrate,
save_cover=save_cover,
market=market,
playlist_context=playlist_context,
artist_separator=artist_separator,
spotify_metadata=spotify_metadata
)
return track
def download_albumspo(
self, link_album,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
playlist_context=None,
artist_separator: str = "; "
) -> Album:
link_dee = self.convert_spoty_to_dee_link_album(link_album)
album = self.download_albumdee(
link_dee, output_dir,
quality_download, recursive_quality,
recursive_download, not_interface,
make_zip,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
bitrate=bitrate,
save_cover=save_cover,
market=market,
playlist_context=playlist_context,
artist_separator=artist_separator
)
return album
def download_playlistspo(
self, link_playlist,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
artist_separator: str = "; ",
spotify_metadata: bool = False
) -> Playlist:
link_is_valid(link_playlist)
ids = get_ids(link_playlist)
playlist_json = Spo.get_playlist(ids)
# Extract track metadata for playlist callback object
playlist_tracks_for_callback = []
for item in playlist_json['tracks']['items']:
if not item.get('track'):
continue
track_info = item['track']
# Import the correct playlist-specific objects
from deezspot.models.callback.playlist import (
artistTrackPlaylistObject,
albumTrackPlaylistObject,
artistAlbumTrackPlaylistObject,
trackPlaylistObject
)
# Create artists with proper type
track_artists = [artistTrackPlaylistObject(
name=artist['name'],
ids=IDs(spotify=artist.get('id'))
) for artist in track_info.get('artists', [])]
# Process album with proper type and include images
album_info = track_info.get('album', {})
album_images = []
if album_info.get('images'):
album_images = [
{"url": img.get('url'), "height": img.get('height'), "width": img.get('width')}
for img in album_info.get('images', [])
]
# Process album artists
album_artists = []
if album_info.get('artists'):
album_artists = [
artistAlbumTrackPlaylistObject(
name=artist.get('name'),
ids=IDs(spotify=artist.get('id'))
)
for artist in album_info.get('artists', [])
]
album_obj = albumTrackPlaylistObject(
title=album_info.get('name', 'Unknown Album'),
ids=IDs(spotify=album_info.get('id')),
images=album_images,
artists=album_artists,
album_type=album_info.get('album_type', ''),
release_date={
"year": int(album_info.get('release_date', '0').split('-')[0]) if album_info.get('release_date') else 0,
"month": int(album_info.get('release_date', '0-0').split('-')[1]) if album_info.get('release_date') and len(album_info.get('release_date').split('-')) > 1 else 0,
"day": int(album_info.get('release_date', '0-0-0').split('-')[2]) if album_info.get('release_date') and len(album_info.get('release_date').split('-')) > 2 else 0
},
total_tracks=album_info.get('total_tracks', 0)
)
# Create track with proper playlist-specific type
track_obj = trackPlaylistObject(
title=track_info.get('name', 'Unknown Track'),
artists=track_artists,
album=album_obj,
duration_ms=track_info.get('duration_ms', 0),
explicit=track_info.get('explicit', False),
ids=IDs(
spotify=track_info.get('id'),
isrc=track_info.get('external_ids', {}).get('isrc')
),
disc_number=track_info.get('disc_number', 1),
track_number=track_info.get('track_number', 0)
)
playlist_tracks_for_callback.append(track_obj)
playlist_obj = playlistCbObject(
title=playlist_json['name'],
owner=userObject(name=playlist_json.get('owner', {}).get('display_name', 'Unknown Owner')),
ids=IDs(spotify=playlist_json['id']),
tracks=playlist_tracks_for_callback # Populate tracks array with track objects
)
status_obj_init = initializingObject(ids=playlist_obj.ids)
callback_obj_init = playlistCallbackObject(playlist=playlist_obj, status_info=status_obj_init)
report_progress(reporter=self.progress_reporter, callback_obj=callback_obj_init)
total_tracks = playlist_json['tracks']['total']
playlist_tracks = playlist_json['tracks']['items']
playlist = Playlist()
tracks = playlist.tracks
successful_tracks_cb = []
failed_tracks_cb = []
skipped_tracks_cb = []
for index, item in enumerate(playlist_tracks, 1):
is_track = item.get('track')
if not is_track:
logger.warning(f"Skipping an item in playlist {playlist_obj.title} as it's not a valid track (likely unavailable in region).")
unknown_track = trackCbObject(title="Unknown Skipped Item", artists=[artistTrackObject(name="")])
reason = "Playlist item was not a valid track object or is not available in your region."
failed_tracks_cb.append(failedTrackObject(track=unknown_track, reason=reason))
# Create a placeholder for the failed item
failed_track = Track(
tags={'music': 'Unknown Skipped Item', 'artist': 'Unknown'},
song_path=None, file_format=None, quality=None, link=None, ids=None
)
failed_track.success = False
failed_track.error_message = reason
tracks.append(failed_track)
continue
track_info = is_track
track_name = track_info.get('name', 'Unknown Track')
artist_name = track_info['artists'][0]['name'] if track_info.get('artists') else 'Unknown Artist'
link_track = track_info.get('external_urls', {}).get('spotify')
if not link_track:
logger.warning(f"The track \"{track_name}\" is not available on Spotify :(")
continue
try:
playlist_ctx = {
'json_data': playlist_json,
'track_number': index,
'total_tracks': total_tracks,
'spotify_url': link_track
}
# Attach Spotify track object for tagging if requested
if spotify_metadata:
try:
from deezspot.spotloader.__spo_api__ import json_to_track_playlist_object
playlist_ctx['spotify_track_obj'] = json_to_track_playlist_object(track_info)
except Exception:
pass
downloaded_track = self.download_trackspo(
link_track,
output_dir=output_dir, quality_download=quality_download,
recursive_quality=recursive_quality, recursive_download=recursive_download,
not_interface=not_interface, custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format, pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay, retry_delay_increase=retry_delay_increase,
max_retries=max_retries, convert_to=convert_to, bitrate=bitrate,
save_cover=save_cover, market=market, playlist_context=playlist_ctx,
artist_separator=artist_separator, spotify_metadata=spotify_metadata
)
tracks.append(downloaded_track)
# After download, check status for summary
if getattr(downloaded_track, 'was_skipped', False):
skipped_tracks_cb.append(playlist_obj.tracks[index-1])
elif downloaded_track.success:
successful_tracks_cb.append(playlist_obj.tracks[index-1])
else:
failed_tracks_cb.append(failedTrackObject(track=playlist_obj.tracks[index-1], reason=getattr(downloaded_track, 'error_message', 'Unknown reason')))
except Exception as e:
logger.error(f"Track '{track_name}' in playlist '{playlist_obj.title}' failed: {e}")
failed_tracks_cb.append(failedTrackObject(track=playlist_obj.tracks[index-1], reason=str(e)))
current_track_object = Track({'music': track_name, 'artist': artist_name}, None, None, None, link_track, None)
current_track_object.success = False
current_track_object.error_message = str(e)
tracks.append(current_track_object)
# Finalize summary and callbacks (existing logic continues below in file)...
total_from_spotify = playlist_json['tracks']['total']
processed_count = len(successful_tracks_cb) + len(skipped_tracks_cb) + len(failed_tracks_cb)
if total_from_spotify != processed_count:
logger.warning(
f"Playlist '{playlist_obj.title}' metadata reports {total_from_spotify} tracks, "
f"but only {processed_count} were processed. This might indicate that not all pages of tracks were retrieved from Spotify."
)
from deezspot.libutils.write_m3u import write_tracks_to_m3u
m3u_path = write_tracks_to_m3u(output_dir, playlist_obj.title, tracks)
summary_obj = summaryObject(
successful_tracks=successful_tracks_cb,
skipped_tracks=skipped_tracks_cb,
failed_tracks=failed_tracks_cb,
total_successful=len(successful_tracks_cb),
total_skipped=len(skipped_tracks_cb),
total_failed=len(failed_tracks_cb)
)
# Include m3u path in summary and callback
summary_obj.m3u_path = m3u_path
status_obj_done = doneObject(ids=playlist_obj.ids, summary=summary_obj)
callback_obj_done = playlistCallbackObject(playlist=playlist_obj, status_info=status_obj_done)
report_progress(reporter=self.progress_reporter, callback_obj=callback_obj_done)
if make_zip:
zip_name = f"{output_dir}/playlist_{sanitize_name(playlist_obj.title)}.zip"
create_zip(tracks, zip_name=zip_name)
playlist.zip_path = zip_name
return playlist
def download_name(
self, artist, song,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
custom_dir_format=None,
custom_track_format=None,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
pad_tracks=True,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
artist_separator: str = "; "
) -> Track:
query = f"track:{song} artist:{artist}"
search = self.__spo.search(query)
items = search['tracks']['items']
if len(items) == 0:
msg = f"No result for {query} :("
raise TrackNotFound(message=msg)
link_track = items[0]['external_urls']['spotify']
track = self.download_trackspo(
link_track,
output_dir=output_dir,
quality_download=quality_download,
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
bitrate=bitrate,
save_cover=save_cover,
market=market,
artist_separator=artist_separator
)
return track
def download_episode(
self,
link_episode,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
artist_separator: str = "; "
) -> Episode:
logger.warning("Episode download logic is not fully refactored and might not work as expected with new reporting.")
link_is_valid(link_episode)
ids = get_ids(link_episode)
try:
# This will likely fail as API.tracking is gone.
episode_metadata = API.get_episode(ids)
except (NoDataApi, MarketAvailabilityError) as e:
raise TrackNotFound(url=link_episode, message=f"Episode not available: {e}") from e
except Exception:
# Fallback to GW API if public API fails for any reason
infos = self.__gw_api.get_episode_data(ids)
if not infos:
raise TrackNotFound(f"Episode {ids} not found")
episode_metadata = {
'music': infos.get('EPISODE_TITLE', ''), 'artist': infos.get('SHOW_NAME', ''),
'album': infos.get('SHOW_NAME', ''), 'date': infos.get('EPISODE_PUBLISHED_TIMESTAMP', '').split()[0],
'genre': 'Podcast', 'explicit': infos.get('SHOW_IS_EXPLICIT', '2'),
'disc': 1, 'track': 1, 'duration': int(infos.get('DURATION', 0)), 'isrc': None,
'image': infos.get('EPISODE_IMAGE_MD5', '')
}
preferences = Preferences()
preferences.link = link_episode
preferences.song_metadata = episode_metadata
preferences.quality_download = quality_download
preferences.output_dir = output_dir
preferences.ids = ids
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.max_retries = max_retries
preferences.convert_to = convert_to
preferences.bitrate = bitrate
preferences.save_cover = save_cover
preferences.is_episode = True
preferences.market = market
preferences.artist_separator = artist_separator
episode = DW_EPISODE(preferences).dw()
return episode
def download_smart(
self, link,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market=stock_market,
artist_separator: str = "; "
) -> Smart:
link_is_valid(link)
link = what_kind(link)
smart = Smart()
if "spotify.com" in link:
source = "spotify"
elif "deezer.com" in link:
source = "deezer"
else:
raise InvalidLink(link)
smart.source = source
# Smart download reporting can be enhanced later if needed
# For now, the individual download functions will do the reporting.
if "track/" in link:
func = self.download_trackspo if source == 'spotify' else self.download_trackdee
track = func(
link, output_dir=output_dir, quality_download=quality_download,
recursive_quality=recursive_quality, recursive_download=recursive_download,
not_interface=not_interface, custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format, pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay, retry_delay_increase=retry_delay_increase,
max_retries=max_retries, convert_to=convert_to, bitrate=bitrate,
save_cover=save_cover, market=market, artist_separator=artist_separator
)
smart.type = "track"
smart.track = track
elif "album/" in link:
func = self.download_albumspo if source == 'spotify' else self.download_albumdee
album = func(
link, output_dir=output_dir, quality_download=quality_download,
recursive_quality=recursive_quality, recursive_download=recursive_download,
not_interface=not_interface, make_zip=make_zip,
custom_dir_format=custom_dir_format, custom_track_format=custom_track_format,
pad_tracks=pad_tracks, initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase, max_retries=max_retries,
convert_to=convert_to, bitrate=bitrate, save_cover=save_cover,
market=market, artist_separator=artist_separator
)
smart.type = "album"
smart.album = album
elif "playlist/" in link:
func = self.download_playlistspo if source == 'spotify' else self.download_playlistdee
playlist = func(
link, output_dir=output_dir, quality_download=quality_download,
recursive_quality=recursive_quality, recursive_download=recursive_download,
not_interface=not_interface, make_zip=make_zip,
custom_dir_format=custom_dir_format, custom_track_format=custom_track_format,
pad_tracks=pad_tracks, initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase, max_retries=max_retries,
convert_to=convert_to, bitrate=bitrate, save_cover=save_cover,
market=market, artist_separator=artist_separator
)
smart.type = "playlist"
smart.playlist = playlist
return smart