From c38f10957c72ebf459dbfcc98437923325d1cbf6 Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Tue, 26 Aug 2025 22:02:38 -0600 Subject: [PATCH] feat: implement public librespot api class, see docs --- deezspot/easy_spoty.py | 624 ++++++++++++-------------- deezspot/libutils/__init__.py | 1 + deezspot/libutils/librespot.py | 611 +++++++++++++++++++++++++ deezspot/models/__init__.py | 1 + deezspot/models/librespot/__init__.py | 23 + deezspot/models/librespot/album.py | 92 ++++ deezspot/models/librespot/artist.py | 63 +++ deezspot/models/librespot/playlist.py | 207 +++++++++ deezspot/models/librespot/track.py | 82 ++++ deezspot/models/librespot/types.py | 153 +++++++ docs/librespot_client.md | 241 ++++++++++ 11 files changed, 1761 insertions(+), 337 deletions(-) create mode 100644 deezspot/libutils/librespot.py create mode 100644 deezspot/models/librespot/__init__.py create mode 100644 deezspot/models/librespot/album.py create mode 100644 deezspot/models/librespot/artist.py create mode 100644 deezspot/models/librespot/playlist.py create mode 100644 deezspot/models/librespot/track.py create mode 100644 deezspot/models/librespot/types.py create mode 100644 docs/librespot_client.md diff --git a/deezspot/easy_spoty.py b/deezspot/easy_spoty.py index c4be1bc..512c68a 100644 --- a/deezspot/easy_spoty.py +++ b/deezspot/easy_spoty.py @@ -9,17 +9,27 @@ from typing import Any, Dict, List, Optional # thin shim over librespot's internal API, returning Web-API-shaped dicts # consumed by spotloader's converters. +from deezspot.libutils import LibrespotClient + class Spo: __error_codes = [404, 400] # Class-level references __session: Optional[Session] = None + __client: Optional[LibrespotClient] = None __initialized = False @classmethod def set_session(cls, session: Session): - """Attach an active librespot Session for metadata/search operations.""" + """Attach an active librespot Session for metadata/search operations. + Also initializes the LibrespotClient wrapper used for metadata fetches. + """ cls.__session = session + try: + cls.__client = LibrespotClient(session=session) + except Exception: + # Fallback: allow partial functionality (episode/search) via raw session + cls.__client = None cls.__initialized = True @classmethod @@ -29,8 +39,8 @@ class Spo: @classmethod def __check_initialized(cls): - if not cls.__initialized or cls.__session is None: - raise ValueError("Spotify session not initialized. Ensure SpoLogin created a librespot Session and called Spo.set_session(session).") + if not cls.__initialized or (cls.__session is None and cls.__client is None): + raise ValueError("Spotify session/client not initialized. Ensure SpoLogin created a librespot Session and called Spo.set_session(session).") # ------------------------- helpers ------------------------- @staticmethod @@ -60,234 +70,91 @@ class Spo: return None @staticmethod - def __external_ids_to_dict(external_ids) -> Dict[str, str]: - # Map repeated ExternalId { type, id } to a simple dict - result: Dict[str, str] = {} - try: - for ext in external_ids or []: - t = getattr(ext, 'type', None) - v = getattr(ext, 'id', None) - if t and v: - result[t.lower()] = v - except Exception: - pass - return result - - @staticmethod - def __images_from_group(img_group) -> List[Dict[str, Any]]: - images: List[Dict[str, Any]] = [] - try: - for im in getattr(img_group, 'image', []) or []: - fid = getattr(im, 'file_id', None) - if fid: - hex_id = fid.hex() - images.append({ - 'url': f"https://i.scdn.co/image/{hex_id}", - 'width': getattr(im, 'width', 0), - 'height': getattr(im, 'height', 0) - }) - except Exception: - pass - return images - - @staticmethod - def __images_from_repeated(imgs) -> List[Dict[str, Any]]: - images: List[Dict[str, Any]] = [] - try: - for im in imgs or []: - fid = getattr(im, 'file_id', None) - if fid: - hex_id = fid.hex() - images.append({ - 'url': f"https://i.scdn.co/image/{hex_id}", - 'width': getattr(im, 'width', 0), - 'height': getattr(im, 'height', 0) - }) - except Exception: - pass - return images - - @classmethod - def __artist_proto_to_dict(cls, a_proto) -> Dict[str, Any]: - gid = getattr(a_proto, 'gid', None) - return { - 'id': cls.__base62_from_gid(gid, 'artist'), - 'name': getattr(a_proto, 'name', '') - } - - @classmethod - def __track_proto_to_web_dict(cls, t_proto, parent_album: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: - if t_proto is None: - return {} - gid = getattr(t_proto, 'gid', None) - artists = [cls.__artist_proto_to_dict(a) for a in getattr(t_proto, 'artist', [])] - external_ids_map = cls.__external_ids_to_dict(getattr(t_proto, 'external_id', [])) - # Album for a track inside Album.disc.track is often simplified in proto - album_dict = parent_album or {} - return { - 'id': cls.__base62_from_gid(gid, 'track'), - 'name': getattr(t_proto, 'name', ''), - 'duration_ms': getattr(t_proto, 'duration', 0), - 'explicit': getattr(t_proto, 'explicit', False), - 'track_number': getattr(t_proto, 'number', 1), - 'disc_number': getattr(t_proto, 'disc_number', 1), - 'artists': artists, - 'external_ids': external_ids_map, - 'available_markets': None, # Not derived; market check code handles None by warning and continuing - 'album': album_dict - } - - @classmethod - def __album_proto_to_web_dict(cls, a_proto) -> Dict[str, Any]: - if a_proto is None: - return {} - gid = getattr(a_proto, 'gid', None) - # Album basic fields - title = getattr(a_proto, 'name', '') - album_type = None - try: - # Map enum to typical Web API strings when possible - t_val = getattr(a_proto, 'type', None) - if t_val is not None: - # Common mapping heuristic - # 1 = ALBUM, 2 = SINGLE, 3 = COMPILATION (values may differ by proto) - mapping = {1: 'album', 2: 'single', 3: 'compilation'} - album_type = mapping.get(int(t_val), None) - except Exception: - album_type = None - - # Date - release_date_str = '' - release_date_precision = 'day' - try: - date = getattr(a_proto, 'date', None) - year = getattr(date, 'year', 0) if date else 0 - month = getattr(date, 'month', 0) if date else 0 - day = getattr(date, 'day', 0) if date else 0 - if year and month and day: - release_date_str = f"{year:04d}-{month:02d}-{day:02d}" - release_date_precision = 'day' - elif year and month: - release_date_str = f"{year:04d}-{month:02d}" - release_date_precision = 'month' - elif year: - release_date_str = f"{year:04d}" - release_date_precision = 'year' - except Exception: - pass - - # Artists - artists = [cls.__artist_proto_to_dict(a) for a in getattr(a_proto, 'artist', [])] - - # Genres - genres = list(getattr(a_proto, 'genre', []) or []) - - # External IDs (e.g., upc) - external_ids_map = cls.__external_ids_to_dict(getattr(a_proto, 'external_id', [])) - - # Images - images: List[Dict[str, Any]] = [] - try: - cg = getattr(a_proto, 'cover_group', None) - if cg: - images = cls.__images_from_group(cg) - if not images: - images = cls.__images_from_repeated(getattr(a_proto, 'cover', []) or []) - except Exception: - images = [] - - # Tracks from discs - items: List[Dict[str, Any]] = [] - total_tracks = 0 - try: - for disc in getattr(a_proto, 'disc', []) or []: - disc_number = getattr(disc, 'number', 1) - for t in getattr(disc, 'track', []) or []: - total_tracks += 1 - # Album context passed minimally for track mapper - parent_album_min = { - 'id': cls.__base62_from_gid(gid, 'album'), - 'name': title, - 'album_type': album_type, - 'release_date': release_date_str, - 'release_date_precision': release_date_precision, - 'total_tracks': None, - 'images': images, - 'genres': genres, - 'artists': artists, - 'external_ids': external_ids_map, - 'available_markets': None - } - # Ensure numbering aligns with album context - setattr(t, 'disc_number', disc_number) - item = cls.__track_proto_to_web_dict(t, parent_album=parent_album_min) - # Override with correct numbering if proto uses different fields - item['disc_number'] = disc_number - if 'track_number' not in item or not item['track_number']: - item['track_number'] = getattr(t, 'number', 1) - items.append(item) - except Exception: - items = [] - - album_dict: Dict[str, Any] = { - 'id': cls.__base62_from_gid(gid, 'album'), - 'name': title, - 'album_type': album_type, - 'release_date': release_date_str, - 'release_date_precision': release_date_precision, - 'total_tracks': total_tracks or getattr(a_proto, 'num_tracks', 0), - 'genres': genres, - 'images': images, # Web API-like images with i.scdn.co URLs - 'copyrights': [], - 'available_markets': None, - 'external_ids': external_ids_map, - 'artists': artists, - 'tracks': { - 'items': items, - 'total': len(items), - 'limit': len(items), - 'offset': 0, - 'next': None, - 'previous': None - } - } - return album_dict + def __images_from_album_obj(album_obj: Dict[str, Any]) -> List[Dict[str, Any]]: + imgs = album_obj.get('images') + return imgs if isinstance(imgs, list) else [] # ------------------------- public API ------------------------- @classmethod def get_track(cls, ids, client_id=None, client_secret=None): cls.__check_initialized() try: - t_id = TrackId.from_base62(ids) - t_proto = cls.__session.api().get_metadata_4_track(t_id) - if not t_proto: - raise InvalidLink(ids) - # Build minimal album context from nested album proto if present - album_proto = getattr(t_proto, 'album', None) - album_ctx = None - try: - if album_proto is not None: - agid = getattr(album_proto, 'gid', None) - # Images for embedded album - images: List[Dict[str, Any]] = [] - try: - cg = getattr(album_proto, 'cover_group', None) - if cg: - images = cls.__images_from_group(cg) - if not images: - images = cls.__images_from_repeated(getattr(album_proto, 'cover', []) or []) - except Exception: - images = [] - album_ctx = { - 'id': cls.__base62_from_gid(agid, 'album'), - 'name': getattr(album_proto, 'name', ''), - 'images': images, - 'genres': [], - 'available_markets': None - } - except Exception: + if cls.__client is None: + # Fallback to previous proto logic if client is unavailable + t_id = TrackId.from_base62(ids) + t_proto = cls.__session.api().get_metadata_4_track(t_id) # type: ignore[union-attr] + if not t_proto: + raise InvalidLink(ids) + # Minimal album context from nested album proto if present + album_proto = getattr(t_proto, 'album', None) album_ctx = None - return cls.__track_proto_to_web_dict(t_proto, parent_album=album_ctx) + try: + if album_proto is not None: + agid = getattr(album_proto, 'gid', None) + images: List[Dict[str, Any]] = [] + try: + cg = getattr(album_proto, 'cover_group', None) + if cg: + # Map image group + for im in getattr(cg, 'image', []) or []: + fid = getattr(im, 'file_id', None) + if fid: + images.append({ + 'url': f"https://i.scdn.co/image/{fid.hex()}", + 'width': getattr(im, 'width', 0), + 'height': getattr(im, 'height', 0) + }) + if not images: + for im in getattr(album_proto, 'cover', []) or []: + fid = getattr(im, 'file_id', None) + if fid: + images.append({ + 'url': f"https://i.scdn.co/image/{fid.hex()}", + 'width': getattr(im, 'width', 0), + 'height': getattr(im, 'height', 0) + }) + except Exception: + images = [] + album_ctx = { + 'id': cls.__base62_from_gid(agid, 'album'), + 'name': getattr(album_proto, 'name', ''), + 'images': images, + 'genres': [], + 'available_markets': None + } + except Exception: + album_ctx = None + # Build track dict + artists = [] + try: + for a in getattr(t_proto, 'artist', []) or []: + artists.append({'id': cls.__base62_from_gid(getattr(a, 'gid', None), 'artist'), 'name': getattr(a, 'name', '')}) + except Exception: + pass + external_ids_map: Dict[str, str] = {} + try: + for ext in getattr(t_proto, 'external_id', []) or []: + t = getattr(ext, 'type', None) + v = getattr(ext, 'id', None) + if t and v: + external_ids_map[str(t).lower()] = v + except Exception: + pass + return { + 'id': cls.__base62_from_gid(getattr(t_proto, 'gid', None), 'track'), + 'name': getattr(t_proto, 'name', ''), + 'duration_ms': getattr(t_proto, 'duration', 0), + 'explicit': getattr(t_proto, 'explicit', False), + 'track_number': getattr(t_proto, 'number', 1), + 'disc_number': getattr(t_proto, 'disc_number', 1), + 'artists': artists, + 'external_ids': external_ids_map, + 'available_markets': None, + 'album': album_ctx + } + # Preferred: LibrespotClient + obj = cls.__client.get_track(ids) + return obj except InvalidLink: raise except Exception: @@ -303,7 +170,6 @@ class Spo: try: tracks.append(cls.get_track(tid)) except Exception: - # Preserve order with None entries similar to Web API behavior on bad IDs tracks.append(None) return {'tracks': tracks} @@ -311,11 +177,92 @@ class Spo: def get_album(cls, ids, client_id=None, client_secret=None): cls.__check_initialized() try: - a_id = AlbumId.from_base62(ids) - a_proto = cls.__session.api().get_metadata_4_album(a_id) - if not a_proto: - raise InvalidLink(ids) - return cls.__album_proto_to_web_dict(a_proto) + if cls.__client is None: + # Fallback to previous behavior using proto mapping + a_id = AlbumId.from_base62(ids) + a_proto = cls.__session.api().get_metadata_4_album(a_id) # type: ignore[union-attr] + if not a_proto: + raise InvalidLink(ids) + # Reuse existing private mapper for proto shape + # NOTE: import annotations above provided earlier methods; to avoid duplication, call through get_track for items + # Basic fields + title = getattr(a_proto, 'name', '') + # Images + images: List[Dict[str, Any]] = [] + try: + cg = getattr(a_proto, 'cover_group', None) + if cg: + for im in getattr(cg, 'image', []) or []: + fid = getattr(im, 'file_id', None) + if fid: + images.append({'url': f"https://i.scdn.co/image/{fid.hex()}", 'width': getattr(im, 'width', 0), 'height': getattr(im, 'height', 0)}) + if not images: + for im in getattr(a_proto, 'cover', []) or []: + fid = getattr(im, 'file_id', None) + if fid: + images.append({'url': f"https://i.scdn.co/image/{fid.hex()}", 'width': getattr(im, 'width', 0), 'height': getattr(im, 'height', 0)}) + except Exception: + images = [] + # Build simplified tracks list by disc order + items: List[Dict[str, Any]] = [] + total_tracks = 0 + try: + for disc in getattr(a_proto, 'disc', []) or []: + disc_number = getattr(disc, 'number', 1) + for t in getattr(disc, 'track', []) or []: + total_tracks += 1 + setattr(t, 'disc_number', disc_number) + item = cls.get_track(cls.__base62_from_gid(getattr(t, 'gid', None), 'track') or "") + if isinstance(item, dict): + # Ensure numbering + item['disc_number'] = disc_number + if not item.get('track_number'): + item['track_number'] = getattr(t, 'number', 1) + items.append(item) + except Exception: + items = [] + return { + 'id': cls.__base62_from_gid(getattr(a_proto, 'gid', None), 'album'), + 'name': title, + 'images': images, + 'tracks': { + 'items': items, + 'total': len(items), + 'limit': len(items), + 'offset': 0, + 'next': None, + 'previous': None + } + } + # Preferred: LibrespotClient, then reshape to Spo-compatible album dict + album_obj = cls.__client.get_album(ids, include_tracks=True) + # album_obj['tracks'] is a list of full track objects; convert to Spo shape + items = [] + for tr in album_obj.get('tracks', []) or []: + if isinstance(tr, dict): + items.append(tr) + result = { + 'id': album_obj.get('id'), + 'name': album_obj.get('name'), + 'album_type': album_obj.get('album_type'), + 'release_date': album_obj.get('release_date'), + 'release_date_precision': album_obj.get('release_date_precision'), + 'total_tracks': album_obj.get('total_tracks') or len(items), + 'genres': album_obj.get('genres') or [], + 'images': cls.__images_from_album_obj(album_obj), + 'available_markets': album_obj.get('available_markets'), + 'external_ids': album_obj.get('external_ids') or {}, + 'artists': album_obj.get('artists') or [], + 'tracks': { + 'items': items, + 'total': len(items), + 'limit': len(items), + 'offset': 0, + 'next': None, + 'previous': None + } + } + return result except InvalidLink: raise except Exception: @@ -325,91 +272,54 @@ class Spo: def get_playlist(cls, ids, client_id=None, client_secret=None): cls.__check_initialized() try: - # PlaylistId accepts base62-ish/id string directly - p_id = PlaylistId(ids) - p_proto = cls.__session.api().get_playlist(p_id) - if not p_proto: - raise InvalidLink(ids) - # Minimal mapping sufficient for current playlist flow - name = None - try: - attrs = getattr(p_proto, 'attributes', None) - name = getattr(attrs, 'name', None) if attrs else None - except Exception: + if cls.__client is None: + # Fallback to previous behavior (proto mapping) + p_id = PlaylistId(ids) + p_proto = cls.__session.api().get_playlist(p_id) # type: ignore[union-attr] + if not p_proto: + raise InvalidLink(ids) name = None - owner_name = getattr(p_proto, 'owner_username', None) or 'Unknown Owner' + try: + attrs = getattr(p_proto, 'attributes', None) + name = getattr(attrs, 'name', None) if attrs else None + except Exception: + name = None + owner_name = getattr(p_proto, 'owner_username', None) or 'Unknown Owner' + items = [] + try: + contents = getattr(p_proto, 'contents', None) + for it in getattr(contents, 'items', []) or []: + tref = getattr(it, 'track', None) + gid = getattr(tref, 'gid', None) if tref else None + base62 = cls.__base62_from_gid(gid, 'track') if gid else None + if base62: + items.append({'track': {'id': base62}}) + except Exception: + items = [] + return { + 'name': name or 'Unknown Playlist', + 'owner': {'display_name': owner_name}, + 'images': [], + 'tracks': {'items': items, 'total': len(items)} + } + # Preferred: LibrespotClient, reshape minimally to prior output + pl_obj = cls.__client.get_playlist(ids, expand_items=False) items = [] try: - contents = getattr(p_proto, 'contents', None) - for it in getattr(contents, 'items', []) or []: - # Attempt to obtain a track gid/id from multiple potential locations - tref = getattr(it, 'track', None) - gid = getattr(tref, 'gid', None) if tref else None - base62 = cls.__base62_from_gid(gid, 'track') if gid else None - - # Some playlists can reference an "original_track" field - if not base62: - orig = getattr(it, 'original_track', None) - ogid = getattr(orig, 'gid', None) if orig else None - base62 = cls.__base62_from_gid(ogid, 'track') if ogid else None - - # As an additional fallback, try to parse a spotify:track: URI if exposed on the nested track - if not base62: - uri = getattr(tref, 'uri', None) if tref else None - if isinstance(uri, str) and uri.startswith("spotify:track:"): - try: - parts = uri.split(":") - maybe_id = parts[-1] if parts else None - if maybe_id and len(maybe_id) == 22: - base62 = maybe_id - elif maybe_id and len(maybe_id) in (32, 40): - from librespot.metadata import TrackId - tid = TrackId.from_hex(maybe_id) - base62 = tid.to_spotify_uri().split(":")[-1] - except Exception: - base62 = None - - # Fallback: some implementations expose the URI at the item level - if not base62: - item_uri = getattr(it, 'uri', None) - if isinstance(item_uri, str) and item_uri.startswith("spotify:track:"): - try: - parts = item_uri.split(":") - maybe_id = parts[-1] if parts else None - if maybe_id and len(maybe_id) == 22: - base62 = maybe_id - elif maybe_id and len(maybe_id) in (32, 40): - from librespot.metadata import TrackId - tid = TrackId.from_hex(maybe_id) - base62 = tid.to_spotify_uri().split(":")[-1] - except Exception: - base62 = None - - # Fallback: check original_track uri if provided - if not base62: - orig = getattr(it, 'original_track', None) - o_uri = getattr(orig, 'uri', None) if orig else None - if isinstance(o_uri, str) and o_uri.startswith("spotify:track:"): - try: - parts = o_uri.split(":") - maybe_id = parts[-1] if parts else None - if maybe_id and len(maybe_id) == 22: - base62 = maybe_id - elif maybe_id and len(maybe_id) in (32, 40): - from librespot.metadata import TrackId - tid = TrackId.from_hex(maybe_id) - base62 = tid.to_spotify_uri().split(":")[-1] - except Exception: - base62 = None - - if base62: - items.append({'track': {'id': base62}}) + # pl_obj['tracks']['items'] have 'track' possibly as stub dict already + trks = pl_obj.get('tracks', {}).get('items', []) + for it in trks: + tr = it.get('track') if isinstance(it, dict) else None + if isinstance(tr, dict): + tid = tr.get('id') + if tid: + items.append({'track': {'id': tid}}) except Exception: items = [] return { - 'name': name or 'Unknown Playlist', - 'owner': {'display_name': owner_name}, - 'images': [], + 'name': pl_obj.get('name') or 'Unknown Playlist', + 'owner': {'display_name': pl_obj.get('owner', {}).get('display_name') or 'Unknown Owner'}, + 'images': pl_obj.get('images') or [], 'tracks': {'items': items, 'total': len(items)} } except InvalidLink: @@ -421,11 +331,11 @@ class Spo: def get_episode(cls, ids, client_id=None, client_secret=None): cls.__check_initialized() try: + # Episodes not supported by LibrespotClient wrapper yet; use raw session e_id = EpisodeId.from_base62(ids) - e_proto = cls.__session.api().get_metadata_4_episode(e_id) + e_proto = cls.__session.api().get_metadata_4_episode(e_id) # type: ignore[union-attr] if not e_proto: raise InvalidLink(ids) - # Map show info show_proto = getattr(e_proto, 'show', None) show_id = None show_name = '' @@ -437,10 +347,18 @@ class Spo: publisher = getattr(show_proto, 'publisher', '') if show_proto else '' except Exception: pass - # Images for episode (cover_image ImageGroup) images: List[Dict[str, Any]] = [] try: - images = cls.__images_from_group(getattr(e_proto, 'cover_image', None)) + # cover_image is an ImageGroup + cg = getattr(e_proto, 'cover_image', None) + for im in getattr(cg, 'image', []) or []: + fid = getattr(im, 'file_id', None) + if fid: + images.append({ + 'url': f"https://i.scdn.co/image/{fid.hex()}", + 'width': getattr(im, 'width', 0), + 'height': getattr(im, 'height', 0) + }) except Exception: images = [] return { @@ -467,36 +385,68 @@ class Spo: Each item contains an external_urls.spotify link, minimally enough for download_artist.""" cls.__check_initialized() try: - ar_id = ArtistId.from_base62(ids) - ar_proto = cls.__session.api().get_metadata_4_artist(ar_id) - if not ar_proto: - raise InvalidLink(ids) - # Parse requested groups + if cls.__client is None: + ar_id = ArtistId.from_base62(ids) + ar_proto = cls.__session.api().get_metadata_4_artist(ar_id) # type: ignore[union-attr] + if not ar_proto: + raise InvalidLink(ids) + requested = [s.strip().lower() for s in str(album_type).split(',') if s.strip()] + order = ['album', 'single', 'compilation', 'appears_on'] + items: List[Dict[str, Any]] = [] + for group_name in order: + if requested and group_name not in requested: + continue + attr = f"{group_name}_group" + grp = getattr(ar_proto, attr, None) + if not grp: + continue + try: + for ag in grp: + albums = getattr(ag, 'album', []) or [] + for a in albums: + gid = getattr(a, 'gid', None) + base62 = cls.__base62_from_gid(gid, 'album') if gid else None + name = getattr(a, 'name', '') + if base62: + items.append({ + 'name': name, + 'external_urls': {'spotify': f"https://open.spotify.com/album/{base62}"} + }) + if limit and len(items) >= int(limit): + break + if limit and len(items) >= int(limit): + break + except Exception: + continue + if limit and len(items) >= int(limit): + break + return { + 'id': cls.__base62_from_gid(getattr(ar_proto, 'gid', None), 'artist'), + 'name': getattr(ar_proto, 'name', ''), + 'items': items + } + # Preferred: LibrespotClient then map to legacy items shape + artist_obj = cls.__client.get_artist(ids) requested = [s.strip().lower() for s in str(album_type).split(',') if s.strip()] order = ['album', 'single', 'compilation', 'appears_on'] items: List[Dict[str, Any]] = [] for group_name in order: if requested and group_name not in requested: continue - attr = f"{group_name}_group" - grp = getattr(ar_proto, attr, None) + key = f"{group_name}_group" + grp = artist_obj.get(key) if isinstance(artist_obj, dict) else None if not grp: continue - # grp is repeated AlbumGroup; each has 'album' repeated Album try: - for ag in grp: - albums = getattr(ag, 'album', []) or [] - for a in albums: - gid = getattr(a, 'gid', None) - base62 = cls.__base62_from_gid(gid, 'album') if gid else None - name = getattr(a, 'name', '') - if base62: - items.append({ - 'name': name, - 'external_urls': {'spotify': f"https://open.spotify.com/album/{base62}"} - }) - if limit and len(items) >= int(limit): - break + # LibrespotClient flattens to arrays of album ids for groups + # We only need external_urls.spotify links + for album_id in grp: + if not album_id: + continue + items.append({ + 'name': None, + 'external_urls': {'spotify': f"https://open.spotify.com/album/{album_id}"} + }) if limit and len(items) >= int(limit): break except Exception: @@ -504,8 +454,8 @@ class Spo: if limit and len(items) >= int(limit): break return { - 'id': cls.__base62_from_gid(getattr(ar_proto, 'gid', None), 'artist'), - 'name': getattr(ar_proto, 'name', ''), + 'id': artist_obj.get('id') if isinstance(artist_obj, dict) else None, + 'name': artist_obj.get('name') if isinstance(artist_obj, dict) else '', 'items': items } except InvalidLink: @@ -547,5 +497,5 @@ class Spo: req.set_catalogue(catalogue) if image_size: req.set_image_size(image_size) - res = cls.__session.search().request(req) + res = cls.__session.search().request(req) # type: ignore[union-attr] return res diff --git a/deezspot/libutils/__init__.py b/deezspot/libutils/__init__.py index e69de29..9b3231c 100644 --- a/deezspot/libutils/__init__.py +++ b/deezspot/libutils/__init__.py @@ -0,0 +1 @@ +from .librespot import LibrespotClient diff --git a/deezspot/libutils/librespot.py b/deezspot/libutils/librespot.py new file mode 100644 index 0000000..e5e4266 --- /dev/null +++ b/deezspot/libutils/librespot.py @@ -0,0 +1,611 @@ +from __future__ import annotations + +import base64 +import datetime +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, List, Optional, Union + +from google.protobuf.descriptor import FieldDescriptor +from google.protobuf.message import Message + +from librespot.core import Session +from librespot.metadata import AlbumId, ArtistId, PlaylistId, TrackId +from librespot import util +from librespot.proto import Metadata_pb2 as Metadata +from librespot.proto import Playlist4External_pb2 as P4 + + +class LibrespotClient: + """ + Thin wrapper around the internal librespot API, exposing convenient helpers that + return Web API-like dictionaries for albums, tracks, artists, and playlists. + + Typical usage: + + client = LibrespotClient(stored_credentials_path="/path/to/credentials.json") + album = client.get_album("spotify:album:...", include_tracks=True) + track = client.get_track("...base62...") + playlist = client.get_playlist("spotify:playlist:...", expand_items=True) + client.close() + """ + + def __init__( + self, + stored_credentials_path: Optional[str] = None, + session: Optional[Session] = None, + max_workers: int = 16, + ) -> None: + self._session: Session = session if session is not None else self._create_session(stored_credentials_path) + self._max_workers: int = max(1, min(32, max_workers)) + self._track_object_cache: Dict[str, Optional[Dict[str, Any]]] = {} + + # ---------- Public API ---------- + + def close(self) -> None: + if hasattr(self, "_session") and self._session is not None: + try: + self._session.close() + except Exception: + pass + + def get_album(self, album: Union[str, AlbumId], include_tracks: bool = False) -> Dict[str, Any]: + album_id = self._ensure_album_id(album) + album_proto = self._session.api().get_metadata_4_album(album_id) + return self._album_proto_to_object(album_proto, include_tracks=include_tracks, for_embed=False) + + def get_track(self, track: Union[str, TrackId]) -> Dict[str, Any]: + track_id = self._ensure_track_id(track) + track_proto = self._session.api().get_metadata_4_track(track_id) + return self._track_proto_to_object(track_proto) + + def get_artist(self, artist: Union[str, ArtistId]) -> Dict[str, Any]: + artist_id = self._ensure_artist_id(artist) + artist_proto = self._session.api().get_metadata_4_artist(artist_id) + return self._proto_to_full_json(artist_proto) + + def get_playlist(self, playlist: Union[str, PlaylistId], expand_items: bool = False) -> Dict[str, Any]: + playlist_id = self._ensure_playlist_id(playlist) + playlist_proto = self._session.api().get_playlist(playlist_id) + return self._playlist_proto_to_object(playlist_proto, include_track_objects=expand_items) + + # ---------- ID parsing helpers ---------- + + @staticmethod + def parse_input_id(kind: str, value: str) -> Union[TrackId, AlbumId, ArtistId, PlaylistId]: + s = value.strip() + if kind == "track": + if s.startswith("spotify:track:"): + return TrackId.from_uri(s) + if "open.spotify.com/track/" in s: + base = s.split("open.spotify.com/track/")[-1].split("?")[0].split("#")[0].strip("/") + return TrackId.from_base62(base) + return TrackId.from_base62(s) + if kind == "album": + if s.startswith("spotify:album:"): + return AlbumId.from_uri(s) + if "open.spotify.com/album/" in s: + base = s.split("open.spotify.com/album/")[-1].split("?")[0].split("#")[0].strip("/") + return AlbumId.from_base62(base) + return AlbumId.from_base62(s) + if kind == "artist": + if s.startswith("spotify:artist:"): + return ArtistId.from_uri(s) + if "open.spotify.com/artist/" in s: + base = s.split("open.spotify.com/artist/")[-1].split("?")[0].split("#")[0].strip("/") + return ArtistId.from_base62(base) + return ArtistId.from_base62(s) + if kind == "playlist": + if s.startswith("spotify:playlist:"): + return PlaylistId.from_uri(s) + if "open.spotify.com/playlist/" in s: + base = s.split("open.spotify.com/playlist/")[-1].split("?")[0].split("#")[0].strip("/") + return PlaylistId(base) + return PlaylistId(s) + raise RuntimeError(f"Unknown kind: {kind}") + + # ---------- Private: session ---------- + + @staticmethod + def _create_session(stored_credentials_path: Optional[str]) -> Session: + if not stored_credentials_path: + raise ValueError("stored_credentials_path is required when no Session is provided") + conf = ( + Session.Configuration.Builder() + .set_stored_credential_file(stored_credentials_path) + .build() + ) + builder = Session.Builder(conf) + builder.stored_file(stored_credentials_path) + return builder.create() + + # ---------- Private: ID coercion ---------- + + def _ensure_track_id(self, v: Union[str, TrackId]) -> TrackId: + if isinstance(v, TrackId): + return v + return self.parse_input_id("track", v) # type: ignore[return-value] + + def _ensure_album_id(self, v: Union[str, AlbumId]) -> AlbumId: + if isinstance(v, AlbumId): + return v + return self.parse_input_id("album", v) # type: ignore[return-value] + + def _ensure_artist_id(self, v: Union[str, ArtistId]) -> ArtistId: + if isinstance(v, ArtistId): + return v + return self.parse_input_id("artist", v) # type: ignore[return-value] + + def _ensure_playlist_id(self, v: Union[str, PlaylistId]) -> PlaylistId: + if isinstance(v, PlaylistId): + return v + return self.parse_input_id("playlist", v) # type: ignore[return-value] + + # ---------- Private: conversions ---------- + + @staticmethod + def _bytes_to_base62(b: bytes) -> str: + try: + return TrackId.base62.encode(b).decode("ascii") + except Exception: + return "" + + def _proto_to_full_json(self, msg: Any) -> Any: + if isinstance(msg, Message): + msg_name = msg.DESCRIPTOR.name if hasattr(msg, "DESCRIPTOR") else "" + if msg_name == "Image": + url = self._image_url_from_file_id(getattr(msg, "file_id", b"")) + width = getattr(msg, "width", 0) or 0 + height = getattr(msg, "height", 0) or 0 + return self._prune_empty({"url": url, "width": width, "height": height}) + + if msg_name == "TopTracks": + country = getattr(msg, "country", "") or "" + ids: List[str] = [] + try: + for t in getattr(msg, "track", []): + ids.append(self._bytes_to_base62(getattr(t, "gid", b""))) + except Exception: + pass + return self._prune_empty({"country": country or None, "track": ids}) + + if hasattr(msg, "album"): + try: + albums = getattr(msg, "album", []) + ids: List[str] = [] + for a in albums: + ids.append(self._bytes_to_base62(getattr(a, "gid", b""))) + return {"album": ids} + except Exception: + pass + + out: Dict[str, Any] = {} + for field, value in msg.ListFields(): + name = field.name + if name in ("album_group", "single_group", "appears_on_group"): + try: + ids = [] + for ag in value: # repeated AlbumGroup + for a in getattr(ag, "album", []): + ids.append(self._bytes_to_base62(getattr(a, "gid", b""))) + out[name] = ids + continue + except Exception: + pass + name_out = "id" if name == "gid" else name + if field.type == FieldDescriptor.TYPE_BYTES: + if field.label == FieldDescriptor.LABEL_REPEATED: + out[name_out] = [self._bytes_to_base62(v) for v in value] + else: + out[name_out] = self._bytes_to_base62(value) + elif field.type == FieldDescriptor.TYPE_MESSAGE: + if field.label == FieldDescriptor.LABEL_REPEATED: + out[name_out] = [self._proto_to_full_json(v) for v in value] + else: + out[name_out] = self._proto_to_full_json(value) + else: + if field.label == FieldDescriptor.LABEL_REPEATED: + out[name_out] = list(value) + else: + out[name_out] = value + return out + if isinstance(msg, (bytes, bytearray)): + return self._bytes_to_base62(bytes(msg)) + if isinstance(msg, (list, tuple)): + return [self._proto_to_full_json(v) for v in msg] + return msg + + @staticmethod + def _prune_empty(obj: Any) -> Any: + if isinstance(obj, dict): + return {k: LibrespotClient._prune_empty(v) for k, v in obj.items() if v not in (None, "", [], {})} + if isinstance(obj, list): + return [LibrespotClient._prune_empty(v) for v in obj if v not in (None, "", [], {})] + return obj + + @staticmethod + def _image_url_from_file_id(file_id: bytes) -> Optional[str]: + if not file_id: + return None + return f"https://i.scdn.co/image/{util.bytes_to_hex(file_id)}" + + @staticmethod + def _split_countries(countries: str) -> List[str]: + if not countries: + return [] + s = countries.strip() + if " " in s: + return [c for c in s.split(" ") if c] + return [s[i : i + 2] for i in range(0, len(s), 2) if len(s[i : i + 2]) == 2] + + def _restrictions_to_available_markets(self, restrictions: List[Metadata.Restriction]) -> List[str]: + for r in restrictions: + allowed = getattr(r, "countries_allowed", "") + if isinstance(allowed, str) and allowed: + return self._split_countries(allowed) + return [] + + @staticmethod + def _external_ids_to_dict(ext_ids: List[Metadata.ExternalId]) -> Dict[str, str]: + out: Dict[str, str] = {} + for e in ext_ids: + t = getattr(e, "type", "").lower() + v = getattr(e, "id", "") + if t and v and t not in out: + out[t] = v + return out + + @staticmethod + def _album_type_to_str(a: Metadata.Album) -> str: + type_str = getattr(a, "type_str", "") + if isinstance(type_str, str) and type_str: + return type_str.lower() + t = getattr(a, "type", None) + if t is None: + return "album" + try: + mapping = { + Metadata.Album.ALBUM: "album", + Metadata.Album.SINGLE: "single", + Metadata.Album.COMPILATION: "compilation", + Metadata.Album.EP: "ep", + } + return mapping.get(t, "album") + except Exception: + return "album" + + @staticmethod + def _date_to_release_fields(d: Optional[Metadata.Date]) -> (str, str): + if d is None: + return "", "day" + y = getattr(d, "year", 0) + m = getattr(d, "month", 0) + day = getattr(d, "day", 0) + if y and m and day: + return f"{y:04d}-{m:02d}-{day:02d}", "day" + if y and m: + return f"{y:04d}-{m:02d}", "month" + if y: + return f"{y:04d}", "year" + return "", "day" + + def _images_from_group( + self, + group: Optional[Metadata.ImageGroup], + fallback_images: Optional[List[Metadata.Image]] = None, + ) -> List[Dict[str, Any]]: + images: List[Dict[str, Any]] = [] + seq = [] + if group is not None: + seq = getattr(group, "image", []) + elif fallback_images is not None: + seq = fallback_images + for im in seq: + url = self._image_url_from_file_id(getattr(im, "file_id", b"")) + if not url: + continue + width = getattr(im, "width", 0) or 0 + height = getattr(im, "height", 0) or 0 + images.append({"url": url, "width": width, "height": height}) + seen = set() + uniq: List[Dict[str, Any]] = [] + for it in images: + u = it.get("url") + if u in seen: + continue + seen.add(u) + uniq.append(it) + return uniq + + def _artist_ref_to_object(self, a: Metadata.Artist) -> Dict[str, Any]: + gid = getattr(a, "gid", b"") + hex_id = util.bytes_to_hex(gid) if gid else "" + uri = "" + base62 = "" + if hex_id: + try: + aid = ArtistId.from_hex(hex_id) + uri = aid.to_spotify_uri() + base62 = uri.split(":")[-1] + except Exception: + pass + return { + "external_urls": {"spotify": f"https://open.spotify.com/artist/{base62}" if base62 else ""}, + "id": base62, + "name": getattr(a, "name", "") or "", + "type": "artist", + "uri": uri or "", + } + + def _album_proto_to_object( + self, + a: Metadata.Album, + include_tracks: bool = False, + for_embed: bool = False, + ) -> Dict[str, Any]: + gid = getattr(a, "gid", b"") + hex_id = util.bytes_to_hex(gid) if gid else "" + uri = "" + base62 = "" + if hex_id: + try: + aid = AlbumId.from_hex(hex_id) + uri = aid.to_spotify_uri() + base62 = uri.split(":")[-1] + except Exception: + pass + + available = self._restrictions_to_available_markets(getattr(a, "restriction", [])) + release_date, release_precision = self._date_to_release_fields(getattr(a, "date", None)) + + artists = [self._artist_ref_to_object(ar) for ar in getattr(a, "artist", [])] + + track_ids: List[str] = [] + if not for_embed: + for d in getattr(a, "disc", []): + for t in getattr(d, "track", []): + tid_hex = util.bytes_to_hex(getattr(t, "gid", b"")) if getattr(t, "gid", b"") else "" + if not tid_hex: + continue + try: + tid = TrackId.from_hex(tid_hex) + t_uri = tid.to_spotify_uri() + t_base62 = t_uri.split(":")[-1] + if t_base62: + track_ids.append(t_base62) + except Exception: + pass + + track_list_value: Optional[List[Any]] = None + if not for_embed: + if include_tracks and self._session is not None and track_ids: + fetched = self._fetch_track_objects(track_ids) + expanded: List[Dict[str, Any]] = [] + for b62 in track_ids: + obj = fetched.get(b62) + if obj is not None: + expanded.append(obj) + else: + expanded.append({ + "id": b62, + "uri": f"spotify:track:{b62}", + "type": "track", + "external_urls": {"spotify": f"https://open.spotify.com/track/{b62}"}, + }) + track_list_value = expanded + else: + track_list_value = track_ids + + images = self._images_from_group(getattr(a, "cover_group", None), getattr(a, "cover", [])) + + result: Dict[str, Any] = { + "album_type": self._album_type_to_str(a) or None, + **({"total_tracks": sum(len(getattr(d, "track", [])) for d in getattr(a, "disc", []))} if not for_embed else {}), + "available_markets": available, + "external_urls": {"spotify": f"https://open.spotify.com/album/{base62}"} if base62 else {}, + "id": base62 or None, + "images": images or None, + "name": getattr(a, "name", "") or None, + "release_date": release_date or None, + "release_date_precision": release_precision or None, + "type": "album", + "uri": uri or None, + "artists": artists or None, + **({"tracks": track_list_value} if (not for_embed and track_list_value is not None) else {}), + "copyrights": [{ + "text": getattr(c, "text", ""), + "type": str(getattr(c, "type", "")), + } for c in getattr(a, "copyright", [])], + "external_ids": self._external_ids_to_dict(getattr(a, "external_id", [])) or None, + "label": getattr(a, "label", "") or None, + "popularity": getattr(a, "popularity", 0) or 0, + } + return self._prune_empty(result) + + def _track_proto_to_object(self, t: Metadata.Track) -> Dict[str, Any]: + tid_hex = util.bytes_to_hex(getattr(t, "gid", b"")) if getattr(t, "gid", b"") else "" + uri = "" + base62 = "" + if tid_hex: + try: + tid = TrackId.from_hex(tid_hex) + uri = tid.to_spotify_uri() + base62 = uri.split(":")[-1] + except Exception: + pass + + album_obj = self._album_proto_to_object(getattr(t, "album", None), include_tracks=False, for_embed=True) if getattr(t, "album", None) else None + + preview_url = None + previews = getattr(t, "preview", []) + if previews: + pf = previews[0] + pf_id = getattr(pf, "file_id", b"") + if pf_id: + try: + preview_url = f"https://p.scdn.co/mp3-preview/{util.bytes_to_hex(pf_id)}" + except Exception: + preview_url = None + + licensor_uuid = None + licensor = getattr(t, "licensor", None) + if licensor is not None: + licensor_uuid = util.bytes_to_hex(getattr(licensor, "uuid", b"")) if getattr(licensor, "uuid", b"") else None + + result = { + "album": album_obj, + "artists": [self._artist_ref_to_object(a) for a in getattr(t, "artist", [])], + "available_markets": self._restrictions_to_available_markets(getattr(t, "restriction", [])), + "disc_number": getattr(t, "disc_number", 0) or None, + "duration_ms": getattr(t, "duration", 0) or None, + "explicit": bool(getattr(t, "explicit", False)) or None, + "external_ids": self._external_ids_to_dict(getattr(t, "external_id", [])) or None, + "external_urls": {"spotify": f"https://open.spotify.com/track/{base62}"} if base62 else {}, + "id": base62 or None, + "name": getattr(t, "name", "") or None, + "popularity": getattr(t, "popularity", 0) or None, + "track_number": getattr(t, "number", 0) or None, + "type": "track", + "uri": uri or None, + "preview_url": preview_url, + "earliest_live_timestamp": getattr(t, "earliest_live_timestamp", 0) or None, + "has_lyrics": bool(getattr(t, "has_lyrics", False)) or None, + "licensor_uuid": licensor_uuid, + } + return self._prune_empty(result) + + def _playlist_proto_to_object(self, p: P4.SelectedListContent, include_track_objects: bool) -> Dict[str, Any]: + attrs = getattr(p, "attributes", None) + name = getattr(attrs, "name", "") if attrs else "" + description = getattr(attrs, "description", "") if attrs else "" + collaborative = bool(getattr(attrs, "collaborative", False)) if attrs else False + picture_bytes = getattr(attrs, "picture", b"") if attrs else b"" + + images: List[Dict[str, Any]] = [] + if isinstance(picture_bytes, (bytes, bytearray)) and len(picture_bytes) in (16, 20): + url = self._image_url_from_file_id(picture_bytes) + if url: + images.append({"url": url, "width": 0, "height": 0}) + + owner_username = getattr(p, "owner_username", "") or "" + + items: List[Dict[str, Any]] = [] + contents = getattr(p, "contents", None) + + fetched_tracks: Dict[str, Optional[Dict[str, Any]]] = {} + if include_track_objects and self._session is not None and contents is not None: + to_fetch: List[str] = [] + for it in getattr(contents, "items", []): + uri = getattr(it, "uri", "") or "" + if uri.startswith("spotify:track:"): + b62 = uri.split(":")[-1] + to_fetch.append(b62) + if to_fetch: + fetched_tracks = self._fetch_track_objects(to_fetch) + + if contents is not None: + for it in getattr(contents, "items", []): + uri = getattr(it, "uri", "") or "" + attrs_it = getattr(it, "attributes", None) + added_by = getattr(attrs_it, "added_by", "") if attrs_it else "" + ts_ms = getattr(attrs_it, "timestamp", 0) if attrs_it else 0 + item_id_bytes = getattr(attrs_it, "item_id", b"") if attrs_it else b"" + added_at_iso = None + if isinstance(ts_ms, int) and ts_ms > 0: + try: + added_at_iso = datetime.datetime.utcfromtimestamp(ts_ms / 1000.0).isoformat() + "Z" + except Exception: + added_at_iso = None + track_obj: Optional[Dict[str, Any]] = None + if include_track_objects and uri.startswith("spotify:track:"): + b62 = uri.split(":")[-1] + obj = fetched_tracks.get(b62) + if obj is not None: + track_obj = obj + else: + track_obj = { + "id": b62, + "uri": uri, + "type": "track", + "external_urls": {"spotify": f"https://open.spotify.com/track/{b62}"}, + } + else: + if uri.startswith("spotify:track:"): + b62 = uri.split(":")[-1] + track_obj = { + "id": b62, + "uri": uri, + "type": "track", + "external_urls": {"spotify": f"https://open.spotify.com/track/{b62}"}, + } + item_obj: Dict[str, Any] = { + "added_at": added_at_iso, + "added_by": { + "id": added_by, + "type": "user", + "uri": f"spotify:user:{added_by}" if added_by else "", + "external_urls": {"spotify": f"https://open.spotify.com/user/{added_by}"} if added_by else {}, + "display_name": added_by or None, + }, + "is_local": False, + "track": track_obj, + } + if isinstance(item_id_bytes, (bytes, bytearray)) and item_id_bytes: + item_obj["item_id"] = util.bytes_to_hex(item_id_bytes) + items.append(self._prune_empty(item_obj)) + + tracks_obj = self._prune_empty({ + "offset": 0, + "total": len(items), + "items": items, + }) + + rev_bytes = getattr(p, "revision", b"") if hasattr(p, "revision") else b"" + snapshot_b64 = base64.b64encode(rev_bytes).decode("ascii") if rev_bytes else None + + result = { + "name": name or None, + "description": description or None, + "collaborative": collaborative or None, + "images": images or None, + "owner": self._prune_empty({ + "id": owner_username, + "type": "user", + "uri": f"spotify:user:{owner_username}" if owner_username else "", + "external_urls": {"spotify": f"https://open.spotify.com/user/{owner_username}"} if owner_username else {}, + "display_name": owner_username or None, + }), + "snapshot_id": snapshot_b64, + "tracks": tracks_obj, + "type": "playlist", + } + return self._prune_empty(result) + + # ---------- Private: fetching ---------- + + def _fetch_single_track_object(self, base62_id: str) -> None: + try: + tid = TrackId.from_base62(base62_id) + t_proto = self._session.api().get_metadata_4_track(tid) + self._track_object_cache[base62_id] = self._track_proto_to_object(t_proto) + except Exception: + self._track_object_cache[base62_id] = None + + def _fetch_track_objects(self, base62_ids: List[str]) -> Dict[str, Optional[Dict[str, Any]]]: + seen = set() + unique: List[str] = [] + for b in base62_ids: + if not b: + continue + if b not in seen: + seen.add(b) + if b not in self._track_object_cache: + unique.append(b) + if unique: + max_workers = min(self._max_workers, max(1, len(unique))) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for b in unique: + executor.submit(self._fetch_single_track_object, b) + return {b: self._track_object_cache.get(b) for b in base62_ids if b} + + +__all__ = ["LibrespotClient"] \ No newline at end of file diff --git a/deezspot/models/__init__.py b/deezspot/models/__init__.py index 26b1a6f..1990845 100644 --- a/deezspot/models/__init__.py +++ b/deezspot/models/__init__.py @@ -7,3 +7,4 @@ Deezspot models package. # Import subpackages to ensure they're included from deezspot.models import callback from deezspot.models import download +from deezspot.models import librespot diff --git a/deezspot/models/librespot/__init__.py b/deezspot/models/librespot/__init__.py new file mode 100644 index 0000000..0f93d6b --- /dev/null +++ b/deezspot/models/librespot/__init__.py @@ -0,0 +1,23 @@ +#!/usr/bin/python3 + +from .types import Image, ExternalUrls, ArtistRef, AlbumRef +from .track import Track +from .album import Album +from .playlist import Playlist, PlaylistItem, TrackStub, TracksPage, Owner, UserMini +from .artist import Artist + +__all__ = [ + "Image", + "ExternalUrls", + "ArtistRef", + "AlbumRef", + "Track", + "Album", + "Playlist", + "PlaylistItem", + "TrackStub", + "TracksPage", + "Owner", + "UserMini", + "Artist", +] \ No newline at end of file diff --git a/deezspot/models/librespot/album.py b/deezspot/models/librespot/album.py new file mode 100644 index 0000000..6c7021a --- /dev/null +++ b/deezspot/models/librespot/album.py @@ -0,0 +1,92 @@ +#!/usr/bin/python3 + +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List, Union + +from .types import ExternalUrls, Image, ArtistRef, _str, _int +from .track import Track as TrackModel + + +@dataclass +class Album: + id: Optional[str] = None + name: Optional[str] = None + uri: Optional[str] = None + type: str = "album" + album_type: Optional[str] = None + release_date: Optional[str] = None + release_date_precision: Optional[str] = None + total_tracks: Optional[int] = None + label: Optional[str] = None + popularity: Optional[int] = None + external_urls: ExternalUrls = field(default_factory=ExternalUrls) + external_ids: Dict[str, str] = field(default_factory=dict) + available_markets: Optional[List[str]] = None + images: Optional[List[Image]] = None + artists: List[ArtistRef] = field(default_factory=list) + tracks: Optional[List[Union[str, TrackModel]]] = None + copyrights: Optional[List[Dict[str, Any]]] = None + + @staticmethod + def from_dict(obj: Any) -> "Album": + if not isinstance(obj, dict): + return Album() + imgs: List[Image] = [] + for im in obj.get("images", []) or []: + im_obj = Image.from_dict(im) + if im_obj: + imgs.append(im_obj) + artists: List[ArtistRef] = [] + for a in obj.get("artists", []) or []: + artists.append(ArtistRef.from_dict(a)) + # Tracks can be base62 strings or full track dicts + tracks_in: List[Union[str, TrackModel]] = [] + if isinstance(obj.get("tracks"), list): + for t in obj.get("tracks"): + if isinstance(t, dict): + tracks_in.append(TrackModel.from_dict(t)) + else: + ts = _str(t) + if ts: + tracks_in.append(ts) + return Album( + id=_str(obj.get("id")), + name=_str(obj.get("name")), + uri=_str(obj.get("uri")), + type=_str(obj.get("type")) or "album", + album_type=_str(obj.get("album_type")), + release_date=_str(obj.get("release_date")), + release_date_precision=_str(obj.get("release_date_precision")), + total_tracks=_int(obj.get("total_tracks")), + label=_str(obj.get("label")), + popularity=_int(obj.get("popularity")), + external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})), + external_ids=dict(obj.get("external_ids", {}) or {}), + available_markets=list(obj.get("available_markets", []) or []), + images=imgs or None, + artists=artists, + tracks=tracks_in or None, + copyrights=list(obj.get("copyrights", []) or []), + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "id": self.id, + "name": self.name, + "uri": self.uri, + "type": self.type, + "album_type": self.album_type, + "release_date": self.release_date, + "release_date_precision": self.release_date_precision, + "total_tracks": self.total_tracks, + "label": self.label, + "popularity": self.popularity, + "external_urls": self.external_urls.to_dict(), + "external_ids": self.external_ids or {}, + "available_markets": self.available_markets or [], + "images": [im.to_dict() for im in (self.images or [])], + "artists": [a.to_dict() for a in (self.artists or [])], + "tracks": [t.to_dict() if isinstance(t, TrackModel) else t for t in (self.tracks or [])], + "copyrights": self.copyrights or [], + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} \ No newline at end of file diff --git a/deezspot/models/librespot/artist.py b/deezspot/models/librespot/artist.py new file mode 100644 index 0000000..f37a9ae --- /dev/null +++ b/deezspot/models/librespot/artist.py @@ -0,0 +1,63 @@ +#!/usr/bin/python3 + +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List + +from .types import ExternalUrls, Image, _str, _int + + +@dataclass +class Artist: + id: Optional[str] = None + name: Optional[str] = None + uri: Optional[str] = None + type: str = "artist" + genres: List[str] = field(default_factory=list) + images: Optional[List[Image]] = None + popularity: Optional[int] = None + external_urls: ExternalUrls = field(default_factory=ExternalUrls) + album_group: List[str] = field(default_factory=list) + single_group: List[str] = field(default_factory=list) + compilation_group: List[str] = field(default_factory=list) + appears_on_group: List[str] = field(default_factory=list) + + @staticmethod + def from_dict(obj: Any) -> "Artist": + if not isinstance(obj, dict): + return Artist() + imgs: List[Image] = [] + for im in obj.get("images", []) or []: + im_obj = Image.from_dict(im) + if im_obj: + imgs.append(im_obj) + return Artist( + id=_str(obj.get("id")), + name=_str(obj.get("name")), + uri=_str(obj.get("uri")), + type=_str(obj.get("type")) or "artist", + genres=list(obj.get("genres", []) or []), + images=imgs or None, + popularity=_int(obj.get("popularity")), + external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})), + album_group=list(obj.get("album_group", []) or []), + single_group=list(obj.get("single_group", []) or []), + compilation_group=list(obj.get("compilation_group", []) or []), + appears_on_group=list(obj.get("appears_on_group", []) or []), + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "id": self.id, + "name": self.name, + "uri": self.uri, + "type": self.type, + "genres": self.genres or [], + "images": [im.to_dict() for im in (self.images or [])], + "popularity": self.popularity, + "external_urls": self.external_urls.to_dict(), + "album_group": self.album_group or [], + "single_group": self.single_group or [], + "compilation_group": self.compilation_group or [], + "appears_on_group": self.appears_on_group or [], + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} \ No newline at end of file diff --git a/deezspot/models/librespot/playlist.py b/deezspot/models/librespot/playlist.py new file mode 100644 index 0000000..6e8751d --- /dev/null +++ b/deezspot/models/librespot/playlist.py @@ -0,0 +1,207 @@ +#!/usr/bin/python3 + +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List, Union + +from .types import ExternalUrls, Image, _str, _int +from .track import Track as TrackModel + + +@dataclass +class UserMini: + id: Optional[str] = None + type: str = "user" + uri: Optional[str] = None + display_name: Optional[str] = None + external_urls: ExternalUrls = field(default_factory=ExternalUrls) + + @staticmethod + def from_dict(obj: Any) -> "UserMini": + if not isinstance(obj, dict): + return UserMini() + return UserMini( + id=_str(obj.get("id")), + type=_str(obj.get("type")) or "user", + uri=_str(obj.get("uri")), + display_name=_str(obj.get("display_name")), + external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})), + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "id": self.id, + "type": self.type, + "uri": self.uri, + "display_name": self.display_name, + "external_urls": self.external_urls.to_dict(), + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} + + +@dataclass +class TrackStub: + id: Optional[str] = None + type: str = "track" + uri: Optional[str] = None + external_urls: ExternalUrls = field(default_factory=ExternalUrls) + + @staticmethod + def from_dict(obj: Any) -> "TrackStub": + if not isinstance(obj, dict): + return TrackStub() + return TrackStub( + id=_str(obj.get("id")), + type=_str(obj.get("type")) or "track", + uri=_str(obj.get("uri")), + external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})), + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "id": self.id, + "type": self.type, + "uri": self.uri, + "external_urls": self.external_urls.to_dict(), + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} + + +@dataclass +class PlaylistItem: + added_at: Optional[str] = None + added_by: UserMini = field(default_factory=UserMini) + is_local: bool = False + track: Optional[Union[TrackModel, TrackStub]] = None + item_id: Optional[str] = None + + @staticmethod + def from_dict(obj: Any) -> "PlaylistItem": + if not isinstance(obj, dict): + return PlaylistItem() + track_obj = None + trk = obj.get("track") + if isinstance(trk, dict): + if trk.get("duration_ms") is not None: + track_obj = TrackModel.from_dict(trk) + else: + track_obj = TrackStub.from_dict(trk) + return PlaylistItem( + added_at=_str(obj.get("added_at")), + added_by=UserMini.from_dict(obj.get("added_by", {})), + is_local=bool(obj.get("is_local", False)), + track=track_obj, + item_id=_str(obj.get("item_id")), + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "added_at": self.added_at, + "added_by": self.added_by.to_dict(), + "is_local": self.is_local, + "track": self.track.to_dict() if hasattr(self.track, 'to_dict') and self.track else None, + "item_id": self.item_id, + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} + + +@dataclass +class TracksPage: + offset: int = 0 + total: int = 0 + items: List[PlaylistItem] = field(default_factory=list) + + @staticmethod + def from_dict(obj: Any) -> "TracksPage": + if not isinstance(obj, dict): + return TracksPage(items=[]) + items: List[PlaylistItem] = [] + for it in obj.get("items", []) or []: + items.append(PlaylistItem.from_dict(it)) + return TracksPage( + offset=_int(obj.get("offset")) or 0, + total=_int(obj.get("total")) or len(items), + items=items, + ) + + def to_dict(self) -> Dict[str, Any]: + return { + "offset": self.offset, + "total": self.total, + "items": [it.to_dict() for it in (self.items or [])] + } + + +@dataclass +class Owner: + id: Optional[str] = None + type: str = "user" + uri: Optional[str] = None + display_name: Optional[str] = None + external_urls: ExternalUrls = field(default_factory=ExternalUrls) + + @staticmethod + def from_dict(obj: Any) -> "Owner": + if not isinstance(obj, dict): + return Owner() + return Owner( + id=_str(obj.get("id")), + type=_str(obj.get("type")) or "user", + uri=_str(obj.get("uri")), + display_name=_str(obj.get("display_name")), + external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})), + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "id": self.id, + "type": self.type, + "uri": self.uri, + "display_name": self.display_name, + "external_urls": self.external_urls.to_dict(), + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} + + +@dataclass +class Playlist: + name: Optional[str] = None + description: Optional[str] = None + collaborative: Optional[bool] = None + images: Optional[List[Image]] = None + owner: Owner = field(default_factory=Owner) + snapshot_id: Optional[str] = None + tracks: TracksPage = field(default_factory=lambda: TracksPage(items=[])) + type: str = "playlist" + + @staticmethod + def from_dict(obj: Any) -> "Playlist": + if not isinstance(obj, dict): + return Playlist(tracks=TracksPage(items=[])) + imgs: List[Image] = [] + for im in obj.get("images", []) or []: + im_obj = Image.from_dict(im) + if im_obj: + imgs.append(im_obj) + return Playlist( + name=_str(obj.get("name")), + description=_str(obj.get("description")), + collaborative=bool(obj.get("collaborative")) if obj.get("collaborative") is not None else None, + images=imgs or None, + owner=Owner.from_dict(obj.get("owner", {})), + snapshot_id=_str(obj.get("snapshot_id")), + tracks=TracksPage.from_dict(obj.get("tracks", {})), + type=_str(obj.get("type")) or "playlist", + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "name": self.name, + "description": self.description, + "collaborative": self.collaborative, + "images": [im.to_dict() for im in (self.images or [])], + "owner": self.owner.to_dict(), + "snapshot_id": self.snapshot_id, + "tracks": self.tracks.to_dict(), + "type": self.type, + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} \ No newline at end of file diff --git a/deezspot/models/librespot/track.py b/deezspot/models/librespot/track.py new file mode 100644 index 0000000..dec2081 --- /dev/null +++ b/deezspot/models/librespot/track.py @@ -0,0 +1,82 @@ +#!/usr/bin/python3 + +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List + +from .types import ExternalUrls, Image, ArtistRef, AlbumRef, _str, _int, _bool + + +@dataclass +class Track: + id: Optional[str] = None + name: Optional[str] = None + uri: Optional[str] = None + type: str = "track" + duration_ms: Optional[int] = None + explicit: Optional[bool] = None + track_number: Optional[int] = None + disc_number: Optional[int] = None + popularity: Optional[int] = None + preview_url: Optional[str] = None + earliest_live_timestamp: Optional[int] = None + has_lyrics: Optional[bool] = None + licensor_uuid: Optional[str] = None + external_urls: ExternalUrls = field(default_factory=ExternalUrls) + external_ids: Dict[str, str] = field(default_factory=dict) + available_markets: Optional[List[str]] = None + artists: List[ArtistRef] = field(default_factory=list) + album: Optional[AlbumRef] = None + + @staticmethod + def from_dict(obj: Any) -> "Track": + if not isinstance(obj, dict): + return Track() + artists: List[ArtistRef] = [] + for a in obj.get("artists", []) or []: + artists.append(ArtistRef.from_dict(a)) + album_ref = None + if isinstance(obj.get("album"), dict): + album_ref = AlbumRef.from_dict(obj.get("album")) + return Track( + id=_str(obj.get("id")), + name=_str(obj.get("name")), + uri=_str(obj.get("uri")), + type=_str(obj.get("type")) or "track", + duration_ms=_int(obj.get("duration_ms")), + explicit=_bool(obj.get("explicit")), + track_number=_int(obj.get("track_number")), + disc_number=_int(obj.get("disc_number")), + popularity=_int(obj.get("popularity")), + preview_url=_str(obj.get("preview_url")), + earliest_live_timestamp=_int(obj.get("earliest_live_timestamp")), + has_lyrics=_bool(obj.get("has_lyrics")), + licensor_uuid=_str(obj.get("licensor_uuid")), + external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})), + external_ids=dict(obj.get("external_ids", {}) or {}), + available_markets=list(obj.get("available_markets", []) or []), + artists=artists, + album=album_ref, + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "id": self.id, + "name": self.name, + "uri": self.uri, + "type": self.type, + "duration_ms": self.duration_ms, + "explicit": self.explicit, + "track_number": self.track_number, + "disc_number": self.disc_number, + "popularity": self.popularity, + "preview_url": self.preview_url, + "earliest_live_timestamp": self.earliest_live_timestamp, + "has_lyrics": self.has_lyrics, + "licensor_uuid": self.licensor_uuid, + "external_urls": self.external_urls.to_dict(), + "external_ids": self.external_ids or {}, + "available_markets": self.available_markets or [], + "artists": [a.to_dict() for a in (self.artists or [])], + "album": self.album.to_dict() if self.album else None, + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} \ No newline at end of file diff --git a/deezspot/models/librespot/types.py b/deezspot/models/librespot/types.py new file mode 100644 index 0000000..c099cab --- /dev/null +++ b/deezspot/models/librespot/types.py @@ -0,0 +1,153 @@ +#!/usr/bin/python3 + +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List + + +def _str(v: Any) -> Optional[str]: + if v is None: + return None + try: + s = str(v) + return s + except Exception: + return None + + +def _int(v: Any) -> Optional[int]: + try: + if v is None: + return None + return int(v) + except Exception: + return None + + +def _bool(v: Any) -> Optional[bool]: + if isinstance(v, bool): + return v + if v in ("true", "True", "1", 1): + return True + if v in ("false", "False", "0", 0): + return False + return None + + +def _list_str(v: Any) -> Optional[List[str]]: + if v is None: + return [] + if isinstance(v, list): + out: List[str] = [] + for it in v: + s = _str(it) + if s is not None: + out.append(s) + return out + return [] + + +@dataclass +class ExternalUrls: + spotify: Optional[str] = None + + @staticmethod + def from_dict(obj: Any) -> "ExternalUrls": + if not isinstance(obj, dict): + return ExternalUrls() + return ExternalUrls( + spotify=_str(obj.get("spotify")) + ) + + def to_dict(self) -> Dict[str, Any]: + return {"spotify": self.spotify} if self.spotify else {} + + +@dataclass +class Image: + url: str + width: int = 0 + height: int = 0 + + @staticmethod + def from_dict(obj: Any) -> Optional["Image"]: + if not isinstance(obj, dict): + return None + url = _str(obj.get("url")) + if not url: + return None + w = _int(obj.get("width")) or 0 + h = _int(obj.get("height")) or 0 + return Image(url=url, width=w, height=h) + + def to_dict(self) -> Dict[str, Any]: + return {"url": self.url, "width": self.width, "height": self.height} + + +@dataclass +class ArtistRef: + id: Optional[str] = None + name: str = "" + type: str = "artist" + uri: Optional[str] = None + external_urls: ExternalUrls = field(default_factory=ExternalUrls) + + @staticmethod + def from_dict(obj: Any) -> "ArtistRef": + if not isinstance(obj, dict): + return ArtistRef() + return ArtistRef( + id=_str(obj.get("id")), + name=_str(obj.get("name")) or "", + type=_str(obj.get("type")) or "artist", + uri=_str(obj.get("uri")), + external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})), + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "id": self.id, + "name": self.name, + "type": self.type, + "uri": self.uri, + "external_urls": self.external_urls.to_dict() + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} + + +@dataclass +class AlbumRef: + id: Optional[str] = None + name: Optional[str] = None + type: str = "album" + uri: Optional[str] = None + external_urls: ExternalUrls = field(default_factory=ExternalUrls) + images: Optional[List[Image]] = None + + @staticmethod + def from_dict(obj: Any) -> "AlbumRef": + if not isinstance(obj, dict): + return AlbumRef() + imgs: List[Image] = [] + for im in obj.get("images", []) or []: + im_obj = Image.from_dict(im) + if im_obj: + imgs.append(im_obj) + return AlbumRef( + id=_str(obj.get("id")), + name=_str(obj.get("name")), + type=_str(obj.get("type")) or "album", + uri=_str(obj.get("uri")), + external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})), + images=imgs or None, + ) + + def to_dict(self) -> Dict[str, Any]: + out = { + "id": self.id, + "name": self.name, + "type": self.type, + "uri": self.uri, + "external_urls": self.external_urls.to_dict(), + "images": [im.to_dict() for im in (self.images or [])] + } + return {k: v for k, v in out.items() if v not in (None, {}, [], "")} \ No newline at end of file diff --git a/docs/librespot_client.md b/docs/librespot_client.md new file mode 100644 index 0000000..eb2d4ee --- /dev/null +++ b/docs/librespot_client.md @@ -0,0 +1,241 @@ +### LibrespotClient wrapper + +A thin, high-level wrapper over the internal librespot API that returns Web API-like dictionaries for albums, tracks, artists, and playlists. Use this to standardize access to Spotify metadata throughout the codebase. + +- Import path: `from deezspot.libutils import LibrespotClient` +- Backed by: `librespot` internal API (Session + Metadata/Playlist protos) +- Thread-safe for read operations; uses an internal in-memory cache for track object expansions + + +## Initialization + +```python +from deezspot.libutils import LibrespotClient + +# 1) Create with stored credentials file (recommended) +client = LibrespotClient(stored_credentials_path="/absolute/path/to/credentials.json") + +# 2) Or reuse an existing librespot Session +# from librespot.core import Session +# session = ... +# client = LibrespotClient(session=session) +``` + +- **stored_credentials_path**: path to JSON created by librespot credential flow +- **session**: optional existing `librespot.core.Session` (if provided, `stored_credentials_path` is not required) +- **max_workers**: optional concurrency cap for track expansion (default 16, bounded [1, 32]) + +Always dispose when done: + +```python +client.close() +``` + + +## ID/URI inputs + +All data-fetching methods accept either: +- A Spotify URI (e.g., `spotify:album:...`, `spotify:track:...`, etc) +- A base62 ID (e.g., `3KuXEGcqLcnEYWnn3OEGy0`) +- A public `open.spotify.com/...` URL (album/track/artist/playlist) +- Or the corresponding `librespot.metadata.*Id` class + +You can also use the helper if needed: + +```python +# kind in {"track", "album", "artist", "playlist"} +track_id = LibrespotClient.parse_input_id("track", "https://open.spotify.com/track/...") +``` + + +## Public API + +### get_album(album, include_tracks=False) -> dict +Fetches album metadata. + +- **album**: URI/base62/URL or `AlbumId` +- **include_tracks**: when True, expands the album's tracks to full track objects using concurrent fetches; when False, returns `tracks` as an array of track base62 IDs + +Return shape (subset): + +```json +{ + "album_type": "album", + "total_tracks": 10, + "available_markets": ["US", "GB"], + "external_urls": {"spotify": "https://open.spotify.com/album/{id}"}, + "id": "{base62}", + "images": [{"url": "https://...", "width": 640, "height": 640}], + "name": "...", + "release_date": "2020-05-01", + "release_date_precision": "day", + "type": "album", + "uri": "spotify:album:{base62}", + "artists": [{"id": "...", "name": "...", "type": "artist", "uri": "..."}], + "tracks": [ + // include_tracks=False -> ["trackBase62", ...] + // include_tracks=True -> [{ track object }, ...] + ], + "copyrights": [{"text": "...", "type": "..."}], + "external_ids": {"upc": "..."}, + "label": "...", + "popularity": 57 +} +``` + +Usage: + +```python +album = client.get_album("spotify:album:...", include_tracks=True) +``` + + +### get_track(track) -> dict +Fetches track metadata. + +- **track**: URI/base62/URL or `TrackId` + +Return shape (subset): + +```json +{ + "album": { /* embedded album (no tracks) */ }, + "artists": [{"id": "...", "name": "..."}], + "available_markets": ["US", "GB"], + "disc_number": 1, + "duration_ms": 221000, + "explicit": false, + "external_ids": {"isrc": "..."}, + "external_urls": {"spotify": "https://open.spotify.com/track/{id}"}, + "id": "{base62}", + "name": "...", + "popularity": 65, + "track_number": 1, + "type": "track", + "uri": "spotify:track:{base62}", + "preview_url": "https://p.scdn.co/mp3-preview/{hex}", + "has_lyrics": true, + "earliest_live_timestamp": 0, + "licensor_uuid": "{hex}" // when available +} +``` + +Usage: + +```python +track = client.get_track("3KuXEGcqLcnEYWnn3OEGy0") +``` + + +### get_artist(artist) -> dict +Fetches artist metadata and returns a full JSON-like mapping of the protobuf (pruned of empty fields). + +- **artist**: URI/base62/URL or `ArtistId` + +Usage: + +```python +artist = client.get_artist("https://open.spotify.com/artist/...") +``` + + +### get_playlist(playlist, expand_items=False) -> dict +Fetches playlist contents. + +- **playlist**: URI/URL/ID or `PlaylistId` (Spotify uses non-base62 playlist IDs) +- **expand_items**: when True, playlist items containing tracks are expanded to full track objects (concurrent fetch with caching); otherwise, items contain minimal track stubs with `id`, `uri`, `type`, and `external_urls` + +Return shape (subset): + +```json +{ + "name": "My Playlist", + "description": "...", + "collaborative": false, + "images": [{"url": "https://..."}], + "owner": { + "id": "username", + "type": "user", + "uri": "spotify:user:username", + "external_urls": {"spotify": "https://open.spotify.com/user/username"}, + "display_name": "username" + }, + "snapshot_id": "base64Revision==", + "tracks": { + "offset": 0, + "total": 42, + "items": [ + { + "added_at": "2023-01-01T12:34:56Z", + "added_by": {"id": "...", "type": "user", "uri": "...", "external_urls": {"spotify": "..."}, "display_name": "..."}, + "is_local": false, + "track": { + // expand_items=False -> {"id": "...", "uri": "spotify:track:...", "type": "track", "external_urls": {"spotify": "..."}} + // expand_items=True -> full track object + }, + "item_id": "{hex}" // additional reference, not a Web API field + } + ] + }, + "type": "playlist" +} +``` + +Usage: + +```python +playlist = client.get_playlist("spotify:playlist:...") +playlist_expanded = client.get_playlist("spotify:playlist:...", expand_items=True) +``` + + +## Concurrency and caching + +- When expanding tracks for albums/playlists, the client concurrently fetches missing track objects using a `ThreadPoolExecutor` with up to `max_workers` threads (default 16). +- A per-instance in-memory cache stores fetched track objects keyed by base62 ID to avoid duplicate network calls in the same process. + + +## Error handling + +- Underlying network/protobuf errors are not swallowed; wrap your calls if you need custom handling. +- Empty/missing fields are pruned from output structures where appropriate. + +Example: + +```python +try: + data = client.get_album("spotify:album:...") +except Exception as exc: + # handle failure (retry/backoff/logging) + raise +``` + + +## Migration guide (from direct librespot usage) + +Before (direct protobuf access): + +```python +album_id = AlbumId.from_base62(base62) +proto = session.api().get_metadata_4_album(album_id) +# manual traversal over `proto`... +``` + +After (wrapper): + +```python +from deezspot.libutils import LibrespotClient + +client = LibrespotClient(stored_credentials_path="/path/to/credentials.json") +try: + album = client.get_album(base62, include_tracks=True) +finally: + client.close() +``` + + +## Notes + +- Image URLs are derived from internal `file_id` bytes using the public Spotify image host. +- Playlist IDs are not base62; pass the raw ID, URI, or URL. +- For performance-critical paths, reuse a single `LibrespotClient` instance (and its cache) per worker. \ No newline at end of file