Compare commits

...

10 Commits

Author SHA1 Message Date
Xoconoch
4e2eec3bd2 feat: include image to playlist internal librespot api 2025-08-28 08:19:53 -06:00
Xoconoch
73919dbddb fix: increase connection refused 2025-08-28 07:48:09 -06:00
Xoconoch
4ef72a91e5 fix: spotify_credentials_path in deezer class 2025-08-28 07:37:34 -06:00
Xoconoch
6d33201a71 fix: make compilation_group an array of album ids in artist response 2025-08-28 07:07:13 -06:00
Xoconoch
c0cc9a6b6f fix: ditch search completely 2025-08-28 06:52:04 -06:00
Xoconoch
573098ca6e fix: ditch librespot search 2025-08-27 08:20:11 -06:00
Xoconoch
e55b15db2c feat: add data models for search results 2025-08-26 22:15:01 -06:00
Xoconoch
6c795d8d92 feat: implement librespot-powered search 2025-08-26 22:09:07 -06:00
Xoconoch
c38f10957c feat: implement public librespot api class, see docs 2025-08-26 22:02:38 -06:00
Xoconoch
0d2607e263 fix: update project build 2025-08-26 10:11:40 -06:00
15 changed files with 1885 additions and 366 deletions

1
.gitignore vendored
View File

@@ -19,3 +19,4 @@ deezspot_test.log
spotify_downloads
deezer_spo_downloads
__pycache__
test_librespot.py

View File

@@ -91,6 +91,7 @@ class DeeLogin:
password=None,
spotify_client_id=None,
spotify_client_secret=None,
spotify_credentials_path=None,
progress_callback=None,
silent=False
) -> None:
@@ -98,8 +99,12 @@ class DeeLogin:
# Store Spotify credentials
self.spotify_client_id = spotify_client_id
self.spotify_client_secret = spotify_client_secret
# Optional path to Spotify credentials.json (env override or CWD default)
self.spotify_credentials_path = os.environ.get("SPOTIFY_CREDENTIALS_PATH") or os.path.join(os.getcwd(), "credentials.json")
# Optional path to Spotify credentials.json (explicit param > env override > CWD default)
self.spotify_credentials_path = (
spotify_credentials_path
or os.environ.get("SPOTIFY_CREDENTIALS_PATH")
or os.path.join(os.getcwd(), "credentials.json")
)
# Initialize Spotify API if credentials are provided
if spotify_client_id and spotify_client_secret:

View File

