Files
deezspot-spotizerr-dev/deezspot/spotloader/__init__.py
2025-06-04 14:44:14 -06:00

646 lines
27 KiB
Python

#!/usr/bin/python3
import traceback
from os.path import isfile
from deezspot.easy_spoty import Spo
from librespot.core import Session
from deezspot.exceptions import InvalidLink, MarketAvailabilityError
from deezspot.spotloader.__spo_api__ import tracking, tracking_album, tracking_episode
from deezspot.spotloader.spotify_settings import stock_quality, stock_market
from deezspot.libutils.utils import (
get_ids,
link_is_valid,
what_kind,
)
from deezspot.models import (
Track,
Album,
Playlist,
Preferences,
Smart,
Episode
)
from deezspot.spotloader.__download__ import (
DW_TRACK,
DW_ALBUM,
DW_PLAYLIST,
DW_EPISODE,
Download_JOB,
)
from deezspot.libutils.others_settings import (
stock_output,
stock_recursive_quality,
stock_recursive_download,
stock_not_interface,
stock_zip,
stock_save_cover,
stock_real_time_dl,
stock_market
)
from deezspot.libutils.logging_utils import logger, ProgressReporter
class SpoLogin:
def __init__(
self,
credentials_path: str,
spotify_client_id: str = None,
spotify_client_secret: str = None,
progress_callback = None,
silent: bool = False
) -> None:
self.credentials_path = credentials_path
self.spotify_client_id = spotify_client_id
self.spotify_client_secret = spotify_client_secret
# Initialize Spotify API with credentials if provided
if spotify_client_id and spotify_client_secret:
Spo.__init__(client_id=spotify_client_id, client_secret=spotify_client_secret)
logger.info("Initialized Spotify API with provided credentials")
# Configure progress reporting
self.progress_reporter = ProgressReporter(callback=progress_callback, silent=silent)
self.__initialize_session()
def report_progress(self, progress_data):
"""Report progress using the configured reporter."""
self.progress_reporter.report(progress_data)
def __initialize_session(self) -> None:
try:
session_builder = Session.Builder()
session_builder.conf.stored_credentials_file = self.credentials_path
if isfile(self.credentials_path):
session = session_builder.stored_file().create()
logger.info("Successfully initialized Spotify session")
else:
logger.error("Credentials file not found")
raise FileNotFoundError("Please fill your credentials.json location!")
Download_JOB(session)
Download_JOB.set_progress_reporter(self.progress_reporter)
except Exception as e:
logger.error(f"Failed to initialize Spotify session: {str(e)}")
raise
def download_track(
self, link_track,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market: list[str] | None = stock_market
) -> Track:
try:
link_is_valid(link_track)
ids = get_ids(link_track)
song_metadata = tracking(ids, market=market)
if song_metadata is None:
raise Exception(f"Could not retrieve metadata for track {link_track}. It might not be available or an API error occurred.")
logger.info(f"Starting download for track: {song_metadata.get('music', 'Unknown')} - {song_metadata.get('artist', 'Unknown')}")
preferences = Preferences()
preferences.real_time_dl = real_time_dl
preferences.link = link_track
preferences.song_metadata = song_metadata
preferences.quality_download = quality_download
preferences.output_dir = output_dir
preferences.ids = ids
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.is_episode = False
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
preferences.pad_tracks = pad_tracks
preferences.initial_retry_delay = initial_retry_delay
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
if convert_to is None:
preferences.convert_to = None
preferences.bitrate = None
else:
preferences.convert_to = convert_to
preferences.bitrate = bitrate
preferences.save_cover = save_cover
preferences.market = market
track = DW_TRACK(preferences).dw()
return track
except MarketAvailabilityError as e:
logger.error(f"Track download failed due to market availability: {str(e)}")
raise
except Exception as e:
logger.error(f"Failed to download track: {str(e)}")
traceback.print_exc()
raise e
def download_album(
self, link_album,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market: list[str] | None = stock_market
) -> Album:
try:
link_is_valid(link_album)
ids = get_ids(link_album)
album_json = Spo.get_album(ids)
if not album_json:
raise Exception(f"Could not retrieve album data for {link_album}.")
song_metadata = tracking_album(album_json, market=market)
if song_metadata is None:
raise Exception(f"Could not process album metadata for {link_album}. It might not be available in the specified market(s) or an API error occurred.")
logger.info(f"Starting download for album: {song_metadata.get('album', 'Unknown')} - {song_metadata.get('ar_album', 'Unknown')}")
preferences = Preferences()
preferences.real_time_dl = real_time_dl
preferences.link = link_album
preferences.song_metadata = song_metadata
preferences.quality_download = quality_download
preferences.output_dir = output_dir
preferences.ids = ids
preferences.json_data = album_json
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.make_zip = make_zip
preferences.is_episode = False
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
preferences.pad_tracks = pad_tracks
preferences.initial_retry_delay = initial_retry_delay
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
if convert_to is None:
preferences.convert_to = None
preferences.bitrate = None
else:
preferences.convert_to = convert_to
preferences.bitrate = bitrate
preferences.save_cover = save_cover
preferences.market = market
album = DW_ALBUM(preferences).dw()
return album
except MarketAvailabilityError as e:
logger.error(f"Album download failed due to market availability: {str(e)}")
raise
except Exception as e:
logger.error(f"Failed to download album: {str(e)}")
traceback.print_exc()
raise e
def download_playlist(
self, link_playlist,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market: list[str] | None = stock_market
) -> Playlist:
try:
link_is_valid(link_playlist)
ids = get_ids(link_playlist)
song_metadata = []
playlist_json = Spo.get_playlist(ids)
if not playlist_json:
raise Exception(f"Could not retrieve playlist data for {link_playlist}.")
logger.info(f"Starting download for playlist: {playlist_json.get('name', 'Unknown')}")
for track_item_wrapper in playlist_json['tracks']['items']:
track_info = track_item_wrapper.get('track')
c_song_metadata = None # Initialize for each item
if not track_info:
logger.warning(f"Skipping an item in playlist {playlist_json.get('name', 'Unknown Playlist')} as it does not appear to be a valid track object.")
# Create a placeholder for this unidentifiable item
c_song_metadata = {
'name': 'Unknown Skipped Item',
'ids': None,
'error_type': 'InvalidItemStructure',
'error_message': 'Playlist item was not a valid track object.'
}
song_metadata.append(c_song_metadata)
continue
track_name_for_logs = track_info.get('name', 'Unknown Track')
track_id_for_logs = track_info.get('id', 'Unknown ID') # Track's own ID if available
external_urls = track_info.get('external_urls')
if not external_urls or not external_urls.get('spotify'):
logger.warning(f"Track \"{track_name_for_logs}\" (ID: {track_id_for_logs}) in playlist {playlist_json.get('name', 'Unknown Playlist')} is not available on Spotify or has no URL.")
c_song_metadata = {
'name': track_name_for_logs,
'ids': track_id_for_logs, # Use track's own ID if available, otherwise will be None
'error_type': 'MissingTrackURL',
'error_message': f"Track \"{track_name_for_logs}\" is not available on Spotify or has no URL."
}
else:
track_spotify_url = external_urls['spotify']
track_ids_from_url = get_ids(track_spotify_url) # This is the ID used for fetching with 'tracking'
try:
# Market check for each track is done within tracking()
# Pass market. tracking() will raise MarketAvailabilityError if unavailable.
fetched_metadata = tracking(track_ids_from_url, market=market)
if fetched_metadata:
c_song_metadata = fetched_metadata
else:
# tracking() returned None, but didn't raise MarketAvailabilityError. General fetch error.
logger.warning(f"Could not retrieve full metadata for track {track_name_for_logs} (ID: {track_ids_from_url}, URL: {track_spotify_url}) in playlist {playlist_json.get('name', 'Unknown Playlist')}. API error or other issue.")
c_song_metadata = {
'name': track_name_for_logs,
'ids': track_ids_from_url,
'error_type': 'MetadataFetchError',
'error_message': f"Failed to fetch full metadata for track {track_name_for_logs}."
}
except MarketAvailabilityError as e:
logger.warning(f"Track {track_name_for_logs} (ID: {track_ids_from_url}, URL: {track_spotify_url}) in playlist {playlist_json.get('name', 'Unknown Playlist')} is not available in the specified market(s). Skipping. Error: {str(e)}")
c_song_metadata = {
'name': track_name_for_logs,
'ids': track_ids_from_url,
'error_type': 'MarketAvailabilityError',
'error_message': str(e)
}
except Exception as e_tracking: # Catch any other unexpected error from tracking()
logger.error(f"Unexpected error fetching metadata for track {track_name_for_logs} (ID: {track_ids_from_url}, URL: {track_spotify_url}): {str(e_tracking)}")
c_song_metadata = {
'name': track_name_for_logs,
'ids': track_ids_from_url,
'error_type': 'UnexpectedTrackingError',
'error_message': f"Unexpected error fetching metadata: {str(e_tracking)}"
}
if c_song_metadata: # Ensure something is appended
song_metadata.append(c_song_metadata)
else:
# This case should ideally not be reached if logic above is complete
logger.error(f"Logic error: c_song_metadata remained None for track {track_name_for_logs} in playlist {playlist_json.get('name', 'Unknown Playlist')}")
song_metadata.append({
'name': track_name_for_logs,
'ids': track_id_for_logs or track_ids_from_url,
'error_type': 'InternalLogicError',
'error_message': 'Internal error processing playlist track metadata.'
})
preferences = Preferences()
preferences.real_time_dl = real_time_dl
preferences.link = link_playlist
preferences.song_metadata = song_metadata
preferences.quality_download = quality_download
preferences.output_dir = output_dir
preferences.ids = ids
preferences.json_data = playlist_json
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.make_zip = make_zip
preferences.is_episode = False
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
preferences.pad_tracks = pad_tracks
preferences.initial_retry_delay = initial_retry_delay
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
if convert_to is None:
preferences.convert_to = None
preferences.bitrate = None
else:
preferences.convert_to = convert_to
preferences.bitrate = bitrate
preferences.save_cover = save_cover
preferences.market = market
playlist = DW_PLAYLIST(preferences).dw()
return playlist
except MarketAvailabilityError as e:
logger.error(f"Playlist download failed due to market availability issues with one or more tracks: {str(e)}")
raise
except Exception as e:
logger.error(f"Failed to download playlist: {str(e)}")
traceback.print_exc()
raise e
def download_episode(
self, link_episode,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market: list[str] | None = stock_market
) -> Episode:
try:
link_is_valid(link_episode)
ids = get_ids(link_episode)
episode_json = Spo.get_episode(ids)
if not episode_json:
raise Exception(f"Could not retrieve episode data for {link_episode} from API.")
episode_metadata = tracking_episode(ids, market=market)
if episode_metadata is None:
raise Exception(f"Could not process episode metadata for {link_episode}. It might not be available in the specified market(s) or an API error occurred.")
logger.info(f"Starting download for episode: {episode_metadata.get('name', 'Unknown')} - {episode_metadata.get('show', 'Unknown')}")
preferences = Preferences()
preferences.real_time_dl = real_time_dl
preferences.link = link_episode
preferences.song_metadata = episode_metadata
preferences.output_dir = output_dir
preferences.ids = ids
preferences.json_data = episode_json
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.is_episode = True
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
preferences.pad_tracks = pad_tracks
preferences.initial_retry_delay = initial_retry_delay
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
if convert_to is None:
preferences.convert_to = None
preferences.bitrate = None
else:
preferences.convert_to = convert_to
preferences.bitrate = bitrate
preferences.save_cover = save_cover
preferences.market = market
episode = DW_EPISODE(preferences).dw()
return episode
except MarketAvailabilityError as e:
logger.error(f"Episode download failed due to market availability: {str(e)}")
raise
except Exception as e:
logger.error(f"Failed to download episode: {str(e)}")
traceback.print_exc()
raise e
def download_artist(
self, link_artist,
album_type: str = 'album,single,compilation,appears_on',
limit: int = 50,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
market: list[str] | None = stock_market,
save_cover=stock_save_cover
):
"""
Download all albums (or a subset based on album_type and limit) from an artist.
"""
try:
link_is_valid(link_artist)
ids = get_ids(link_artist)
discography = Spo.get_artist(ids, album_type=album_type, limit=limit)
albums = discography.get('items', [])
if not albums:
logger.warning("No albums found for the provided artist")
raise Exception("No albums found for the provided artist.")
logger.info(f"Starting download for artist discography: {discography.get('name', 'Unknown')}")
downloaded_albums = []
for album in albums:
album_url = album.get('external_urls', {}).get('spotify')
if not album_url:
logger.warning(f"No URL found for album: {album.get('name', 'Unknown')}")
continue
downloaded_album = self.download_album(
album_url,
output_dir=output_dir,
quality_download=quality_download,
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
make_zip=make_zip,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
bitrate=bitrate,
market=market,
save_cover=save_cover
)
downloaded_albums.append(downloaded_album)
return downloaded_albums
except Exception as e:
logger.error(f"Failed to download artist discography: {str(e)}")
traceback.print_exc()
raise e
def download_smart(
self, link,
output_dir=stock_output,
quality_download=stock_quality,
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None,
bitrate=None,
save_cover=stock_save_cover,
market: list[str] | None = stock_market
) -> Smart:
try:
link_is_valid(link)
link = what_kind(link)
smart = Smart()
if "spotify.com" in link:
source = "https://spotify.com"
smart.source = source
logger.info(f"Starting smart download for: {link}")
if "track/" in link:
if not "spotify.com" in link:
raise InvalidLink(link)
track = self.download_track(
link,
output_dir=output_dir,
quality_download=quality_download,
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
bitrate=bitrate,
save_cover=save_cover,
market=market
)
smart.type = "track"
smart.track = track
elif "album/" in link:
if not "spotify.com" in link:
raise InvalidLink(link)
album = self.download_album(
link,
output_dir=output_dir,
quality_download=quality_download,
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
make_zip=make_zip,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
bitrate=bitrate,
save_cover=save_cover,
market=market
)
smart.type = "album"
smart.album = album
elif "playlist/" in link:
if not "spotify.com" in link:
raise InvalidLink(link)
playlist = self.download_playlist(
link,
output_dir=output_dir,
quality_download=quality_download,
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
make_zip=make_zip,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
bitrate=bitrate,
save_cover=save_cover,
market=market
)
smart.type = "playlist"
smart.playlist = playlist
elif "episode/" in link:
if not "spotify.com" in link:
raise InvalidLink(link)
episode = self.download_episode(
link,
output_dir=output_dir,
quality_download=quality_download,
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
bitrate=bitrate,
save_cover=save_cover,
market=market
)
smart.type = "episode"
smart.episode = episode
return smart
except Exception as e:
logger.error(f"Failed to perform smart download: {str(e)}")
traceback.print_exc()
raise e