diff --git a/deezspot/deezloader/__init__.py b/deezspot/deezloader/__init__.py index c7c97a3..e2351cd 100644 --- a/deezspot/deezloader/__init__.py +++ b/deezspot/deezloader/__init__.py @@ -47,6 +47,7 @@ from deezspot.libutils.others_settings import ( ) from deezspot.libutils.logging_utils import ProgressReporter, logger, report_progress import requests +from librespot.core import Session from deezspot.models.callback.callbacks import ( trackCallbackObject, @@ -97,6 +98,8 @@ class DeeLogin: # Store Spotify credentials self.spotify_client_id = spotify_client_id self.spotify_client_secret = spotify_client_secret + # Optional path to Spotify credentials.json (env override or CWD default) + self.spotify_credentials_path = os.environ.get("SPOTIFY_CREDENTIALS_PATH") or os.path.join(os.getcwd(), "credentials.json") # Initialize Spotify API if credentials are provided if spotify_client_id and spotify_client_secret: @@ -120,6 +123,27 @@ class DeeLogin: # Set the progress reporter for Download_JOB Download_JOB.set_progress_reporter(self.progress_reporter) + def _ensure_spotify_session(self) -> None: + """Ensure Spo has an attached librespot Session. Used only by spo->dee flows.""" + try: + # Check if Spo already has a session (accessing private attr is ok internally) + has_session = getattr(Spo, f"_{Spo.__name__}__session", None) is not None + if has_session: + return + except Exception: + pass + + cred_path = self.spotify_credentials_path + if not os.path.isfile(cred_path): + raise FileNotFoundError( + f"Spotify session not initialized. Missing credentials.json at '{cred_path}'. " + "Set SPOTIFY_CREDENTIALS_PATH or place credentials.json in the working directory." + ) + builder = Session.Builder() + builder.conf.stored_credentials_file = cred_path + session = builder.stored_file().create() + Spo.set_session(session) + def download_trackdee( self, link_track, output_dir=stock_output, @@ -424,6 +448,8 @@ class DeeLogin: return names def convert_spoty_to_dee_link_track(self, link_track): + # Ensure Spotify session only when using spo->dee conversion + self._ensure_spotify_session() link_is_valid(link_track) ids = get_ids(link_track) @@ -516,6 +542,8 @@ class DeeLogin: return track_link_dee def convert_spoty_to_dee_link_album(self, link_album): + # Ensure Spotify session only when using spo->dee conversion + self._ensure_spotify_session() link_is_valid(link_album) ids = get_ids(link_album) @@ -656,6 +684,8 @@ class DeeLogin: spotify_album_obj = None if spotify_metadata: + # Only initialize Spotify session when we actually need Spotify metadata + self._ensure_spotify_session() try: # Fetch full Spotify album with tracks once and convert to albumObject from deezspot.spotloader.__spo_api__ import tracking_album as spo_tracking_album @@ -716,7 +746,34 @@ class DeeLogin: link_is_valid(link_playlist) ids = get_ids(link_playlist) + # Ensure Spotify session for fetching playlist and tracks + self._ensure_spotify_session() + playlist_json = Spo.get_playlist(ids) + # Ensure we keep the playlist ID for callbacks + if 'id' not in playlist_json: + playlist_json['id'] = ids + + # Enrich items with full track objects so downstream expects Web API shape + try: + items = playlist_json.get('tracks', {}).get('items', []) or [] + track_ids = [it.get('track', {}).get('id') for it in items if it.get('track') and it['track'].get('id')] + full = Spo.get_tracks(track_ids) if track_ids else {'tracks': []} + full_list = full.get('tracks') or [] + full_by_id = {t.get('id'): t for t in full_list if t and t.get('id')} + new_items = [] + for it in items: + tid = (it.get('track') or {}).get('id') + full_track = full_by_id.get(tid) + if full_track: + new_items.append({'track': full_track}) + else: + new_items.append(it) + playlist_json['tracks']['items'] = new_items + except Exception: + # If enrichment fails, continue with minimal ids + pass + # Extract track metadata for playlist callback object playlist_tracks_for_callback = [] @@ -793,7 +850,7 @@ class DeeLogin: playlist_obj = playlistCbObject( title=playlist_json['name'], owner=userObject(name=playlist_json.get('owner', {}).get('display_name', 'Unknown Owner')), - ids=IDs(spotify=playlist_json['id']), + ids=IDs(spotify=playlist_json.get('id', ids)), tracks=playlist_tracks_for_callback # Populate tracks array with track objects ) @@ -833,6 +890,10 @@ class DeeLogin: track_name = track_info.get('name', 'Unknown Track') artist_name = track_info['artists'][0]['name'] if track_info.get('artists') else 'Unknown Artist' link_track = track_info.get('external_urls', {}).get('spotify') + if not link_track: + tid = track_info.get('id') + if tid: + link_track = f"https://open.spotify.com/track/{tid}" if not link_track: logger.warning(f"The track \"{track_name}\" is not available on Spotify :(") diff --git a/deezspot/easy_spoty.py b/deezspot/easy_spoty.py index dde84be..c4be1bc 100644 --- a/deezspot/easy_spoty.py +++ b/deezspot/easy_spoty.py @@ -1,342 +1,551 @@ #!/usr/bin/python3 -from spotipy import Spotify +from librespot.core import Session, SearchManager +from librespot.metadata import TrackId, AlbumId, ArtistId, EpisodeId, ShowId, PlaylistId from deezspot.exceptions import InvalidLink -from spotipy.exceptions import SpotifyException -from spotipy.oauth2 import SpotifyClientCredentials -import os +from typing import Any, Dict, List, Optional + +# Note: We intentionally avoid importing spotipy. This module is now a +# thin shim over librespot's internal API, returning Web-API-shaped dicts +# consumed by spotloader's converters. class Spo: __error_codes = [404, 400] - - # Class-level API instance and credentials - __api = None - __client_id = None - __client_secret = None + + # Class-level references + __session: Optional[Session] = None __initialized = False @classmethod - def __init__(cls, client_id, client_secret): - """ - Initialize the Spotify API client. - - Args: - client_id (str): Spotify API client ID. - client_secret (str): Spotify API client secret. - """ - if not client_id or not client_secret: - raise ValueError("Spotify API credentials required. Provide client_id and client_secret.") - - client_credentials_manager = SpotifyClientCredentials( - client_id=client_id, - client_secret=client_secret - ) - - # Store the credentials and API instance - cls.__client_id = client_id - cls.__client_secret = client_secret - cls.__api = Spotify( - auth_manager=client_credentials_manager - ) + def set_session(cls, session: Session): + """Attach an active librespot Session for metadata/search operations.""" + cls.__session = session + cls.__initialized = True + + @classmethod + def __init__(cls, client_id=None, client_secret=None): + """Kept for compatibility; no longer used (librespot session is used).""" cls.__initialized = True @classmethod def __check_initialized(cls): - """Check if the class has been initialized with credentials""" - if not cls.__initialized: - raise ValueError("Spotify API not initialized. Call Spo.__init__(client_id, client_secret) first.") + if not cls.__initialized or cls.__session is None: + raise ValueError("Spotify session not initialized. Ensure SpoLogin created a librespot Session and called Spo.set_session(session).") - @classmethod - def __get_api(cls, client_id=None, client_secret=None): - """ - Get a Spotify API instance with the provided credentials or use stored credentials. - - Args: - client_id (str, optional): Spotify API client ID - client_secret (str, optional): Spotify API client secret - - Returns: - A Spotify API instance - """ - # If new credentials are provided, create a new API instance - if client_id and client_secret: - client_credentials_manager = SpotifyClientCredentials( - client_id=client_id, - client_secret=client_secret - ) - return Spotify(auth_manager=client_credentials_manager) - - # Otherwise, use the existing class-level API - cls.__check_initialized() - return cls.__api - - @classmethod - def __lazy(cls, results, api=None): - """Process paginated results and extend the initial page's items in-place.""" - api = api or cls.__api - if not results or 'items' not in results: - return results - items_ref = results['items'] - - while results.get('next'): - results = api.next(results) - if results and 'items' in results: - items_ref.extend(results['items']) + # ------------------------- helpers ------------------------- + @staticmethod + def __base62_from_gid(gid_bytes: bytes, kind: str) -> Optional[str]: + if not gid_bytes: + return None + hex_id = gid_bytes.hex() + try: + if kind == 'track': + obj = TrackId.from_hex(hex_id) + elif kind == 'album': + obj = AlbumId.from_hex(hex_id) + elif kind == 'artist': + obj = ArtistId.from_hex(hex_id) + elif kind == 'episode': + obj = EpisodeId.from_hex(hex_id) + elif kind == 'show': + obj = ShowId.from_hex(hex_id) + elif kind == 'playlist': + # PlaylistId typically not hex-backed in same way, avoid for playlists here + return None else: - break + return None + uri = obj.to_spotify_uri() + return uri.split(":")[-1] + except Exception: + return None - return results + @staticmethod + def __external_ids_to_dict(external_ids) -> Dict[str, str]: + # Map repeated ExternalId { type, id } to a simple dict + result: Dict[str, str] = {} + try: + for ext in external_ids or []: + t = getattr(ext, 'type', None) + v = getattr(ext, 'id', None) + if t and v: + result[t.lower()] = v + except Exception: + pass + return result + + @staticmethod + def __images_from_group(img_group) -> List[Dict[str, Any]]: + images: List[Dict[str, Any]] = [] + try: + for im in getattr(img_group, 'image', []) or []: + fid = getattr(im, 'file_id', None) + if fid: + hex_id = fid.hex() + images.append({ + 'url': f"https://i.scdn.co/image/{hex_id}", + 'width': getattr(im, 'width', 0), + 'height': getattr(im, 'height', 0) + }) + except Exception: + pass + return images + + @staticmethod + def __images_from_repeated(imgs) -> List[Dict[str, Any]]: + images: List[Dict[str, Any]] = [] + try: + for im in imgs or []: + fid = getattr(im, 'file_id', None) + if fid: + hex_id = fid.hex() + images.append({ + 'url': f"https://i.scdn.co/image/{hex_id}", + 'width': getattr(im, 'width', 0), + 'height': getattr(im, 'height', 0) + }) + except Exception: + pass + return images @classmethod - def __fetch_all_album_tracks(cls, album_id: str, api: Spotify) -> dict: - """ - Fetch all tracks for an album using album_tracks pagination. - Returns a dict shaped like Spotify's 'tracks' object with all items merged. - """ - all_items = [] - limit = 50 - offset = 0 - first_page = None - while True: - page = api.album_tracks(album_id, limit=limit, offset=offset) - if first_page is None: - first_page = dict(page) if page is not None else None - items = page.get('items', []) if page else [] - if not items: - break - all_items.extend(items) - offset += len(items) - if page.get('next') is None: - break - if first_page is None: - return {'items': [], 'total': 0, 'limit': limit, 'offset': 0} - # Build a consolidated tracks object - total_val = first_page.get('total', len(all_items)) + def __artist_proto_to_dict(cls, a_proto) -> Dict[str, Any]: + gid = getattr(a_proto, 'gid', None) return { - 'items': all_items, - 'total': total_val, - 'limit': limit, - 'offset': 0, - 'next': None, - 'previous': None + 'id': cls.__base62_from_gid(gid, 'artist'), + 'name': getattr(a_proto, 'name', '') } @classmethod - def get_track(cls, ids, client_id=None, client_secret=None): - """ - Get track information by ID. - - Args: - ids (str): Spotify track ID - client_id (str, optional): Optional custom Spotify client ID - client_secret (str, optional): Optional custom Spotify client secret - - Returns: - dict: Track information - """ - api = cls.__get_api(client_id, client_secret) - try: - track_json = api.track(ids) - except SpotifyException as error: - if error.http_status in cls.__error_codes: - raise InvalidLink(ids) + def __track_proto_to_web_dict(cls, t_proto, parent_album: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + if t_proto is None: + return {} + gid = getattr(t_proto, 'gid', None) + artists = [cls.__artist_proto_to_dict(a) for a in getattr(t_proto, 'artist', [])] + external_ids_map = cls.__external_ids_to_dict(getattr(t_proto, 'external_id', [])) + # Album for a track inside Album.disc.track is often simplified in proto + album_dict = parent_album or {} + return { + 'id': cls.__base62_from_gid(gid, 'track'), + 'name': getattr(t_proto, 'name', ''), + 'duration_ms': getattr(t_proto, 'duration', 0), + 'explicit': getattr(t_proto, 'explicit', False), + 'track_number': getattr(t_proto, 'number', 1), + 'disc_number': getattr(t_proto, 'disc_number', 1), + 'artists': artists, + 'external_ids': external_ids_map, + 'available_markets': None, # Not derived; market check code handles None by warning and continuing + 'album': album_dict + } - return track_json + @classmethod + def __album_proto_to_web_dict(cls, a_proto) -> Dict[str, Any]: + if a_proto is None: + return {} + gid = getattr(a_proto, 'gid', None) + # Album basic fields + title = getattr(a_proto, 'name', '') + album_type = None + try: + # Map enum to typical Web API strings when possible + t_val = getattr(a_proto, 'type', None) + if t_val is not None: + # Common mapping heuristic + # 1 = ALBUM, 2 = SINGLE, 3 = COMPILATION (values may differ by proto) + mapping = {1: 'album', 2: 'single', 3: 'compilation'} + album_type = mapping.get(int(t_val), None) + except Exception: + album_type = None + + # Date + release_date_str = '' + release_date_precision = 'day' + try: + date = getattr(a_proto, 'date', None) + year = getattr(date, 'year', 0) if date else 0 + month = getattr(date, 'month', 0) if date else 0 + day = getattr(date, 'day', 0) if date else 0 + if year and month and day: + release_date_str = f"{year:04d}-{month:02d}-{day:02d}" + release_date_precision = 'day' + elif year and month: + release_date_str = f"{year:04d}-{month:02d}" + release_date_precision = 'month' + elif year: + release_date_str = f"{year:04d}" + release_date_precision = 'year' + except Exception: + pass + + # Artists + artists = [cls.__artist_proto_to_dict(a) for a in getattr(a_proto, 'artist', [])] + + # Genres + genres = list(getattr(a_proto, 'genre', []) or []) + + # External IDs (e.g., upc) + external_ids_map = cls.__external_ids_to_dict(getattr(a_proto, 'external_id', [])) + + # Images + images: List[Dict[str, Any]] = [] + try: + cg = getattr(a_proto, 'cover_group', None) + if cg: + images = cls.__images_from_group(cg) + if not images: + images = cls.__images_from_repeated(getattr(a_proto, 'cover', []) or []) + except Exception: + images = [] + + # Tracks from discs + items: List[Dict[str, Any]] = [] + total_tracks = 0 + try: + for disc in getattr(a_proto, 'disc', []) or []: + disc_number = getattr(disc, 'number', 1) + for t in getattr(disc, 'track', []) or []: + total_tracks += 1 + # Album context passed minimally for track mapper + parent_album_min = { + 'id': cls.__base62_from_gid(gid, 'album'), + 'name': title, + 'album_type': album_type, + 'release_date': release_date_str, + 'release_date_precision': release_date_precision, + 'total_tracks': None, + 'images': images, + 'genres': genres, + 'artists': artists, + 'external_ids': external_ids_map, + 'available_markets': None + } + # Ensure numbering aligns with album context + setattr(t, 'disc_number', disc_number) + item = cls.__track_proto_to_web_dict(t, parent_album=parent_album_min) + # Override with correct numbering if proto uses different fields + item['disc_number'] = disc_number + if 'track_number' not in item or not item['track_number']: + item['track_number'] = getattr(t, 'number', 1) + items.append(item) + except Exception: + items = [] + + album_dict: Dict[str, Any] = { + 'id': cls.__base62_from_gid(gid, 'album'), + 'name': title, + 'album_type': album_type, + 'release_date': release_date_str, + 'release_date_precision': release_date_precision, + 'total_tracks': total_tracks or getattr(a_proto, 'num_tracks', 0), + 'genres': genres, + 'images': images, # Web API-like images with i.scdn.co URLs + 'copyrights': [], + 'available_markets': None, + 'external_ids': external_ids_map, + 'artists': artists, + 'tracks': { + 'items': items, + 'total': len(items), + 'limit': len(items), + 'offset': 0, + 'next': None, + 'previous': None + } + } + return album_dict + + # ------------------------- public API ------------------------- + @classmethod + def get_track(cls, ids, client_id=None, client_secret=None): + cls.__check_initialized() + try: + t_id = TrackId.from_base62(ids) + t_proto = cls.__session.api().get_metadata_4_track(t_id) + if not t_proto: + raise InvalidLink(ids) + # Build minimal album context from nested album proto if present + album_proto = getattr(t_proto, 'album', None) + album_ctx = None + try: + if album_proto is not None: + agid = getattr(album_proto, 'gid', None) + # Images for embedded album + images: List[Dict[str, Any]] = [] + try: + cg = getattr(album_proto, 'cover_group', None) + if cg: + images = cls.__images_from_group(cg) + if not images: + images = cls.__images_from_repeated(getattr(album_proto, 'cover', []) or []) + except Exception: + images = [] + album_ctx = { + 'id': cls.__base62_from_gid(agid, 'album'), + 'name': getattr(album_proto, 'name', ''), + 'images': images, + 'genres': [], + 'available_markets': None + } + except Exception: + album_ctx = None + return cls.__track_proto_to_web_dict(t_proto, parent_album=album_ctx) + except InvalidLink: + raise + except Exception: + raise InvalidLink(ids) @classmethod def get_tracks(cls, ids: list, market: str = None, client_id=None, client_secret=None): - """ - Get information for multiple tracks by a list of IDs. - Handles chunking by 50 IDs per request and merges results while preserving order. - - Args: - ids (list): A list of Spotify track IDs. - market (str, optional): An ISO 3166-1 alpha-2 country code. - client_id (str, optional): Optional custom Spotify client ID. - client_secret (str, optional): Optional custom Spotify client secret. - - Returns: - dict: A dictionary containing a list of track information under key 'tracks'. - """ if not ids: return {'tracks': []} - - api = cls.__get_api(client_id, client_secret) - all_tracks = [] - chunk_size = 50 - try: - for i in range(0, len(ids), chunk_size): - chunk = ids[i:i + chunk_size] - resp = api.tracks(chunk, market=market) if market else api.tracks(chunk) - # Spotify returns {'tracks': [...]} for each chunk - chunk_tracks = resp.get('tracks', []) if resp else [] - all_tracks.extend(chunk_tracks) - except SpotifyException as error: - if error.http_status in cls.__error_codes: - # Create a string of the first few IDs for the error message - ids_preview = ', '.join(ids[:3]) + ('...' if len(ids) > 3 else '') - raise InvalidLink(f"one or more IDs in the list: [{ids_preview}]") - else: - raise - - return {'tracks': all_tracks} + cls.__check_initialized() + tracks: List[Dict[str, Any]] = [] + for tid in ids: + try: + tracks.append(cls.get_track(tid)) + except Exception: + # Preserve order with None entries similar to Web API behavior on bad IDs + tracks.append(None) + return {'tracks': tracks} @classmethod def get_album(cls, ids, client_id=None, client_secret=None): - """ - Get album information by ID and include all tracks (paged if needed). - - Args: - ids (str): Spotify album ID - client_id (str, optional): Optional custom Spotify client ID - client_secret (str, optional): Optional custom Spotify client secret - - Returns: - dict: Album information with full 'tracks.items' - """ - api = cls.__get_api(client_id, client_secret) + cls.__check_initialized() try: - album_json = api.album(ids) - except SpotifyException as error: - if error.http_status in cls.__error_codes: + a_id = AlbumId.from_base62(ids) + a_proto = cls.__session.api().get_metadata_4_album(a_id) + if not a_proto: raise InvalidLink(ids) - else: - raise - - # Replace/ensure tracks contains all items via dedicated pagination endpoint - try: - full_tracks_obj = cls.__fetch_all_album_tracks(ids, api) - if isinstance(album_json, dict): - album_json['tracks'] = full_tracks_obj + return cls.__album_proto_to_web_dict(a_proto) + except InvalidLink: + raise except Exception: - # Fallback to lazy-paging over embedded 'tracks' if available - try: - tracks = album_json.get('tracks') if isinstance(album_json, dict) else None - if tracks: - cls.__lazy(tracks, api) - except Exception: - pass - - return album_json + raise InvalidLink(ids) @classmethod def get_playlist(cls, ids, client_id=None, client_secret=None): - """ - Get playlist information by ID. - - Args: - ids (str): Spotify playlist ID - client_id (str, optional): Optional custom Spotify client ID - client_secret (str, optional): Optional custom Spotify client secret - - Returns: - dict: Playlist information - """ - api = cls.__get_api(client_id, client_secret) + cls.__check_initialized() try: - playlist_json = api.playlist(ids) - except SpotifyException as error: - if error.http_status in cls.__error_codes: + # PlaylistId accepts base62-ish/id string directly + p_id = PlaylistId(ids) + p_proto = cls.__session.api().get_playlist(p_id) + if not p_proto: raise InvalidLink(ids) + # Minimal mapping sufficient for current playlist flow + name = None + try: + attrs = getattr(p_proto, 'attributes', None) + name = getattr(attrs, 'name', None) if attrs else None + except Exception: + name = None + owner_name = getattr(p_proto, 'owner_username', None) or 'Unknown Owner' + items = [] + try: + contents = getattr(p_proto, 'contents', None) + for it in getattr(contents, 'items', []) or []: + # Attempt to obtain a track gid/id from multiple potential locations + tref = getattr(it, 'track', None) + gid = getattr(tref, 'gid', None) if tref else None + base62 = cls.__base62_from_gid(gid, 'track') if gid else None - tracks = playlist_json['tracks'] - cls.__lazy(tracks, api) + # Some playlists can reference an "original_track" field + if not base62: + orig = getattr(it, 'original_track', None) + ogid = getattr(orig, 'gid', None) if orig else None + base62 = cls.__base62_from_gid(ogid, 'track') if ogid else None - return playlist_json + # As an additional fallback, try to parse a spotify:track: URI if exposed on the nested track + if not base62: + uri = getattr(tref, 'uri', None) if tref else None + if isinstance(uri, str) and uri.startswith("spotify:track:"): + try: + parts = uri.split(":") + maybe_id = parts[-1] if parts else None + if maybe_id and len(maybe_id) == 22: + base62 = maybe_id + elif maybe_id and len(maybe_id) in (32, 40): + from librespot.metadata import TrackId + tid = TrackId.from_hex(maybe_id) + base62 = tid.to_spotify_uri().split(":")[-1] + except Exception: + base62 = None + + # Fallback: some implementations expose the URI at the item level + if not base62: + item_uri = getattr(it, 'uri', None) + if isinstance(item_uri, str) and item_uri.startswith("spotify:track:"): + try: + parts = item_uri.split(":") + maybe_id = parts[-1] if parts else None + if maybe_id and len(maybe_id) == 22: + base62 = maybe_id + elif maybe_id and len(maybe_id) in (32, 40): + from librespot.metadata import TrackId + tid = TrackId.from_hex(maybe_id) + base62 = tid.to_spotify_uri().split(":")[-1] + except Exception: + base62 = None + + # Fallback: check original_track uri if provided + if not base62: + orig = getattr(it, 'original_track', None) + o_uri = getattr(orig, 'uri', None) if orig else None + if isinstance(o_uri, str) and o_uri.startswith("spotify:track:"): + try: + parts = o_uri.split(":") + maybe_id = parts[-1] if parts else None + if maybe_id and len(maybe_id) == 22: + base62 = maybe_id + elif maybe_id and len(maybe_id) in (32, 40): + from librespot.metadata import TrackId + tid = TrackId.from_hex(maybe_id) + base62 = tid.to_spotify_uri().split(":")[-1] + except Exception: + base62 = None + + if base62: + items.append({'track': {'id': base62}}) + except Exception: + items = [] + return { + 'name': name or 'Unknown Playlist', + 'owner': {'display_name': owner_name}, + 'images': [], + 'tracks': {'items': items, 'total': len(items)} + } + except InvalidLink: + raise + except Exception: + raise InvalidLink(ids) @classmethod def get_episode(cls, ids, client_id=None, client_secret=None): - """ - Get episode information by ID. - - Args: - ids (str): Spotify episode ID - client_id (str, optional): Optional custom Spotify client ID - client_secret (str, optional): Optional custom Spotify client secret - - Returns: - dict: Episode information - """ - api = cls.__get_api(client_id, client_secret) + cls.__check_initialized() try: - episode_json = api.episode(ids) - except SpotifyException as error: - if error.http_status in cls.__error_codes: + e_id = EpisodeId.from_base62(ids) + e_proto = cls.__session.api().get_metadata_4_episode(e_id) + if not e_proto: raise InvalidLink(ids) - - return episode_json + # Map show info + show_proto = getattr(e_proto, 'show', None) + show_id = None + show_name = '' + publisher = '' + try: + sgid = getattr(show_proto, 'gid', None) if show_proto else None + show_id = cls.__base62_from_gid(sgid, 'show') if sgid else None + show_name = getattr(show_proto, 'name', '') if show_proto else '' + publisher = getattr(show_proto, 'publisher', '') if show_proto else '' + except Exception: + pass + # Images for episode (cover_image ImageGroup) + images: List[Dict[str, Any]] = [] + try: + images = cls.__images_from_group(getattr(e_proto, 'cover_image', None)) + except Exception: + images = [] + return { + 'id': cls.__base62_from_gid(getattr(e_proto, 'gid', None), 'episode'), + 'name': getattr(e_proto, 'name', ''), + 'duration_ms': getattr(e_proto, 'duration', 0), + 'explicit': getattr(e_proto, 'explicit', False), + 'images': images, + 'available_markets': None, + 'show': { + 'id': show_id, + 'name': show_name, + 'publisher': publisher + } + } + except InvalidLink: + raise + except Exception: + raise InvalidLink(ids) @classmethod - def search(cls, query, search_type='track', limit=10, client_id=None, client_secret=None): - """ - Search for tracks, albums, artists, or playlists. - - Args: - query (str): Search query - search_type (str, optional): Type of search ('track', 'album', 'artist', 'playlist') - limit (int, optional): Maximum number of results to return - client_id (str, optional): Optional custom Spotify client ID - client_secret (str, optional): Optional custom Spotify client secret - - Returns: - dict: Search results - """ - api = cls.__get_api(client_id, client_secret) - search = api.search(q=query, type=search_type, limit=limit) - return search + def get_artist(cls, ids, album_type='album,single,compilation,appears_on', limit: int = 50, client_id=None, client_secret=None): + """Return a dict with artist name and an 'items' list of albums matching album_type. + Each item contains an external_urls.spotify link, minimally enough for download_artist.""" + cls.__check_initialized() + try: + ar_id = ArtistId.from_base62(ids) + ar_proto = cls.__session.api().get_metadata_4_artist(ar_id) + if not ar_proto: + raise InvalidLink(ids) + # Parse requested groups + requested = [s.strip().lower() for s in str(album_type).split(',') if s.strip()] + order = ['album', 'single', 'compilation', 'appears_on'] + items: List[Dict[str, Any]] = [] + for group_name in order: + if requested and group_name not in requested: + continue + attr = f"{group_name}_group" + grp = getattr(ar_proto, attr, None) + if not grp: + continue + # grp is repeated AlbumGroup; each has 'album' repeated Album + try: + for ag in grp: + albums = getattr(ag, 'album', []) or [] + for a in albums: + gid = getattr(a, 'gid', None) + base62 = cls.__base62_from_gid(gid, 'album') if gid else None + name = getattr(a, 'name', '') + if base62: + items.append({ + 'name': name, + 'external_urls': {'spotify': f"https://open.spotify.com/album/{base62}"} + }) + if limit and len(items) >= int(limit): + break + if limit and len(items) >= int(limit): + break + except Exception: + continue + if limit and len(items) >= int(limit): + break + return { + 'id': cls.__base62_from_gid(getattr(ar_proto, 'gid', None), 'artist'), + 'name': getattr(ar_proto, 'name', ''), + 'items': items + } + except InvalidLink: + raise + except Exception: + raise InvalidLink(ids) + + # ------------------------- search (optional) ------------------------- + @classmethod + def __get_session_country_code(cls) -> str: + try: + if cls.__session is None: + return "" + cc = getattr(cls.__session, "_Session__country_code", None) + if isinstance(cc, str) and len(cc) == 2: + return cc + cc2 = getattr(cls.__session, "country_code", None) + if isinstance(cc2, str) and len(cc2) == 2: + return cc2 + except Exception: + pass + return "" @classmethod - def get_artist(cls, ids, client_id=None, client_secret=None): - """ - Get artist information by ID. - - Args: - ids (str): Spotify artist ID - client_id (str, optional): Optional custom Spotify client ID - client_secret (str, optional): Optional custom Spotify client secret - - Returns: - dict: Artist information - """ - api = cls.__get_api(client_id, client_secret) - try: - artist_json = api.artist(ids) - except SpotifyException as error: - if error.http_status in cls.__error_codes: - raise InvalidLink(ids) - - return artist_json - - @classmethod - def get_artist_discography(cls, ids, album_type='album,single,compilation,appears_on', limit=50, offset=0, client_id=None, client_secret=None): - """ - Get artist information and discography by ID. - - Args: - ids (str): Spotify artist ID - album_type (str, optional): Types of albums to include - limit (int, optional): Maximum number of results - client_id (str, optional): Optional custom Spotify client ID - client_secret (str, optional): Optional custom Spotify client secret - - Returns: - dict: Artist discography - """ - api = cls.__get_api(client_id, client_secret) - try: - # Request all types of releases by the artist. - discography = api.artist_albums( - ids, - album_type=album_type, - limit=limit, - offset=offset - ) - except SpotifyException as error: - if error.http_status in cls.__error_codes: - raise InvalidLink(ids) - else: - raise - - # Ensure that all pages of results are fetched. - cls.__lazy(discography, api) - return discography + def search(cls, query, search_type='track', limit=10, country: Optional[str] = None, locale: Optional[str] = None, catalogue: Optional[str] = None, image_size: Optional[str] = None, client_id=None, client_secret=None): + cls.__check_initialized() + # Map simple type value; librespot returns a combined JSON-like response + req = SearchManager.SearchRequest(query).set_limit(limit) + # Country precedence: explicit country > session country + if country: + req.set_country(country) + else: + cc = cls.__get_session_country_code() + if cc: + req.set_country(cc) + if locale: + req.set_locale(locale) + if catalogue: + req.set_catalogue(catalogue) + if image_size: + req.set_image_size(image_size) + res = cls.__session.search().request(req) + return res diff --git a/deezspot/spotloader/__init__.py b/deezspot/spotloader/__init__.py index a92bbf8..1c65a4d 100644 --- a/deezspot/spotloader/__init__.py +++ b/deezspot/spotloader/__init__.py @@ -7,648 +7,650 @@ from deezspot.exceptions import InvalidLink, MarketAvailabilityError from deezspot.spotloader.__spo_api__ import tracking, tracking_album, tracking_episode from deezspot.spotloader.spotify_settings import stock_quality, stock_market from deezspot.libutils.utils import ( - get_ids, - link_is_valid, - what_kind, + get_ids, + link_is_valid, + what_kind, ) from deezspot.models.download import ( - Track, - Album, - Playlist, - Preferences, - Smart, - Episode + Track, + Album, + Playlist, + Preferences, + Smart, + Episode ) from deezspot.models.callback import trackCallbackObject, errorObject from deezspot.spotloader.__download__ import ( - DW_TRACK, - DW_ALBUM, - DW_PLAYLIST, - DW_EPISODE, - Download_JOB, + DW_TRACK, + DW_ALBUM, + DW_PLAYLIST, + DW_EPISODE, + Download_JOB, ) from deezspot.libutils.others_settings import ( - stock_output, - stock_recursive_quality, - stock_recursive_download, - stock_not_interface, - stock_zip, - stock_save_cover, - stock_real_time_dl, - stock_market, - stock_real_time_multiplier + stock_output, + stock_recursive_quality, + stock_recursive_download, + stock_not_interface, + stock_zip, + stock_save_cover, + stock_real_time_dl, + stock_market, + stock_real_time_multiplier ) from deezspot.libutils.logging_utils import logger, ProgressReporter, report_progress class SpoLogin: - def __init__( - self, - credentials_path: str, - spotify_client_id: str = None, - spotify_client_secret: str = None, - progress_callback = None, - silent: bool = False - ) -> None: - self.credentials_path = credentials_path - self.spotify_client_id = spotify_client_id - self.spotify_client_secret = spotify_client_secret - - # Initialize Spotify API with credentials if provided - if spotify_client_id and spotify_client_secret: - Spo.__init__(client_id=spotify_client_id, client_secret=spotify_client_secret) - logger.info("Initialized Spotify API with provided credentials") - - # Configure progress reporting - self.progress_reporter = ProgressReporter(callback=progress_callback, silent=silent) - - self.__initialize_session() + def __init__( + self, + credentials_path: str, + spotify_client_id: str = None, + spotify_client_secret: str = None, + progress_callback = None, + silent: bool = False + ) -> None: + self.credentials_path = credentials_path + self.spotify_client_id = spotify_client_id + self.spotify_client_secret = spotify_client_secret + + # Initialize Spotify API with credentials if provided (kept no-op for compatibility) + if spotify_client_id and spotify_client_secret: + Spo.__init__(client_id=spotify_client_id, client_secret=spotify_client_secret) + logger.info("Initialized Spotify API compatibility shim (librespot-backed)") + + # Configure progress reporting + self.progress_reporter = ProgressReporter(callback=progress_callback, silent=silent) + + self.__initialize_session() - def __initialize_session(self) -> None: - try: - session_builder = Session.Builder() - session_builder.conf.stored_credentials_file = self.credentials_path + def __initialize_session(self) -> None: + try: + session_builder = Session.Builder() + session_builder.conf.stored_credentials_file = self.credentials_path - if isfile(self.credentials_path): - session = session_builder.stored_file().create() - logger.info("Successfully initialized Spotify session") - else: - logger.error("Credentials file not found") - raise FileNotFoundError("Please fill your credentials.json location!") + if isfile(self.credentials_path): + session = session_builder.stored_file().create() + logger.info("Successfully initialized Spotify session") + else: + logger.error("Credentials file not found") + raise FileNotFoundError("Please fill your credentials.json location!") - Download_JOB(session) - Download_JOB.set_progress_reporter(self.progress_reporter) - except Exception as e: - logger.error(f"Failed to initialize Spotify session: {str(e)}") - raise + Download_JOB(session) + Download_JOB.set_progress_reporter(self.progress_reporter) + # Wire the session into Spo shim for metadata/search + Spo.set_session(session) + except Exception as e: + logger.error(f"Failed to initialize Spotify session: {str(e)}") + raise - def download_track( - self, link_track, - output_dir=stock_output, - quality_download=stock_quality, - recursive_quality=stock_recursive_quality, - recursive_download=stock_recursive_download, - not_interface=stock_not_interface, - real_time_dl=stock_real_time_dl, - real_time_multiplier: int = stock_real_time_multiplier, - custom_dir_format=None, - custom_track_format=None, - pad_tracks=True, - initial_retry_delay=30, - retry_delay_increase=30, - max_retries=5, - convert_to=None, - bitrate=None, - save_cover=stock_save_cover, - market: list[str] | None = stock_market, - artist_separator: str = "; ", - pad_number_width: int | str = 'auto' - ) -> Track: - song_metadata = None - try: - link_is_valid(link_track) - ids = get_ids(link_track) - song_metadata = tracking(ids, market=market) - - if song_metadata is None: - raise Exception(f"Could not retrieve metadata for track {link_track}. It might not be available or an API error occurred.") + def download_track( + self, link_track, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + real_time_dl=stock_real_time_dl, + real_time_multiplier: int = stock_real_time_multiplier, + custom_dir_format=None, + custom_track_format=None, + pad_tracks=True, + initial_retry_delay=30, + retry_delay_increase=30, + max_retries=5, + convert_to=None, + bitrate=None, + save_cover=stock_save_cover, + market: list[str] | None = stock_market, + artist_separator: str = "; ", + pad_number_width: int | str = 'auto' + ) -> Track: + song_metadata = None + try: + link_is_valid(link_track) + ids = get_ids(link_track) + song_metadata = tracking(ids, market=market) + + if song_metadata is None: + raise Exception(f"Could not retrieve metadata for track {link_track}. It might not be available or an API error occurred.") - logger.info(f"Starting download for track: {song_metadata.title} - {artist_separator.join([a.name for a in song_metadata.artists])}") + logger.info(f"Starting download for track: {song_metadata.title} - {artist_separator.join([a.name for a in song_metadata.artists])}") - preferences = Preferences() - preferences.real_time_dl = real_time_dl - preferences.real_time_multiplier = int(real_time_multiplier) if real_time_multiplier is not None else 1 - preferences.link = link_track - preferences.song_metadata = song_metadata - preferences.quality_download = quality_download - preferences.output_dir = output_dir - preferences.ids = ids - preferences.recursive_quality = recursive_quality - preferences.recursive_download = recursive_download - preferences.not_interface = not_interface - preferences.is_episode = False - preferences.custom_dir_format = custom_dir_format - preferences.custom_track_format = custom_track_format - preferences.pad_tracks = pad_tracks - preferences.initial_retry_delay = initial_retry_delay - preferences.retry_delay_increase = retry_delay_increase - preferences.max_retries = max_retries - if convert_to is None: - preferences.convert_to = None - preferences.bitrate = None - else: - preferences.convert_to = convert_to - preferences.bitrate = bitrate - preferences.save_cover = save_cover - preferences.market = market - preferences.artist_separator = artist_separator - preferences.pad_number_width = pad_number_width + preferences = Preferences() + preferences.real_time_dl = real_time_dl + preferences.real_time_multiplier = int(real_time_multiplier) if real_time_multiplier is not None else 1 + preferences.link = link_track + preferences.song_metadata = song_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.is_episode = False + preferences.custom_dir_format = custom_dir_format + preferences.custom_track_format = custom_track_format + preferences.pad_tracks = pad_tracks + preferences.initial_retry_delay = initial_retry_delay + preferences.retry_delay_increase = retry_delay_increase + preferences.max_retries = max_retries + if convert_to is None: + preferences.convert_to = None + preferences.bitrate = None + else: + preferences.convert_to = convert_to + preferences.bitrate = bitrate + preferences.save_cover = save_cover + preferences.market = market + preferences.artist_separator = artist_separator + preferences.pad_number_width = pad_number_width - track = DW_TRACK(preferences).dw() + track = DW_TRACK(preferences).dw() - return track - except MarketAvailabilityError as e: - logger.error(f"Track download failed due to market availability: {str(e)}") - if song_metadata: - status_obj = errorObject(ids=song_metadata.ids, error=str(e)) - callback_obj = trackCallbackObject(track=song_metadata, status_info=status_obj) - report_progress( - reporter=self.progress_reporter, - callback_obj=callback_obj - ) - raise - except Exception as e: - logger.error(f"Failed to download track: {str(e)}") - traceback.print_exc() - if song_metadata: - status_obj = errorObject(ids=song_metadata.ids, error=str(e)) - callback_obj = trackCallbackObject(track=song_metadata, status_info=status_obj) - report_progress( - reporter=self.progress_reporter, - callback_obj=callback_obj - ) - raise e + return track + except MarketAvailabilityError as e: + logger.error(f"Track download failed due to market availability: {str(e)}") + if song_metadata: + status_obj = errorObject(ids=song_metadata.ids, error=str(e)) + callback_obj = trackCallbackObject(track=song_metadata, status_info=status_obj) + report_progress( + reporter=self.progress_reporter, + callback_obj=callback_obj + ) + raise + except Exception as e: + logger.error(f"Failed to download track: {str(e)}") + traceback.print_exc() + if song_metadata: + status_obj = errorObject(ids=song_metadata.ids, error=str(e)) + callback_obj = trackCallbackObject(track=song_metadata, status_info=status_obj) + report_progress( + reporter=self.progress_reporter, + callback_obj=callback_obj + ) + raise e - def download_album( - self, link_album, - output_dir=stock_output, - quality_download=stock_quality, - recursive_quality=stock_recursive_quality, - recursive_download=stock_recursive_download, - not_interface=stock_not_interface, - make_zip=stock_zip, - real_time_dl=stock_real_time_dl, - real_time_multiplier: int = stock_real_time_multiplier, - custom_dir_format=None, - custom_track_format=None, - pad_tracks=True, - initial_retry_delay=30, - retry_delay_increase=30, - max_retries=5, - convert_to=None, - bitrate=None, - save_cover=stock_save_cover, - market: list[str] | None = stock_market, - artist_separator: str = "; ", - pad_number_width: int | str = 'auto' - ) -> Album: - try: - link_is_valid(link_album) - ids = get_ids(link_album) - album_json = Spo.get_album(ids) - if not album_json: - raise Exception(f"Could not retrieve album data for {link_album}.") - - song_metadata = tracking_album(album_json, market=market) - if song_metadata is None: - raise Exception(f"Could not process album metadata for {link_album}. It might not be available in the specified market(s) or an API error occurred.") + def download_album( + self, link_album, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + make_zip=stock_zip, + real_time_dl=stock_real_time_dl, + real_time_multiplier: int = stock_real_time_multiplier, + custom_dir_format=None, + custom_track_format=None, + pad_tracks=True, + initial_retry_delay=30, + retry_delay_increase=30, + max_retries=5, + convert_to=None, + bitrate=None, + save_cover=stock_save_cover, + market: list[str] | None = stock_market, + artist_separator: str = "; ", + pad_number_width: int | str = 'auto' + ) -> Album: + try: + link_is_valid(link_album) + ids = get_ids(link_album) + album_json = Spo.get_album(ids) + if not album_json: + raise Exception(f"Could not retrieve album data for {link_album}.") + + song_metadata = tracking_album(album_json, market=market) + if song_metadata is None: + raise Exception(f"Could not process album metadata for {link_album}. It might not be available in the specified market(s) or an API error occurred.") - logger.info(f"Starting download for album: {song_metadata.title} - {artist_separator.join([a.name for a in song_metadata.artists])}") + logger.info(f"Starting download for album: {song_metadata.title} - {artist_separator.join([a.name for a in song_metadata.artists])}") - preferences = Preferences() - preferences.real_time_dl = real_time_dl - preferences.real_time_multiplier = int(real_time_multiplier) if real_time_multiplier is not None else 1 - preferences.link = link_album - preferences.song_metadata = song_metadata - preferences.quality_download = quality_download - preferences.output_dir = output_dir - preferences.ids = ids - preferences.json_data = album_json - preferences.recursive_quality = recursive_quality - preferences.recursive_download = recursive_download - preferences.not_interface = not_interface - preferences.make_zip = make_zip - preferences.is_episode = False - preferences.custom_dir_format = custom_dir_format - preferences.custom_track_format = custom_track_format - preferences.pad_tracks = pad_tracks - preferences.initial_retry_delay = initial_retry_delay - preferences.retry_delay_increase = retry_delay_increase - preferences.max_retries = max_retries - if convert_to is None: - preferences.convert_to = None - preferences.bitrate = None - else: - preferences.convert_to = convert_to - preferences.bitrate = bitrate - preferences.save_cover = save_cover - preferences.market = market - preferences.artist_separator = artist_separator - preferences.pad_number_width = pad_number_width + preferences = Preferences() + preferences.real_time_dl = real_time_dl + preferences.real_time_multiplier = int(real_time_multiplier) if real_time_multiplier is not None else 1 + preferences.link = link_album + preferences.song_metadata = song_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.json_data = album_json + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.make_zip = make_zip + preferences.is_episode = False + preferences.custom_dir_format = custom_dir_format + preferences.custom_track_format = custom_track_format + preferences.pad_tracks = pad_tracks + preferences.initial_retry_delay = initial_retry_delay + preferences.retry_delay_increase = retry_delay_increase + preferences.max_retries = max_retries + if convert_to is None: + preferences.convert_to = None + preferences.bitrate = None + else: + preferences.convert_to = convert_to + preferences.bitrate = bitrate + preferences.save_cover = save_cover + preferences.market = market + preferences.artist_separator = artist_separator + preferences.pad_number_width = pad_number_width - album = DW_ALBUM(preferences).dw() + album = DW_ALBUM(preferences).dw() - return album - except MarketAvailabilityError as e: - logger.error(f"Album download failed due to market availability: {str(e)}") - raise - except Exception as e: - logger.error(f"Failed to download album: {str(e)}") - traceback.print_exc() - raise e + return album + except MarketAvailabilityError as e: + logger.error(f"Album download failed due to market availability: {str(e)}") + raise + except Exception as e: + logger.error(f"Failed to download album: {str(e)}") + traceback.print_exc() + raise e - def download_playlist( - self, link_playlist, - output_dir=stock_output, - quality_download=stock_quality, - recursive_quality=stock_recursive_quality, - recursive_download=stock_recursive_download, - not_interface=stock_not_interface, - make_zip=stock_zip, - real_time_dl=stock_real_time_dl, - real_time_multiplier: int = stock_real_time_multiplier, - custom_dir_format=None, - custom_track_format=None, - pad_tracks=True, - initial_retry_delay=30, - retry_delay_increase=30, - max_retries=5, - convert_to=None, - bitrate=None, - save_cover=stock_save_cover, - market: list[str] | None = stock_market, - artist_separator: str = "; ", - pad_number_width: int | str = 'auto' - ) -> Playlist: - try: - link_is_valid(link_playlist) - ids = get_ids(link_playlist) + def download_playlist( + self, link_playlist, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + make_zip=stock_zip, + real_time_dl=stock_real_time_dl, + real_time_multiplier: int = stock_real_time_multiplier, + custom_dir_format=None, + custom_track_format=None, + pad_tracks=True, + initial_retry_delay=30, + retry_delay_increase=30, + max_retries=5, + convert_to=None, + bitrate=None, + save_cover=stock_save_cover, + market: list[str] | None = stock_market, + artist_separator: str = "; ", + pad_number_width: int | str = 'auto' + ) -> Playlist: + try: + link_is_valid(link_playlist) + ids = get_ids(link_playlist) - song_metadata = [] - playlist_json = Spo.get_playlist(ids) - if not playlist_json: - raise Exception(f"Could not retrieve playlist data for {link_playlist}.") - - logger.info(f"Starting download for playlist: {playlist_json.get('name', 'Unknown')}") + song_metadata = [] + playlist_json = Spo.get_playlist(ids) + if not playlist_json: + raise Exception(f"Could not retrieve playlist data for {link_playlist}.") + + logger.info(f"Starting download for playlist: {playlist_json.get('name', 'Unknown')}") - playlist_tracks_data = playlist_json.get('tracks', {}).get('items', []) - if not playlist_tracks_data: - logger.warning(f"Playlist {link_playlist} has no tracks or could not be fetched.") - # We can still proceed to create an empty playlist object for consistency - - song_metadata_list = [] - for item in playlist_tracks_data: - if not item or 'track' not in item or not item['track']: - # Log a warning for items that are not valid tracks (e.g., local files, etc.) - logger.warning(f"Skipping an item in playlist {link_playlist} as it does not appear to be a valid track object.") - song_metadata_list.append({'error_type': 'invalid_track_object', 'error_message': 'Playlist item was not a valid track object.', 'name': 'Unknown Skipped Item', 'ids': None}) - continue - - track_data = item['track'] - track_id = track_data.get('id') - - if not track_id: - logger.warning(f"Skipping an item in playlist {link_playlist} because it has no track ID.") - song_metadata_list.append({'error_type': 'missing_track_id', 'error_message': 'Playlist item is missing a track ID.', 'name': track_data.get('name', 'Unknown Track without ID'), 'ids': None}) - continue + playlist_tracks_data = playlist_json.get('tracks', {}).get('items', []) + if not playlist_tracks_data: + logger.warning(f"Playlist {link_playlist} has no tracks or could not be fetched.") + # We can still proceed to create an empty playlist object for consistency + + song_metadata_list = [] + for item in playlist_tracks_data: + if not item or 'track' not in item or not item['track']: + # Log a warning for items that are not valid tracks (e.g., local files, etc.) + logger.warning(f"Skipping an item in playlist {link_playlist} as it does not appear to be a valid track object.") + song_metadata_list.append({'error_type': 'invalid_track_object', 'error_message': 'Playlist item was not a valid track object.', 'name': 'Unknown Skipped Item', 'ids': None}) + continue + + track_data = item['track'] + track_id = track_data.get('id') + + if not track_id: + logger.warning(f"Skipping an item in playlist {link_playlist} because it has no track ID.") + song_metadata_list.append({'error_type': 'missing_track_id', 'error_message': 'Playlist item is missing a track ID.', 'name': track_data.get('name', 'Unknown Track without ID'), 'ids': None}) + continue - try: - song_metadata = tracking(track_id, market=market) - if song_metadata: - song_metadata_list.append(song_metadata) - else: - # Create a placeholder for tracks that fail metadata fetching - failed_track_info = {'error_type': 'metadata_fetch_failed', 'error_message': f"Failed to fetch metadata for track ID: {track_id}", 'name': track_data.get('name', f'Track ID {track_id}'), 'ids': track_id} - song_metadata_list.append(failed_track_info) - logger.warning(f"Could not retrieve metadata for track {track_id} in playlist {link_playlist}.") - except MarketAvailabilityError as e: - failed_track_info = {'error_type': 'market_availability_error', 'error_message': str(e), 'name': track_data.get('name', f'Track ID {track_id}'), 'ids': track_id} - song_metadata_list.append(failed_track_info) - logger.warning(str(e)) + try: + song_metadata = tracking(track_id, market=market) + if song_metadata: + song_metadata_list.append(song_metadata) + else: + # Create a placeholder for tracks that fail metadata fetching + failed_track_info = {'error_type': 'metadata_fetch_failed', 'error_message': f"Failed to fetch metadata for track ID: {track_id}", 'name': track_data.get('name', f'Track ID {track_id}'), 'ids': track_id} + song_metadata_list.append(failed_track_info) + logger.warning(f"Could not retrieve metadata for track {track_id} in playlist {link_playlist}.") + except MarketAvailabilityError as e: + failed_track_info = {'error_type': 'market_availability_error', 'error_message': str(e), 'name': track_data.get('name', f'Track ID {track_id}'), 'ids': track_id} + song_metadata_list.append(failed_track_info) + logger.warning(str(e)) - preferences = Preferences() - preferences.real_time_dl = real_time_dl - preferences.real_time_multiplier = int(real_time_multiplier) if real_time_multiplier is not None else 1 - preferences.link = link_playlist - preferences.song_metadata = song_metadata_list - preferences.quality_download = quality_download - preferences.output_dir = output_dir - preferences.ids = ids - preferences.json_data = playlist_json - preferences.playlist_tracks_json = playlist_tracks_data - preferences.recursive_quality = recursive_quality - preferences.recursive_download = recursive_download - preferences.not_interface = not_interface - preferences.make_zip = make_zip - preferences.is_episode = False - preferences.custom_dir_format = custom_dir_format - preferences.custom_track_format = custom_track_format - preferences.pad_tracks = pad_tracks - preferences.initial_retry_delay = initial_retry_delay - preferences.retry_delay_increase = retry_delay_increase - preferences.max_retries = max_retries - if convert_to is None: - preferences.convert_to = None - preferences.bitrate = None - else: - preferences.convert_to = convert_to - preferences.bitrate = bitrate - preferences.save_cover = save_cover - preferences.market = market - preferences.artist_separator = artist_separator - preferences.pad_number_width = pad_number_width - - playlist = DW_PLAYLIST(preferences).dw() + preferences = Preferences() + preferences.real_time_dl = real_time_dl + preferences.real_time_multiplier = int(real_time_multiplier) if real_time_multiplier is not None else 1 + preferences.link = link_playlist + preferences.song_metadata = song_metadata_list + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.json_data = playlist_json + preferences.playlist_tracks_json = playlist_tracks_data + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.make_zip = make_zip + preferences.is_episode = False + preferences.custom_dir_format = custom_dir_format + preferences.custom_track_format = custom_track_format + preferences.pad_tracks = pad_tracks + preferences.initial_retry_delay = initial_retry_delay + preferences.retry_delay_increase = retry_delay_increase + preferences.max_retries = max_retries + if convert_to is None: + preferences.convert_to = None + preferences.bitrate = None + else: + preferences.convert_to = convert_to + preferences.bitrate = bitrate + preferences.save_cover = save_cover + preferences.market = market + preferences.artist_separator = artist_separator + preferences.pad_number_width = pad_number_width + + playlist = DW_PLAYLIST(preferences).dw() - return playlist - except MarketAvailabilityError as e: - logger.error(f"Playlist download failed due to market availability issues with one or more tracks: {str(e)}") - raise - except Exception as e: - logger.error(f"Failed to download playlist: {str(e)}") - traceback.print_exc() - raise e + return playlist + except MarketAvailabilityError as e: + logger.error(f"Playlist download failed due to market availability issues with one or more tracks: {str(e)}") + raise + except Exception as e: + logger.error(f"Failed to download playlist: {str(e)}") + traceback.print_exc() + raise e - def download_episode( - self, link_episode, - output_dir=stock_output, - quality_download=stock_quality, - recursive_quality=stock_recursive_quality, - recursive_download=stock_recursive_download, - not_interface=stock_not_interface, - real_time_dl=stock_real_time_dl, - real_time_multiplier: int = stock_real_time_multiplier, - custom_dir_format=None, - custom_track_format=None, - pad_tracks=True, - initial_retry_delay=30, - retry_delay_increase=30, - max_retries=5, - convert_to=None, - bitrate=None, - save_cover=stock_save_cover, - market: list[str] | None = stock_market, - artist_separator: str = "; " - ) -> Episode: - try: - link_is_valid(link_episode) - ids = get_ids(link_episode) - episode_json = Spo.get_episode(ids) - if not episode_json: - raise Exception(f"Could not retrieve episode data for {link_episode} from API.") + def download_episode( + self, link_episode, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + real_time_dl=stock_real_time_dl, + real_time_multiplier: int = stock_real_time_multiplier, + custom_dir_format=None, + custom_track_format=None, + pad_tracks=True, + initial_retry_delay=30, + retry_delay_increase=30, + max_retries=5, + convert_to=None, + bitrate=None, + save_cover=stock_save_cover, + market: list[str] | None = stock_market, + artist_separator: str = "; " + ) -> Episode: + try: + link_is_valid(link_episode) + ids = get_ids(link_episode) + episode_json = Spo.get_episode(ids) + if not episode_json: + raise Exception(f"Could not retrieve episode data for {link_episode} from API.") - episode_metadata = tracking_episode(ids, market=market) - if episode_metadata is None: - raise Exception(f"Could not process episode metadata for {link_episode}. It might not be available in the specified market(s) or an API error occurred.") - - logger.info(f"Starting download for episode: {episode_metadata.title} - {episode_metadata.album.title}") + episode_metadata = tracking_episode(ids, market=market) + if episode_metadata is None: + raise Exception(f"Could not process episode metadata for {link_episode}. It might not be available in the specified market(s) or an API error occurred.") + + logger.info(f"Starting download for episode: {episode_metadata.title} - {episode_metadata.album.title}") - preferences = Preferences() - preferences.real_time_dl = real_time_dl - preferences.real_time_multiplier = int(real_time_multiplier) if real_time_multiplier is not None else 1 - preferences.link = link_episode - preferences.song_metadata = episode_metadata - preferences.output_dir = output_dir - preferences.ids = ids - preferences.json_data = episode_json - preferences.recursive_quality = recursive_quality - preferences.recursive_download = recursive_download - preferences.not_interface = not_interface - preferences.is_episode = True - preferences.custom_dir_format = custom_dir_format - preferences.custom_track_format = custom_track_format - preferences.pad_tracks = pad_tracks - preferences.initial_retry_delay = initial_retry_delay - preferences.retry_delay_increase = retry_delay_increase - preferences.max_retries = max_retries - if convert_to is None: - preferences.convert_to = None - preferences.bitrate = None - else: - preferences.convert_to = convert_to - preferences.bitrate = bitrate - preferences.save_cover = save_cover - preferences.market = market - preferences.artist_separator = artist_separator + preferences = Preferences() + preferences.real_time_dl = real_time_dl + preferences.real_time_multiplier = int(real_time_multiplier) if real_time_multiplier is not None else 1 + preferences.link = link_episode + preferences.song_metadata = episode_metadata + preferences.output_dir = output_dir + preferences.ids = ids + preferences.json_data = episode_json + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.is_episode = True + preferences.custom_dir_format = custom_dir_format + preferences.custom_track_format = custom_track_format + preferences.pad_tracks = pad_tracks + preferences.initial_retry_delay = initial_retry_delay + preferences.retry_delay_increase = retry_delay_increase + preferences.max_retries = max_retries + if convert_to is None: + preferences.convert_to = None + preferences.bitrate = None + else: + preferences.convert_to = convert_to + preferences.bitrate = bitrate + preferences.save_cover = save_cover + preferences.market = market + preferences.artist_separator = artist_separator - episode = DW_EPISODE(preferences).dw() + episode = DW_EPISODE(preferences).dw() - return episode - except MarketAvailabilityError as e: - logger.error(f"Episode download failed due to market availability: {str(e)}") - raise - except Exception as e: - logger.error(f"Failed to download episode: {str(e)}") - traceback.print_exc() - raise e + return episode + except MarketAvailabilityError as e: + logger.error(f"Episode download failed due to market availability: {str(e)}") + raise + except Exception as e: + logger.error(f"Failed to download episode: {str(e)}") + traceback.print_exc() + raise e - def download_artist( - self, link_artist, - album_type: str = 'album,single,compilation,appears_on', - limit: int = 50, - output_dir=stock_output, - quality_download=stock_quality, - recursive_quality=stock_recursive_quality, - recursive_download=stock_recursive_download, - not_interface=stock_not_interface, - make_zip=stock_zip, - real_time_dl=stock_real_time_dl, - real_time_multiplier: int = stock_real_time_multiplier, - custom_dir_format=None, - custom_track_format=None, - pad_tracks=True, - initial_retry_delay=30, - retry_delay_increase=30, - max_retries=5, - convert_to=None, - bitrate=None, - market: list[str] | None = stock_market, - save_cover=stock_save_cover, - artist_separator: str = "; " - ): - """ - Download all albums (or a subset based on album_type and limit) from an artist. - """ - try: - link_is_valid(link_artist) - ids = get_ids(link_artist) - discography = Spo.get_artist(ids, album_type=album_type, limit=limit) - albums = discography.get('items', []) - if not albums: - logger.warning("No albums found for the provided artist") - raise Exception("No albums found for the provided artist.") - - logger.info(f"Starting download for artist discography: {discography.get('name', 'Unknown')}") - - downloaded_albums = [] - for album in albums: - album_url = album.get('external_urls', {}).get('spotify') - if not album_url: - logger.warning(f"No URL found for album: {album.get('name', 'Unknown')}") - continue - downloaded_album = self.download_album( - album_url, - output_dir=output_dir, - quality_download=quality_download, - recursive_quality=recursive_quality, - recursive_download=recursive_download, - not_interface=not_interface, - make_zip=make_zip, - real_time_dl=real_time_dl, - real_time_multiplier=real_time_multiplier, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format, - pad_tracks=pad_tracks, - initial_retry_delay=initial_retry_delay, - retry_delay_increase=retry_delay_increase, - max_retries=max_retries, - convert_to=convert_to, - bitrate=bitrate, - market=market, - save_cover=save_cover, - artist_separator=artist_separator - ) - downloaded_albums.append(downloaded_album) - return downloaded_albums - except Exception as e: - logger.error(f"Failed to download artist discography: {str(e)}") - traceback.print_exc() - raise e + def download_artist( + self, link_artist, + album_type: str = 'album,single,compilation,appears_on', + limit: int = 50, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + make_zip=stock_zip, + real_time_dl=stock_real_time_dl, + real_time_multiplier: int = stock_real_time_multiplier, + custom_dir_format=None, + custom_track_format=None, + pad_tracks=True, + initial_retry_delay=30, + retry_delay_increase=30, + max_retries=5, + convert_to=None, + bitrate=None, + market: list[str] | None = stock_market, + save_cover=stock_save_cover, + artist_separator: str = "; " + ): + """ + Download all albums (or a subset based on album_type and limit) from an artist. + """ + try: + link_is_valid(link_artist) + ids = get_ids(link_artist) + discography = Spo.get_artist(ids, album_type=album_type, limit=limit) + albums = discography.get('items', []) + if not albums: + logger.warning("No albums found for the provided artist") + raise Exception("No albums found for the provided artist.") + + logger.info(f"Starting download for artist discography: {discography.get('name', 'Unknown')}") + + downloaded_albums = [] + for album in albums: + album_url = album.get('external_urls', {}).get('spotify') + if not album_url: + logger.warning(f"No URL found for album: {album.get('name', 'Unknown')}") + continue + downloaded_album = self.download_album( + album_url, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + make_zip=make_zip, + real_time_dl=real_time_dl, + real_time_multiplier=real_time_multiplier, + custom_dir_format=custom_dir_format, + custom_track_format=custom_track_format, + pad_tracks=pad_tracks, + initial_retry_delay=initial_retry_delay, + retry_delay_increase=retry_delay_increase, + max_retries=max_retries, + convert_to=convert_to, + bitrate=bitrate, + market=market, + save_cover=save_cover, + artist_separator=artist_separator + ) + downloaded_albums.append(downloaded_album) + return downloaded_albums + except Exception as e: + logger.error(f"Failed to download artist discography: {str(e)}") + traceback.print_exc() + raise e - def download_smart( - self, link, - output_dir=stock_output, - quality_download=stock_quality, - recursive_quality=stock_recursive_quality, - recursive_download=stock_recursive_download, - not_interface=stock_not_interface, - make_zip=stock_zip, - real_time_dl=stock_real_time_dl, - real_time_multiplier: int = stock_real_time_multiplier, - custom_dir_format=None, - custom_track_format=None, - pad_tracks=True, - initial_retry_delay=30, - retry_delay_increase=30, - max_retries=5, - convert_to=None, - bitrate=None, - save_cover=stock_save_cover, - market: list[str] | None = stock_market, - artist_separator: str = "; " - ) -> Smart: - try: - link_is_valid(link) - link = what_kind(link) - smart = Smart() + def download_smart( + self, link, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + make_zip=stock_zip, + real_time_dl=stock_real_time_dl, + real_time_multiplier: int = stock_real_time_multiplier, + custom_dir_format=None, + custom_track_format=None, + pad_tracks=True, + initial_retry_delay=30, + retry_delay_increase=30, + max_retries=5, + convert_to=None, + bitrate=None, + save_cover=stock_save_cover, + market: list[str] | None = stock_market, + artist_separator: str = "; " + ) -> Smart: + try: + link_is_valid(link) + link = what_kind(link) + smart = Smart() - if "spotify.com" in link: - source = "https://spotify.com" - smart.source = source - - logger.info(f"Starting smart download for: {link}") + if "spotify.com" in link: + source = "https://spotify.com" + smart.source = source + + logger.info(f"Starting smart download for: {link}") - if "track/" in link: - if not "spotify.com" in link: - raise InvalidLink(link) - track = self.download_track( - link, - output_dir=output_dir, - quality_download=quality_download, - recursive_quality=recursive_quality, - recursive_download=recursive_download, - not_interface=not_interface, - real_time_dl=real_time_dl, - real_time_multiplier=real_time_multiplier, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format, - pad_tracks=pad_tracks, - initial_retry_delay=initial_retry_delay, - retry_delay_increase=retry_delay_increase, - max_retries=max_retries, - convert_to=convert_to, - bitrate=bitrate, - save_cover=save_cover, - market=market, - artist_separator=artist_separator - ) - smart.type = "track" - smart.track = track + if "track/" in link: + if not "spotify.com" in link: + raise InvalidLink(link) + track = self.download_track( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + real_time_dl=real_time_dl, + real_time_multiplier=real_time_multiplier, + custom_dir_format=custom_dir_format, + custom_track_format=custom_track_format, + pad_tracks=pad_tracks, + initial_retry_delay=initial_retry_delay, + retry_delay_increase=retry_delay_increase, + max_retries=max_retries, + convert_to=convert_to, + bitrate=bitrate, + save_cover=save_cover, + market=market, + artist_separator=artist_separator + ) + smart.type = "track" + smart.track = track - elif "album/" in link: - if not "spotify.com" in link: - raise InvalidLink(link) - album = self.download_album( - link, - output_dir=output_dir, - quality_download=quality_download, - recursive_quality=recursive_quality, - recursive_download=recursive_download, - not_interface=not_interface, - make_zip=make_zip, - real_time_dl=real_time_dl, - real_time_multiplier=real_time_multiplier, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format, - pad_tracks=pad_tracks, - initial_retry_delay=initial_retry_delay, - retry_delay_increase=retry_delay_increase, - max_retries=max_retries, - convert_to=convert_to, - bitrate=bitrate, - save_cover=save_cover, - market=market, - artist_separator=artist_separator - ) - smart.type = "album" - smart.album = album + elif "album/" in link: + if not "spotify.com" in link: + raise InvalidLink(link) + album = self.download_album( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + make_zip=make_zip, + real_time_dl=real_time_dl, + real_time_multiplier=real_time_multiplier, + custom_dir_format=custom_dir_format, + custom_track_format=custom_track_format, + pad_tracks=pad_tracks, + initial_retry_delay=initial_retry_delay, + retry_delay_increase=retry_delay_increase, + max_retries=max_retries, + convert_to=convert_to, + bitrate=bitrate, + save_cover=save_cover, + market=market, + artist_separator=artist_separator + ) + smart.type = "album" + smart.album = album - elif "playlist/" in link: - if not "spotify.com" in link: - raise InvalidLink(link) - playlist = self.download_playlist( - link, - output_dir=output_dir, - quality_download=quality_download, - recursive_quality=recursive_quality, - recursive_download=recursive_download, - not_interface=not_interface, - make_zip=make_zip, - real_time_dl=real_time_dl, - real_time_multiplier=real_time_multiplier, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format, - pad_tracks=pad_tracks, - initial_retry_delay=initial_retry_delay, - retry_delay_increase=retry_delay_increase, - max_retries=max_retries, - convert_to=convert_to, - bitrate=bitrate, - save_cover=save_cover, - market=market, - artist_separator=artist_separator - ) - smart.type = "playlist" - smart.playlist = playlist + elif "playlist/" in link: + if not "spotify.com" in link: + raise InvalidLink(link) + playlist = self.download_playlist( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + make_zip=make_zip, + real_time_dl=real_time_dl, + real_time_multiplier=real_time_multiplier, + custom_dir_format=custom_dir_format, + custom_track_format=custom_track_format, + pad_tracks=pad_tracks, + initial_retry_delay=initial_retry_delay, + retry_delay_increase=retry_delay_increase, + max_retries=max_retries, + convert_to=convert_to, + bitrate=bitrate, + save_cover=save_cover, + market=market, + artist_separator=artist_separator + ) + smart.type = "playlist" + smart.playlist = playlist - elif "episode/" in link: - if not "spotify.com" in link: - raise InvalidLink(link) - episode = self.download_episode( - link, - output_dir=output_dir, - quality_download=quality_download, - recursive_quality=recursive_quality, - recursive_download=recursive_download, - not_interface=not_interface, - real_time_dl=real_time_dl, - real_time_multiplier=real_time_multiplier, - custom_dir_format=custom_dir_format, - custom_track_format=custom_track_format, - pad_tracks=pad_tracks, - initial_retry_delay=initial_retry_delay, - retry_delay_increase=retry_delay_increase, - max_retries=max_retries, - convert_to=convert_to, - bitrate=bitrate, - save_cover=save_cover, - market=market, - artist_separator=artist_separator - ) - smart.type = "episode" - smart.episode = episode + elif "episode/" in link: + if not "spotify.com" in link: + raise InvalidLink(link) + episode = self.download_episode( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + real_time_dl=real_time_dl, + real_time_multiplier=real_time_multiplier, + custom_dir_format=custom_dir_format, + custom_track_format=custom_track_format, + pad_tracks=pad_tracks, + initial_retry_delay=initial_retry_delay, + retry_delay_increase=retry_delay_increase, + max_retries=max_retries, + convert_to=convert_to, + bitrate=bitrate, + save_cover=save_cover, + market=market, + artist_separator=artist_separator + ) + smart.type = "episode" + smart.episode = episode - return smart - except Exception as e: - logger.error(f"Failed to perform smart download: {str(e)}") - traceback.print_exc() - raise e + return smart + except Exception as e: + logger.error(f"Failed to perform smart download: {str(e)}") + traceback.print_exc() + raise e