#!/usr/bin/python3 import traceback from os.path import isfile from deezspot.easy_spoty import Spo from librespot.core import Session from deezspot.exceptions import InvalidLink, MarketAvailabilityError from deezspot.spotloader.__spo_api__ import tracking, tracking_album, tracking_episode from deezspot.spotloader.spotify_settings import stock_quality, stock_market from deezspot.libutils.utils import ( get_ids, link_is_valid, what_kind, ) from deezspot.models.download import ( Track, Album, Playlist, Preferences, Smart, Episode ) from deezspot.models.callback import trackCallbackObject, errorObject from deezspot.spotloader.__download__ import ( DW_TRACK, DW_ALBUM, DW_PLAYLIST, DW_EPISODE, Download_JOB, ) from deezspot.libutils.others_settings import ( stock_output, stock_recursive_quality, stock_recursive_download, stock_not_interface, stock_zip, stock_save_cover, stock_real_time_dl, stock_market ) from deezspot.libutils.logging_utils import logger, ProgressReporter, report_progress class SpoLogin: def __init__( self, credentials_path: str, spotify_client_id: str = None, spotify_client_secret: str = None, progress_callback = None, silent: bool = False ) -> None: self.credentials_path = credentials_path self.spotify_client_id = spotify_client_id self.spotify_client_secret = spotify_client_secret # Initialize Spotify API with credentials if provided if spotify_client_id and spotify_client_secret: Spo.__init__(client_id=spotify_client_id, client_secret=spotify_client_secret) logger.info("Initialized Spotify API with provided credentials") # Configure progress reporting self.progress_reporter = ProgressReporter(callback=progress_callback, silent=silent) self.__initialize_session() def __initialize_session(self) -> None: try: session_builder = Session.Builder() session_builder.conf.stored_credentials_file = self.credentials_path if isfile(self.credentials_path): session = session_builder.stored_file().create() logger.info("Successfully initialized Spotify session") else: logger.error("Credentials file not found") raise FileNotFoundError("Please fill your credentials.json location!") Download_JOB(session) Download_JOB.set_progress_reporter(self.progress_reporter) except Exception as e: logger.error(f"Failed to initialize Spotify session: {str(e)}") raise def download_track( self, link_track, output_dir=stock_output, quality_download=stock_quality, recursive_quality=stock_recursive_quality, recursive_download=stock_recursive_download, not_interface=stock_not_interface, real_time_dl=stock_real_time_dl, custom_dir_format=None, custom_track_format=None, pad_tracks=True, initial_retry_delay=30, retry_delay_increase=30, max_retries=5, convert_to=None, bitrate=None, save_cover=stock_save_cover, market: list[str] | None = stock_market ) -> Track: song_metadata = None try: link_is_valid(link_track) ids = get_ids(link_track) song_metadata = tracking(ids, market=market) if song_metadata is None: raise Exception(f"Could not retrieve metadata for track {link_track}. It might not be available or an API error occurred.") logger.info(f"Starting download for track: {song_metadata.title} - {'; '.join([a.name for a in song_metadata.artists])}") preferences = Preferences() preferences.real_time_dl = real_time_dl preferences.link = link_track preferences.song_metadata = song_metadata preferences.quality_download = quality_download preferences.output_dir = output_dir preferences.ids = ids preferences.recursive_quality = recursive_quality preferences.recursive_download = recursive_download preferences.not_interface = not_interface preferences.is_episode = False preferences.custom_dir_format = custom_dir_format preferences.custom_track_format = custom_track_format preferences.pad_tracks = pad_tracks preferences.initial_retry_delay = initial_retry_delay preferences.retry_delay_increase = retry_delay_increase preferences.max_retries = max_retries if convert_to is None: preferences.convert_to = None preferences.bitrate = None else: preferences.convert_to = convert_to preferences.bitrate = bitrate preferences.save_cover = save_cover preferences.market = market track = DW_TRACK(preferences).dw() return track except MarketAvailabilityError as e: logger.error(f"Track download failed due to market availability: {str(e)}") if song_metadata: status_obj = errorObject(ids=song_metadata.ids, error=str(e)) callback_obj = trackCallbackObject(track=song_metadata, status_info=status_obj) report_progress( reporter=self.progress_reporter, callback_obj=callback_obj ) raise except Exception as e: logger.error(f"Failed to download track: {str(e)}") traceback.print_exc() if song_metadata: status_obj = errorObject(ids=song_metadata.ids, error=str(e)) callback_obj = trackCallbackObject(track=song_metadata, status_info=status_obj) report_progress( reporter=self.progress_reporter, callback_obj=callback_obj ) raise e def download_album( self, link_album, output_dir=stock_output, quality_download=stock_quality, recursive_quality=stock_recursive_quality, recursive_download=stock_recursive_download, not_interface=stock_not_interface, make_zip=stock_zip, real_time_dl=stock_real_time_dl, custom_dir_format=None, custom_track_format=None, pad_tracks=True, initial_retry_delay=30, retry_delay_increase=30, max_retries=5, convert_to=None, bitrate=None, save_cover=stock_save_cover, market: list[str] | None = stock_market ) -> Album: try: link_is_valid(link_album) ids = get_ids(link_album) album_json = Spo.get_album(ids) if not album_json: raise Exception(f"Could not retrieve album data for {link_album}.") song_metadata = tracking_album(album_json, market=market) if song_metadata is None: raise Exception(f"Could not process album metadata for {link_album}. It might not be available in the specified market(s) or an API error occurred.") logger.info(f"Starting download for album: {song_metadata.title} - {'; '.join([a.name for a in song_metadata.artists])}") preferences = Preferences() preferences.real_time_dl = real_time_dl preferences.link = link_album preferences.song_metadata = song_metadata preferences.quality_download = quality_download preferences.output_dir = output_dir preferences.ids = ids preferences.json_data = album_json preferences.recursive_quality = recursive_quality preferences.recursive_download = recursive_download preferences.not_interface = not_interface preferences.make_zip = make_zip preferences.is_episode = False preferences.custom_dir_format = custom_dir_format preferences.custom_track_format = custom_track_format preferences.pad_tracks = pad_tracks preferences.initial_retry_delay = initial_retry_delay preferences.retry_delay_increase = retry_delay_increase preferences.max_retries = max_retries if convert_to is None: preferences.convert_to = None preferences.bitrate = None else: preferences.convert_to = convert_to preferences.bitrate = bitrate preferences.save_cover = save_cover preferences.market = market album = DW_ALBUM(preferences).dw() return album except MarketAvailabilityError as e: logger.error(f"Album download failed due to market availability: {str(e)}") raise except Exception as e: logger.error(f"Failed to download album: {str(e)}") traceback.print_exc() raise e def download_playlist( self, link_playlist, output_dir=stock_output, quality_download=stock_quality, recursive_quality=stock_recursive_quality, recursive_download=stock_recursive_download, not_interface=stock_not_interface, make_zip=stock_zip, real_time_dl=stock_real_time_dl, custom_dir_format=None, custom_track_format=None, pad_tracks=True, initial_retry_delay=30, retry_delay_increase=30, max_retries=5, convert_to=None, bitrate=None, save_cover=stock_save_cover, market: list[str] | None = stock_market ) -> Playlist: try: link_is_valid(link_playlist) ids = get_ids(link_playlist) song_metadata = [] playlist_json = Spo.get_playlist(ids) if not playlist_json: raise Exception(f"Could not retrieve playlist data for {link_playlist}.") logger.info(f"Starting download for playlist: {playlist_json.get('name', 'Unknown')}") playlist_tracks_data = playlist_json.get('tracks', {}).get('items', []) if not playlist_tracks_data: logger.warning(f"Playlist {link_playlist} has no tracks or could not be fetched.") # We can still proceed to create an empty playlist object for consistency song_metadata_list = [] for item in playlist_tracks_data: if not item or 'track' not in item or not item['track']: # Log a warning for items that are not valid tracks (e.g., local files, etc.) logger.warning(f"Skipping an item in playlist {link_playlist} as it does not appear to be a valid track object.") song_metadata_list.append({'error_type': 'invalid_track_object', 'error_message': 'Playlist item was not a valid track object.', 'name': 'Unknown Skipped Item', 'ids': None}) continue track_data = item['track'] track_id = track_data.get('id') if not track_id: logger.warning(f"Skipping an item in playlist {link_playlist} because it has no track ID.") song_metadata_list.append({'error_type': 'missing_track_id', 'error_message': 'Playlist item is missing a track ID.', 'name': track_data.get('name', 'Unknown Track without ID'), 'ids': None}) continue try: song_metadata = tracking(track_id, market=market) if song_metadata: song_metadata_list.append(song_metadata) else: # Create a placeholder for tracks that fail metadata fetching failed_track_info = {'error_type': 'metadata_fetch_failed', 'error_message': f"Failed to fetch metadata for track ID: {track_id}", 'name': track_data.get('name', f'Track ID {track_id}'), 'ids': track_id} song_metadata_list.append(failed_track_info) logger.warning(f"Could not retrieve metadata for track {track_id} in playlist {link_playlist}.") except MarketAvailabilityError as e: failed_track_info = {'error_type': 'market_availability_error', 'error_message': str(e), 'name': track_data.get('name', f'Track ID {track_id}'), 'ids': track_id} song_metadata_list.append(failed_track_info) logger.warning(str(e)) preferences = Preferences() preferences.real_time_dl = real_time_dl preferences.link = link_playlist preferences.song_metadata = song_metadata_list preferences.quality_download = quality_download preferences.output_dir = output_dir preferences.ids = ids preferences.json_data = playlist_json preferences.playlist_tracks_json = playlist_tracks_data preferences.recursive_quality = recursive_quality preferences.recursive_download = recursive_download preferences.not_interface = not_interface preferences.make_zip = make_zip preferences.is_episode = False preferences.custom_dir_format = custom_dir_format preferences.custom_track_format = custom_track_format preferences.pad_tracks = pad_tracks preferences.initial_retry_delay = initial_retry_delay preferences.retry_delay_increase = retry_delay_increase preferences.max_retries = max_retries if convert_to is None: preferences.convert_to = None preferences.bitrate = None else: preferences.convert_to = convert_to preferences.bitrate = bitrate preferences.save_cover = save_cover preferences.market = market playlist = DW_PLAYLIST(preferences).dw() return playlist except MarketAvailabilityError as e: logger.error(f"Playlist download failed due to market availability issues with one or more tracks: {str(e)}") raise except Exception as e: logger.error(f"Failed to download playlist: {str(e)}") traceback.print_exc() raise e def download_episode( self, link_episode, output_dir=stock_output, quality_download=stock_quality, recursive_quality=stock_recursive_quality, recursive_download=stock_recursive_download, not_interface=stock_not_interface, real_time_dl=stock_real_time_dl, custom_dir_format=None, custom_track_format=None, pad_tracks=True, initial_retry_delay=30, retry_delay_increase=30, max_retries=5, convert_to=None, bitrate=None, save_cover=stock_save_cover, market: list[str] | None = stock_market ) -> Episode: try: link_is_valid(link_episode) ids = get_ids(link_episode) episode_json = Spo.get_episode(ids) if not episode_json: raise Exception(f"Could not retrieve episode data for {link_episode} from API.") episode_metadata = tracking_episode(ids, market=market) if episode_metadata is None: raise Exception(f"Could not process episode metadata for {link_episode}. It might not be available in the specified market(s) or an API error occurred.") logger.info(f"Starting download for episode: {episode_metadata.title} - {episode_metadata.album.title}") preferences = Preferences() preferences.real_time_dl = real_time_dl preferences.link = link_episode preferences.song_metadata = episode_metadata preferences.output_dir = output_dir preferences.ids = ids preferences.json_data = episode_json preferences.recursive_quality = recursive_quality preferences.recursive_download = recursive_download preferences.not_interface = not_interface preferences.is_episode = True preferences.custom_dir_format = custom_dir_format preferences.custom_track_format = custom_track_format preferences.pad_tracks = pad_tracks preferences.initial_retry_delay = initial_retry_delay preferences.retry_delay_increase = retry_delay_increase preferences.max_retries = max_retries if convert_to is None: preferences.convert_to = None preferences.bitrate = None else: preferences.convert_to = convert_to preferences.bitrate = bitrate preferences.save_cover = save_cover preferences.market = market episode = DW_EPISODE(preferences).dw() return episode except MarketAvailabilityError as e: logger.error(f"Episode download failed due to market availability: {str(e)}") raise except Exception as e: logger.error(f"Failed to download episode: {str(e)}") traceback.print_exc() raise e def download_artist( self, link_artist, album_type: str = 'album,single,compilation,appears_on', limit: int = 50, output_dir=stock_output, quality_download=stock_quality, recursive_quality=stock_recursive_quality, recursive_download=stock_recursive_download, not_interface=stock_not_interface, make_zip=stock_zip, real_time_dl=stock_real_time_dl, custom_dir_format=None, custom_track_format=None, pad_tracks=True, initial_retry_delay=30, retry_delay_increase=30, max_retries=5, convert_to=None, bitrate=None, market: list[str] | None = stock_market, save_cover=stock_save_cover ): """ Download all albums (or a subset based on album_type and limit) from an artist. """ try: link_is_valid(link_artist) ids = get_ids(link_artist) discography = Spo.get_artist(ids, album_type=album_type, limit=limit) albums = discography.get('items', []) if not albums: logger.warning("No albums found for the provided artist") raise Exception("No albums found for the provided artist.") logger.info(f"Starting download for artist discography: {discography.get('name', 'Unknown')}") downloaded_albums = [] for album in albums: album_url = album.get('external_urls', {}).get('spotify') if not album_url: logger.warning(f"No URL found for album: {album.get('name', 'Unknown')}") continue downloaded_album = self.download_album( album_url, output_dir=output_dir, quality_download=quality_download, recursive_quality=recursive_quality, recursive_download=recursive_download, not_interface=not_interface, make_zip=make_zip, real_time_dl=real_time_dl, custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, pad_tracks=pad_tracks, initial_retry_delay=initial_retry_delay, retry_delay_increase=retry_delay_increase, max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, market=market, save_cover=save_cover ) downloaded_albums.append(downloaded_album) return downloaded_albums except Exception as e: logger.error(f"Failed to download artist discography: {str(e)}") traceback.print_exc() raise e def download_smart( self, link, output_dir=stock_output, quality_download=stock_quality, recursive_quality=stock_recursive_quality, recursive_download=stock_recursive_download, not_interface=stock_not_interface, make_zip=stock_zip, real_time_dl=stock_real_time_dl, custom_dir_format=None, custom_track_format=None, pad_tracks=True, initial_retry_delay=30, retry_delay_increase=30, max_retries=5, convert_to=None, bitrate=None, save_cover=stock_save_cover, market: list[str] | None = stock_market ) -> Smart: try: link_is_valid(link) link = what_kind(link) smart = Smart() if "spotify.com" in link: source = "https://spotify.com" smart.source = source logger.info(f"Starting smart download for: {link}") if "track/" in link: if not "spotify.com" in link: raise InvalidLink(link) track = self.download_track( link, output_dir=output_dir, quality_download=quality_download, recursive_quality=recursive_quality, recursive_download=recursive_download, not_interface=not_interface, real_time_dl=real_time_dl, custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, pad_tracks=pad_tracks, initial_retry_delay=initial_retry_delay, retry_delay_increase=retry_delay_increase, max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, save_cover=save_cover, market=market ) smart.type = "track" smart.track = track elif "album/" in link: if not "spotify.com" in link: raise InvalidLink(link) album = self.download_album( link, output_dir=output_dir, quality_download=quality_download, recursive_quality=recursive_quality, recursive_download=recursive_download, not_interface=not_interface, make_zip=make_zip, real_time_dl=real_time_dl, custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, pad_tracks=pad_tracks, initial_retry_delay=initial_retry_delay, retry_delay_increase=retry_delay_increase, max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, save_cover=save_cover, market=market ) smart.type = "album" smart.album = album elif "playlist/" in link: if not "spotify.com" in link: raise InvalidLink(link) playlist = self.download_playlist( link, output_dir=output_dir, quality_download=quality_download, recursive_quality=recursive_quality, recursive_download=recursive_download, not_interface=not_interface, make_zip=make_zip, real_time_dl=real_time_dl, custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, pad_tracks=pad_tracks, initial_retry_delay=initial_retry_delay, retry_delay_increase=retry_delay_increase, max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, save_cover=save_cover, market=market ) smart.type = "playlist" smart.playlist = playlist elif "episode/" in link: if not "spotify.com" in link: raise InvalidLink(link) episode = self.download_episode( link, output_dir=output_dir, quality_download=quality_download, recursive_quality=recursive_quality, recursive_download=recursive_download, not_interface=not_interface, real_time_dl=real_time_dl, custom_dir_format=custom_dir_format, custom_track_format=custom_track_format, pad_tracks=pad_tracks, initial_retry_delay=initial_retry_delay, retry_delay_increase=retry_delay_increase, max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, save_cover=save_cover, market=market ) smart.type = "episode" smart.episode = episode return smart except Exception as e: logger.error(f"Failed to perform smart download: {str(e)}") traceback.print_exc() raise e