#!/usr/bin/python3 from deezspot.easy_spoty import Spo import traceback from deezspot.libutils.logging_utils import logger from deezspot.exceptions import MarketAvailabilityError from typing import List, Optional, Dict, Any from deezspot.models.callback.album import albumObject, artistAlbumObject, trackAlbumObject as CbTrackAlbumObject, artistTrackAlbumObject from deezspot.models.callback.artist import artistObject from deezspot.models.callback.common import IDs from deezspot.models.callback.playlist import playlistObject, trackPlaylistObject, albumTrackPlaylistObject, artistTrackPlaylistObject, artistAlbumTrackPlaylistObject from deezspot.models.callback.track import trackObject, artistTrackObject, albumTrackObject, artistAlbumTrackObject from deezspot.models.callback.user import userObject def _check_market_availability(item_name: str, item_type: str, api_available_markets: list[str] | None, user_markets: list[str] | None): """Checks if an item is available in any of the user-specified markets.""" if user_markets and api_available_markets is not None: is_available_in_any_user_market = any(m in api_available_markets for m in user_markets) if not is_available_in_any_user_market: markets_str = ", ".join(user_markets) raise MarketAvailabilityError(f"{item_type} '{item_name}' not available in provided market(s): {markets_str}") elif user_markets and api_available_markets is None: logger.warning( f"Market availability check for {item_type} '{item_name}' skipped: " "API response did not include 'available_markets' field. Assuming availability." ) def _parse_release_date(date_str: Optional[str], precision: Optional[str]) -> Dict[str, Any]: if not date_str: return {} parts = date_str.split('-') data = {} if len(parts) >= 1 and parts[0]: data['year'] = int(parts[0]) if precision in ['month', 'day'] and len(parts) >= 2 and parts[1]: data['month'] = int(parts[1]) if precision == 'day' and len(parts) >= 3 and parts[2]: data['day'] = int(parts[2]) return data def _json_to_ids(item_json: dict) -> IDs: external_ids = item_json.get('external_ids', {}) return IDs( spotify=item_json.get('id'), isrc=external_ids.get('isrc'), upc=external_ids.get('upc') ) def _json_to_artist_track_object(artist_json: dict) -> artistTrackObject: return artistTrackObject( name=artist_json.get('name', ''), ids=_json_to_ids(artist_json) ) def _json_to_artist_album_track_object(artist_json: dict) -> artistAlbumTrackObject: return artistAlbumTrackObject( name=artist_json.get('name', ''), ids=_json_to_ids(artist_json) ) def _json_to_album_track_object(album_json: dict) -> albumTrackObject: return albumTrackObject( album_type=album_json.get('album_type', 'album'), title=album_json.get('name', ''), release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')), total_tracks=album_json.get('total_tracks', 0), genres=album_json.get('genres', []), images=album_json.get('images', []), ids=_json_to_ids(album_json), artists=[_json_to_artist_album_track_object(artist) for artist in album_json.get('artists', [])] ) def tracking(ids, album_data_for_track=None, market: list[str] | None = None) -> Optional[trackObject]: try: json_track = Spo.get_track(ids) if not json_track: logger.error(f"Failed to get track details for ID: {ids} from Spotify API.") return None track_name_for_check = json_track.get('name', f'Track ID {ids}') api_track_markets = json_track.get('available_markets') _check_market_availability(track_name_for_check, "Track", api_track_markets, market) album_to_process = None full_album_obj = None if album_data_for_track: album_to_process = album_data_for_track elif json_track.get('album'): album_id = json_track.get('album', {}).get('id') if album_id: # Try to get full album data with all tracks for proper disc counting full_album_data = Spo.get_album(album_id) if full_album_data: album_to_process = full_album_data # Also create a full album object to get total_discs full_album_obj = tracking_album(full_album_data, market) if not album_to_process: album_to_process = json_track.get('album') album_for_track = _json_to_album_track_object(album_to_process) if album_to_process else albumTrackObject() # If we have a full album object with total_discs, use that information if full_album_obj and hasattr(full_album_obj, 'total_discs'): album_for_track.total_discs = full_album_obj.total_discs track_obj = trackObject( title=json_track.get('name', ''), disc_number=json_track.get('disc_number', 1), track_number=json_track.get('track_number', 1), duration_ms=json_track.get('duration_ms', 0), explicit=json_track.get('explicit', False), genres=album_for_track.genres, album=album_for_track, artists=[_json_to_artist_track_object(artist) for artist in json_track.get('artists', [])], ids=_json_to_ids(json_track) ) logger.debug(f"Successfully tracked metadata for track {ids}") return track_obj except MarketAvailabilityError: raise except Exception as e: logger.error(f"Failed to track metadata for track {ids}: {str(e)}") logger.debug(traceback.format_exc()) return None def _json_to_artist_album_object(artist_json: dict) -> artistAlbumObject: return artistAlbumObject( name=artist_json.get('name', ''), ids=_json_to_ids(artist_json) ) def _json_to_track_album_object(track_json: dict) -> CbTrackAlbumObject: return CbTrackAlbumObject( title=track_json.get('name', ''), disc_number=track_json.get('disc_number', 1), track_number=track_json.get('track_number', 1), duration_ms=track_json.get('duration_ms', 0), explicit=track_json.get('explicit', False), ids=_json_to_ids(track_json), artists=[artistTrackAlbumObject(name=a.get('name'), ids=_json_to_ids(a)) for a in track_json.get('artists', [])] ) def tracking_album(album_json, market: list[str] | None = None) -> Optional[albumObject]: if not album_json: logger.error("tracking_album received None or empty album_json.") return None try: album_name_for_check = album_json.get('name', f"Album ID {album_json.get('id', 'Unknown')}") api_album_markets = album_json.get('available_markets') _check_market_availability(album_name_for_check, "Album", api_album_markets, market) album_artists = [_json_to_artist_album_object(a) for a in album_json.get('artists', [])] album_tracks = [] simplified_tracks = album_json.get('tracks', {}).get('items', []) track_ids = [t['id'] for t in simplified_tracks if t and t.get('id')] full_tracks_data = [] if track_ids: # Batch fetch full track objects. The get_tracks method should handle chunking if necessary. full_tracks_data = Spo.get_tracks(track_ids, market=','.join(market) if market else None) track_items_to_process = [] if full_tracks_data and full_tracks_data.get('tracks'): track_items_to_process = full_tracks_data['tracks'] else: # Fallback to simplified if batch fetch fails track_items_to_process = simplified_tracks for track_item in track_items_to_process: if not track_item or not track_item.get('id'): continue # Simplified track object from album endpoint is enough for trackAlbumObject album_tracks.append(_json_to_track_album_object(track_item)) # Calculate total discs by finding the maximum disc number total_discs = 1 if album_tracks: disc_numbers = [track.disc_number for track in album_tracks if hasattr(track, 'disc_number') and track.disc_number] total_discs = max(disc_numbers, default=1) album_obj = albumObject( album_type=album_json.get('album_type'), title=album_json.get('name'), release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')), total_tracks=album_json.get('total_tracks'), total_discs=total_discs, # Set the calculated total discs genres=album_json.get('genres', []), images=album_json.get('images', []), copyrights=album_json.get('copyrights', []), ids=_json_to_ids(album_json), tracks=album_tracks, artists=album_artists ) logger.debug(f"Successfully tracked metadata for album {album_json.get('id', 'N/A')}") return album_obj except MarketAvailabilityError: raise except Exception as e: logger.error(f"Failed to track album metadata for album ID {album_json.get('id', 'N/A') if album_json else 'N/A'}: {str(e)}") logger.debug(traceback.format_exc()) return None def tracking_episode(ids, market: list[str] | None = None) -> Optional[trackObject]: try: json_episode = Spo.get_episode(ids) if not json_episode: logger.error(f"Failed to get episode details for ID: {ids} from Spotify API.") return None episode_name_for_check = json_episode.get('name', f'Episode ID {ids}') api_episode_markets = json_episode.get('available_markets') _check_market_availability(episode_name_for_check, "Episode", api_episode_markets, market) show_data = json_episode.get('show', {}) album_for_episode = albumTrackObject( album_type='show', title=show_data.get('name', 'Unknown Show'), total_tracks=show_data.get('total_episodes', 0), genres=show_data.get('genres', []), images=json_episode.get('images', []), ids=IDs(spotify=show_data.get('id')), artists=[artistTrackAlbumObject(name=show_data.get('publisher', ''))] ) episode_as_track = trackObject( title=json_episode.get('name', 'Unknown Episode'), duration_ms=json_episode.get('duration_ms', 0), explicit=json_episode.get('explicit', False), album=album_for_episode, artists=[artistTrackObject(name=show_data.get('publisher', ''))], ids=_json_to_ids(json_episode) ) logger.debug(f"Successfully tracked metadata for episode {ids}") return episode_as_track except MarketAvailabilityError: raise except Exception as e: logger.error(f"Failed to track episode metadata for ID {ids}: {str(e)}") logger.debug(traceback.format_exc()) return None def json_to_artist_album_track_playlist_object(artist_json: dict) -> artistAlbumTrackPlaylistObject: """Converts a JSON dict to an artistAlbumTrackPlaylistObject.""" return artistAlbumTrackPlaylistObject( name=artist_json.get('name', ''), ids=_json_to_ids(artist_json) ) def json_to_artist_track_playlist_object(artist_json: dict) -> artistTrackPlaylistObject: """Converts a JSON dict to an artistTrackPlaylistObject.""" return artistTrackPlaylistObject( name=artist_json.get('name', ''), ids=_json_to_ids(artist_json) ) def json_to_album_track_playlist_object(album_json: dict) -> albumTrackPlaylistObject: """Converts a JSON dict to an albumTrackPlaylistObject.""" return albumTrackPlaylistObject( album_type=album_json.get('album_type', ''), title=album_json.get('name', ''), total_tracks=album_json.get('total_tracks', 0), release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')), images=album_json.get('images', []), ids=_json_to_ids(album_json), artists=[json_to_artist_album_track_playlist_object(a) for a in album_json.get('artists', [])] ) def json_to_track_playlist_object(track_json: dict) -> Optional[trackPlaylistObject]: """Converts a JSON dict from a playlist item to a trackPlaylistObject.""" if not track_json: return None album_data = track_json.get('album', {}) return trackPlaylistObject( title=track_json.get('name', ''), disc_number=track_json.get('disc_number', 1), track_number=track_json.get('track_number', 1), duration_ms=track_json.get('duration_ms', 0), ids=_json_to_ids(track_json), album=json_to_album_track_playlist_object(album_data), artists=[json_to_artist_track_playlist_object(a) for a in track_json.get('artists', [])] )