@@ -1,25 +1,34 @@
#!/usr/bin/python3
from librespot.core import Session, SearchManager
from librespot.core import Session
from librespot.metadata import TrackId, AlbumId, ArtistId, EpisodeId, ShowId, PlaylistId
from deezspot.exceptions import InvalidLink
from typing import Any, Dict, List, Optional
# Note: We intentionally avoid importing spotipy. This module is now a
# thin shim over librespot's internal API, returning Web-API-shaped dicts
# consumed by spotloader's converters.
# Note: Search is handled via spotipy (Web API). Other metadata (tracks/albums/...)
# still use librespot via LibrespotClient.
from deezspot.libutils import LibrespotClient
class Spo:
__error_codes = [404, 400]
# Class-level references
__session: Optional[Session] = None
__client: Optional[LibrespotClient] = None
__initialized = False
@classmethod
def set_session(cls, session: Session):
"""Attach an active librespot Session for metadata/search operations."""
"""Attach an active librespot Session for metadata/search operations.
Also initializes the LibrespotClient wrapper used for metadata fetches.
"""
cls.__session = session
try:
cls.__client = LibrespotClient(session=session)
except Exception:
# Fallback: allow partial functionality (episode/search) via raw session
cls.__client = None
cls.__initialized = True
@classmethod
@@ -29,8 +38,8 @@ class Spo:
@classmethod
def __check_initialized(cls):
if not cls.__initialized or cls.__session is None:
raise ValueError("Spotify session not initialized. Ensure SpoLogin created a librespot Session and called Spo.set_session(session).")
if not cls.__initialized or (cls.__session is None and cls.__client is None):
raise ValueError("Spotify session/client not initialized. Ensure SpoLogin created a librespot Session and called Spo.set_session(session).")
# ------------------------- helpers -------------------------
@staticmethod
@@ -60,234 +69,91 @@ class Spo:
return None
@staticmethod
def __external_ids_to_dict(external_ids) -> Dict[str, str]:
# Map repeated ExternalId { type, id } to a simple dict
result: Dict[str, str] = {}
try:
for ext in external_ids or []:
t = getattr(ext, 'type', None)
v = getattr(ext, 'id', None)
if t and v:
result[t.lower()] = v
except Exception:
pass
return result
@staticmethod
def __images_from_group(img_group) -> List[Dict[str, Any]]:
images: List[Dict[str, Any]] = []
try:
for im in getattr(img_group, 'image', []) or []:
fid = getattr(im, 'file_id', None)
if fid:
hex_id = fid.hex()
images.append({
'url': f"https://i.scdn.co/image/{hex_id}",
'width': getattr(im, 'width', 0),
'height': getattr(im, 'height', 0)
})
except Exception:
pass
return images
@staticmethod
def __images_from_repeated(imgs) -> List[Dict[str, Any]]:
images: List[Dict[str, Any]] = []
try:
for im in imgs or []:
fid = getattr(im, 'file_id', None)
if fid:
hex_id = fid.hex()
images.append({
'url': f"https://i.scdn.co/image/{hex_id}",
'width': getattr(im, 'width', 0),
'height': getattr(im, 'height', 0)
})
except Exception:
pass
return images
@classmethod
def __artist_proto_to_dict(cls, a_proto) -> Dict[str, Any]:
gid = getattr(a_proto, 'gid', None)
return {
'id': cls.__base62_from_gid(gid, 'artist'),
'name': getattr(a_proto, 'name', '')
}
@classmethod
def __track_proto_to_web_dict(cls, t_proto, parent_album: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if t_proto is None:
return {}
gid = getattr(t_proto, 'gid', None)
artists = [cls.__artist_proto_to_dict(a) for a in getattr(t_proto, 'artist', [])]
external_ids_map = cls.__external_ids_to_dict(getattr(t_proto, 'external_id', []))
# Album for a track inside Album.disc.track is often simplified in proto
album_dict = parent_album or {}
return {
'id': cls.__base62_from_gid(gid, 'track'),
'name': getattr(t_proto, 'name', ''),
'duration_ms': getattr(t_proto, 'duration', 0),
'explicit': getattr(t_proto, 'explicit', False),
'track_number': getattr(t_proto, 'number', 1),
'disc_number': getattr(t_proto, 'disc_number', 1),
'artists': artists,
'external_ids': external_ids_map,
'available_markets': None, # Not derived; market check code handles None by warning and continuing
'album': album_dict
}
@classmethod
def __album_proto_to_web_dict(cls, a_proto) -> Dict[str, Any]:
if a_proto is None:
return {}
gid = getattr(a_proto, 'gid', None)
# Album basic fields
title = getattr(a_proto, 'name', '')
album_type = None
try:
# Map enum to typical Web API strings when possible
t_val = getattr(a_proto, 'type', None)
if t_val is not None:
# Common mapping heuristic
# 1 = ALBUM, 2 = SINGLE, 3 = COMPILATION (values may differ by proto)
mapping = {1: 'album', 2: 'single', 3: 'compilation'}
album_type = mapping.get(int(t_val), None)
except Exception:
album_type = None
# Date
release_date_str = ''
release_date_precision = 'day'
try:
date = getattr(a_proto, 'date', None)
year = getattr(date, 'year', 0) if date else 0
month = getattr(date, 'month', 0) if date else 0
day = getattr(date, 'day', 0) if date else 0
if year and month and day:
release_date_str = f"{year:04d}-{month:02d}-{day:02d}"
release_date_precision = 'day'
elif year and month:
release_date_str = f"{year:04d}-{month:02d}"
release_date_precision = 'month'
elif year:
release_date_str = f"{year:04d}"
release_date_precision = 'year'
except Exception:
pass
# Artists
artists = [cls.__artist_proto_to_dict(a) for a in getattr(a_proto, 'artist', [])]
# Genres
genres = list(getattr(a_proto, 'genre', []) or [])
# External IDs (e.g., upc)
external_ids_map = cls.__external_ids_to_dict(getattr(a_proto, 'external_id', []))
# Images
images: List[Dict[str, Any]] = []
try:
cg = getattr(a_proto, 'cover_group', None)
if cg:
images = cls.__images_from_group(cg)
if not images:
images = cls.__images_from_repeated(getattr(a_proto, 'cover', []) or [])
except Exception:
images = []
# Tracks from discs
items: List[Dict[str, Any]] = []
total_tracks = 0
try:
for disc in getattr(a_proto, 'disc', []) or []:
disc_number = getattr(disc, 'number', 1)
for t in getattr(disc, 'track', []) or []:
total_tracks += 1
# Album context passed minimally for track mapper
parent_album_min = {
'id': cls.__base62_from_gid(gid, 'album'),
'name': title,
'album_type': album_type,
'release_date': release_date_str,
'release_date_precision': release_date_precision,
'total_tracks': None,
'images': images,
'genres': genres,
'artists': artists,
'external_ids': external_ids_map,
'available_markets': None
}
# Ensure numbering aligns with album context
setattr(t, 'disc_number', disc_number)
item = cls.__track_proto_to_web_dict(t, parent_album=parent_album_min)
# Override with correct numbering if proto uses different fields
item['disc_number'] = disc_number
if 'track_number' not in item or not item['track_number']:
item['track_number'] = getattr(t, 'number', 1)
items.append(item)
except Exception:
items = []
album_dict: Dict[str, Any] = {
'id': cls.__base62_from_gid(gid, 'album'),
'name': title,
'album_type': album_type,
'release_date': release_date_str,
'release_date_precision': release_date_precision,
'total_tracks': total_tracks or getattr(a_proto, 'num_tracks', 0),
'genres': genres,
'images': images, # Web API-like images with i.scdn.co URLs
'copyrights': [],
'available_markets': None,
'external_ids': external_ids_map,
'artists': artists,
'tracks': {
'items': items,
'total': len(items),
'limit': len(items),
'offset': 0,
'next': None,
'previous': None
}
}
return album_dict
def __images_from_album_obj(album_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
imgs = album_obj.get('images')
return imgs if isinstance(imgs, list) else []
# ------------------------- public API -------------------------
@classmethod
def get_track(cls, ids, client_id=None, client_secret=None):
cls.__check_initialized()
try:
t_id = TrackId.from_base62(ids)
t_proto = cls.__session.api().get_metadata_4_track(t_id)
if not t_proto:
raise InvalidLink(ids)
# Build minimal album context from nested album proto if present
album_proto = getattr(t_proto, 'album', None)
album_ctx = None
try:
if album_proto is not None:
agid = getattr(album_proto, 'gid', None)
# Images for embedded album
images: List[Dict[str, Any]] = []
try:
cg = getattr(album_proto, 'cover_group', None)
if cg:
images = cls.__images_from_group(cg)
if not images:
images = cls.__images_from_repeated(getattr(album_proto, 'cover', []) or [])
except Exception:
images = []
album_ctx = {
'id': cls.__base62_from_gid(agid, 'album'),
'name': getattr(album_proto, 'name', ''),
'images': images,
'genres': [],
'available_markets': None
}
except Exception:
if cls.__client is None:
# Fallback to previous proto logic if client is unavailable
t_id = TrackId.from_base62(ids)
t_proto = cls.__session.api().get_metadata_4_track(t_id) # type: ignore[union-attr]
if not t_proto:
raise InvalidLink(ids)
# Minimal album context from nested album proto if present
album_proto = getattr(t_proto, 'album', None)
album_ctx = None
return cls.__track_proto_to_web_dict(t_proto, parent_album=album_ctx)
try:
if album_proto is not None:
agid = getattr(album_proto, 'gid', None)
images: List[Dict[str, Any]] = []
try:
cg = getattr(album_proto, 'cover_group', None)
if cg:
# Map image group
for im in getattr(cg, 'image', []) or []:
fid = getattr(im, 'file_id', None)
if fid:
images.append({
'url': f"https://i.scdn.co/image/{fid.hex()}",
'width': getattr(im, 'width', 0),
'height': getattr(im, 'height', 0)
})
if not images:
for im in getattr(album_proto, 'cover', []) or []:
fid = getattr(im, 'file_id', None)
if fid:
images.append({
'url': f"https://i.scdn.co/image/{fid.hex()}",
'width': getattr(im, 'width', 0),
'height': getattr(im, 'height', 0)
})
except Exception:
images = []
album_ctx = {
'id': cls.__base62_from_gid(agid, 'album'),
'name': getattr(album_proto, 'name', ''),
'images': images,
'genres': [],
'available_markets': None
}
except Exception:
album_ctx = None
# Build track dict
artists = []
try:
for a in getattr(t_proto, 'artist', []) or []:
artists.append({'id': cls.__base62_from_gid(getattr(a, 'gid', None), 'artist'), 'name': getattr(a, 'name', '')})
except Exception:
pass
external_ids_map: Dict[str, str] = {}
try:
for ext in getattr(t_proto, 'external_id', []) or []:
t = getattr(ext, 'type', None)
v = getattr(ext, 'id', None)
if t and v:
external_ids_map[str(t).lower()] = v
except Exception:
pass
return {
'id': cls.__base62_from_gid(getattr(t_proto, 'gid', None), 'track'),
'name': getattr(t_proto, 'name', ''),
'duration_ms': getattr(t_proto, 'duration', 0),
'explicit': getattr(t_proto, 'explicit', False),
'track_number': getattr(t_proto, 'number', 1),
'disc_number': getattr(t_proto, 'disc_number', 1),
'artists': artists,
'external_ids': external_ids_map,
'available_markets': None,
'album': album_ctx
}
# Preferred: LibrespotClient
obj = cls.__client.get_track(ids)
return obj
except InvalidLink:
raise
except Exception:
@@ -303,7 +169,6 @@ class Spo:
try:
tracks.append(cls.get_track(tid))
except Exception:
# Preserve order with None entries similar to Web API behavior on bad IDs
tracks.append(None)
return {'tracks': tracks}
@@ -311,11 +176,92 @@ class Spo:
def get_album(cls, ids, client_id=None, client_secret=None):
cls.__check_initialized()
try:
a_id = AlbumId.from_base62(ids)
a_proto = cls.__session.api().get_metadata_4_album(a_id)
if not a_proto:
raise InvalidLink(ids)
return cls.__album_proto_to_web_dict(a_proto)
if cls.__client is None:
# Fallback to previous behavior using proto mapping
a_id = AlbumId.from_base62(ids)
a_proto = cls.__session.api().get_metadata_4_album(a_id) # type: ignore[union-attr]
if not a_proto:
raise InvalidLink(ids)
# Reuse existing private mapper for proto shape
# NOTE: import annotations above provided earlier methods; to avoid duplication, call through get_track for items
# Basic fields
title = getattr(a_proto, 'name', '')
# Images
images: List[Dict[str, Any]] = []
try:
cg = getattr(a_proto, 'cover_group', None)
if cg:
for im in getattr(cg, 'image', []) or []:
fid = getattr(im, 'file_id', None)
if fid:
images.append({'url': f"https://i.scdn.co/image/{fid.hex()}", 'width': getattr(im, 'width', 0), 'height': getattr(im, 'height', 0)})
if not images:
for im in getattr(a_proto, 'cover', []) or []:
fid = getattr(im, 'file_id', None)
if fid:
images.append({'url': f"https://i.scdn.co/image/{fid.hex()}", 'width': getattr(im, 'width', 0), 'height': getattr(im, 'height', 0)})
except Exception:
images = []
# Build simplified tracks list by disc order
items: List[Dict[str, Any]] = []
total_tracks = 0
try:
for disc in getattr(a_proto, 'disc', []) or []:
disc_number = getattr(disc, 'number', 1)
for t in getattr(disc, 'track', []) or []:
total_tracks += 1
setattr(t, 'disc_number', disc_number)
item = cls.get_track(cls.__base62_from_gid(getattr(t, 'gid', None), 'track') or "")
if isinstance(item, dict):
# Ensure numbering
item['disc_number'] = disc_number
if not item.get('track_number'):
item['track_number'] = getattr(t, 'number', 1)
items.append(item)
except Exception:
items = []
return {
'id': cls.__base62_from_gid(getattr(a_proto, 'gid', None), 'album'),
'name': title,
'images': images,
'tracks': {
'items': items,
'total': len(items),
'limit': len(items),
'offset': 0,
'next': None,
'previous': None
}
}
# Preferred: LibrespotClient, then reshape to Spo-compatible album dict
album_obj = cls.__client.get_album(ids, include_tracks=True)
# album_obj['tracks'] is a list of full track objects; convert to Spo shape
items = []
for tr in album_obj.get('tracks', []) or []:
if isinstance(tr, dict):
items.append(tr)
result = {
'id': album_obj.get('id'),
'name': album_obj.get('name'),
'album_type': album_obj.get('album_type'),
'release_date': album_obj.get('release_date'),
'release_date_precision': album_obj.get('release_date_precision'),
'total_tracks': album_obj.get('total_tracks') or len(items),
'genres': album_obj.get('genres') or [],
'images': cls.__images_from_album_obj(album_obj),
'available_markets': album_obj.get('available_markets'),
'external_ids': album_obj.get('external_ids') or {},
'artists': album_obj.get('artists') or [],
'tracks': {
'items': items,
'total': len(items),
'limit': len(items),
'offset': 0,
'next': None,
'previous': None
}
}
return result
except InvalidLink:
raise
except Exception:
@@ -325,91 +271,54 @@ class Spo:
def get_playlist(cls, ids, client_id=None, client_secret=None):
cls.__check_initialized()
try:
# PlaylistId accepts base62-ish/id string directly
p_id = PlaylistId(ids)
p_proto = cls.__session.api().get_playlist(p_id)
if not p_proto:
raise InvalidLink(ids)
# Minimal mapping sufficient for current playlist flow
name = None
try:
attrs = getattr(p_proto, 'attributes', None)
name = getattr(attrs, 'name', None) if attrs else None
except Exception:
if cls.__client is None:
# Fallback to previous behavior (proto mapping)
p_id = PlaylistId(ids)
p_proto = cls.__session.api().get_playlist(p_id) # type: ignore[union-attr]
if not p_proto:
raise InvalidLink(ids)
name = None
owner_name = getattr(p_proto, 'owner_username', None) or 'Unknown Owner'
try:
attrs = getattr(p_proto, 'attributes', None)
name = getattr(attrs, 'name', None) if attrs else None
except Exception:
name = None
owner_name = getattr(p_proto, 'owner_username', None) or 'Unknown Owner'
items = []
try:
contents = getattr(p_proto, 'contents', None)
for it in getattr(contents, 'items', []) or []:
tref = getattr(it, 'track', None)
gid = getattr(tref, 'gid', None) if tref else None
base62 = cls.__base62_from_gid(gid, 'track') if gid else None
if base62:
items.append({'track': {'id': base62}})
except Exception:
items = []
return {
'name': name or 'Unknown Playlist',
'owner': {'display_name': owner_name},
'images': [],
'tracks': {'items': items, 'total': len(items)}
}
# Preferred: LibrespotClient, reshape minimally to prior output
pl_obj = cls.__client.get_playlist(ids, expand_items=False)
items = []
try:
contents = getattr(p_proto, 'contents', None)
for it in getattr(contents, 'items', []) or []:
# Attempt to obtain a track gid/id from multiple potential locations
tref = getattr(it, 'track', None)
gid = getattr(tref, 'gid', None) if tref else None
base62 = cls.__base62_from_gid(gid, 'track') if gid else None
# Some playlists can reference an "original_track" field
if not base62:
orig = getattr(it, 'original_track', None)
ogid = getattr(orig, 'gid', None) if orig else None
base62 = cls.__base62_from_gid(ogid, 'track') if ogid else None
# As an additional fallback, try to parse a spotify:track: URI if exposed on the nested track
if not base62:
uri = getattr(tref, 'uri', None) if tref else None
if isinstance(uri, str) and uri.startswith("spotify:track:"):
try:
parts = uri.split(":")
maybe_id = parts[-1] if parts else None
if maybe_id and len(maybe_id) == 22:
base62 = maybe_id
elif maybe_id and len(maybe_id) in (32, 40):
from librespot.metadata import TrackId
tid = TrackId.from_hex(maybe_id)
base62 = tid.to_spotify_uri().split(":")[-1]
except Exception:
base62 = None
# Fallback: some implementations expose the URI at the item level
if not base62:
item_uri = getattr(it, 'uri', None)
if isinstance(item_uri, str) and item_uri.startswith("spotify:track:"):
try:
parts = item_uri.split(":")
maybe_id = parts[-1] if parts else None
if maybe_id and len(maybe_id) == 22:
base62 = maybe_id
elif maybe_id and len(maybe_id) in (32, 40):
from librespot.metadata import TrackId
tid = TrackId.from_hex(maybe_id)
base62 = tid.to_spotify_uri().split(":")[-1]
except Exception:
base62 = None
# Fallback: check original_track uri if provided
if not base62:
orig = getattr(it, 'original_track', None)
o_uri = getattr(orig, 'uri', None) if orig else None
if isinstance(o_uri, str) and o_uri.startswith("spotify:track:"):
try:
parts = o_uri.split(":")
maybe_id = parts[-1] if parts else None
if maybe_id and len(maybe_id) == 22:
base62 = maybe_id
elif maybe_id and len(maybe_id) in (32, 40):
from librespot.metadata import TrackId
tid = TrackId.from_hex(maybe_id)
base62 = tid.to_spotify_uri().split(":")[-1]
except Exception:
base62 = None
if base62:
items.append({'track': {'id': base62}})
# pl_obj['tracks']['items'] have 'track' possibly as stub dict already
trks = pl_obj.get('tracks', {}).get('items', [])
for it in trks:
tr = it.get('track') if isinstance(it, dict) else None
if isinstance(tr, dict):
tid = tr.get('id')
if tid:
items.append({'track': {'id': tid}})
except Exception:
items = []
return {
'name': name or 'Unknown Playlist',
'owner': {'display_name': owner_name},
'images': [],
'name': pl_obj.get('name') or 'Unknown Playlist',
'owner': {'display_name': pl_obj.get('owner', {}).get('display_name') or 'Unknown Owner'},
'images': pl_obj.get('images') or [],
'tracks': {'items': items, 'total': len(items)}
}
except InvalidLink:
@@ -421,11 +330,11 @@ class Spo:
def get_episode(cls, ids, client_id=None, client_secret=None):
cls.__check_initialized()
try:
# Episodes not supported by LibrespotClient wrapper yet; use raw session
e_id = EpisodeId.from_base62(ids)
e_proto = cls.__session.api().get_metadata_4_episode(e_id)
e_proto = cls.__session.api().get_metadata_4_episode(e_id) # type: ignore[union-attr]
if not e_proto:
raise InvalidLink(ids)
# Map show info
show_proto = getattr(e_proto, 'show', None)
show_id = None
show_name = ''
@@ -437,10 +346,18 @@ class Spo:
publisher = getattr(show_proto, 'publisher', '') if show_proto else ''
except Exception:
pass
# Images for episode (cover_image ImageGroup)
images: List[Dict[str, Any]] = []
try:
images = cls.__images_from_group(getattr(e_proto, 'cover_image', None))
# cover_image is an ImageGroup
cg = getattr(e_proto, 'cover_image', None)
for im in getattr(cg, 'image', []) or []:
fid = getattr(im, 'file_id', None)
if fid:
images.append({
'url': f"https://i.scdn.co/image/{fid.hex()}",
'width': getattr(im, 'width', 0),
'height': getattr(im, 'height', 0)
})
except Exception:
images = []
return {
@@ -467,36 +384,68 @@ class Spo:
Each item contains an external_urls.spotify link, minimally enough for download_artist."""
cls.__check_initialized()
try:
ar_id = ArtistId.from_base62(ids)
ar_proto = cls.__session.api().get_metadata_4_artist(ar_id)
if not ar_proto:
raise InvalidLink(ids)
# Parse requested groups
if cls.__client is None:
ar_id = ArtistId.from_base62(ids)
ar_proto = cls.__session.api().get_metadata_4_artist(ar_id) # type: ignore[union-attr]
if not ar_proto:
raise InvalidLink(ids)
requested = [s.strip().lower() for s in str(album_type).split(',') if s.strip()]
order = ['album', 'single', 'compilation', 'appears_on']
items: List[Dict[str, Any]] = []
for group_name in order:
if requested and group_name not in requested:
continue
attr = f"{group_name}_group"
grp = getattr(ar_proto, attr, None)
if not grp:
continue
try:
for ag in grp:
albums = getattr(ag, 'album', []) or []
for a in albums:
gid = getattr(a, 'gid', None)
base62 = cls.__base62_from_gid(gid, 'album') if gid else None
name = getattr(a, 'name', '')
if base62:
items.append({
'name': name,
'external_urls': {'spotify': f"https://open.spotify.com/album/{base62}"}
})
if limit and len(items) >= int(limit):
break
if limit and len(items) >= int(limit):
break
except Exception:
continue
if limit and len(items) >= int(limit):
break
return {
'id': cls.__base62_from_gid(getattr(ar_proto, 'gid', None), 'artist'),
'name': getattr(ar_proto, 'name', ''),
'items': items
}
# Preferred: LibrespotClient then map to legacy items shape
artist_obj = cls.__client.get_artist(ids)
requested = [s.strip().lower() for s in str(album_type).split(',') if s.strip()]
order = ['album', 'single', 'compilation', 'appears_on']
items: List[Dict[str, Any]] = []
for group_name in order:
if requested and group_name not in requested:
continue
attr = f"{group_name}_group"
grp = getattr(ar_proto, attr, None)
key = f"{group_name}_group"
grp = artist_obj.get(key) if isinstance(artist_obj, dict) else None
if not grp:
continue
# grp is repeated AlbumGroup; each has 'album' repeated Album
try:
for ag in grp:
albums = getattr(ag, 'album', []) or []
for a in albums:
gid = getattr(a, 'gid', None)
base62 = cls.__base62_from_gid(gid, 'album') if gid else None
name = getattr(a, 'name', '')
if base62:
items.append({
'name': name,
'external_urls': {'spotify': f"https://open.spotify.com/album/{base62}"}
})
if limit and len(items) >= int(limit):
break
# LibrespotClient flattens to arrays of album ids for groups
# We only need external_urls.spotify links
for album_id in grp:
if not album_id:
continue
items.append({
'name': None,
'external_urls': {'spotify': f"https://open.spotify.com/album/{album_id}"}
})
if limit and len(items) >= int(limit):
break
except Exception:
@@ -504,8 +453,8 @@ class Spo:
if limit and len(items) >= int(limit):
break
return {
'id': cls.__base62_from_gid(getattr(ar_proto, 'gid', None), 'artist'),
'name': getattr(ar_proto, 'name', ''),
'id': artist_obj.get('id') if isinstance(artist_obj, dict) else None,
'name': artist_obj.get('name') if isinstance(artist_obj, dict) else '',
'items': items
}
except InvalidLink:
@@ -531,21 +480,22 @@ class Spo:
@classmethod
def search(cls, query, search_type='track', limit=10, country: Optional[str] = None, locale: Optional[str] = None, catalogue: Optional[str] = None, image_size: Optional[str] = None, client_id=None, client_secret=None):
cls.__check_initialized()
# Map simple type value; librespot returns a combined JSON-like response
req = SearchManager.SearchRequest(query).set_limit(limit)
# Country precedence: explicit country > session country
if country:
req.set_country(country)
else:
cc = cls.__get_session_country_code()
if cc:
req.set_country(cc)
if locale:
req.set_locale(locale)
if catalogue:
req.set_catalogue(catalogue)
if image_size:
req.set_image_size(image_size)
res = cls.__session.search().request(req)
return res
# Reverted: use spotipy Web API search; librespot search is not supported here.
try:
import spotipy # type: ignore
from spotipy.oauth2 import SpotifyClientCredentials # type: ignore
except Exception as e:
raise RuntimeError("spotipy is required for search; please install spotipy") from e
try:
if client_id or client_secret:
auth_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
else:
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
type_param = ','.join([t.strip() for t in str(search_type or 'track').split(',') if t.strip()]) or 'track'
market = country or None
res = sp.search(q=query, type=type_param, market=market, limit=int(limit) if limit is not None else 10)
return res
except Exception as e:
# Surface a concise error to callers
raise RuntimeError(f"Spotify search failed: {e}")

View File

@@ -0,0 +1 @@
from .librespot import LibrespotClient

View File

@@ -0,0 +1,698 @@
from __future__ import annotations
import base64
import datetime
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Union
from google.protobuf.descriptor import FieldDescriptor
from google.protobuf.message import Message
from librespot.core import Session, SearchManager
from librespot.metadata import AlbumId, ArtistId, PlaylistId, TrackId
from librespot import util
from librespot.proto import Metadata_pb2 as Metadata
from librespot.proto import Playlist4External_pb2 as P4
class LibrespotClient:
"""
Thin wrapper around the internal librespot API, exposing convenient helpers that
return Web API-like dictionaries for albums, tracks, artists, and playlists.
Typical usage:
client = LibrespotClient(stored_credentials_path="/path/to/credentials.json")
album = client.get_album("spotify:album:...", include_tracks=True)
track = client.get_track("...base62...")
playlist = client.get_playlist("spotify:playlist:...", expand_items=True)
client.close()
"""
def __init__(
self,
stored_credentials_path: Optional[str] = None,
session: Optional[Session] = None,
max_workers: int = 16,
) -> None:
self._session: Session = session if session is not None else self._create_session(stored_credentials_path)
self._max_workers: int = max(1, min(32, max_workers))
self._track_object_cache: Dict[str, Optional[Dict[str, Any]]] = {}
# ---------- Public API ----------
def close(self) -> None:
if hasattr(self, "_session") and self._session is not None:
try:
self._session.close()
except Exception:
pass
def get_album(self, album: Union[str, AlbumId], include_tracks: bool = False) -> Dict[str, Any]:
album_id = self._ensure_album_id(album)
album_proto = self._session.api().get_metadata_4_album(album_id)
return self._album_proto_to_object(album_proto, include_tracks=include_tracks, for_embed=False)
def get_track(self, track: Union[str, TrackId]) -> Dict[str, Any]:
track_id = self._ensure_track_id(track)
track_proto = self._session.api().get_metadata_4_track(track_id)
return self._track_proto_to_object(track_proto)
def get_artist(self, artist: Union[str, ArtistId]) -> Dict[str, Any]:
artist_id = self._ensure_artist_id(artist)
artist_proto = self._session.api().get_metadata_4_artist(artist_id)
return self._proto_to_full_json(artist_proto)
def get_playlist(self, playlist: Union[str, PlaylistId], expand_items: bool = False) -> Dict[str, Any]:
playlist_id = self._ensure_playlist_id(playlist)
playlist_proto = self._session.api().get_playlist(playlist_id)
return self._playlist_proto_to_object(playlist_proto, include_track_objects=expand_items)
def search(
self,
query: str,
limit: int = 10,
country: Optional[str] = None,
locale: Optional[str] = None,
catalogue: Optional[str] = None,
image_size: Optional[str] = None,
) -> Dict[str, Any]:
"""Perform a full-featured search using librespot's SearchManager.
- country precedence: explicit country > session country code > unset
- returns the raw JSON-like mapping response provided by librespot
"""
req = SearchManager.SearchRequest(query).set_limit(limit)
# Country precedence
cc = country or self._get_session_country_code()
if cc:
req.set_country(cc)
if locale:
req.set_locale(locale)
if catalogue:
req.set_catalogue(catalogue)
if image_size:
req.set_image_size(image_size)
res = self._session.search().request(req)
return res
# ---------- ID parsing helpers ----------
@staticmethod
def parse_input_id(kind: str, value: str) -> Union[TrackId, AlbumId, ArtistId, PlaylistId]:
s = value.strip()
if kind == "track":
if s.startswith("spotify:track:"):
return TrackId.from_uri(s)
if "open.spotify.com/track/" in s:
base = s.split("open.spotify.com/track/")[-1].split("?")[0].split("#")[0].strip("/")
return TrackId.from_base62(base)
return TrackId.from_base62(s)
if kind == "album":
if s.startswith("spotify:album:"):
return AlbumId.from_uri(s)
if "open.spotify.com/album/" in s:
base = s.split("open.spotify.com/album/")[-1].split("?")[0].split("#")[0].strip("/")
return AlbumId.from_base62(base)
return AlbumId.from_base62(s)
if kind == "artist":
if s.startswith("spotify:artist:"):
return ArtistId.from_uri(s)
if "open.spotify.com/artist/" in s:
base = s.split("open.spotify.com/artist/")[-1].split("?")[0].split("#")[0].strip("/")
return ArtistId.from_base62(base)
return ArtistId.from_base62(s)
if kind == "playlist":
if s.startswith("spotify:playlist:"):
return PlaylistId.from_uri(s)
if "open.spotify.com/playlist/" in s:
base = s.split("open.spotify.com/playlist/")[-1].split("?")[0].split("#")[0].strip("/")
return PlaylistId(base)
return PlaylistId(s)
raise RuntimeError(f"Unknown kind: {kind}")
# ---------- Private: session ----------
@staticmethod
def _create_session(stored_credentials_path: Optional[str]) -> Session:
if not stored_credentials_path:
raise ValueError("stored_credentials_path is required when no Session is provided")
conf = (
Session.Configuration.Builder()
.set_stored_credential_file(stored_credentials_path)
.build()
)
builder = Session.Builder(conf)
builder.stored_file(stored_credentials_path)
last_exc: Optional[Exception] = None
for attempt in range(1, 4):
try:
return builder.create()
except Exception as exc:
last_exc = exc
if attempt < 3:
time.sleep(3)
else:
raise last_exc
def _get_session_country_code(self) -> str:
try:
cc = getattr(self._session, "_Session__country_code", None)
if isinstance(cc, str) and len(cc) == 2:
return cc
cc2 = getattr(self._session, "country_code", None)
if isinstance(cc2, str) and len(cc2) == 2:
return cc2
except Exception:
pass
return ""
# ---------- Private: ID coercion ----------
def _ensure_track_id(self, v: Union[str, TrackId]) -> TrackId:
if isinstance(v, TrackId):
return v
return self.parse_input_id("track", v) # type: ignore[return-value]
def _ensure_album_id(self, v: Union[str, AlbumId]) -> AlbumId:
if isinstance(v, AlbumId):
return v
return self.parse_input_id("album", v) # type: ignore[return-value]
def _ensure_artist_id(self, v: Union[str, ArtistId]) -> ArtistId:
if isinstance(v, ArtistId):
return v
return self.parse_input_id("artist", v) # type: ignore[return-value]
def _ensure_playlist_id(self, v: Union[str, PlaylistId]) -> PlaylistId:
if isinstance(v, PlaylistId):
return v
return self.parse_input_id("playlist", v) # type: ignore[return-value]
# ---------- Private: conversions ----------
@staticmethod
def _bytes_to_base62(b: bytes) -> str:
try:
return TrackId.base62.encode(b).decode("ascii")
except Exception:
return ""
def _proto_to_full_json(self, msg: Any) -> Any:
if isinstance(msg, Message):
msg_name = msg.DESCRIPTOR.name if hasattr(msg, "DESCRIPTOR") else ""
if msg_name == "Image":
url = self._image_url_from_file_id(getattr(msg, "file_id", b""))
width = getattr(msg, "width", 0) or 0
height = getattr(msg, "height", 0) or 0
return self._prune_empty({"url": url, "width": width, "height": height})
if msg_name == "TopTracks":
country = getattr(msg, "country", "") or ""
ids: List[str] = []
try:
for t in getattr(msg, "track", []):
ids.append(self._bytes_to_base62(getattr(t, "gid", b"")))
except Exception:
pass
return self._prune_empty({"country": country or None, "track": ids})
if hasattr(msg, "album"):
try:
albums = getattr(msg, "album", [])
ids: List[str] = []
for a in albums:
ids.append(self._bytes_to_base62(getattr(a, "gid", b"")))
return {"album": ids}
except Exception:
pass
out: Dict[str, Any] = {}
for field, value in msg.ListFields():
name = field.name
if name in ("album_group", "single_group", "compilation_group", "appears_on_group"):
try:
ids = []
for ag in value: # repeated AlbumGroup
for a in getattr(ag, "album", []):
ids.append(self._bytes_to_base62(getattr(a, "gid", b"")))
out[name] = ids
continue
except Exception:
pass
name_out = "id" if name == "gid" else name
if field.type == FieldDescriptor.TYPE_BYTES:
if field.label == FieldDescriptor.LABEL_REPEATED:
out[name_out] = [self._bytes_to_base62(v) for v in value]
else:
out[name_out] = self._bytes_to_base62(value)
elif field.type == FieldDescriptor.TYPE_MESSAGE:
if field.label == FieldDescriptor.LABEL_REPEATED:
out[name_out] = [self._proto_to_full_json(v) for v in value]
else:
out[name_out] = self._proto_to_full_json(value)
else:
if field.label == FieldDescriptor.LABEL_REPEATED:
out[name_out] = list(value)
else:
out[name_out] = value
return out
if isinstance(msg, (bytes, bytearray)):
return self._bytes_to_base62(bytes(msg))
if isinstance(msg, (list, tuple)):
return [self._proto_to_full_json(v) for v in msg]
return msg
@staticmethod
def _prune_empty(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: LibrespotClient._prune_empty(v) for k, v in obj.items() if v not in (None, "", [], {})}
if isinstance(obj, list):
return [LibrespotClient._prune_empty(v) for v in obj if v not in (None, "", [], {})]
return obj
@staticmethod
def _image_url_from_file_id(file_id: bytes) -> Optional[str]:
if not file_id:
return None
return f"https://i.scdn.co/image/{util.bytes_to_hex(file_id)}"
def _get_playlist_picture_url(self, attrs: Any) -> Optional[str]:
pic = getattr(attrs, "picture", b"") if attrs else b""
try:
if not pic:
return None
data = bytes(pic)
image_id: Optional[bytes] = None
if len(data) >= 26 and data[0] == 0xAB and data[1:4] == b"gpl" and data[4:6] == b"\x00\x00":
image_id = data[6:26]
elif len(data) >= 20:
image_id = data[:20]
if image_id:
return f"https://i.scdn.co/image/{util.bytes_to_hex(image_id)}"
except Exception:
pass
return None
@staticmethod
def _split_countries(countries: str) -> List[str]:
if not countries:
return []
s = countries.strip()
if " " in s:
return [c for c in s.split(" ") if c]
return [s[i : i + 2] for i in range(0, len(s), 2) if len(s[i : i + 2]) == 2]
def _restrictions_to_available_markets(self, restrictions: List[Metadata.Restriction]) -> List[str]:
for r in restrictions:
allowed = getattr(r, "countries_allowed", "")
if isinstance(allowed, str) and allowed:
return self._split_countries(allowed)
return []
@staticmethod
def _external_ids_to_dict(ext_ids: List[Metadata.ExternalId]) -> Dict[str, str]:
out: Dict[str, str] = {}
for e in ext_ids:
t = getattr(e, "type", "").lower()
v = getattr(e, "id", "")
if t and v and t not in out:
out[t] = v
return out
@staticmethod
def _album_type_to_str(a: Metadata.Album) -> str:
type_str = getattr(a, "type_str", "")
if isinstance(type_str, str) and type_str:
return type_str.lower()
t = getattr(a, "type", None)
if t is None:
return "album"
try:
mapping = {
Metadata.Album.ALBUM: "album",
Metadata.Album.SINGLE: "single",
Metadata.Album.COMPILATION: "compilation",
Metadata.Album.EP: "ep",
}
return mapping.get(t, "album")
except Exception:
return "album"
@staticmethod
def _date_to_release_fields(d: Optional[Metadata.Date]) -> (str, str):
if d is None:
return "", "day"
y = getattr(d, "year", 0)
m = getattr(d, "month", 0)
day = getattr(d, "day", 0)
if y and m and day:
return f"{y:04d}-{m:02d}-{day:02d}", "day"
if y and m:
return f"{y:04d}-{m:02d}", "month"
if y:
return f"{y:04d}", "year"
return "", "day"
def _images_from_group(
self,
group: Optional[Metadata.ImageGroup],
fallback_images: Optional[List[Metadata.Image]] = None,
) -> List[Dict[str, Any]]:
images: List[Dict[str, Any]] = []
seq = []
if group is not None:
seq = getattr(group, "image", [])
elif fallback_images is not None:
seq = fallback_images
for im in seq:
url = self._image_url_from_file_id(getattr(im, "file_id", b""))
if not url:
continue
width = getattr(im, "width", 0) or 0
height = getattr(im, "height", 0) or 0
images.append({"url": url, "width": width, "height": height})
seen = set()
uniq: List[Dict[str, Any]] = []
for it in images:
u = it.get("url")
if u in seen:
continue
seen.add(u)
uniq.append(it)
return uniq
def _artist_ref_to_object(self, a: Metadata.Artist) -> Dict[str, Any]:
gid = getattr(a, "gid", b"")
hex_id = util.bytes_to_hex(gid) if gid else ""
uri = ""
base62 = ""
if hex_id:
try:
aid = ArtistId.from_hex(hex_id)
uri = aid.to_spotify_uri()
base62 = uri.split(":")[-1]
except Exception:
pass
return {
"external_urls": {"spotify": f"https://open.spotify.com/artist/{base62}" if base62 else ""},
"id": base62,
"name": getattr(a, "name", "") or "",
"type": "artist",
"uri": uri or "",
}
def _album_proto_to_object(
self,
a: Metadata.Album,
include_tracks: bool = False,
for_embed: bool = False,
) -> Dict[str, Any]:
gid = getattr(a, "gid", b"")
hex_id = util.bytes_to_hex(gid) if gid else ""
uri = ""
base62 = ""
if hex_id:
try:
aid = AlbumId.from_hex(hex_id)
uri = aid.to_spotify_uri()
base62 = uri.split(":")[-1]
except Exception:
pass
available = self._restrictions_to_available_markets(getattr(a, "restriction", []))
release_date, release_precision = self._date_to_release_fields(getattr(a, "date", None))
artists = [self._artist_ref_to_object(ar) for ar in getattr(a, "artist", [])]
track_ids: List[str] = []
if not for_embed:
for d in getattr(a, "disc", []):
for t in getattr(d, "track", []):
tid_hex = util.bytes_to_hex(getattr(t, "gid", b"")) if getattr(t, "gid", b"") else ""
if not tid_hex:
continue
try:
tid = TrackId.from_hex(tid_hex)
t_uri = tid.to_spotify_uri()
t_base62 = t_uri.split(":")[-1]
if t_base62:
track_ids.append(t_base62)
except Exception:
pass
track_list_value: Optional[List[Any]] = None
if not for_embed:
if include_tracks and self._session is not None and track_ids:
fetched = self._fetch_track_objects(track_ids)
expanded: List[Dict[str, Any]] = []
for b62 in track_ids:
obj = fetched.get(b62)
if obj is not None:
expanded.append(obj)
else:
expanded.append({
"id": b62,
"uri": f"spotify:track:{b62}",
"type": "track",
"external_urls": {"spotify": f"https://open.spotify.com/track/{b62}"},
})
track_list_value = expanded
else:
track_list_value = track_ids
images = self._images_from_group(getattr(a, "cover_group", None), getattr(a, "cover", []))
result: Dict[str, Any] = {
"album_type": self._album_type_to_str(a) or None,
**({"total_tracks": sum(len(getattr(d, "track", [])) for d in getattr(a, "disc", []))} if not for_embed else {}),
"available_markets": available,
"external_urls": {"spotify": f"https://open.spotify.com/album/{base62}"} if base62 else {},
"id": base62 or None,
"images": images or None,
"name": getattr(a, "name", "") or None,
"release_date": release_date or None,
"release_date_precision": release_precision or None,
"type": "album",
"uri": uri or None,
"artists": artists or None,
**({"tracks": track_list_value} if (not for_embed and track_list_value is not None) else {}),
"copyrights": [{
"text": getattr(c, "text", ""),
"type": str(getattr(c, "type", "")),
} for c in getattr(a, "copyright", [])],
"external_ids": self._external_ids_to_dict(getattr(a, "external_id", [])) or None,
"label": getattr(a, "label", "") or None,
"popularity": getattr(a, "popularity", 0) or 0,
}
return self._prune_empty(result)
def _track_proto_to_object(self, t: Metadata.Track) -> Dict[str, Any]:
tid_hex = util.bytes_to_hex(getattr(t, "gid", b"")) if getattr(t, "gid", b"") else ""
uri = ""
base62 = ""
if tid_hex:
try:
tid = TrackId.from_hex(tid_hex)
uri = tid.to_spotify_uri()
base62 = uri.split(":")[-1]
except Exception:
pass
album_obj = self._album_proto_to_object(getattr(t, "album", None), include_tracks=False, for_embed=True) if getattr(t, "album", None) else None
preview_url = None
previews = getattr(t, "preview", [])
if previews:
pf = previews[0]
pf_id = getattr(pf, "file_id", b"")
if pf_id:
try:
preview_url = f"https://p.scdn.co/mp3-preview/{util.bytes_to_hex(pf_id)}"
except Exception:
preview_url = None
licensor_uuid = None
licensor = getattr(t, "licensor", None)
if licensor is not None:
licensor_uuid = util.bytes_to_hex(getattr(licensor, "uuid", b"")) if getattr(licensor, "uuid", b"") else None
result = {
"album": album_obj,
"artists": [self._artist_ref_to_object(a) for a in getattr(t, "artist", [])],
"available_markets": self._restrictions_to_available_markets(getattr(t, "restriction", [])),
"disc_number": getattr(t, "disc_number", 0) or None,
"duration_ms": getattr(t, "duration", 0) or None,
"explicit": bool(getattr(t, "explicit", False)) or None,
"external_ids": self._external_ids_to_dict(getattr(t, "external_id", [])) or None,
"external_urls": {"spotify": f"https://open.spotify.com/track/{base62}"} if base62 else {},
"id": base62 or None,
"name": getattr(t, "name", "") or None,
"popularity": getattr(t, "popularity", 0) or None,
"track_number": getattr(t, "number", 0) or None,
"type": "track",
"uri": uri or None,
"preview_url": preview_url,
"earliest_live_timestamp": getattr(t, "earliest_live_timestamp", 0) or None,
"has_lyrics": bool(getattr(t, "has_lyrics", False)) or None,
"licensor_uuid": licensor_uuid,
}
return self._prune_empty(result)
def _playlist_proto_to_object(self, p: P4.SelectedListContent, include_track_objects: bool) -> Dict[str, Any]:
attrs = getattr(p, "attributes", None)
name = getattr(attrs, "name", "") if attrs else ""
description = getattr(attrs, "description", "") if attrs else ""
collaborative = bool(getattr(attrs, "collaborative", False)) if attrs else False
picture_bytes = getattr(attrs, "picture", b"") if attrs else b""
images: List[Dict[str, Any]] = []
picture_url: Optional[str] = None
# Derive picture URL from attributes.picture with header-aware parsing
pic_url = self._get_playlist_picture_url(attrs)
if pic_url:
picture_url = pic_url
images.append({"url": pic_url, "width": 0, "height": 0})
owner_username = getattr(p, "owner_username", "") or ""
items: List[Dict[str, Any]] = []
contents = getattr(p, "contents", None)
fetched_tracks: Dict[str, Optional[Dict[str, Any]]] = {}
# Collect all track ids to fetch durations for length computation and, if requested, for expansion
to_fetch: List[str] = []
if contents is not None:
for it in getattr(contents, "items", []):
uri = getattr(it, "uri", "") or ""
if uri.startswith("spotify:track:"):
b62 = uri.split(":")[-1]
to_fetch.append(b62)
if to_fetch and self._session is not None:
fetched_tracks = self._fetch_track_objects(to_fetch)
if contents is not None:
for it in getattr(contents, "items", []):
uri = getattr(it, "uri", "") or ""
attrs_it = getattr(it, "attributes", None)
added_by = getattr(attrs_it, "added_by", "") if attrs_it else ""
ts_ms = getattr(attrs_it, "timestamp", 0) if attrs_it else 0
item_id_bytes = getattr(attrs_it, "item_id", b"") if attrs_it else b""
added_at_iso = None
if isinstance(ts_ms, int) and ts_ms > 0:
try:
added_at_iso = datetime.datetime.utcfromtimestamp(ts_ms / 1000.0).isoformat() + "Z"
except Exception:
added_at_iso = None
track_obj: Optional[Dict[str, Any]] = None
if include_track_objects and uri.startswith("spotify:track:"):
b62 = uri.split(":")[-1]
obj = fetched_tracks.get(b62)
if obj is not None:
track_obj = obj
else:
track_obj = {
"id": b62,
"uri": uri,
"type": "track",
"external_urls": {"spotify": f"https://open.spotify.com/track/{b62}"},
}
else:
if uri.startswith("spotify:track:"):
b62 = uri.split(":")[-1]
track_obj = {
"id": b62,
"uri": uri,
"type": "track",
"external_urls": {"spotify": f"https://open.spotify.com/track/{b62}"},
}
item_obj: Dict[str, Any] = {
"added_at": added_at_iso,
"added_by": {
"id": added_by,
"type": "user",
"uri": f"spotify:user:{added_by}" if added_by else "",
"external_urls": {"spotify": f"https://open.spotify.com/user/{added_by}"} if added_by else {},
"display_name": added_by or None,
},
"is_local": False,
"track": track_obj,
}
if isinstance(item_id_bytes, (bytes, bytearray)) and item_id_bytes:
item_obj["item_id"] = util.bytes_to_hex(item_id_bytes)
items.append(self._prune_empty(item_obj))
tracks_obj = self._prune_empty({
"offset": 0,
"total": len(items),
"items": items,
})
# Compute playlist length (in seconds) by summing track durations
length_seconds: Optional[int] = None
try:
if to_fetch and fetched_tracks:
total_ms = 0
for b62 in to_fetch:
obj = fetched_tracks.get(b62)
if obj is None:
continue
dur = obj.get("duration_ms")
if isinstance(dur, int) and dur > 0:
total_ms += dur
length_seconds = (total_ms // 1000) if total_ms > 0 else 0
except Exception:
length_seconds = None
rev_bytes = getattr(p, "revision", b"") if hasattr(p, "revision") else b""
snapshot_b64 = base64.b64encode(rev_bytes).decode("ascii") if rev_bytes else None
result = {
"name": name or None,
"description": description or None,
"collaborative": collaborative or None,
"picture": picture_url or None,
"owner": self._prune_empty({
"id": owner_username,
"type": "user",
"uri": f"spotify:user:{owner_username}" if owner_username else "",
"external_urls": {"spotify": f"https://open.spotify.com/user/{owner_username}"} if owner_username else {},
"display_name": owner_username or None,
}),
"snapshot_id": snapshot_b64,
"length": length_seconds,
"tracks": tracks_obj,
"type": "playlist",
}
return self._prune_empty(result)
# ---------- Private: fetching ----------
def _fetch_single_track_object(self, base62_id: str) -> None:
try:
tid = TrackId.from_base62(base62_id)
t_proto = self._session.api().get_metadata_4_track(tid)
self._track_object_cache[base62_id] = self._track_proto_to_object(t_proto)
except Exception:
self._track_object_cache[base62_id] = None
def _fetch_track_objects(self, base62_ids: List[str]) -> Dict[str, Optional[Dict[str, Any]]]:
seen = set()
unique: List[str] = []
for b in base62_ids:
if not b:
continue
if b not in seen:
seen.add(b)
if b not in self._track_object_cache:
unique.append(b)
if unique:
max_workers = min(self._max_workers, max(1, len(unique)))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for b in unique:
executor.submit(self._fetch_single_track_object, b)
return {b: self._track_object_cache.get(b) for b in base62_ids if b}
__all__ = ["LibrespotClient"]

View File

@@ -7,3 +7,4 @@ Deezspot models package.
# Import subpackages to ensure they're included
from deezspot.models import callback
from deezspot.models import download
from deezspot.models import librespot

View File

@@ -0,0 +1,28 @@
#!/usr/bin/python3
from .types import Image, ExternalUrls, ArtistRef, AlbumRef
from .track import Track
from .album import Album
from .playlist import Playlist, PlaylistItem, TrackStub, TracksPage, Owner, UserMini
from .artist import Artist
__all__ = [
"Image",
"ExternalUrls",
"ArtistRef",
"AlbumRef",
"Track",
"Album",
"Playlist",
"PlaylistItem",
"TrackStub",
"TracksPage",
"Owner",
"UserMini",
"Artist",
"SearchResult",
"SearchTracksPage",
"SearchAlbumsPage",
"SearchArtistsPage",
"SearchPlaylistsPage",
]

View File

@@ -0,0 +1,92 @@
#!/usr/bin/python3
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List, Union
from .types import ExternalUrls, Image, ArtistRef, _str, _int
from .track import Track as TrackModel
@dataclass
class Album:
id: Optional[str] = None
name: Optional[str] = None
uri: Optional[str] = None
type: str = "album"
album_type: Optional[str] = None
release_date: Optional[str] = None
release_date_precision: Optional[str] = None
total_tracks: Optional[int] = None
label: Optional[str] = None
popularity: Optional[int] = None
external_urls: ExternalUrls = field(default_factory=ExternalUrls)
external_ids: Dict[str, str] = field(default_factory=dict)
available_markets: Optional[List[str]] = None
images: Optional[List[Image]] = None
artists: List[ArtistRef] = field(default_factory=list)
tracks: Optional[List[Union[str, TrackModel]]] = None
copyrights: Optional[List[Dict[str, Any]]] = None
@staticmethod
def from_dict(obj: Any) -> "Album":
if not isinstance(obj, dict):
return Album()
imgs: List[Image] = []
for im in obj.get("images", []) or []:
im_obj = Image.from_dict(im)
if im_obj:
imgs.append(im_obj)
artists: List[ArtistRef] = []
for a in obj.get("artists", []) or []:
artists.append(ArtistRef.from_dict(a))
# Tracks can be base62 strings or full track dicts
tracks_in: List[Union[str, TrackModel]] = []
if isinstance(obj.get("tracks"), list):
for t in obj.get("tracks"):
if isinstance(t, dict):
tracks_in.append(TrackModel.from_dict(t))
else:
ts = _str(t)
if ts:
tracks_in.append(ts)
return Album(
id=_str(obj.get("id")),
name=_str(obj.get("name")),
uri=_str(obj.get("uri")),
type=_str(obj.get("type")) or "album",
album_type=_str(obj.get("album_type")),
release_date=_str(obj.get("release_date")),
release_date_precision=_str(obj.get("release_date_precision")),
total_tracks=_int(obj.get("total_tracks")),
label=_str(obj.get("label")),
popularity=_int(obj.get("popularity")),
external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})),
external_ids=dict(obj.get("external_ids", {}) or {}),
available_markets=list(obj.get("available_markets", []) or []),
images=imgs or None,
artists=artists,
tracks=tracks_in or None,
copyrights=list(obj.get("copyrights", []) or []),
)
def to_dict(self) -> Dict[str, Any]:
out = {
"id": self.id,
"name": self.name,
"uri": self.uri,
"type": self.type,
"album_type": self.album_type,
"release_date": self.release_date,
"release_date_precision": self.release_date_precision,
"total_tracks": self.total_tracks,
"label": self.label,
"popularity": self.popularity,
"external_urls": self.external_urls.to_dict(),
"external_ids": self.external_ids or {},
"available_markets": self.available_markets or [],
"images": [im.to_dict() for im in (self.images or [])],
"artists": [a.to_dict() for a in (self.artists or [])],
"tracks": [t.to_dict() if isinstance(t, TrackModel) else t for t in (self.tracks or [])],
"copyrights": self.copyrights or [],
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}

View File

@@ -0,0 +1,63 @@
#!/usr/bin/python3
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from .types import ExternalUrls, Image, _str, _int
@dataclass
class Artist:
id: Optional[str] = None
name: Optional[str] = None
uri: Optional[str] = None
type: str = "artist"
genres: List[str] = field(default_factory=list)
images: Optional[List[Image]] = None
popularity: Optional[int] = None
external_urls: ExternalUrls = field(default_factory=ExternalUrls)
album_group: List[str] = field(default_factory=list)
single_group: List[str] = field(default_factory=list)
compilation_group: List[str] = field(default_factory=list)
appears_on_group: List[str] = field(default_factory=list)
@staticmethod
def from_dict(obj: Any) -> "Artist":
if not isinstance(obj, dict):
return Artist()
imgs: List[Image] = []
for im in obj.get("images", []) or []:
im_obj = Image.from_dict(im)
if im_obj:
imgs.append(im_obj)
return Artist(
id=_str(obj.get("id")),
name=_str(obj.get("name")),
uri=_str(obj.get("uri")),
type=_str(obj.get("type")) or "artist",
genres=list(obj.get("genres", []) or []),
images=imgs or None,
popularity=_int(obj.get("popularity")),
external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})),
album_group=list(obj.get("album_group", []) or []),
single_group=list(obj.get("single_group", []) or []),
compilation_group=list(obj.get("compilation_group", []) or []),
appears_on_group=list(obj.get("appears_on_group", []) or []),
)
def to_dict(self) -> Dict[str, Any]:
out = {
"id": self.id,
"name": self.name,
"uri": self.uri,
"type": self.type,
"genres": self.genres or [],
"images": [im.to_dict() for im in (self.images or [])],
"popularity": self.popularity,
"external_urls": self.external_urls.to_dict(),
"album_group": self.album_group or [],
"single_group": self.single_group or [],
"compilation_group": self.compilation_group or [],
"appears_on_group": self.appears_on_group or [],
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}

View File

@@ -0,0 +1,205 @@
#!/usr/bin/python3
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List, Union
from .types import ExternalUrls, _str, _int
from .track import Track as TrackModel
@dataclass
class UserMini:
id: Optional[str] = None
type: str = "user"
uri: Optional[str] = None
display_name: Optional[str] = None
external_urls: ExternalUrls = field(default_factory=ExternalUrls)
@staticmethod
def from_dict(obj: Any) -> "UserMini":
if not isinstance(obj, dict):
return UserMini()
return UserMini(
id=_str(obj.get("id")),
type=_str(obj.get("type")) or "user",
uri=_str(obj.get("uri")),
display_name=_str(obj.get("display_name")),
external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})),
)
def to_dict(self) -> Dict[str, Any]:
out = {
"id": self.id,
"type": self.type,
"uri": self.uri,
"display_name": self.display_name,
"external_urls": self.external_urls.to_dict(),
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}
@dataclass
class TrackStub:
id: Optional[str] = None
type: str = "track"
uri: Optional[str] = None
external_urls: ExternalUrls = field(default_factory=ExternalUrls)
@staticmethod
def from_dict(obj: Any) -> "TrackStub":
if not isinstance(obj, dict):
return TrackStub()
return TrackStub(
id=_str(obj.get("id")),
type=_str(obj.get("type")) or "track",
uri=_str(obj.get("uri")),
external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})),
)
def to_dict(self) -> Dict[str, Any]:
out = {
"id": self.id,
"type": self.type,
"uri": self.uri,
"external_urls": self.external_urls.to_dict(),
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}
@dataclass
class PlaylistItem:
added_at: Optional[str] = None
added_by: UserMini = field(default_factory=UserMini)
is_local: bool = False
track: Optional[Union[TrackModel, TrackStub]] = None
item_id: Optional[str] = None
@staticmethod
def from_dict(obj: Any) -> "PlaylistItem":
if not isinstance(obj, dict):
return PlaylistItem()
track_obj = None
trk = obj.get("track")
if isinstance(trk, dict):
if trk.get("duration_ms") is not None:
track_obj = TrackModel.from_dict(trk)
else:
track_obj = TrackStub.from_dict(trk)
return PlaylistItem(
added_at=_str(obj.get("added_at")),
added_by=UserMini.from_dict(obj.get("added_by", {})),
is_local=bool(obj.get("is_local", False)),
track=track_obj,
item_id=_str(obj.get("item_id")),
)
def to_dict(self) -> Dict[str, Any]:
out = {
"added_at": self.added_at,
"added_by": self.added_by.to_dict(),
"is_local": self.is_local,
"track": self.track.to_dict() if hasattr(self.track, 'to_dict') and self.track else None,
"item_id": self.item_id,
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}
@dataclass
class TracksPage:
offset: int = 0
total: int = 0
items: List[PlaylistItem] = field(default_factory=list)
@staticmethod
def from_dict(obj: Any) -> "TracksPage":
if not isinstance(obj, dict):
return TracksPage(items=[])
items: List[PlaylistItem] = []
for it in obj.get("items", []) or []:
items.append(PlaylistItem.from_dict(it))
return TracksPage(
offset=_int(obj.get("offset")) or 0,
total=_int(obj.get("total")) or len(items),
items=items,
)
def to_dict(self) -> Dict[str, Any]:
return {
"offset": self.offset,
"total": self.total,
"items": [it.to_dict() for it in (self.items or [])]
}
@dataclass
class Owner:
id: Optional[str] = None
type: str = "user"
uri: Optional[str] = None
display_name: Optional[str] = None
external_urls: ExternalUrls = field(default_factory=ExternalUrls)
@staticmethod
def from_dict(obj: Any) -> "Owner":
if not isinstance(obj, dict):
return Owner()
return Owner(
id=_str(obj.get("id")),
type=_str(obj.get("type")) or "user",
uri=_str(obj.get("uri")),
display_name=_str(obj.get("display_name")),
external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})),
)
def to_dict(self) -> Dict[str, Any]:
out = {
"id": self.id,
"type": self.type,
"uri": self.uri,
"display_name": self.display_name,
"external_urls": self.external_urls.to_dict(),
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}
@dataclass
class Playlist:
name: Optional[str] = None
description: Optional[str] = None
collaborative: Optional[bool] = None
picture: Optional[str] = None
owner: Owner = field(default_factory=Owner)
snapshot_id: Optional[str] = None
length: Optional[int] = None
tracks: TracksPage = field(default_factory=lambda: TracksPage(items=[]))
type: str = "playlist"
@staticmethod
def from_dict(obj: Any) -> "Playlist":
if not isinstance(obj, dict):
return Playlist(tracks=TracksPage(items=[]))
return Playlist(
name=_str(obj.get("name")),
description=_str(obj.get("description")),
collaborative=bool(obj.get("collaborative")) if obj.get("collaborative") is not None else None,
picture=_str(obj.get("picture")),
owner=Owner.from_dict(obj.get("owner", {})),
snapshot_id=_str(obj.get("snapshot_id")),
length=_int(obj.get("length")),
tracks=TracksPage.from_dict(obj.get("tracks", {})),
type=_str(obj.get("type")) or "playlist",
)
def to_dict(self) -> Dict[str, Any]:
out = {
"name": self.name,
"description": self.description,
"collaborative": self.collaborative,
"picture": self.picture,
"owner": self.owner.to_dict(),
"snapshot_id": self.snapshot_id,
"length": self.length,
"tracks": self.tracks.to_dict(),
"type": self.type,
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}

View File

@@ -0,0 +1,82 @@
#!/usr/bin/python3
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from .types import ExternalUrls, Image, ArtistRef, AlbumRef, _str, _int, _bool
@dataclass
class Track:
id: Optional[str] = None
name: Optional[str] = None
uri: Optional[str] = None
type: str = "track"
duration_ms: Optional[int] = None
explicit: Optional[bool] = None
track_number: Optional[int] = None
disc_number: Optional[int] = None
popularity: Optional[int] = None
preview_url: Optional[str] = None
earliest_live_timestamp: Optional[int] = None
has_lyrics: Optional[bool] = None
licensor_uuid: Optional[str] = None
external_urls: ExternalUrls = field(default_factory=ExternalUrls)
external_ids: Dict[str, str] = field(default_factory=dict)
available_markets: Optional[List[str]] = None
artists: List[ArtistRef] = field(default_factory=list)
album: Optional[AlbumRef] = None
@staticmethod
def from_dict(obj: Any) -> "Track":
if not isinstance(obj, dict):
return Track()
artists: List[ArtistRef] = []
for a in obj.get("artists", []) or []:
artists.append(ArtistRef.from_dict(a))
album_ref = None
if isinstance(obj.get("album"), dict):
album_ref = AlbumRef.from_dict(obj.get("album"))
return Track(
id=_str(obj.get("id")),
name=_str(obj.get("name")),
uri=_str(obj.get("uri")),
type=_str(obj.get("type")) or "track",
duration_ms=_int(obj.get("duration_ms")),
explicit=_bool(obj.get("explicit")),
track_number=_int(obj.get("track_number")),
disc_number=_int(obj.get("disc_number")),
popularity=_int(obj.get("popularity")),
preview_url=_str(obj.get("preview_url")),
earliest_live_timestamp=_int(obj.get("earliest_live_timestamp")),
has_lyrics=_bool(obj.get("has_lyrics")),
licensor_uuid=_str(obj.get("licensor_uuid")),
external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})),
external_ids=dict(obj.get("external_ids", {}) or {}),
available_markets=list(obj.get("available_markets", []) or []),
artists=artists,
album=album_ref,
)
def to_dict(self) -> Dict[str, Any]:
out = {
"id": self.id,
"name": self.name,
"uri": self.uri,
"type": self.type,
"duration_ms": self.duration_ms,
"explicit": self.explicit,
"track_number": self.track_number,
"disc_number": self.disc_number,
"popularity": self.popularity,
"preview_url": self.preview_url,
"earliest_live_timestamp": self.earliest_live_timestamp,
"has_lyrics": self.has_lyrics,
"licensor_uuid": self.licensor_uuid,
"external_urls": self.external_urls.to_dict(),
"external_ids": self.external_ids or {},
"available_markets": self.available_markets or [],
"artists": [a.to_dict() for a in (self.artists or [])],
"album": self.album.to_dict() if self.album else None,
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}

View File

@@ -0,0 +1,153 @@
#!/usr/bin/python3
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
def _str(v: Any) -> Optional[str]:
if v is None:
return None
try:
s = str(v)
return s
except Exception:
return None
def _int(v: Any) -> Optional[int]:
try:
if v is None:
return None
return int(v)
except Exception:
return None
def _bool(v: Any) -> Optional[bool]:
if isinstance(v, bool):
return v
if v in ("true", "True", "1", 1):
return True
if v in ("false", "False", "0", 0):
return False
return None
def _list_str(v: Any) -> Optional[List[str]]:
if v is None:
return []
if isinstance(v, list):
out: List[str] = []
for it in v:
s = _str(it)
if s is not None:
out.append(s)
return out
return []
@dataclass
class ExternalUrls:
spotify: Optional[str] = None
@staticmethod
def from_dict(obj: Any) -> "ExternalUrls":
if not isinstance(obj, dict):
return ExternalUrls()
return ExternalUrls(
spotify=_str(obj.get("spotify"))
)
def to_dict(self) -> Dict[str, Any]:
return {"spotify": self.spotify} if self.spotify else {}
@dataclass
class Image:
url: str
width: int = 0
height: int = 0
@staticmethod
def from_dict(obj: Any) -> Optional["Image"]:
if not isinstance(obj, dict):
return None
url = _str(obj.get("url"))
if not url:
return None
w = _int(obj.get("width")) or 0
h = _int(obj.get("height")) or 0
return Image(url=url, width=w, height=h)
def to_dict(self) -> Dict[str, Any]:
return {"url": self.url, "width": self.width, "height": self.height}
@dataclass
class ArtistRef:
id: Optional[str] = None
name: str = ""
type: str = "artist"
uri: Optional[str] = None
external_urls: ExternalUrls = field(default_factory=ExternalUrls)
@staticmethod
def from_dict(obj: Any) -> "ArtistRef":
if not isinstance(obj, dict):
return ArtistRef()
return ArtistRef(
id=_str(obj.get("id")),
name=_str(obj.get("name")) or "",
type=_str(obj.get("type")) or "artist",
uri=_str(obj.get("uri")),
external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})),
)
def to_dict(self) -> Dict[str, Any]:
out = {
"id": self.id,
"name": self.name,
"type": self.type,
"uri": self.uri,
"external_urls": self.external_urls.to_dict()
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}
@dataclass
class AlbumRef:
id: Optional[str] = None
name: Optional[str] = None
type: str = "album"
uri: Optional[str] = None
external_urls: ExternalUrls = field(default_factory=ExternalUrls)
images: Optional[List[Image]] = None
@staticmethod
def from_dict(obj: Any) -> "AlbumRef":
if not isinstance(obj, dict):
return AlbumRef()
imgs: List[Image] = []
for im in obj.get("images", []) or []:
im_obj = Image.from_dict(im)
if im_obj:
imgs.append(im_obj)
return AlbumRef(
id=_str(obj.get("id")),
name=_str(obj.get("name")),
type=_str(obj.get("type")) or "album",
uri=_str(obj.get("uri")),
external_urls=ExternalUrls.from_dict(obj.get("external_urls", {})),
images=imgs or None,
)
def to_dict(self) -> Dict[str, Any]:
out = {
"id": self.id,
"name": self.name,
"type": self.type,
"uri": self.uri,
"external_urls": self.external_urls.to_dict(),
"images": [im.to_dict() for im in (self.images or [])]
}
return {k: v for k, v in out.items() if v not in (None, {}, [], "")}

242
docs/librespot_client.md Normal file
View File

@@ -0,0 +1,242 @@
### LibrespotClient wrapper
A thin, high-level wrapper over the internal librespot API that returns Web API-like dictionaries for albums, tracks, artists, and playlists. Use this to standardize access to Spotify metadata throughout the codebase.
- Import path: `from deezspot.libutils import LibrespotClient`
- Backed by: `librespot` internal API (Session + Metadata/Playlist protos)
- Thread-safe for read operations; uses an internal in-memory cache for track object expansions
## Initialization
```python
from deezspot.libutils import LibrespotClient
# 1) Create with stored credentials file (recommended)
client = LibrespotClient(stored_credentials_path="/absolute/path/to/credentials.json")
# 2) Or reuse an existing librespot Session
# from librespot.core import Session
# session = ...
# client = LibrespotClient(session=session)
```
- **stored_credentials_path**: path to JSON created by librespot credential flow
- **session**: optional existing `librespot.core.Session` (if provided, `stored_credentials_path` is not required)
- **max_workers**: optional concurrency cap for track expansion (default 16, bounded [1, 32])
Always dispose when done:
```python
client.close()
```
## ID/URI inputs
All data-fetching methods accept either:
- A Spotify URI (e.g., `spotify:album:...`, `spotify:track:...`, etc)
- A base62 ID (e.g., `3KuXEGcqLcnEYWnn3OEGy0`)
- A public `open.spotify.com/...` URL (album/track/artist/playlist)
- Or the corresponding `librespot.metadata.*Id` class
You can also use the helper if needed:
```python
# kind in {"track", "album", "artist", "playlist"}
track_id = LibrespotClient.parse_input_id("track", "https://open.spotify.com/track/...")
```
## Public API
### get_album(album, include_tracks=False) -> dict
Fetches album metadata.
- **album**: URI/base62/URL or `AlbumId`
- **include_tracks**: when True, expands the album's tracks to full track objects using concurrent fetches; when False, returns `tracks` as an array of track base62 IDs
Return shape (subset):
```json
{
"album_type": "album",
"total_tracks": 10,
"available_markets": ["US", "GB"],
"external_urls": {"spotify": "https://open.spotify.com/album/{id}"},
"id": "{base62}",
"images": [{"url": "https://...", "width": 640, "height": 640}],
"name": "...",
"release_date": "2020-05-01",
"release_date_precision": "day",
"type": "album",
"uri": "spotify:album:{base62}",
"artists": [{"id": "...", "name": "...", "type": "artist", "uri": "..."}],
"tracks": [
// include_tracks=False -> ["trackBase62", ...]
// include_tracks=True -> [{ track object }, ...]
],
"copyrights": [{"text": "...", "type": "..."}],
"external_ids": {"upc": "..."},
"label": "...",
"popularity": 57
}
```
Usage:
```python
album = client.get_album("spotify:album:...", include_tracks=True)
```
### get_track(track) -> dict
Fetches track metadata.
- **track**: URI/base62/URL or `TrackId`
Return shape (subset):
```json
{
"album": { /* embedded album (no tracks) */ },
"artists": [{"id": "...", "name": "..."}],
"available_markets": ["US", "GB"],
"disc_number": 1,
"duration_ms": 221000,
"explicit": false,
"external_ids": {"isrc": "..."},
"external_urls": {"spotify": "https://open.spotify.com/track/{id}"},
"id": "{base62}",
"name": "...",
"popularity": 65,
"track_number": 1,
"type": "track",
"uri": "spotify:track:{base62}",
"preview_url": "https://p.scdn.co/mp3-preview/{hex}",
"has_lyrics": true,
"earliest_live_timestamp": 0,
"licensor_uuid": "{hex}" // when available
}
```
Usage:
```python
track = client.get_track("3KuXEGcqLcnEYWnn3OEGy0")
```
### get_artist(artist) -> dict
Fetches artist metadata and returns a full JSON-like mapping of the protobuf (pruned of empty fields).
- **artist**: URI/base62/URL or `ArtistId`
Usage:
```python
artist = client.get_artist("https://open.spotify.com/artist/...")
```
### get_playlist(playlist, expand_items=False) -> dict
Fetches playlist contents.
- **playlist**: URI/URL/ID or `PlaylistId` (Spotify uses non-base62 playlist IDs)
- **expand_items**: when True, playlist items containing tracks are expanded to full track objects (concurrent fetch with caching); otherwise, items contain minimal track stubs with `id`, `uri`, `type`, and `external_urls`
Return shape (subset):
```json
{
"name": "My Playlist",
"description": "...",
"collaborative": false,
"images": [{"url": "https://..."}],
"owner": {
"id": "username",
"type": "user",
"uri": "spotify:user:username",
"external_urls": {"spotify": "https://open.spotify.com/user/username"},
"display_name": "username"
},
"snapshot_id": "base64Revision==",
"tracks": {
"offset": 0,
"total": 42,
"items": [
{
"added_at": "2023-01-01T12:34:56Z",
"added_by": {"id": "...", "type": "user", "uri": "...", "external_urls": {"spotify": "..."}, "display_name": "..."},
"is_local": false,
"track": {
// expand_items=False -> {"id": "...", "uri": "spotify:track:...", "type": "track", "external_urls": {"spotify": "..."}}
// expand_items=True -> full track object
},
"item_id": "{hex}" // additional reference, not a Web API field
}
]
},
"type": "playlist"
}
```
Usage:
```python
playlist = client.get_playlist("spotify:playlist:...")
playlist_expanded = client.get_playlist("spotify:playlist:...", expand_items=True)
```
## Concurrency and caching
- When expanding tracks for albums/playlists, the client concurrently fetches missing track objects using a `ThreadPoolExecutor` with up to `max_workers` threads (default 16).
- A per-instance in-memory cache stores fetched track objects keyed by base62 ID to avoid duplicate network calls in the same process.
## Error handling
- Underlying network/protobuf errors are not swallowed; wrap your calls if you need custom handling.
- Empty/missing fields are pruned from output structures where appropriate.
Example:
```python
try:
data = client.get_album("spotify:album:...")
except Exception as exc:
# handle failure (retry/backoff/logging)
raise
```
## Migration guide (from direct librespot usage)
Before (direct protobuf access):
```python
album_id = AlbumId.from_base62(base62)
proto = session.api().get_metadata_4_album(album_id)
# manual traversal over `proto`...
```
After (wrapper):
```python
from deezspot.libutils import LibrespotClient
client = LibrespotClient(stored_credentials_path="/path/to/credentials.json")
try:
album = client.get_album(base62, include_tracks=True)
finally:
client.close()
```
## Notes
- Image URLs are derived from internal `file_id` bytes using the public Spotify image host.
- Playlist IDs are not base62; pass the raw ID, URI, or URL.
- For performance-critical paths, reuse a single `LibrespotClient` instance (and its cache) per worker.

View File

@@ -17,13 +17,12 @@ dependencies = [
"mutagen==1.47.0",
"pycryptodome==3.23.0",
"requests==2.32.3",
"spotipy==2.25.1",
"tqdm==4.67.1",
"fastapi==0.116.1",
"uvicorn[standard]==0.35.0",
"spotipy-anon==1.5.2",
"librespot-spotizerr==0.3.0",
"rapidfuzz==3.13.0"
"rapidfuzz==3.13.0",
"spotipy==2.25.1"
]
[project.urls]

View File

@@ -22,12 +22,11 @@ setup(
"mutagen==1.47.0",
"pycryptodome==3.23.0",
"requests==2.32.3",
"spotipy==2.25.1",
"tqdm==4.67.1",
"fastapi==0.116.1",
"uvicorn[standard]==0.35.0",
"spotipy-anon==1.5.2",
"librespot-spotizerr==0.3.0",
"rapidfuzz"
"rapidfuzz==3.13.0",
"spotipy==2.25.1"
],
)