From ed8f41d45fb0dfa92657abcbaeeea54f09a1f06a Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Tue, 10 Jun 2025 21:10:13 -0600 Subject: [PATCH] implemented typechecking in spotloader --- deezspot/easy_spoty.py | 25 + deezspot/libutils/logging_utils.py | 6 +- deezspot/models/callback/__init__.py | 5 +- deezspot/models/callback/album.py | 3 + deezspot/models/callback/artist.py | 3 +- deezspot/models/callback/common.py | 1 + deezspot/models/callback/playlist.py | 3 + deezspot/models/callback/track.py | 2 + deezspot/models/download/preferences.py | 1 + deezspot/spotloader/__download__.py | 1027 +++++++++++------------ deezspot/spotloader/__init__.py | 168 +--- deezspot/spotloader/__spo_api__.py | 443 ++++------ 12 files changed, 759 insertions(+), 928 deletions(-) diff --git a/deezspot/easy_spoty.py b/deezspot/easy_spoty.py index 64f15d3..dd87807 100644 --- a/deezspot/easy_spoty.py +++ b/deezspot/easy_spoty.py @@ -104,6 +104,31 @@ class Spo: return track_json + @classmethod + def get_tracks(cls, ids: list, market: str = None, client_id=None, client_secret=None): + """ + Get information for multiple tracks by a list of IDs. + + Args: + ids (list): A list of Spotify track IDs. + market (str, optional): An ISO 3166-1 alpha-2 country code. + client_id (str, optional): Optional custom Spotify client ID. + client_secret (str, optional): Optional custom Spotify client secret. + + Returns: + dict: A dictionary containing a list of track information. + """ + api = cls.__get_api(client_id, client_secret) + try: + tracks_json = api.tracks(ids, market=market) + except SpotifyException as error: + if error.http_status in cls.__error_codes: + # Create a string of the first few IDs for the error message + ids_preview = ', '.join(ids[:3]) + ('...' if len(ids) > 3 else '') + raise InvalidLink(f"one or more IDs in the list: [{ids_preview}]") + + return tracks_json + @classmethod def get_album(cls, ids, client_id=None, client_secret=None): """ diff --git a/deezspot/libutils/logging_utils.py b/deezspot/libutils/logging_utils.py index b88b57c..6dfb428 100644 --- a/deezspot/libutils/logging_utils.py +++ b/deezspot/libutils/logging_utils.py @@ -4,6 +4,7 @@ import logging import sys from typing import Optional, Callable, Dict, Any, Union import json +from dataclasses import asdict from deezspot.models.callback.callbacks import ( BaseStatusObject, @@ -165,7 +166,8 @@ def report_progress( raise TypeError(f"callback_obj must be of type trackCallbackObject, albumCallbackObject, or playlistCallbackObject, got {type(callback_obj)}") # Convert the callback object to a dictionary and report it + report_dict = asdict(callback_obj) if reporter: - reporter.report(callback_obj.__dict__) + reporter.report(report_dict) else: - logger.info(json.dumps(callback_obj.__dict__)) \ No newline at end of file + logger.info(json.dumps(report_dict)) \ No newline at end of file diff --git a/deezspot/models/callback/__init__.py b/deezspot/models/callback/__init__.py index ccaf979..6421dcf 100644 --- a/deezspot/models/callback/__init__.py +++ b/deezspot/models/callback/__init__.py @@ -6,7 +6,7 @@ Callback data models for the music metadata schema. from .common import IDs, ReleaseDate from .artist import artistObject, albumArtistObject -from .album import albumObject, trackAlbumObject +from .album import albumObject, trackAlbumObject, artistAlbumObject from .track import trackObject, artistTrackObject, albumTrackObject, playlistTrackObject from .playlist import playlistObject, trackPlaylistObject, albumTrackPlaylistObject, artistTrackPlaylistObject from .callbacks import ( @@ -22,4 +22,5 @@ from .callbacks import ( trackCallbackObject, albumCallbackObject, playlistCallbackObject -) \ No newline at end of file +) +from .user import userObject \ No newline at end of file diff --git a/deezspot/models/callback/album.py b/deezspot/models/callback/album.py index 617f760..5ae4d31 100644 --- a/deezspot/models/callback/album.py +++ b/deezspot/models/callback/album.py @@ -29,6 +29,7 @@ class trackAlbumObject: disc_number: int = 1 track_number: int = 1 duration_ms: int = 0 + explicit: bool = False genres: List[str] = field(default_factory=list) ids: IDs = field(default_factory=IDs) artists: List[artistTrackAlbumObject] = field(default_factory=list) @@ -43,6 +44,8 @@ class albumObject: release_date: Dict[str, Any] = field(default_factory=dict) total_tracks: int = 0 genres: List[str] = field(default_factory=list) + images: List[Dict[str, Any]] = field(default_factory=list) + copyrights: List[Dict[str, str]] = field(default_factory=list) ids: IDs = field(default_factory=IDs) tracks: List[trackAlbumObject] = field(default_factory=list) artists: List[artistAlbumObject] = field(default_factory=list) \ No newline at end of file diff --git a/deezspot/models/callback/artist.py b/deezspot/models/callback/artist.py index 691693f..8d7903a 100644 --- a/deezspot/models/callback/artist.py +++ b/deezspot/models/callback/artist.py @@ -1,7 +1,7 @@ #!/usr/bin/python3 from dataclasses import dataclass, field -from typing import List, Optional +from typing import List, Optional, Dict, Any from .common import IDs @@ -23,5 +23,6 @@ class artistObject: type: str = "artist" name: str = "" genres: List[str] = field(default_factory=list) + images: List[Dict[str, Any]] = field(default_factory=list) ids: IDs = field(default_factory=IDs) albums: List[albumArtistObject] = field(default_factory=list) \ No newline at end of file diff --git a/deezspot/models/callback/common.py b/deezspot/models/callback/common.py index d05d567..d390c11 100644 --- a/deezspot/models/callback/common.py +++ b/deezspot/models/callback/common.py @@ -10,6 +10,7 @@ class IDs: spotify: Optional[str] = None deezer: Optional[str] = None isrc: Optional[str] = None + upc: Optional[str] = None @dataclass diff --git a/deezspot/models/callback/playlist.py b/deezspot/models/callback/playlist.py index b297e24..f40f734 100644 --- a/deezspot/models/callback/playlist.py +++ b/deezspot/models/callback/playlist.py @@ -20,6 +20,8 @@ class albumTrackPlaylistObject: album_type: str = "" # "album" | "single" | "compilation" title: str = "" release_date: Dict[str, Any] = field(default_factory=dict) # ReleaseDate as dict + total_tracks: int = 0 + images: List[Dict[str, Any]] = field(default_factory=list) ids: IDs = field(default_factory=IDs) artists: List[artistAlbumTrackPlaylistObject] = field(default_factory=list) @@ -54,4 +56,5 @@ class playlistObject: description: Optional[str] = None owner: userObject = field(default_factory=userObject) tracks: List[trackPlaylistObject] = field(default_factory=list) + images: List[Dict[str, Any]] = field(default_factory=list) ids: IDs = field(default_factory=IDs) \ No newline at end of file diff --git a/deezspot/models/callback/track.py b/deezspot/models/callback/track.py index cf0ba09..7a40cf3 100644 --- a/deezspot/models/callback/track.py +++ b/deezspot/models/callback/track.py @@ -32,6 +32,7 @@ class albumTrackObject: release_date: Dict[str, Any] = field(default_factory=dict) # ReleaseDate as dict total_tracks: int = 0 genres: List[str] = field(default_factory=list) + images: List[Dict[str, Any]] = field(default_factory=list) ids: IDs = field(default_factory=IDs) artists: List[artistAlbumTrackObject] = field(default_factory=list) @@ -52,6 +53,7 @@ class trackObject: disc_number: int = 1 track_number: int = 1 duration_ms: int = 0 # mandatory + explicit: bool = False genres: List[str] = field(default_factory=list) album: albumTrackObject = field(default_factory=albumTrackObject) artists: List[artistTrackObject] = field(default_factory=list) diff --git a/deezspot/models/download/preferences.py b/deezspot/models/download/preferences.py index 8b288a5..c72e08b 100644 --- a/deezspot/models/download/preferences.py +++ b/deezspot/models/download/preferences.py @@ -8,6 +8,7 @@ class Preferences: self.output_dir = None self.ids = None self.json_data = None + self.playlist_tracks_json = None self.recursive_quality = None self.recursive_download = None self.not_interface = None diff --git a/deezspot/spotloader/__download__.py b/deezspot/spotloader/__download__.py index b2b22a4..5b45a8d 100644 --- a/deezspot/spotloader/__download__.py +++ b/deezspot/spotloader/__download__.py @@ -39,26 +39,17 @@ from deezspot.libutils.cleanup_utils import ( ) from deezspot.libutils.skip_detection import check_track_exists from deezspot.models.callback import ( - trackCallbackObject, - albumCallbackObject, - playlistCallbackObject, - initializingObject, - skippedObject, - retryingObject, - realTimeObject, - errorObject, - doneObject, - summaryObject, - failedTrackObject, - trackObject as cb_trackObject, - albumObject as cb_albumObject, - playlistObject as cb_playlistObject, - artistTrackObject, - albumTrackObject, - playlistTrackObject, + trackObject, albumTrackObject, playlistTrackObject, artistTrackObject, + trackCallbackObject, albumCallbackObject, playlistCallbackObject, + initializingObject, skippedObject, retryingObject, realTimeObject, errorObject, doneObject, + failedTrackObject, summaryObject, + albumObject, artistAlbumObject, + playlistObject, userObject, IDs ) +from deezspot.spotloader.__spo_api__ import tracking, json_to_track_playlist_object +from datetime import datetime # --- Global retry counter variables --- GLOBAL_RETRY_COUNT = 0 @@ -67,6 +58,104 @@ GLOBAL_MAX_RETRIES = 100 # Adjust this value as needed # --- Global tracking of active downloads --- # Moved to deezspot.libutils.cleanup_utils +def _track_object_to_dict(track_obj: trackObject) -> dict: + """Converts a trackObject into a dictionary for legacy functions like taggers.""" + if not track_obj: + return {} + + tags = {} + + # Track details + tags['music'] = track_obj.title + tags['tracknum'] = track_obj.track_number + tags['discnum'] = track_obj.disc_number + tags['duration'] = track_obj.duration_ms // 1000 if track_obj.duration_ms else 0 + if track_obj.ids: + tags['ids'] = track_obj.ids.spotify + tags['isrc'] = track_obj.ids.isrc + + tags['artist'] = "; ".join([artist.name for artist in track_obj.artists]) + + # Album details + if track_obj.album: + album = track_obj.album + tags['album'] = album.title + tags['ar_album'] = "; ".join([artist.name for artist in album.artists]) + tags['nb_tracks'] = album.total_tracks + if album.release_date and 'year' in album.release_date: + try: + # Create a datetime object for the tagger + tags['year'] = datetime( + year=album.release_date['year'], + month=album.release_date.get('month', 1), + day=album.release_date.get('day', 1) + ) + except (ValueError, TypeError): + tags['year'] = None # Handle invalid date parts + else: + tags['year'] = None + + if album.ids: + tags['upc'] = album.ids.upc + tags['album_id'] = album.ids.spotify + + if album.images: + tags['image'] = max(album.images, key=lambda i: i.get('height', 0) * i.get('width', 0)).get('url') if album.images else None + else: + tags['image'] = None + + # These are not in the model, add them from album if they exist + tags['label'] = getattr(album, 'label', '') + tags['copyright'] = getattr(album, 'copyright', '') + + # Default/Placeholder values + tags['bpm'] = tags.get('bpm', 'Unknown') + tags['gain'] = tags.get('gain', 'Unknown') + tags['lyric'] = tags.get('lyric', '') + tags['author'] = tags.get('author', '') + tags['composer'] = tags.get('composer', '') + tags['lyricist'] = tags.get('lyricist', '') + tags['version'] = tags.get('version', '') + + return tags + +def _album_object_to_dict(album_obj: albumObject) -> dict: + """Converts an albumObject into a dictionary for legacy functions.""" + if not album_obj: + return {} + + tags = {} + + # Album details + tags['album'] = album_obj.title + tags['ar_album'] = "; ".join([artist.name for artist in album_obj.artists]) + tags['nb_tracks'] = album_obj.total_tracks + if album_obj.release_date and 'year' in album_obj.release_date: + try: + tags['year'] = datetime( + year=album_obj.release_date['year'], + month=album_obj.release_date.get('month', 1), + day=album_obj.release_date.get('day', 1) + ) + except (ValueError, TypeError): + tags['year'] = None + else: + tags['year'] = None + + if album_obj.ids: + tags['upc'] = album_obj.ids.upc + tags['album_id'] = album_obj.ids.spotify + + if album_obj.images: + tags['image'] = max(album_obj.images, key=lambda i: i.get('height', 0) * i.get('width', 0)).get('url') if album_obj.images else None + else: + tags['image'] = None + + tags['label'] = getattr(album_obj, 'label', '') + tags['copyright'] = getattr(album_obj, 'copyright', '') + + return tags + class Download_JOB: session = None progress_reporter = None @@ -93,6 +182,7 @@ class EASY_DW: self.__link = preferences.link self.__output_dir = preferences.output_dir self.__song_metadata = preferences.song_metadata + self.__song_metadata_dict = _track_object_to_dict(self.__song_metadata) self.__not_interface = preferences.not_interface self.__quality_download = preferences.quality_download or "NORMAL" self.__recursive_download = preferences.recursive_download @@ -125,7 +215,7 @@ class EASY_DW: custom_track_format = getattr(self.__preferences, 'custom_track_format', None) pad_tracks = getattr(self.__preferences, 'pad_tracks', True) self.__song_path = set_path( - self.__song_metadata, + self.__song_metadata_dict, self.__output_dir, self.__song_quality, self.__file_format, @@ -139,7 +229,7 @@ class EASY_DW: custom_track_format = getattr(self.__preferences, 'custom_track_format', None) pad_tracks = getattr(self.__preferences, 'pad_tracks', True) self.__song_path = set_path( - self.__song_metadata, + self.__song_metadata_dict, self.__output_dir, self.__song_quality, self.__file_format, @@ -152,7 +242,7 @@ class EASY_DW: def __write_track(self) -> None: self.__set_song_path() self.__c_track = Track( - self.__song_metadata, self.__song_path, + self.__song_metadata_dict, self.__song_path, self.__file_format, self.__song_quality, self.__link, self.__ids ) @@ -162,7 +252,7 @@ class EASY_DW: def __write_episode(self) -> None: self.__set_episode_path() self.__c_episode = Episode( - self.__song_metadata, self.__song_path, + self.__song_metadata_dict, self.__song_path, self.__file_format, self.__song_quality, self.__link, self.__ids ) @@ -183,13 +273,14 @@ class EASY_DW: "url": f"https://open.spotify.com/playlist/{playlist_data.get('id', '')}" } elif self.__parent == "album": - total_tracks_val = self.__song_metadata.get('nb_tracks', 0) + album_meta = self.__song_metadata.album + total_tracks_val = album_meta.total_tracks parent_info = { "type": "album", - "title": self.__song_metadata.get('album', ''), - "artist": self.__song_metadata.get('album_artist', self.__song_metadata.get('ar_album', '')), + "title": album_meta.title, + "artist": "; ".join([a.name for a in album_meta.artists]), "total_tracks": total_tracks_val, - "url": f"https://open.spotify.com/album/{self.__song_metadata.get('album_id', '')}" + "url": f"https://open.spotify.com/album/{album_meta.ids.spotify if album_meta.ids else ''}" } return parent_info, total_tracks_val @@ -287,9 +378,10 @@ class EASY_DW: def easy_dw(self) -> Track: # Request the image data - pic = self.__song_metadata['image'] - image = request(pic).content - self.__song_metadata['image'] = image + pic = self.__song_metadata_dict.get('image') + image = request(pic).content if pic else None + if image: + self.__song_metadata_dict['image'] = image try: # Initialize success to False, it will be set to True if download_try is successful @@ -301,8 +393,8 @@ class EASY_DW: self.download_try() # This should set self.__c_track.success = True if successful except Exception as e: - song_title = self.__song_metadata.get('music', 'Unknown Song') - artist_name = self.__song_metadata.get('artist', 'Unknown Artist') + song_title = self.__song_metadata.title + artist_name = "; ".join([a.name for a in self.__song_metadata.artists]) error_message = f"Download failed for '{song_title}' by '{artist_name}' (URL: {self.__link}). Original error: {str(e)}" logger.error(error_message) traceback.print_exc() @@ -322,8 +414,8 @@ class EASY_DW: # Final check for non-skipped tracks that might have failed after download_try returned. # This handles cases where download_try didn't raise an exception but self.__c_track.success is still False. if hasattr(self, '_EASY_DW__c_track') and self.__c_track and not self.__c_track.success: - song_title = self.__song_metadata.get('music', 'Unknown Song') - artist_name = self.__song_metadata.get('artist', 'Unknown Artist') + song_title = self.__song_metadata.title + artist_name = "; ".join([a.name for a in self.__song_metadata.artists]) original_error_msg = getattr(self.__c_track, 'error_message', "Download failed for an unspecified reason after attempt.") error_msg_template = "Cannot download '{title}' by '{artist}'. Reason: {reason}" final_error_msg = error_msg_template.format(title=song_title, artist=artist_name, reason=original_error_msg) @@ -334,6 +426,7 @@ class EASY_DW: # If we reach here, the track should be successful and not skipped. if hasattr(self, '_EASY_DW__c_track') and self.__c_track and self.__c_track.success: + self.__c_track.tags = self.__song_metadata_dict write_tags(self.__c_track) # Unregister the final successful file path after all operations are done. @@ -343,9 +436,9 @@ class EASY_DW: return self.__c_track def download_try(self) -> Track: - current_title = self.__song_metadata.get('music') - current_album = self.__song_metadata.get('album') - current_artist = self.__song_metadata.get('artist') + current_title = self.__song_metadata.title + current_album = self.__song_metadata.album.title if self.__song_metadata.album else '' + current_artist = "; ".join([a.name for a in self.__song_metadata.artists]) # Call the new check_track_exists function from skip_detection.py # It needs: original_song_path, title, album, convert_to, logger @@ -373,90 +466,78 @@ class EASY_DW: self.__c_track.was_skipped = True parent_info, total_tracks_val = self._get_parent_info() + + # Build track object + track_obj = self.__song_metadata - track_for_callback = cb_trackObject( - title=current_title, - artists=[artistTrackObject(name=artist.strip()) for artist in current_artist.split(';')], - ids=IDs(spotify=self.__ids) - ) + # Build parent object + parent_obj = None + if self.__parent == "album": + parent_obj = self.__song_metadata.album + elif self.__parent == "playlist" and parent_info: + parent_obj = playlistTrackObject( + title=parent_info.get("name"), + owner=userObject(name=parent_info.get("owner")) + ) - initializing_status = initializingObject( - ids=IDs(spotify=self.__ids), + # Build status object + status_obj = skippedObject( + ids=self.__song_metadata.ids, + reason=f"Track already exists at '{existing_file_path}'", convert_to=self.__preferences.convert_to, bitrate=self.__preferences.bitrate ) - parent_for_callback = None - if parent_info: - if parent_info.get("type") == "album": - parent_for_callback = albumTrackObject( - title=parent_info.get("title"), - album_type=self.__song_metadata.get("album_type", "album"), - total_tracks=parent_info.get("total_tracks"), - ids=IDs(spotify=self.__song_metadata.get("album_id")) - ) - elif parent_info.get("type") == "playlist": - parent_for_callback = playlistTrackObject( - title=parent_info.get("name"), - owner=userObject(name=parent_info.get("owner")), - ids=IDs(spotify=parent_info.get("url", "").split("/")[-1]) - ) - - callback_object = trackCallbackObject( - track=track_for_callback, - status_info=initializing_status, + # Build callback object + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, current_track=getattr(self.__preferences, 'track_number', None), total_tracks=total_tracks_val, - parent=parent_for_callback + parent=parent_obj ) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj ) return self.__c_track # Report initializing status for the track download parent_info, total_tracks_val = self._get_parent_info() - track_for_callback = cb_trackObject( - title=current_title, - artists=[artistTrackObject(name=artist.strip()) for artist in current_artist.split(';')], - ids=IDs(spotify=self.__ids) - ) + + # Build track object + track_obj = self.__song_metadata - initializing_status = initializingObject( - ids=IDs(spotify=self.__ids), + # Build parent object + parent_obj = None + if self.__parent == "album": + parent_obj = self.__song_metadata.album + elif self.__parent == "playlist" and parent_info: + parent_obj = playlistTrackObject( + title=parent_info.get("name"), + owner=userObject(name=parent_info.get("owner")) + ) + + # Build status object + status_obj = initializingObject( + ids=self.__song_metadata.ids, convert_to=self.__preferences.convert_to, bitrate=self.__preferences.bitrate ) - parent_for_callback = None - if parent_info: - if parent_info.get("type") == "album": - parent_for_callback = albumTrackObject( - title=parent_info.get("title"), - album_type=self.__song_metadata.get("album_type", "album"), - total_tracks=parent_info.get("total_tracks"), - ids=IDs(spotify=self.__song_metadata.get("album_id")) - ) - elif parent_info.get("type") == "playlist": - parent_for_callback = playlistTrackObject( - title=parent_info.get("name"), - owner=userObject(name=parent_info.get("owner")), - ids=IDs(spotify=parent_info.get("url", "").split("/")[-1]) - ) - - callback_object = trackCallbackObject( - track=track_for_callback, - status_info=initializing_status, + # Build callback object + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, current_track=getattr(self.__preferences, 'track_number', None), total_tracks=total_tracks_val, - parent=parent_for_callback + parent=parent_obj ) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj ) # If track does not exist in the desired final format, proceed with download/conversion @@ -485,9 +566,9 @@ class EASY_DW: try: with open(self.__song_path, "wb") as f: - if self.__real_time_dl and self.__song_metadata.get("duration"): + if self.__real_time_dl and self.__song_metadata_dict.get("duration") and self.__song_metadata_dict.get("duration") > 0: # Real-time download path - duration = self.__song_metadata["duration"] + duration = self.__song_metadata_dict["duration"] if duration > 0: rate_limit = total_size / duration chunk_size = 4096 @@ -512,31 +593,27 @@ class EASY_DW: if current_percentage > self._last_reported_percentage: self._last_reported_percentage = current_percentage - real_time_status = realTimeObject( - ids=IDs(spotify=self.__ids), - convert_to=self.__convert_to, - bitrate=self.__bitrate, + # Build status object + status_obj = realTimeObject( + ids=self.__song_metadata.ids, time_elapsed=int((current_time - start_time) * 1000), - progress=current_percentage + progress=current_percentage, + convert_to=self.__convert_to, + bitrate=self.__bitrate ) - track_for_callback = cb_trackObject( - title=self.__song_metadata.get("music", ""), - artists=[artistTrackObject(name=artist.strip()) for artist in self.__song_metadata.get("artist", "").split(';')], - ids=IDs(spotify=self.__ids) - ) - - callback_object = trackCallbackObject( - track=track_for_callback, - status_info=real_time_status, + # Build callback object + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, current_track=getattr(self.__preferences, 'track_number', None), total_tracks=total_tracks_val, - parent=parent_for_callback + parent=parent_obj ) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj ) # Rate limiting (if needed) @@ -591,90 +668,38 @@ class EASY_DW: if os.path.exists(self.__song_path): os.remove(self.__song_path) unregister_active_download(self.__song_path) - progress_data = { - "type": "track", - "status": "retrying", - "retry_count": retries, - "seconds_left": retry_delay, - "song": self.__song_metadata.get('music', ''), - "artist": self.__song_metadata.get('artist', ''), - "album": self.__song_metadata.get('album', ''), - "error": str(e), - "url": self.__link, - "convert_to": self.__convert_to, - "bitrate": self.__bitrate - } - # Add parent info based on parent type - if self.__parent == "playlist" and hasattr(self.__preferences, "json_data"): - playlist_data = self.__preferences.json_data - playlist_name = playlist_data.get('name', 'unknown') - total_tracks = playlist_data.get('tracks', {}).get('total', 'unknown') - current_track = getattr(self.__preferences, 'track_number', 0) - playlist_owner = playlist_data.get('owner', {}).get('display_name', 'unknown') - playlist_id = playlist_data.get('id', '') - - progress_data.update({ - "current_track": current_track, - "total_tracks": total_tracks, - "parent": { - "type": "playlist", - "name": playlist_name, - "owner": playlist_owner, - "total_tracks": total_tracks, - "url": f"https://open.spotify.com/playlist/{playlist_id}" - } - }) - elif self.__parent == "album": - album_name = self.__song_metadata.get('album', '') - album_artist = self.__song_metadata.get('album_artist', self.__song_metadata.get('ar_album', '')) - total_tracks = self.__song_metadata.get('nb_tracks', 0) - current_track = getattr(self.__preferences, 'track_number', 0) - album_id = self.__song_metadata.get('album_id', '') - - progress_data.update({ - "current_track": current_track, - "total_tracks": total_tracks, - "parent": { - "type": "album", - "title": album_name, - "artist": album_artist, - "total_tracks": total_tracks, - "url": f"https://open.spotify.com/album/{album_id}" - } - }) - - retrying_status = retryingObject( - ids=IDs(spotify=self.__ids), - convert_to=self.__convert_to, - bitrate=self.__bitrate, + # Build status object + status_obj = retryingObject( + ids=self.__song_metadata.ids, retry_count=retries, seconds_left=retry_delay, - error=str(e) + error=str(e), + convert_to=self.__convert_to, + bitrate=self.__bitrate ) - track_for_callback = cb_trackObject( - title=self.__song_metadata.get('music', 'Unknown Episode'), - artists=[artistTrackObject(name=self.__song_metadata.get('artist', 'Unknown Show'))], - ids=IDs(spotify=self.__ids) + # Build callback object + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, + current_track=getattr(self.__preferences, 'track_number', None), + total_tracks=total_tracks_val, + parent=parent_obj ) - - callback_object = trackCallbackObject( - track=track_for_callback, - status_info=retrying_status - ) - + report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj ) if retries >= max_retries or GLOBAL_RETRY_COUNT >= GLOBAL_MAX_RETRIES: + # Final cleanup before giving up if os.path.exists(self.__song_path): - os.remove(self.__song_path) # Clean up partial file + os.remove(self.__song_path) # Add track info to exception - track_name = self.__song_metadata.get('music', 'Unknown Track') - artist_name = self.__song_metadata.get('artist', 'Unknown Artist') + track_name = self.__song_metadata.title + artist_name = "; ".join([a.name for a in self.__song_metadata.artists]) final_error_msg = f"Maximum retry limit reached for '{track_name}' by '{artist_name}' (local: {max_retries}, global: {GLOBAL_MAX_RETRIES}). Last error: {str(e)}" # Store error on track object if hasattr(self, '_EASY_DW__c_track') and self.__c_track: @@ -685,11 +710,12 @@ class EASY_DW: retry_delay += retry_delay_increase # Use the custom retry delay increase # Save cover image if requested, after successful download and before conversion - if self.__preferences.save_cover and hasattr(self, '_EASY_DW__song_path') and self.__song_path and self.__song_metadata.get('image'): + if self.__preferences.save_cover and hasattr(self, '_EASY_DW__song_path') and self.__song_path and self.__song_metadata_dict.get('image'): try: track_directory = dirname(self.__song_path) # Ensure the directory exists (it should, from os.makedirs earlier) - save_cover_image(self.__song_metadata['image'], track_directory, "cover.jpg") + image_bytes = request(self.__song_metadata_dict['image']).content + save_cover_image(image_bytes, track_directory, "cover.jpg") logger.info(f"Saved cover image for track in {track_directory}") except Exception as e_img_save: logger.warning(f"Failed to save cover image for track: {e_img_save}") @@ -706,27 +732,26 @@ class EASY_DW: else: error_msg = f"Audio conversion failed: {original_error_str}" - error_status = errorObject( - ids=IDs(spotify=self.__ids), + # Build status object + status_obj = errorObject( + ids=self.__song_metadata.ids, + error=error_msg, convert_to=self.__convert_to, - bitrate=self.__bitrate, - error=error_msg + bitrate=self.__bitrate ) - track_for_callback = cb_trackObject( - title=self.__song_metadata.get('music', ''), - artists=[artistTrackObject(name=artist.strip()) for artist in self.__song_metadata.get('artist', '').split(';')], - ids=IDs(spotify=self.__ids) - ) - callback_object = trackCallbackObject( - track=track_for_callback, - status_info=error_status, + + # Build callback object + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, current_track=getattr(self.__preferences, 'track_number', None), total_tracks=total_tracks_val, - parent=parent_info + parent=parent_obj ) + report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj ) logger.error(f"Audio conversion error: {error_msg}") @@ -741,28 +766,28 @@ class EASY_DW: self.__convert_audio() except Exception as conv_e: # If conversion fails twice, create a final error report - error_msg_2 = f"Audio conversion failed after retry for '{self.__song_metadata.get('music', 'Unknown Track')}'. Original error: {str(conv_e)}" - error_status_2 = errorObject( - ids=IDs(spotify=self.__ids), + error_msg_2 = f"Audio conversion failed after retry for '{self.__song_metadata.title}'. Original error: {str(conv_e)}" + + # Build status object + status_obj = errorObject( + ids=self.__song_metadata.ids, + error=error_msg_2, convert_to=self.__convert_to, - bitrate=self.__bitrate, - error=error_msg_2 + bitrate=self.__bitrate ) - track_for_callback_2 = cb_trackObject( - title=self.__song_metadata.get('music', 'Unknown Track'), - artists=[artistTrackObject(name=artist.strip()) for artist in self.__song_metadata.get('artist', '').split(';')], - ids=IDs(spotify=self.__ids) - ) - callback_object_2 = trackCallbackObject( - track=track_for_callback_2, - status_info=error_status_2, + + # Build callback object + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, current_track=getattr(self.__preferences, 'track_number', None), total_tracks=total_tracks_val, - parent=parent_info + parent=parent_obj ) + report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object_2 + callback_obj=callback_obj ) logger.error(error_msg) @@ -771,60 +796,53 @@ class EASY_DW: # Store error on track object if hasattr(self, '_EASY_DW__c_track') and self.__c_track: self.__c_track.success = False - self.__c_track.error_message = error_msg + self.__c_track.error_message = error_msg_2 raise TrackNotFound(message=error_msg, url=self.__link) from conv_e if hasattr(self, '_EASY_DW__c_track') and self.__c_track: self.__c_track.success = True + self.__c_track.tags = self.__song_metadata_dict write_tags(self.__c_track) # Create done status report - song = self.__song_metadata.get("music", "") - artist = self.__song_metadata.get("artist", "") parent_info, total_tracks_val = self._get_parent_info() - current_track_val = None - summary_data = None - - if self.__parent == "playlist" and hasattr(self.__preferences, "json_data"): - playlist_data = self.__preferences.json_data - total_tracks_val = playlist_data.get('tracks', {}).get('total', 'unknown') - current_track_val = getattr(self.__preferences, 'track_number', 0) - elif self.__parent == "album": - total_tracks_val = self.__song_metadata.get('nb_tracks', 0) - current_track_val = getattr(self.__preferences, 'track_number', 0) - + current_track_val = getattr(self.__preferences, 'track_number', None) + summary_obj = None if self.__parent is None: - track_obj = cb_trackObject(title=f"{song} - {artist}") + # Create a summary object for single-track downloads + successful_track_list = [track_obj] if self.__c_track.success and not getattr(self.__c_track, 'was_skipped', False) else [] + skipped_track_list = [track_obj] if getattr(self.__c_track, 'was_skipped', False) else [] + summary_obj = summaryObject( - successful_tracks=[track_obj], - total_successful=1 + successful_tracks=successful_track_list, + skipped_tracks=skipped_track_list, + failed_tracks=[], + total_successful=len(successful_track_list), + total_skipped=len(skipped_track_list), + total_failed=0 ) - done_status = doneObject( - ids=IDs(spotify=self.__ids), + # Build status object + status_obj = doneObject( + ids=self.__song_metadata.ids, + summary=summary_obj, convert_to=self.__convert_to, - bitrate=self.__bitrate, - summary=summary_obj + bitrate=self.__bitrate ) - track_for_callback = cb_trackObject( - title=song, - artists=[artistTrackObject(name=a.strip()) for a in artist.split(';')], - ids=IDs(spotify=self.__ids) - ) - - callback_object = trackCallbackObject( - track=track_for_callback, - status_info=done_status, + # Build callback object + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, current_track=current_track_val, total_tracks=total_tracks_val, - parent=parent_for_callback + parent=parent_obj ) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj ) if hasattr(self, '_EASY_DW__c_track') and self.__c_track and self.__c_track.success: @@ -867,26 +885,27 @@ class EASY_DW: except Exception as e: global GLOBAL_RETRY_COUNT GLOBAL_RETRY_COUNT += 1 - retries += 1 + + track_obj = self.__song_metadata + status_obj = retryingObject( + ids=track_obj.ids, + retry_count=retries, + seconds_left=retry_delay, + error=str(e), + convert_to=self.__convert_to, + bitrate=self.__bitrate + ) + callback_obj = trackCallbackObject(track=track_obj, status_info=status_obj) # Log retry attempt with structured data report_progress( reporter=Download_JOB.progress_reporter, - report_type="episode", - status="retrying", - retry_count=retries, - seconds_left=retry_delay, - song=self.__song_metadata.get('music', 'Unknown Episode'), - artist=self.__song_metadata.get('artist', 'Unknown Show'), - error=str(e), - url=self.__link, - convert_to=self.__convert_to, - bitrate=self.__bitrate + callback_obj=callback_obj ) if retries >= max_retries or GLOBAL_RETRY_COUNT >= GLOBAL_MAX_RETRIES: if os.path.exists(self.__song_path): os.remove(self.__song_path) # Clean up partial file - track_name = self.__song_metadata.get('music', 'Unknown Episode') - artist_name = self.__song_metadata.get('artist', 'Unknown Show') + track_name = self.__song_metadata.title + artist_name = "; ".join([a.name for a in self.__song_metadata.artists]) final_error_msg = f"Maximum retry limit reached for '{track_name}' by '{artist_name}' (local: {max_retries}, global: {GLOBAL_MAX_RETRIES}). Last error: {str(e)}" if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode: self.__c_episode.success = False @@ -903,9 +922,9 @@ class EASY_DW: try: with open(self.__song_path, "wb") as f: c_stream = stream.input_stream.stream() - if self.__real_time_dl and self.__song_metadata.get("duration") and self.__song_metadata["duration"] > 0: + if self.__real_time_dl and self.__song_metadata_dict.get("duration") and self.__song_metadata_dict["duration"] > 0: # Restored Real-time download logic for episodes - duration = self.__song_metadata["duration"] + duration = self.__song_metadata_dict["duration"] rate_limit = total_size / duration chunk_size = 4096 bytes_written = 0 @@ -937,8 +956,8 @@ class EASY_DW: except: pass unregister_active_download(self.__song_path) - episode_title = self.__song_metadata.get('music', 'Unknown Episode') - artist_name = self.__song_metadata.get('artist', 'Unknown Show') + episode_title = self.__song_metadata.title + artist_name = "; ".join([a.name for a in self.__song_metadata.artists]) final_error_msg = f"Error during real-time download for episode '{episode_title}' by '{artist_name}' (URL: {self.__link}). Error: {str(e_realtime)}" logger.error(final_error_msg) if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode: @@ -963,8 +982,8 @@ class EASY_DW: except: pass unregister_active_download(self.__song_path) - episode_title = self.__song_metadata.get('music', 'Unknown Episode') - artist_name = self.__song_metadata.get('artist', 'Unknown Show') + episode_title = self.__song_metadata.title + artist_name = "; ".join([a.name for a in self.__song_metadata.artists]) final_error_msg = f"Error during standard download for episode '{episode_title}' by '{artist_name}' (URL: {self.__link}). Error: {str(e_standard)}" logger.error(final_error_msg) if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode: @@ -989,7 +1008,7 @@ class EASY_DW: try: os.remove(self.__song_path) except: pass unregister_active_download(self.__song_path) - episode_title = self.__song_metadata.get('music', 'Unknown Episode') + episode_title = self.__song_metadata.title error_message = f"Failed to download episode '{episode_title}' (URL: {self.__link}) during file operations. Error: {str(e_outer)}" logger.error(error_message) if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode: @@ -1005,35 +1024,26 @@ class EASY_DW: # Conversion failed. __convert_audio or underlying convert_audio should have cleaned up its own temps. # The original downloaded file (if __convert_audio started from it) might still exist or be the self.__song_path. # Or self.__song_path might be a partially converted file if convert_audio failed mid-way and didn't cleanup perfectly. - episode_title = self.__song_metadata.get('music', 'Unknown Episode') + episode_title = self.__song_metadata.title error_message = f"Audio conversion for episode '{episode_title}' failed. Original error: {str(conv_e)}" - - track_for_callback = cb_trackObject( - title=episode_title, - artists=[artistTrackObject(name=self.__song_metadata.get('artist', 'Unknown Show'))], - ids=IDs(spotify=self.__ids) - ) - - error_status = errorObject( - ids=IDs(spotify=self.__ids), + + track_obj = self.__song_metadata + status_obj = errorObject( + ids=track_obj.ids, + error=error_message, convert_to=self.__convert_to, - bitrate=self.__bitrate, - error=error_message + bitrate=self.__bitrate ) - - callback_object = trackCallbackObject( - track=track_for_callback, - status_info=error_status - ) - + callback_obj = trackCallbackObject(track=track_obj, status_info=status_obj) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj ) # Attempt to remove self.__song_path, which is the latest known path for this episode if os.path.exists(self.__song_path): os.remove(self.__song_path) - + unregister_active_download(self.__song_path) # Unregister it as it failed/was removed + logger.error(error_message) if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode: self.__c_episode.success = False @@ -1043,6 +1053,7 @@ class EASY_DW: # If we reach here, download and any conversion were successful. if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode: self.__c_episode.success = True + self.__c_episode.tags = self.__song_metadata_dict write_tags(self.__c_episode) # Unregister the final successful file path for episodes, as it's now complete. # self.__c_episode.episode_path would have been updated by __convert_audio__ if conversion occurred. @@ -1089,86 +1100,122 @@ class DW_ALBUM: self.__ids = self.__preferences.ids self.__make_zip = self.__preferences.make_zip self.__output_dir = self.__preferences.output_dir - self.__song_metadata = self.__preferences.song_metadata + self.__album_metadata = self.__preferences.song_metadata self.__not_interface = self.__preferences.not_interface - self.__song_metadata_items = self.__song_metadata.items() def dw(self) -> Album: - # Helper function to find most frequent item in a list - def most_frequent(items): - if not items: - return None - # If items is a string with semicolons, split it - if isinstance(items, str) and ";" in items: - items = [item.strip() for item in items.split(";")] - # If it's still a string, return it directly - if isinstance(items, str): - return items - # Otherwise, find the most frequent item - return max(set(items), key=items.count) - # Report album initializing status - album_name = self.__song_metadata.get('album', 'Unknown Album') - - # Process album artist to get the most representative one - album_artist = self.__song_metadata.get('artist', 'Unknown Artist') - if isinstance(album_artist, list): - album_artist = most_frequent(album_artist) - elif isinstance(album_artist, str) and ";" in album_artist: - artists_list = [artist.strip() for artist in album_artist.split(";")] - album_artist = most_frequent(artists_list) if artists_list else album_artist - - total_tracks = self.__song_metadata.get('nb_tracks', 0) - album_id = self.__ids - - successful_tracks_objs = [] - failed_tracks_objs = [] - skipped_tracks_objs = [] + album_obj = self.__album_metadata + status_obj = initializingObject(ids=album_obj.ids) + callback_obj = albumCallbackObject(album=album_obj, status_info=status_obj) - for track in tracks: - track_info = cb_trackObject( - title=track.tags.get('music', 'Unknown Track'), - artists=[artistTrackObject(name=artist.strip()) for artist in track.tags.get('artist', 'Unknown Artist').split(';')], - ids=IDs(spotify=track.tags.get('ids')) - ) - if getattr(track, 'was_skipped', False): - skipped_tracks_objs.append(track_info) - elif track.success: - successful_tracks_objs.append(track_info) - else: - reason = getattr(track, 'error_message', 'Unknown reason') - failed_tracks_objs.append(failedTrackObject(track=track_info, reason=reason)) - - summary = summaryObject( - successful_tracks=successful_tracks_objs, - skipped_tracks=skipped_tracks_objs, - failed_tracks=failed_tracks_objs, - total_successful=len(successful_tracks_objs), - total_skipped=len(skipped_tracks_objs), - total_failed=len(failed_tracks_objs) - ) - - done_status = doneObject( - ids=IDs(spotify=album_id), - summary=summary - ) - - album_for_callback = cb_albumObject( - title=album_name, - artists=[artistTrackObject(name=album_artist.strip())], - total_tracks=total_tracks, - ids=IDs(spotify=album_id), - album_type=self.__song_metadata.get("album_type", "album") - ) - - callback_object = albumCallbackObject( - album=album_for_callback, - status_info=done_status - ) - report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj + ) + + pic_url = max(album_obj.images, key=lambda i: i.get('height', 0) * i.get('width', 0)).get('url') if album_obj.images else None + image_bytes = request(pic_url).content if pic_url else None + + album = Album(self.__ids) + album.image = image_bytes + album.nb_tracks = album_obj.total_tracks + album.album_name = album_obj.title + album.upc = album_obj.ids.upc + tracks = album.tracks + album.md5_image = self.__ids + album.tags = _album_object_to_dict(self.__album_metadata) # For top-level album tags if needed + + album_base_directory = get_album_directory( + album.tags, + self.__output_dir, + custom_dir_format=self.__preferences.custom_dir_format, + pad_tracks=self.__preferences.pad_tracks + ) + + for a, track_in_album in enumerate(album_obj.tracks): + current_track = a + 1 + + c_preferences = deepcopy(self.__preferences) + c_preferences.track_number = current_track + + try: + # Fetch full track object as album endpoint only provides simplified track objects + full_track_obj = tracking( + track_in_album.ids.spotify, + album_data_for_track=self.__preferences.json_data, # pass raw album json + market=self.__preferences.market + ) + + if not full_track_obj: + raise TrackNotFound(f"Could not fetch metadata for track ID {track_in_album.ids.spotify}") + + c_preferences.song_metadata = full_track_obj + c_preferences.ids = full_track_obj.ids.spotify + c_preferences.link = f"https://open.spotify.com/track/{c_preferences.ids}" + + track = EASY_DW(c_preferences, parent='album').easy_dw() + + except (TrackNotFound, Exception) as e: + # Create a failed track object for the summary + song_tags = _track_object_to_dict(track_in_album) if isinstance(track_in_album, trackObject) else {'music': 'Unknown Track'} + track = Track(tags=song_tags, song_path=None, file_format=None, quality=None, link=None, ids=track_in_album.ids) + track.success = False + track.error_message = str(e) + logger.warning(f"Track '{song_tags.get('music')}' from album '{album.album_name}' failed to download. Reason: {track.error_message}") + + tracks.append(track) + + # Save album cover image + if self.__preferences.save_cover and album.image and album_base_directory: + save_cover_image(album.image, album_base_directory, "cover.jpg") + + if self.__make_zip: + song_quality = tracks[0].quality if tracks and tracks[0].quality else 'HIGH' # Fallback quality + zip_name = create_zip( + tracks, + output_dir=self.__output_dir, + song_metadata=album.tags, + song_quality=song_quality, + custom_dir_format=self.__preferences.custom_dir_format + ) + album.zip_path = zip_name + + successful_tracks = [] + failed_tracks = [] + skipped_tracks = [] + for track in tracks: + # track.tags is a dict. + track_obj_for_cb = trackObject( + title=track.tags.get('music', 'Unknown Track'), + artists=[artistTrackObject(name=artist.strip()) for artist in track.tags.get('artist', '').split(';')] + ) + + if getattr(track, 'was_skipped', False): + skipped_tracks.append(track_obj_for_cb) + elif track.success: + successful_tracks.append(track_obj_for_cb) + else: + failed_tracks.append(failedTrackObject( + track=track_obj_for_cb, + reason=getattr(track, 'error_message', 'Unknown reason') + )) + + summary_obj = summaryObject( + successful_tracks=successful_tracks, + skipped_tracks=skipped_tracks, + failed_tracks=failed_tracks, + total_successful=len(successful_tracks), + total_skipped=len(skipped_tracks), + total_failed=len(failed_tracks) + ) + + status_obj = doneObject(ids=album_obj.ids, summary=summary_obj) + callback_obj = albumCallbackObject(album=album_obj, status_info=status_obj) + + report_progress( + reporter=Download_JOB.progress_reporter, + callback_obj=callback_obj ) return album @@ -1183,34 +1230,40 @@ class DW_PLAYLIST: self.__json_data = preferences.json_data self.__make_zip = self.__preferences.make_zip self.__output_dir = self.__preferences.output_dir - self.__song_metadata = self.__preferences.song_metadata + self.__song_metadata_list = self.__preferences.song_metadata + self.__playlist_tracks_json = getattr(self.__preferences, 'playlist_tracks_json', None) def dw(self) -> Playlist: playlist_name = self.__json_data.get('name', 'unknown') - total_tracks = self.__json_data.get('tracks', {}).get('total', 'unknown') - playlist_owner = self.__json_data.get('owner', {}).get('display_name', 'Unknown Owner') + playlist_owner_name = self.__json_data.get('owner', {}).get('display_name', 'Unknown Owner') playlist_id = self.__ids - # Report playlist initializing status - playlist_for_callback = cb_playlistObject( + # --- Build playlistObject for callbacks --- + playlist_tracks_for_cb = [] + if self.__playlist_tracks_json: + for item in self.__playlist_tracks_json: + track_data = item.get('track') + if track_data: + track_playlist_obj = json_to_track_playlist_object(track_data) + if track_playlist_obj: + playlist_tracks_for_cb.append(track_playlist_obj) + + playlist_obj_for_cb = playlistObject( title=playlist_name, - owner=userObject(name=playlist_owner), + owner=userObject(name=playlist_owner_name, ids=IDs(spotify=self.__json_data.get('owner', {}).get('id'))), ids=IDs(spotify=playlist_id), + images=self.__json_data.get('images', []), + tracks=playlist_tracks_for_cb, description=self.__json_data.get('description') ) + # --- End build playlistObject --- - initializing_status = initializingObject( - ids=IDs(spotify=playlist_id) - ) - - callback_object = playlistCallbackObject( - playlist=playlist_for_callback, - status_info=initializing_status - ) - + # Report playlist initializing status + status_obj = initializingObject(ids=IDs(spotify=playlist_id)) + callback_obj = playlistCallbackObject(playlist=playlist_obj_for_cb, status_info=status_obj) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj ) # --- Prepare the m3u playlist file --- @@ -1225,125 +1278,48 @@ class DW_PLAYLIST: playlist = Playlist() tracks = playlist.tracks - for idx, c_song_metadata in enumerate(self.__song_metadata): - # Check if c_song_metadata indicates a pre-identified error from metadata fetching stage - if isinstance(c_song_metadata, dict) and 'error_type' in c_song_metadata: - track_name = c_song_metadata.get('name', 'Unknown Track') - track_ids = c_song_metadata.get('ids', None) - error_message = c_song_metadata.get('error_message', 'Unknown error during metadata retrieval.') - error_type = c_song_metadata.get('error_type', 'UnknownError') - - logger.warning(f"Skipping download for track '{track_name}' (ID: {track_ids}) from playlist '{playlist_name}' due to {error_type}: {error_message}") - - # Create a placeholder Track object to represent this failure - # The link might not be available or relevant if IDs itself was the issue - failed_track_link = f"https://open.spotify.com/track/{track_ids}" if track_ids else None - - # Basic metadata for the Track object constructor - # We use c_song_metadata itself as it contains name, ids, etc. - # Ensure it's a dict for Track constructor - track_obj_metadata = c_song_metadata if isinstance(c_song_metadata, dict) else {'name': track_name, 'ids': track_ids} + for idx, c_song_metadata in enumerate(self.__song_metadata_list): + track = None - track = Track( - tags=track_obj_metadata, - song_path=None, - file_format=None, - quality=None, - link=failed_track_link, - ids=track_ids - ) + if isinstance(c_song_metadata, dict) and 'error_type' in c_song_metadata: + track_title = c_song_metadata.get('name', 'Unknown Track') + track_ids = c_song_metadata.get('ids') + error_message = c_song_metadata.get('error_message', 'Unknown error during metadata retrieval.') + logger.warning(f"Skipping download for track '{track_title}' (ID: {track_ids}) from playlist '{playlist_name}' due to error: {error_message}") + + track_tags = {'music': track_title, 'ids': track_ids} + track = Track(tags=track_tags, song_path=None, file_format=None, quality=None, link=None, ids=track_ids) track.success = False track.error_message = error_message tracks.append(track) - continue # Move to the next track in the playlist - - # Original handling for string type (though this should be less common with new error dicts) - if type(c_song_metadata) is str: - logger.warning(f"Encountered string as song metadata for a track in playlist '{playlist_name}': {c_song_metadata}. Treating as error.") - # Attempt to create a basic Track object with this string as an error message. - # This is a fallback for older error reporting styles. - error_track_name = "Unknown Track (error)" - error_track_ids = None - # Try to parse some info if the string is very specific, otherwise use generic. - if "Track not found" in c_song_metadata: - # This was an old message format, may not contain structured info. - pass # Keep generic error_track_name for now. - - track = Track( - tags={'name': error_track_name, 'ids': error_track_ids, 'artist': 'Unknown Artist'}, # Minimal metadata - song_path=None, - file_format=None, - quality=None, - link=None, # No reliable link from just an error string - ids=error_track_ids - ) - track.success = False - track.error_message = c_song_metadata # The string itself is the error - tracks.append(track) continue - - # If c_song_metadata is a valid metadata dictionary (no 'error_type') + + # c_song_metadata is a trackObject c_preferences = deepcopy(self.__preferences) - c_preferences.ids = c_song_metadata.get('ids') # Use .get for safety, though it should exist + c_preferences.ids = c_song_metadata.ids.spotify c_preferences.song_metadata = c_song_metadata - c_preferences.json_data = self.__json_data # Pass playlist data for reporting - c_preferences.track_number = idx + 1 # Track number in the playlist + c_preferences.json_data = self.__json_data + c_preferences.track_number = idx + 1 c_preferences.link = f"https://open.spotify.com/track/{c_preferences.ids}" if c_preferences.ids else None easy_dw_instance = EASY_DW(c_preferences, parent='playlist') - track = None # Initialize track for this iteration try: track = easy_dw_instance.easy_dw() - except TrackNotFound as e_track_nf: - track = easy_dw_instance.get_no_dw_track() # Retrieve the track instance from EASY_DW - # Ensure track object is a valid Track instance and has error info - if not isinstance(track, Track): # Fallback if get_no_dw_track didn't return a Track - track = Track(c_song_metadata, None, None, None, c_preferences.link, c_preferences.ids) - track.success = False # Explicitly set success to False - # Ensure error message is set, preferring the one from the exception if track doesn't have one - if not getattr(track, 'error_message', None) or str(e_track_nf): # Prioritize exception message if available - track.error_message = str(e_track_nf) - - song_name_log = c_song_metadata.get('music', 'Unknown Song') - artist_name_log = c_song_metadata.get('artist', 'Unknown Artist') - playlist_name_log = self.__json_data.get('name', 'Unknown Playlist') - logger.warning( - f"Failed to download track '{song_name_log}' by '{artist_name_log}' from playlist '{playlist_name_log}'. " - f"Reason: {track.error_message} (URL: {track.link or c_preferences.link})" - ) - except Exception as e_generic: - # Catch any other unexpected exceptions during the track download process - track = Track(c_song_metadata, None, None, None, c_preferences.link, c_preferences.ids) + except (TrackNotFound, Exception) as e: + track = easy_dw_instance.get_no_dw_track() + if not isinstance(track, Track): + track = Track(_track_object_to_dict(c_song_metadata), None, None, None, c_preferences.link, c_preferences.ids) track.success = False - track.error_message = f"An unexpected error occurred while processing track: {str(e_generic)}" + track.error_message = str(e) + logger.warning(f"Failed to download track '{c_song_metadata.title}' from playlist '{playlist_name}'. Reason: {track.error_message}") - song_name_log = c_song_metadata.get('music', 'Unknown Song') - artist_name_log = c_song_metadata.get('artist', 'Unknown Artist') - playlist_name_log = self.__json_data.get('name', 'Unknown Playlist') - logger.error( - f"Unexpected error downloading track '{song_name_log}' by '{artist_name_log}' from playlist '{playlist_name_log}'. " - f"Reason: {track.error_message} (URL: {track.link or c_preferences.link})" - ) - - # Ensure track is not None before appending (should be assigned in try/except) - if track is None: - # This is a fallback, should ideally not be reached. - track = Track(c_song_metadata, None, None, None, c_preferences.link, c_preferences.ids) - track.success = False - track.error_message = "Track processing resulted in an unhandled null track object." - logger.error(f"Track '{c_song_metadata.get('music', 'Unknown Track')}' from playlist '{self.__json_data.get('name', 'Unknown Playlist')}' " - f"was not properly processed.") - - tracks.append(track) + if track: + tracks.append(track) # --- Append the final track path to the m3u file using a relative path --- - if track.success and hasattr(track, 'song_path') and track.song_path: - # Build the relative path from the playlists directory - relative_path = os.path.relpath( - track.song_path, - start=os.path.join(self.__output_dir, "playlists") - ) + if track and track.success and hasattr(track, 'song_path') and track.song_path: + relative_path = os.path.relpath(track.song_path, start=playlist_m3u_dir) with open(m3u_path, "a", encoding="utf-8") as m3u_file: m3u_file.write(f"{relative_path}\n") # --------------------------------------------------------------------- @@ -1355,57 +1331,42 @@ class DW_PLAYLIST: playlist.zip_path = zip_name # Report playlist done status - playlist_name = self.__json_data.get('name', 'Unknown Playlist') - playlist_owner = self.__json_data.get('owner', {}).get('display_name', 'Unknown Owner') - total_tracks = self.__json_data.get('tracks', {}).get('total', 0) - playlist_id = self.__ids - - successful_tracks_objs = [] - failed_tracks_objs = [] - skipped_tracks_objs = [] + successful_tracks_cb = [] + failed_tracks_cb = [] + skipped_tracks_cb = [] for track in tracks: - track_info = cb_trackObject( - title=track.tags.get('music') or track.tags.get('name', 'Unknown Track'), - artists=[artistTrackObject(name=artist.strip()) for artist in track.tags.get('artist', 'Unknown Artist').split(';')], - ids=IDs(spotify=track.tags.get('ids')) + # Create a trackObject for the callback from the internal Track object's tags + track_tags = track.tags + track_obj_for_cb = trackObject( + title=track_tags.get('music', 'Unknown Track'), + artists=[artistTrackObject(name=artist.strip()) for artist in track_tags.get('artist', '').split(';')] ) + if getattr(track, 'was_skipped', False): - skipped_tracks_objs.append(track_info) + skipped_tracks_cb.append(track_obj_for_cb) elif track.success: - successful_tracks_objs.append(track_info) + successful_tracks_cb.append(track_obj_for_cb) else: - reason = getattr(track, 'error_message', 'Unknown reason') - failed_tracks_objs.append(failedTrackObject(track=track_info, reason=reason)) + failed_tracks_cb.append(failedTrackObject( + track=track_obj_for_cb, + reason=getattr(track, 'error_message', 'Unknown reason') + )) - summary = summaryObject( - successful_tracks=successful_tracks_objs, - skipped_tracks=skipped_tracks_objs, - failed_tracks=failed_tracks_objs, - total_successful=len(successful_tracks_objs), - total_skipped=len(skipped_tracks_objs), - total_failed=len(failed_tracks_objs) - ) - - done_status = doneObject( - ids=IDs(spotify=playlist_id), - summary=summary - ) - - playlist_for_callback = cb_playlistObject( - title=playlist_name, - owner=userObject(name=playlist_owner), - ids=IDs(spotify=playlist_id), - description=self.__json_data.get('description') - ) - - callback_object = playlistCallbackObject( - playlist=playlist_for_callback, - status_info=done_status + summary_obj = summaryObject( + successful_tracks=successful_tracks_cb, + skipped_tracks=skipped_tracks_cb, + failed_tracks=failed_tracks_cb, + total_successful=len(successful_tracks_cb), + total_skipped=len(skipped_tracks_cb), + total_failed=len(failed_tracks_cb) ) + + status_obj = doneObject(ids=playlist_obj_for_cb.ids, summary=summary_obj) + callback_obj = playlistCallbackObject(playlist=playlist_obj_for_cb, status_info=status_obj) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object + callback_obj=callback_obj, ) return playlist @@ -1418,42 +1379,24 @@ class DW_EPISODE: self.__preferences = preferences def dw(self) -> Episode: - episode_id = self.__preferences.ids - - track_for_callback = cb_trackObject( - title=self.__preferences.song_metadata.get('name', 'Unknown Episode'), - artists=[artistTrackObject(name=self.__preferences.song_metadata.get('show', 'Unknown Show'))], - ids=IDs(spotify=episode_id) - ) - - initializing_status = initializingObject( - ids=IDs(spotify=episode_id) - ) - - callback_object_init = trackCallbackObject( - track=track_for_callback, - status_info=initializing_status - ) + episode_obj = self.__preferences.song_metadata # This is a trackObject + # Build status object + status_obj_init = initializingObject(ids=episode_obj.ids) + callback_obj_init = trackCallbackObject(track=episode_obj, status_info=status_obj_init) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object_init + callback_obj=callback_obj_init ) episode = EASY_DW(self.__preferences).download_eps() - done_status = doneObject( - ids=IDs(spotify=episode_id) - ) - - callback_object_done = trackCallbackObject( - track=track_for_callback, - status_info=done_status - ) - + # Build status object + status_obj_done = doneObject(ids=episode_obj.ids) + callback_obj_done = trackCallbackObject(track=episode_obj, status_info=status_obj_done) report_progress( reporter=Download_JOB.progress_reporter, - callback_obj=callback_object_done + callback_obj=callback_obj_done ) return episode diff --git a/deezspot/spotloader/__init__.py b/deezspot/spotloader/__init__.py index 2776bcd..90309e9 100644 --- a/deezspot/spotloader/__init__.py +++ b/deezspot/spotloader/__init__.py @@ -19,6 +19,7 @@ from deezspot.models.download import ( Smart, Episode ) +from deezspot.models.callback import trackCallbackObject, errorObject from deezspot.spotloader.__download__ import ( DW_TRACK, DW_ALBUM, @@ -98,6 +99,7 @@ class SpoLogin: save_cover=stock_save_cover, market: list[str] | None = stock_market ) -> Track: + song_metadata = None try: link_is_valid(link_track) ids = get_ids(link_track) @@ -106,7 +108,7 @@ class SpoLogin: if song_metadata is None: raise Exception(f"Could not retrieve metadata for track {link_track}. It might not be available or an API error occurred.") - logger.info(f"Starting download for track: {song_metadata.get('music', 'Unknown')} - {song_metadata.get('artist', 'Unknown')}") + logger.info(f"Starting download for track: {song_metadata.title} - {'; '.join([a.name for a in song_metadata.artists])}") preferences = Preferences() preferences.real_time_dl = real_time_dl @@ -140,60 +142,22 @@ class SpoLogin: except MarketAvailabilityError as e: logger.error(f"Track download failed due to market availability: {str(e)}") if song_metadata: - track_info = { - "name": song_metadata.get("music", "Unknown Track"), - "artist": song_metadata.get("artist", "Unknown Artist"), - } - summary = { - "successful_tracks": [], - "skipped_tracks": [], - "failed_tracks": [{ - "track": f"{track_info['name']} - {track_info['artist']}", - "reason": str(e) - }], - "total_successful": 0, - "total_skipped": 0, - "total_failed": 1 - } + status_obj = errorObject(ids=song_metadata.ids, error=str(e)) + callback_obj = trackCallbackObject(track=song_metadata, status_info=status_obj) report_progress( reporter=self.progress_reporter, - report_type="track", - song=track_info['name'], - artist=track_info['artist'], - status="error", - url=link_track, - error=str(e), - summary=summary + callback_obj=callback_obj ) raise except Exception as e: logger.error(f"Failed to download track: {str(e)}") traceback.print_exc() if song_metadata: - track_info = { - "name": song_metadata.get("music", "Unknown Track"), - "artist": song_metadata.get("artist", "Unknown Artist"), - } - summary = { - "successful_tracks": [], - "skipped_tracks": [], - "failed_tracks": [{ - "track": f"{track_info['name']} - {track_info['artist']}", - "reason": str(e) - }], - "total_successful": 0, - "total_skipped": 0, - "total_failed": 1 - } + status_obj = errorObject(ids=song_metadata.ids, error=str(e)) + callback_obj = trackCallbackObject(track=song_metadata, status_info=status_obj) report_progress( reporter=self.progress_reporter, - report_type="track", - song=track_info['name'], - artist=track_info['artist'], - status="error", - url=link_track, - error=str(e), - summary=summary + callback_obj=callback_obj ) raise e @@ -228,7 +192,7 @@ class SpoLogin: if song_metadata is None: raise Exception(f"Could not process album metadata for {link_album}. It might not be available in the specified market(s) or an API error occurred.") - logger.info(f"Starting download for album: {song_metadata.get('album', 'Unknown')} - {song_metadata.get('ar_album', 'Unknown')}") + logger.info(f"Starting download for album: {song_metadata.title} - {'; '.join([a.name for a in song_metadata.artists])}") preferences = Preferences() preferences.real_time_dl = real_time_dl @@ -300,90 +264,50 @@ class SpoLogin: logger.info(f"Starting download for playlist: {playlist_json.get('name', 'Unknown')}") - for track_item_wrapper in playlist_json['tracks']['items']: - track_info = track_item_wrapper.get('track') - c_song_metadata = None # Initialize for each item - - if not track_info: - logger.warning(f"Skipping an item in playlist {playlist_json.get('name', 'Unknown Playlist')} as it does not appear to be a valid track object.") - # Create a placeholder for this unidentifiable item - c_song_metadata = { - 'name': 'Unknown Skipped Item', - 'ids': None, - 'error_type': 'InvalidItemStructure', - 'error_message': 'Playlist item was not a valid track object.' - } - song_metadata.append(c_song_metadata) + playlist_tracks_data = playlist_json.get('tracks', {}).get('items', []) + if not playlist_tracks_data: + logger.warning(f"Playlist {link_playlist} has no tracks or could not be fetched.") + # We can still proceed to create an empty playlist object for consistency + + song_metadata_list = [] + for item in playlist_tracks_data: + if not item or 'track' not in item or not item['track']: + # Log a warning for items that are not valid tracks (e.g., local files, etc.) + logger.warning(f"Skipping an item in playlist {link_playlist} as it does not appear to be a valid track object.") + song_metadata_list.append({'error_type': 'invalid_track_object', 'error_message': 'Playlist item was not a valid track object.', 'name': 'Unknown Skipped Item', 'ids': None}) + continue + + track_data = item['track'] + track_id = track_data.get('id') + + if not track_id: + logger.warning(f"Skipping an item in playlist {link_playlist} because it has no track ID.") + song_metadata_list.append({'error_type': 'missing_track_id', 'error_message': 'Playlist item is missing a track ID.', 'name': track_data.get('name', 'Unknown Track without ID'), 'ids': None}) continue - track_name_for_logs = track_info.get('name', 'Unknown Track') - track_id_for_logs = track_info.get('id', 'Unknown ID') # Track's own ID if available - external_urls = track_info.get('external_urls') - - if not external_urls or not external_urls.get('spotify'): - logger.warning(f"Track \"{track_name_for_logs}\" (ID: {track_id_for_logs}) in playlist {playlist_json.get('name', 'Unknown Playlist')} is not available on Spotify or has no URL.") - c_song_metadata = { - 'name': track_name_for_logs, - 'ids': track_id_for_logs, # Use track's own ID if available, otherwise will be None - 'error_type': 'MissingTrackURL', - 'error_message': f"Track \"{track_name_for_logs}\" is not available on Spotify or has no URL." - } - else: - track_spotify_url = external_urls['spotify'] - track_ids_from_url = get_ids(track_spotify_url) # This is the ID used for fetching with 'tracking' - try: - # Market check for each track is done within tracking() - # Pass market. tracking() will raise MarketAvailabilityError if unavailable. - fetched_metadata = tracking(track_ids_from_url, market=market) - if fetched_metadata: - c_song_metadata = fetched_metadata - else: - # tracking() returned None, but didn't raise MarketAvailabilityError. General fetch error. - logger.warning(f"Could not retrieve full metadata for track {track_name_for_logs} (ID: {track_ids_from_url}, URL: {track_spotify_url}) in playlist {playlist_json.get('name', 'Unknown Playlist')}. API error or other issue.") - c_song_metadata = { - 'name': track_name_for_logs, - 'ids': track_ids_from_url, - 'error_type': 'MetadataFetchError', - 'error_message': f"Failed to fetch full metadata for track {track_name_for_logs}." - } - except MarketAvailabilityError as e: - logger.warning(f"Track {track_name_for_logs} (ID: {track_ids_from_url}, URL: {track_spotify_url}) in playlist {playlist_json.get('name', 'Unknown Playlist')} is not available in the specified market(s). Skipping. Error: {str(e)}") - c_song_metadata = { - 'name': track_name_for_logs, - 'ids': track_ids_from_url, - 'error_type': 'MarketAvailabilityError', - 'error_message': str(e) - } - except Exception as e_tracking: # Catch any other unexpected error from tracking() - logger.error(f"Unexpected error fetching metadata for track {track_name_for_logs} (ID: {track_ids_from_url}, URL: {track_spotify_url}): {str(e_tracking)}") - c_song_metadata = { - 'name': track_name_for_logs, - 'ids': track_ids_from_url, - 'error_type': 'UnexpectedTrackingError', - 'error_message': f"Unexpected error fetching metadata: {str(e_tracking)}" - } - - if c_song_metadata: # Ensure something is appended - song_metadata.append(c_song_metadata) - else: - # This case should ideally not be reached if logic above is complete - logger.error(f"Logic error: c_song_metadata remained None for track {track_name_for_logs} in playlist {playlist_json.get('name', 'Unknown Playlist')}") - song_metadata.append({ - 'name': track_name_for_logs, - 'ids': track_id_for_logs or track_ids_from_url, - 'error_type': 'InternalLogicError', - 'error_message': 'Internal error processing playlist track metadata.' - }) - + try: + song_metadata = tracking(track_id, market=market) + if song_metadata: + song_metadata_list.append(song_metadata) + else: + # Create a placeholder for tracks that fail metadata fetching + failed_track_info = {'error_type': 'metadata_fetch_failed', 'error_message': f"Failed to fetch metadata for track ID: {track_id}", 'name': track_data.get('name', f'Track ID {track_id}'), 'ids': track_id} + song_metadata_list.append(failed_track_info) + logger.warning(f"Could not retrieve metadata for track {track_id} in playlist {link_playlist}.") + except MarketAvailabilityError as e: + failed_track_info = {'error_type': 'market_availability_error', 'error_message': str(e), 'name': track_data.get('name', f'Track ID {track_id}'), 'ids': track_id} + song_metadata_list.append(failed_track_info) + logger.warning(str(e)) preferences = Preferences() preferences.real_time_dl = real_time_dl preferences.link = link_playlist - preferences.song_metadata = song_metadata + preferences.song_metadata = song_metadata_list preferences.quality_download = quality_download preferences.output_dir = output_dir preferences.ids = ids preferences.json_data = playlist_json + preferences.playlist_tracks_json = playlist_tracks_data preferences.recursive_quality = recursive_quality preferences.recursive_download = recursive_download preferences.not_interface = not_interface @@ -403,7 +327,7 @@ class SpoLogin: preferences.bitrate = bitrate preferences.save_cover = save_cover preferences.market = market - + playlist = DW_PLAYLIST(preferences).dw() return playlist @@ -445,7 +369,7 @@ class SpoLogin: if episode_metadata is None: raise Exception(f"Could not process episode metadata for {link_episode}. It might not be available in the specified market(s) or an API error occurred.") - logger.info(f"Starting download for episode: {episode_metadata.get('name', 'Unknown')} - {episode_metadata.get('show', 'Unknown')}") + logger.info(f"Starting download for episode: {episode_metadata.title} - {episode_metadata.album.title}") preferences = Preferences() preferences.real_time_dl = real_time_dl diff --git a/deezspot/spotloader/__spo_api__.py b/deezspot/spotloader/__spo_api__.py index 5a88a86..0bfa86c 100644 --- a/deezspot/spotloader/__spo_api__.py +++ b/deezspot/spotloader/__spo_api__.py @@ -1,11 +1,18 @@ #!/usr/bin/python3 from deezspot.easy_spoty import Spo -from datetime import datetime -from deezspot.libutils.utils import convert_to_date import traceback from deezspot.libutils.logging_utils import logger from deezspot.exceptions import MarketAvailabilityError +from typing import List, Optional, Dict, Any + +from deezspot.models.callback.album import albumObject, artistAlbumObject, trackAlbumObject as CbTrackAlbumObject, artistTrackAlbumObject +from deezspot.models.callback.artist import artistObject +from deezspot.models.callback.common import IDs +from deezspot.models.callback.playlist import playlistObject, trackPlaylistObject, albumTrackPlaylistObject, artistTrackPlaylistObject, artistAlbumTrackPlaylistObject +from deezspot.models.callback.track import trackObject, artistTrackObject, albumTrackObject, artistAlbumTrackObject +from deezspot.models.callback.user import userObject + def _check_market_availability(item_name: str, item_type: str, api_available_markets: list[str] | None, user_markets: list[str] | None): """Checks if an item is available in any of the user-specified markets.""" @@ -15,338 +22,256 @@ def _check_market_availability(item_name: str, item_type: str, api_available_mar markets_str = ", ".join(user_markets) raise MarketAvailabilityError(f"{item_type} '{item_name}' not available in provided market(s): {markets_str}") elif user_markets and api_available_markets is None: - # Log a warning if user specified markets, but API response doesn't include 'available_markets' - # This might indicate the item is available in all markets or API doesn't provide this info for this item type. - # For now, we proceed without raising an error, as we cannot confirm it's "not available". logger.warning( f"Market availability check for {item_type} '{item_name}' skipped: " "API response did not include 'available_markets' field. Assuming availability." ) -def _get_best_image_urls(images_list): - urls = {'image': '', 'image2': '', 'image3': ''} - if not images_list or not isinstance(images_list, list): - return urls +def _parse_release_date(date_str: Optional[str], precision: Optional[str]) -> Dict[str, Any]: + if not date_str: + return {} + + parts = date_str.split('-') + data = {} + + if len(parts) >= 1 and parts[0]: + data['year'] = int(parts[0]) + if precision in ['month', 'day'] and len(parts) >= 2 and parts[1]: + data['month'] = int(parts[1]) + if precision == 'day' and len(parts) >= 3 and parts[2]: + data['day'] = int(parts[2]) + + return data - # Sort images by area (height * width) in descending order - # Handle cases where height or width might be missing - sorted_images = sorted( - images_list, - key=lambda img: img.get('height', 0) * img.get('width', 0), - reverse=True +def _json_to_ids(item_json: dict) -> IDs: + external_ids = item_json.get('external_ids', {}) + return IDs( + spotify=item_json.get('id'), + isrc=external_ids.get('isrc'), + upc=external_ids.get('upc') ) - if len(sorted_images) > 0: - urls['image'] = sorted_images[0].get('url', '') - if len(sorted_images) > 1: - urls['image2'] = sorted_images[1].get('url', '') # Second largest or same if only one size - if len(sorted_images) > 2: - urls['image3'] = sorted_images[2].get('url', '') # Third largest - - return urls +def _json_to_artist_track_object(artist_json: dict) -> artistTrackObject: + return artistTrackObject( + name=artist_json.get('name', ''), + ids=_json_to_ids(artist_json) + ) -def tracking(ids, album_data_for_track=None, market: list[str] | None = None): - datas = {} +def _json_to_artist_album_track_object(artist_json: dict) -> artistAlbumTrackObject: + return artistAlbumTrackObject( + name=artist_json.get('name', ''), + ids=_json_to_ids(artist_json) + ) + +def _json_to_album_track_object(album_json: dict) -> albumTrackObject: + return albumTrackObject( + album_type=album_json.get('album_type', 'album'), + title=album_json.get('name', ''), + release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')), + total_tracks=album_json.get('total_tracks', 0), + genres=album_json.get('genres', []), + images=album_json.get('images', []), + ids=_json_to_ids(album_json), + artists=[_json_to_artist_album_track_object(artist) for artist in album_json.get('artists', [])] + ) + +def tracking(ids, album_data_for_track=None, market: list[str] | None = None) -> Optional[trackObject]: try: json_track = Spo.get_track(ids) if not json_track: logger.error(f"Failed to get track details for ID: {ids} from Spotify API.") return None - # Perform market availability check for the track track_name_for_check = json_track.get('name', f'Track ID {ids}') api_track_markets = json_track.get('available_markets') _check_market_availability(track_name_for_check, "Track", api_track_markets, market) - # Album details section - # Use provided album_data_for_track if available (from tracking_album context) - # Otherwise, fetch from track's album info or make a new API call for more details album_to_process = None - fetch_full_album_details = False - if album_data_for_track: album_to_process = album_data_for_track elif json_track.get('album'): - album_to_process = json_track.get('album') - # We might want fuller album details (like label, genres, upc, copyrights) - # not present in track's nested album object. - fetch_full_album_details = True + album_id = json_track.get('album', {}).get('id') + if album_id: + album_to_process = Spo.get_album(album_id) + if not album_to_process: + album_to_process = json_track.get('album') + + album_for_track = _json_to_album_track_object(album_to_process) if album_to_process else albumTrackObject() - if fetch_full_album_details and album_to_process and album_to_process.get('id'): - full_album_json = Spo.get_album(album_to_process.get('id')) - if full_album_json: - album_to_process = full_album_json # Prioritize full album details - - if album_to_process: - image_urls = _get_best_image_urls(album_to_process.get('images', [])) - datas.update(image_urls) - - datas['genre'] = "; ".join(album_to_process.get('genres', [])) - - album_artists_data = album_to_process.get('artists', []) - ar_album_names = [artist.get('name', '') for artist in album_artists_data if artist.get('name')] - datas['ar_album'] = "; ".join(filter(None, ar_album_names)) or 'Unknown Artist' - - datas['album'] = album_to_process.get('name', 'Unknown Album') - datas['label'] = album_to_process.get('label', '') # Often in full album, not track's album obj - datas['album_type'] = album_to_process.get('album_type', 'unknown') - - copyrights_data = album_to_process.get('copyrights', []) - datas['copyright'] = copyrights_data[0].get('text', '') if copyrights_data else '' - - album_external_ids = album_to_process.get('external_ids', {}) - datas['upc'] = album_external_ids.get('upc', '') - - datas['nb_tracks'] = album_to_process.get('total_tracks', 0) - # Release date from album_to_process is likely more definitive - datas['year'] = convert_to_date(album_to_process.get('release_date', '')) - datas['release_date_precision'] = album_to_process.get('release_date_precision', 'unknown') - else: # Fallback if no album_to_process - datas.update(_get_best_image_urls([])) - datas['genre'] = '' - datas['ar_album'] = 'Unknown Artist' - datas['album'] = json_track.get('album', {}).get('name', 'Unknown Album') # Basic fallback - datas['label'] = '' - datas['album_type'] = json_track.get('album', {}).get('album_type', 'unknown') - datas['copyright'] = '' - datas['upc'] = '' - datas['nb_tracks'] = json_track.get('album', {}).get('total_tracks', 0) - datas['year'] = convert_to_date(json_track.get('album', {}).get('release_date', '')) - datas['release_date_precision'] = json_track.get('album', {}).get('release_date_precision', 'unknown') - - - # Track specific details - datas['music'] = json_track.get('name', 'Unknown Track') - - track_artists_data = json_track.get('artists', []) - track_artist_names = [artist.get('name', '') for artist in track_artists_data if artist.get('name')] - datas['artist'] = "; ".join(filter(None, track_artist_names)) or 'Unknown Artist' - - datas['tracknum'] = json_track.get('track_number', 0) - datas['discnum'] = json_track.get('disc_number', 0) - - # If year details were not set from a more complete album object, use track's album info - if not datas.get('year') and json_track.get('album'): - datas['year'] = convert_to_date(json_track.get('album', {}).get('release_date', '')) - datas['release_date_precision'] = json_track.get('album', {}).get('release_date_precision', 'unknown') - - datas['duration'] = json_track.get('duration_ms', 0) // 1000 - - track_external_ids = json_track.get('external_ids', {}) - datas['isrc'] = track_external_ids.get('isrc', '') - - datas['explicit'] = json_track.get('explicit', False) - datas['popularity'] = json_track.get('popularity', 0) - - # Placeholder for tags not directly from this API response but might be expected by tagger - datas['bpm'] = datas.get('bpm', 'Unknown') # Not available here - datas['gain'] = datas.get('gain', 'Unknown') # Not available here - datas['lyric'] = datas.get('lyric', '') # Not available here - datas['author'] = datas.get('author', '') # Not available here (lyricist) - datas['composer'] = datas.get('composer', '') # Not available here - # copyright is handled by album section - datas['lyricist'] = datas.get('lyricist', '') # Same as author, not here - datas['version'] = datas.get('version', '') # Not typically here - - datas['ids'] = ids + track_obj = trackObject( + title=json_track.get('name', ''), + disc_number=json_track.get('disc_number', 1), + track_number=json_track.get('track_number', 1), + duration_ms=json_track.get('duration_ms', 0), + explicit=json_track.get('explicit', False), + genres=album_for_track.genres, + album=album_for_track, + artists=[_json_to_artist_track_object(artist) for artist in json_track.get('artists', [])], + ids=_json_to_ids(json_track) + ) logger.debug(f"Successfully tracked metadata for track {ids}") + return track_obj - except MarketAvailabilityError: # Re-raise to be caught by the calling download method + except MarketAvailabilityError: raise except Exception as e: logger.error(f"Failed to track metadata for track {ids}: {str(e)}") logger.debug(traceback.format_exc()) return None - return datas +def _json_to_artist_album_object(artist_json: dict) -> artistAlbumObject: + return artistAlbumObject( + name=artist_json.get('name', ''), + ids=_json_to_ids(artist_json) + ) -def tracking_album(album_json, market: list[str] | None = None): +def _json_to_track_album_object(track_json: dict) -> CbTrackAlbumObject: + return CbTrackAlbumObject( + title=track_json.get('name', ''), + disc_number=track_json.get('disc_number', 1), + track_number=track_json.get('track_number', 1), + duration_ms=track_json.get('duration_ms', 0), + explicit=track_json.get('explicit', False), + ids=_json_to_ids(track_json), + artists=[artistTrackAlbumObject(name=a.get('name'), ids=_json_to_ids(a)) for a in track_json.get('artists', [])] + ) + +def tracking_album(album_json, market: list[str] | None = None) -> Optional[albumObject]: if not album_json: logger.error("tracking_album received None or empty album_json.") return None - song_metadata = {} try: - # Perform market availability check for the album itself album_name_for_check = album_json.get('name', f"Album ID {album_json.get('id', 'Unknown')}") api_album_markets = album_json.get('available_markets') _check_market_availability(album_name_for_check, "Album", api_album_markets, market) - initial_list_fields = { - "music": [], "artist": [], "tracknum": [], "discnum": [], - "duration": [], "isrc": [], "ids": [], "explicit_list": [], "popularity_list": [] - # "bpm": [], "gain": [] are usually unknown from this endpoint for tracks - } - song_metadata.update(initial_list_fields) - - image_urls = _get_best_image_urls(album_json.get('images', [])) - song_metadata.update(image_urls) - - song_metadata['album'] = album_json.get('name', 'Unknown Album') - song_metadata['label'] = album_json.get('label', '') - song_metadata['year'] = convert_to_date(album_json.get('release_date', '')) - song_metadata['release_date_precision'] = album_json.get('release_date_precision', 'unknown') - song_metadata['nb_tracks'] = album_json.get('total_tracks', 0) - song_metadata['genre'] = "; ".join(album_json.get('genres', [])) - song_metadata['album_type'] = album_json.get('album_type', 'unknown') - song_metadata['popularity'] = album_json.get('popularity', 0) - - album_artists_data = album_json.get('artists', []) - ar_album_names = [artist.get('name', '') for artist in album_artists_data if artist.get('name')] - song_metadata['ar_album'] = "; ".join(filter(None, ar_album_names)) or 'Unknown Artist' - - album_external_ids = album_json.get('external_ids', {}) - song_metadata['upc'] = album_external_ids.get('upc', '') - - copyrights_data = album_json.get('copyrights', []) - song_metadata['copyright'] = copyrights_data[0].get('text', '') if copyrights_data else '' + album_artists = [_json_to_artist_album_object(a) for a in album_json.get('artists', [])] - # Add other common flat metadata keys with defaults if not directly from album_json - song_metadata['bpm'] = 'Unknown' - song_metadata['gain'] = 'Unknown' - song_metadata['lyric'] = '' - song_metadata['author'] = '' - song_metadata['composer'] = '' - song_metadata['lyricist'] = '' - song_metadata['version'] = '' + album_tracks = [] + simplified_tracks = album_json.get('tracks', {}).get('items', []) + track_ids = [t['id'] for t in simplified_tracks if t and t.get('id')] + + full_tracks_data = [] + if track_ids: + # Batch fetch full track objects. The get_tracks method should handle chunking if necessary. + full_tracks_data = Spo.get_tracks(track_ids, market=','.join(market) if market else None) + track_items_to_process = [] + if full_tracks_data and full_tracks_data.get('tracks'): + track_items_to_process = full_tracks_data['tracks'] + else: # Fallback to simplified if batch fetch fails + track_items_to_process = simplified_tracks - tracks_data = album_json.get('tracks', {}).get('items', []) - for track_item in tracks_data: - if not track_item: continue # Skip if track_item is None - c_ids = track_item.get('id') - if not c_ids: # If track has no ID, try to get some basic info directly - song_metadata['music'].append(track_item.get('name', 'Unknown Track')) - track_artists_data = track_item.get('artists', []) - track_artist_names = [artist.get('name', '') for artist in track_artists_data if artist.get('name')] - song_metadata['artist'].append("; ".join(filter(None, track_artist_names)) or 'Unknown Artist') - song_metadata['tracknum'].append(track_item.get('track_number', 0)) - song_metadata['discnum'].append(track_item.get('disc_number', 0)) - song_metadata['duration'].append(track_item.get('duration_ms', 0) // 1000) - song_metadata['isrc'].append(track_item.get('external_ids', {}).get('isrc', '')) - song_metadata['ids'].append('N/A') - song_metadata['explicit_list'].append(track_item.get('explicit', False)) - song_metadata['popularity_list'].append(track_item.get('popularity', 0)) + for track_item in track_items_to_process: + if not track_item or not track_item.get('id'): continue - # Pass the main album_json as album_data_for_track to avoid refetching it in tracking() - # Also pass the market parameter - track_details = tracking(c_ids, album_data_for_track=album_json, market=market) - - if track_details: - song_metadata['music'].append(track_details.get('music', 'Unknown Track')) - song_metadata['artist'].append(track_details.get('artist', 'Unknown Artist')) - song_metadata['tracknum'].append(track_details.get('tracknum', 0)) - song_metadata['discnum'].append(track_details.get('discnum', 0)) - # BPM and Gain are generally not per-track from this endpoint - # song_metadata['bpm'].append(track_details.get('bpm', 'Unknown')) - song_metadata['duration'].append(track_details.get('duration', 0)) - song_metadata['isrc'].append(track_details.get('isrc', '')) - song_metadata['ids'].append(c_ids) - song_metadata['explicit_list'].append(track_details.get('explicit', False)) - # popularity_list for track specific popularity if needed, or use album popularity - # song_metadata['popularity_list'].append(track_details.get('popularity',0)) - - else: # Fallback if tracking(c_ids) failed - logger.warning(f"Could not retrieve full metadata for track ID {c_ids} in album {album_json.get('id', 'N/A')}. Using minimal data.") - song_metadata['music'].append(track_item.get('name', 'Unknown Track')) - track_artists_data = track_item.get('artists', []) - track_artist_names = [artist.get('name', '') for artist in track_artists_data if artist.get('name')] - song_metadata['artist'].append("; ".join(filter(None, track_artist_names)) or 'Unknown Artist') - song_metadata['tracknum'].append(track_item.get('track_number', 0)) - song_metadata['discnum'].append(track_item.get('disc_number', 0)) - song_metadata['duration'].append(track_item.get('duration_ms', 0) // 1000) - song_metadata['isrc'].append(track_item.get('external_ids', {}).get('isrc', '')) - song_metadata['ids'].append(c_ids) - song_metadata['explicit_list'].append(track_item.get('explicit', False)) - # song_metadata['popularity_list'].append(track_item.get('popularity',0)) + # Simplified track object from album endpoint is enough for trackAlbumObject + album_tracks.append(_json_to_track_album_object(track_item)) + album_obj = albumObject( + album_type=album_json.get('album_type'), + title=album_json.get('name'), + release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')), + total_tracks=album_json.get('total_tracks'), + genres=album_json.get('genres', []), + images=album_json.get('images', []), + copyrights=album_json.get('copyrights', []), + ids=_json_to_ids(album_json), + tracks=album_tracks, + artists=album_artists + ) logger.debug(f"Successfully tracked metadata for album {album_json.get('id', 'N/A')}") + return album_obj - except MarketAvailabilityError: # Re-raise + except MarketAvailabilityError: raise except Exception as e: logger.error(f"Failed to track album metadata for album ID {album_json.get('id', 'N/A') if album_json else 'N/A'}: {str(e)}") logger.debug(traceback.format_exc()) return None - return song_metadata - -def tracking_episode(ids, market: list[str] | None = None): - datas = {} +def tracking_episode(ids, market: list[str] | None = None) -> Optional[trackObject]: try: json_episode = Spo.get_episode(ids) if not json_episode: logger.error(f"Failed to get episode details for ID: {ids} from Spotify API.") return None - # Perform market availability check for the episode episode_name_for_check = json_episode.get('name', f'Episode ID {ids}') api_episode_markets = json_episode.get('available_markets') _check_market_availability(episode_name_for_check, "Episode", api_episode_markets, market) - - image_urls = _get_best_image_urls(json_episode.get('images', [])) - datas.update(image_urls) - - datas['audio_preview_url'] = json_episode.get('audio_preview_url', '') - datas['description'] = json_episode.get('description', '') - datas['duration'] = json_episode.get('duration_ms', 0) // 1000 - datas['explicit'] = json_episode.get('explicit', False) - datas['external_urls_spotify'] = json_episode.get('external_urls', {}).get('spotify', '') - datas['href'] = json_episode.get('href', '') - datas['html_description'] = json_episode.get('html_description', '') - datas['id'] = json_episode.get('id', '') # Episode's own ID - - datas['is_externally_hosted'] = json_episode.get('is_externally_hosted', False) - datas['is_playable'] = json_episode.get('is_playable', False) - datas['language'] = json_episode.get('language', '') # Deprecated, use languages - datas['languages'] = "; ".join(json_episode.get('languages', [])) - datas['music'] = json_episode.get('name', 'Unknown Episode') # Use 'music' for consistency with track naming - datas['name'] = json_episode.get('name', 'Unknown Episode') # Keep 'name' as well if needed by other parts - - datas['release_date'] = convert_to_date(json_episode.get('release_date', '')) - datas['release_date_precision'] = json_episode.get('release_date_precision', 'unknown') show_data = json_episode.get('show', {}) - datas['show_name'] = show_data.get('name', 'Unknown Show') - datas['publisher'] = show_data.get('publisher', 'Unknown Publisher') - datas['show_description'] = show_data.get('description', '') - datas['show_explicit'] = show_data.get('explicit', False) - datas['show_total_episodes'] = show_data.get('total_episodes', 0) - datas['show_media_type'] = show_data.get('media_type', 'unknown') # e.g. 'audio' - - # For tagger compatibility, map some show data to common track/album fields - datas['artist'] = datas['publisher'] # Publisher as artist for episodes - datas['album'] = datas['show_name'] # Show name as album for episodes - datas['genre'] = "; ".join(show_data.get('genres', [])) # If shows have genres - datas['copyright'] = copyrights_data[0].get('text', '') if (copyrights_data := show_data.get('copyrights', [])) else '' - - - # Placeholder for tags not directly from this API response but might be expected by tagger - datas['tracknum'] = 1 # Default for single episode - datas['discnum'] = 1 # Default for single episode - datas['ar_album'] = datas['publisher'] - datas['label'] = datas['publisher'] - datas['bpm'] = 'Unknown' - datas['gain'] = 'Unknown' - datas['isrc'] = '' - datas['upc'] = '' - datas['lyric'] = '' - datas['author'] = '' - datas['composer'] = '' - datas['lyricist'] = '' - datas['version'] = '' - datas['ids'] = ids # The episode's own ID passed to the function + album_for_episode = albumTrackObject( + album_type='show', + title=show_data.get('name', 'Unknown Show'), + total_tracks=show_data.get('total_episodes', 0), + genres=show_data.get('genres', []), + images=json_episode.get('images', []), + ids=IDs(spotify=show_data.get('id')), + artists=[artistTrackAlbumObject(name=show_data.get('publisher', ''))] + ) + + episode_as_track = trackObject( + title=json_episode.get('name', 'Unknown Episode'), + duration_ms=json_episode.get('duration_ms', 0), + explicit=json_episode.get('explicit', False), + album=album_for_episode, + artists=[artistTrackObject(name=show_data.get('publisher', ''))], + ids=_json_to_ids(json_episode) + ) logger.debug(f"Successfully tracked metadata for episode {ids}") + return episode_as_track - except MarketAvailabilityError: # Re-raise + except MarketAvailabilityError: raise except Exception as e: logger.error(f"Failed to track episode metadata for ID {ids}: {str(e)}") logger.debug(traceback.format_exc()) return None - return datas \ No newline at end of file +def json_to_artist_album_track_playlist_object(artist_json: dict) -> artistAlbumTrackPlaylistObject: + """Converts a JSON dict to an artistAlbumTrackPlaylistObject.""" + return artistAlbumTrackPlaylistObject( + name=artist_json.get('name', ''), + ids=_json_to_ids(artist_json) + ) + +def json_to_artist_track_playlist_object(artist_json: dict) -> artistTrackPlaylistObject: + """Converts a JSON dict to an artistTrackPlaylistObject.""" + return artistTrackPlaylistObject( + name=artist_json.get('name', ''), + ids=_json_to_ids(artist_json) + ) + +def json_to_album_track_playlist_object(album_json: dict) -> albumTrackPlaylistObject: + """Converts a JSON dict to an albumTrackPlaylistObject.""" + return albumTrackPlaylistObject( + album_type=album_json.get('album_type', ''), + title=album_json.get('name', ''), + total_tracks=album_json.get('total_tracks', 0), + release_date=_parse_release_date(album_json.get('release_date'), album_json.get('release_date_precision')), + images=album_json.get('images', []), + ids=_json_to_ids(album_json), + artists=[json_to_artist_album_track_playlist_object(a) for a in album_json.get('artists', [])] + ) + +def json_to_track_playlist_object(track_json: dict) -> Optional[trackPlaylistObject]: + """Converts a JSON dict from a playlist item to a trackPlaylistObject.""" + if not track_json: + return None + album_data = track_json.get('album', {}) + return trackPlaylistObject( + title=track_json.get('name', ''), + disc_number=track_json.get('disc_number', 1), + track_number=track_json.get('track_number', 1), + duration_ms=track_json.get('duration_ms', 0), + ids=_json_to_ids(track_json), + album=json_to_album_track_playlist_object(album_data), + artists=[json_to_artist_track_playlist_object(a) for a in track_json.get('artists', [])] + ) \ No newline at end of file