277 lines
12 KiB
Python
277 lines
12 KiB
Python
#!/usr/bin/python3
|
|
|
|
from deezspot.easy_spoty import Spo
|
|
import traceback
|
|
from deezspot.libutils.logging_utils import logger
|
|
from deezspot.exceptions import MarketAvailabilityError
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
from deezspot.models.callback.album import albumObject, artistAlbumObject, trackAlbumObject as CbTrackAlbumObject, artistTrackAlbumObject
|
|
from deezspot.models.callback.artist import artistObject
|
|
from deezspot.models.callback.common import IDs
|
|
from deezspot.models.callback.playlist import playlistObject, trackPlaylistObject, albumTrackPlaylistObject, artistTrackPlaylistObject, artistAlbumTrackPlaylistObject
|
|
from deezspot.models.callback.track import trackObject, artistTrackObject, albumTrackObject, artistAlbumTrackObject
|
|
from deezspot.models.callback.user import userObject
|
|
|
|
|
|
def _check_market_availability(item_name: str, item_type: str, api_available_markets: list[str] | None, user_markets: list[str] | None):
|
|
"""Checks if an item is available in any of the user-specified markets."""
|
|
if user_markets and api_available_markets is not None:
|
|
is_available_in_any_user_market = any(m in api_available_markets for m in user_markets)
|
|
if not is_available_in_any_user_market:
|
|
markets_str = ", ".join(user_markets)
|
|
raise MarketAvailabilityError(f"{item_type} '{item_name}' not available in provided market(s): {markets_str}")
|
|
elif user_markets and api_available_markets is None:
|
|
logger.warning(
|
|
f"Market availability check for {item_type} '{item_name}' skipped: "
|
|
"API response did not include 'available_markets' field. Assuming availability."
|
|
)
|
|
|
|
def _parse_release_date(date_str: Optional[str], precision: Optional[str]) -> Dict[str, Any]:
|
|
if not date_str:
|
|
return {}
|
|
|
|
parts = date_str.split('-')
|
|
data = {}
|
|
|
|
if len(parts) >= 1 and parts[0]:
|
|
data['year'] = int(parts[0])
|
|
if precision in ['month', 'day'] and len(parts) >= 2 and parts[1]:
|
|
data['month'] = int(parts[1])
|
|
if precision == 'day' and len(parts) >= 3 and parts[2]:
|
|
data['day'] = int(parts[2])
|
|
|
|
return data
|
|
|
|
def _json_to_ids(item_json: dict) -> IDs:
|
|
external_ids = item_json.get('external_ids', {})
|
|
return IDs(
|
|
spotify=item_json.get('id'),
|
|
isrc=external_ids.get('isrc'),
|
|
upc=external_ids.get('upc')
|
|
)
|
|
|
|
def _json_to_artist_track_object(artist_json: dict) -> artistTrackObject:
|
|
return artistTrackObject(
|
|
name=artist_json.get('name', ''),
|
|
ids=_json_to_ids(artist_json)
|
|
)
|
|
|
|
def _json_to_artist_album_track_object(artist_json: dict) -> artistAlbumTrackObject:
|
|
return artistAlbumTrackObject(
|
|
name=artist_json.get('name', ''),
|
|
ids=_json_to_ids(artist_json)
|
|
)
|
|
|
|
def _json_to_album_track_object(album_json: dict) -> albumTrackObject:
|
|
return albumTrackObject(
|
|
album_type=album_json.get('album_type', 'album'),
|
|
title=album_json.get('name', ''),
|
|
release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')),
|
|
total_tracks=album_json.get('total_tracks', 0),
|
|
genres=album_json.get('genres', []),
|
|
images=album_json.get('images', []),
|
|
ids=_json_to_ids(album_json),
|
|
artists=[_json_to_artist_album_track_object(artist) for artist in album_json.get('artists', [])]
|
|
)
|
|
|
|
def tracking(ids, album_data_for_track=None, market: list[str] | None = None) -> Optional[trackObject]:
|
|
try:
|
|
json_track = Spo.get_track(ids)
|
|
if not json_track:
|
|
logger.error(f"Failed to get track details for ID: {ids} from Spotify API.")
|
|
return None
|
|
|
|
track_name_for_check = json_track.get('name', f'Track ID {ids}')
|
|
api_track_markets = json_track.get('available_markets')
|
|
_check_market_availability(track_name_for_check, "Track", api_track_markets, market)
|
|
|
|
album_to_process = None
|
|
if album_data_for_track:
|
|
album_to_process = album_data_for_track
|
|
elif json_track.get('album'):
|
|
album_id = json_track.get('album', {}).get('id')
|
|
if album_id:
|
|
album_to_process = Spo.get_album(album_id)
|
|
if not album_to_process:
|
|
album_to_process = json_track.get('album')
|
|
|
|
album_for_track = _json_to_album_track_object(album_to_process) if album_to_process else albumTrackObject()
|
|
|
|
track_obj = trackObject(
|
|
title=json_track.get('name', ''),
|
|
disc_number=json_track.get('disc_number', 1),
|
|
track_number=json_track.get('track_number', 1),
|
|
duration_ms=json_track.get('duration_ms', 0),
|
|
explicit=json_track.get('explicit', False),
|
|
genres=album_for_track.genres,
|
|
album=album_for_track,
|
|
artists=[_json_to_artist_track_object(artist) for artist in json_track.get('artists', [])],
|
|
ids=_json_to_ids(json_track)
|
|
)
|
|
logger.debug(f"Successfully tracked metadata for track {ids}")
|
|
return track_obj
|
|
|
|
except MarketAvailabilityError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Failed to track metadata for track {ids}: {str(e)}")
|
|
logger.debug(traceback.format_exc())
|
|
return None
|
|
|
|
def _json_to_artist_album_object(artist_json: dict) -> artistAlbumObject:
|
|
return artistAlbumObject(
|
|
name=artist_json.get('name', ''),
|
|
ids=_json_to_ids(artist_json)
|
|
)
|
|
|
|
def _json_to_track_album_object(track_json: dict) -> CbTrackAlbumObject:
|
|
return CbTrackAlbumObject(
|
|
title=track_json.get('name', ''),
|
|
disc_number=track_json.get('disc_number', 1),
|
|
track_number=track_json.get('track_number', 1),
|
|
duration_ms=track_json.get('duration_ms', 0),
|
|
explicit=track_json.get('explicit', False),
|
|
ids=_json_to_ids(track_json),
|
|
artists=[artistTrackAlbumObject(name=a.get('name'), ids=_json_to_ids(a)) for a in track_json.get('artists', [])]
|
|
)
|
|
|
|
def tracking_album(album_json, market: list[str] | None = None) -> Optional[albumObject]:
|
|
if not album_json:
|
|
logger.error("tracking_album received None or empty album_json.")
|
|
return None
|
|
|
|
try:
|
|
album_name_for_check = album_json.get('name', f"Album ID {album_json.get('id', 'Unknown')}")
|
|
api_album_markets = album_json.get('available_markets')
|
|
_check_market_availability(album_name_for_check, "Album", api_album_markets, market)
|
|
|
|
album_artists = [_json_to_artist_album_object(a) for a in album_json.get('artists', [])]
|
|
|
|
album_tracks = []
|
|
simplified_tracks = album_json.get('tracks', {}).get('items', [])
|
|
track_ids = [t['id'] for t in simplified_tracks if t and t.get('id')]
|
|
|
|
full_tracks_data = []
|
|
if track_ids:
|
|
# Batch fetch full track objects. The get_tracks method should handle chunking if necessary.
|
|
full_tracks_data = Spo.get_tracks(track_ids, market=','.join(market) if market else None)
|
|
|
|
track_items_to_process = []
|
|
if full_tracks_data and full_tracks_data.get('tracks'):
|
|
track_items_to_process = full_tracks_data['tracks']
|
|
else: # Fallback to simplified if batch fetch fails
|
|
track_items_to_process = simplified_tracks
|
|
|
|
for track_item in track_items_to_process:
|
|
if not track_item or not track_item.get('id'):
|
|
continue
|
|
|
|
# Simplified track object from album endpoint is enough for trackAlbumObject
|
|
album_tracks.append(_json_to_track_album_object(track_item))
|
|
|
|
album_obj = albumObject(
|
|
album_type=album_json.get('album_type'),
|
|
title=album_json.get('name'),
|
|
release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')),
|
|
total_tracks=album_json.get('total_tracks'),
|
|
genres=album_json.get('genres', []),
|
|
images=album_json.get('images', []),
|
|
copyrights=album_json.get('copyrights', []),
|
|
ids=_json_to_ids(album_json),
|
|
tracks=album_tracks,
|
|
artists=album_artists
|
|
)
|
|
|
|
logger.debug(f"Successfully tracked metadata for album {album_json.get('id', 'N/A')}")
|
|
return album_obj
|
|
|
|
except MarketAvailabilityError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Failed to track album metadata for album ID {album_json.get('id', 'N/A') if album_json else 'N/A'}: {str(e)}")
|
|
logger.debug(traceback.format_exc())
|
|
return None
|
|
|
|
def tracking_episode(ids, market: list[str] | None = None) -> Optional[trackObject]:
|
|
try:
|
|
json_episode = Spo.get_episode(ids)
|
|
if not json_episode:
|
|
logger.error(f"Failed to get episode details for ID: {ids} from Spotify API.")
|
|
return None
|
|
|
|
episode_name_for_check = json_episode.get('name', f'Episode ID {ids}')
|
|
api_episode_markets = json_episode.get('available_markets')
|
|
_check_market_availability(episode_name_for_check, "Episode", api_episode_markets, market)
|
|
|
|
show_data = json_episode.get('show', {})
|
|
|
|
album_for_episode = albumTrackObject(
|
|
album_type='show',
|
|
title=show_data.get('name', 'Unknown Show'),
|
|
total_tracks=show_data.get('total_episodes', 0),
|
|
genres=show_data.get('genres', []),
|
|
images=json_episode.get('images', []),
|
|
ids=IDs(spotify=show_data.get('id')),
|
|
artists=[artistTrackAlbumObject(name=show_data.get('publisher', ''))]
|
|
)
|
|
|
|
episode_as_track = trackObject(
|
|
title=json_episode.get('name', 'Unknown Episode'),
|
|
duration_ms=json_episode.get('duration_ms', 0),
|
|
explicit=json_episode.get('explicit', False),
|
|
album=album_for_episode,
|
|
artists=[artistTrackObject(name=show_data.get('publisher', ''))],
|
|
ids=_json_to_ids(json_episode)
|
|
)
|
|
|
|
logger.debug(f"Successfully tracked metadata for episode {ids}")
|
|
return episode_as_track
|
|
|
|
except MarketAvailabilityError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Failed to track episode metadata for ID {ids}: {str(e)}")
|
|
logger.debug(traceback.format_exc())
|
|
return None
|
|
|
|
def json_to_artist_album_track_playlist_object(artist_json: dict) -> artistAlbumTrackPlaylistObject:
|
|
"""Converts a JSON dict to an artistAlbumTrackPlaylistObject."""
|
|
return artistAlbumTrackPlaylistObject(
|
|
name=artist_json.get('name', ''),
|
|
ids=_json_to_ids(artist_json)
|
|
)
|
|
|
|
def json_to_artist_track_playlist_object(artist_json: dict) -> artistTrackPlaylistObject:
|
|
"""Converts a JSON dict to an artistTrackPlaylistObject."""
|
|
return artistTrackPlaylistObject(
|
|
name=artist_json.get('name', ''),
|
|
ids=_json_to_ids(artist_json)
|
|
)
|
|
|
|
def json_to_album_track_playlist_object(album_json: dict) -> albumTrackPlaylistObject:
|
|
"""Converts a JSON dict to an albumTrackPlaylistObject."""
|
|
return albumTrackPlaylistObject(
|
|
album_type=album_json.get('album_type', ''),
|
|
title=album_json.get('name', ''),
|
|
total_tracks=album_json.get('total_tracks', 0),
|
|
release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')),
|
|
images=album_json.get('images', []),
|
|
ids=_json_to_ids(album_json),
|
|
artists=[json_to_artist_album_track_playlist_object(a) for a in album_json.get('artists', [])]
|
|
)
|
|
|
|
def json_to_track_playlist_object(track_json: dict) -> Optional[trackPlaylistObject]:
|
|
"""Converts a JSON dict from a playlist item to a trackPlaylistObject."""
|
|
if not track_json:
|
|
return None
|
|
album_data = track_json.get('album', {})
|
|
return trackPlaylistObject(
|
|
title=track_json.get('name', ''),
|
|
disc_number=track_json.get('disc_number', 1),
|
|
track_number=track_json.get('track_number', 1),
|
|
duration_ms=track_json.get('duration_ms', 0),
|
|
ids=_json_to_ids(track_json),
|
|
album=json_to_album_track_playlist_object(album_data),
|
|
artists=[json_to_artist_track_playlist_object(a) for a in track_json.get('artists', [])]
|
|
) |