From 2f9c33c2691e7689307a33bbc792db2db1e33f8d Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Mon, 11 Aug 2025 20:44:00 -0600 Subject: [PATCH] First steps to spotify-tagging when using spo-dee flow --- deezspot/deezloader/__download__.py | 132 ++++++++----- deezspot/deezloader/__init__.py | 238 +++++++++++------------- deezspot/models/download/preferences.py | 6 +- 3 files changed, 196 insertions(+), 180 deletions(-) diff --git a/deezspot/deezloader/__download__.py b/deezspot/deezloader/__download__.py index f05708f..a1d0c30 100644 --- a/deezspot/deezloader/__download__.py +++ b/deezspot/deezloader/__download__.py @@ -44,7 +44,7 @@ from deezspot.libutils.progress_reporter import ( report_album_initializing, report_album_done, report_playlist_initializing, report_playlist_done ) from deezspot.libutils.taggers import ( - enhance_metadata_with_image, add_deezer_enhanced_metadata, process_and_tag_track, + enhance_metadata_with_image, add_deezer_enhanced_metadata, add_spotify_enhanced_metadata, process_and_tag_track, save_cover_image_for_track ) from mutagen.flac import FLAC @@ -264,10 +264,16 @@ class EASY_DW: else: # Get the track object from preferences self.__track_obj: trackCbObject = preferences.song_metadata + # If spotify metadata flag is set and a spotify track object is provided, prefer it for tagging + if getattr(preferences, 'spotify_metadata', False) and getattr(preferences, 'spotify_track_obj', None): + self.__track_obj = preferences.spotify_track_obj # Convert it to the dictionary format needed for legacy functions artist_separator = getattr(preferences, 'artist_separator', '; ') - self.__song_metadata_dict = track_object_to_dict(self.__track_obj, source_type='deezer', artist_separator=artist_separator) + # Auto-select source type based on preference + use_spotify = getattr(preferences, 'spotify_metadata', False) + source_type = 'spotify' if use_spotify else 'deezer' + self.__song_metadata_dict = track_object_to_dict(self.__track_obj, source_type=source_type, artist_separator=artist_separator) # Maintain legacy attribute expected elsewhere self.__song_metadata = self.__song_metadata_dict self.__download_type = "track" @@ -283,7 +289,7 @@ class EASY_DW: current_track_val = None total_tracks_val = None - if self.__parent == "playlist" and hasattr(self.__preferences, "json_data"): + if self.__parent == "playlist" and hasattr(self.__preferences, "json_data") and self.__preferences.json_data: playlist_data = self.__preferences.json_data if isinstance(playlist_data, dict): @@ -307,7 +313,7 @@ class EASY_DW: total_tracks_val = getattr(self.__preferences, 'total_tracks', 0) current_track_val = getattr(self.__preferences, 'track_number', 0) - elif self.__parent == "album" and hasattr(self.__preferences, "json_data"): + elif self.__parent == "album" and hasattr(self.__preferences, "json_data") and self.__preferences.json_data: album_data = self.__preferences.json_data album_id = album_data.ids.deezer parent_obj = albumCbObject( @@ -400,11 +406,18 @@ class EASY_DW: def easy_dw(self) -> Track: # Get image URL and enhance metadata + pic = None if self.__infos_dw.get('__TYPE__') == 'episode': pic = self.__infos_dw.get('EPISODE_IMAGE_MD5', '') + image = API.choose_img(pic) else: - pic = self.__infos_dw['ALB_PICTURE'] - image = API.choose_img(pic) + # If using Spotify metadata, prefer the best Spotify image URL from the track object + if getattr(self.__preferences, 'spotify_metadata', False) and hasattr(self.__track_obj, 'album') and getattr(self.__track_obj.album, 'images', None): + from deezspot.libutils.metadata_converter import _get_best_image_url + image = _get_best_image_url(self.__track_obj.album.images, 'spotify') + else: + pic = self.__infos_dw['ALB_PICTURE'] + image = API.choose_img(pic) self.__song_metadata['image'] = image # Process image data using unified utility @@ -567,21 +580,30 @@ class EASY_DW: # If we reach here, the item should be successful and not skipped. if current_item.success: - if self.__infos_dw.get('__TYPE__') != 'episode': # Assuming pic is for tracks + if self.__infos_dw.get('__TYPE__') != 'episode' and pic: # Assuming pic is for tracks current_item.md5_image = pic # Set md5_image for tracks - # Apply tags using unified utility with Deezer enhancements + # Apply tags using unified utility with Deezer or Spotify enhancements from deezspot.deezloader.deegw_api import API_GW - enhanced_metadata = add_deezer_enhanced_metadata( - self.__song_metadata, - self.__infos_dw, - self.__ids, - API_GW - ) - process_and_tag_track( - track=current_item, - metadata_dict=enhanced_metadata, - source_type='deezer' - ) + use_spotify = getattr(self.__preferences, 'spotify_metadata', False) + if use_spotify: + enhanced_metadata = add_spotify_enhanced_metadata(self.__song_metadata, self.__track_obj) + process_and_tag_track( + track=current_item, + metadata_dict=enhanced_metadata, + source_type='spotify' + ) + else: + enhanced_metadata = add_deezer_enhanced_metadata( + self.__song_metadata, + self.__infos_dw, + self.__ids, + API_GW + ) + process_and_tag_track( + track=current_item, + metadata_dict=enhanced_metadata, + source_type='deezer' + ) return current_item @@ -763,20 +785,31 @@ class EASY_DW: # Add Deezer-specific enhanced metadata and apply tags from deezspot.deezloader.deegw_api import API_GW - enhanced_metadata = add_deezer_enhanced_metadata( - self.__song_metadata, - self.__infos_dw, - self.__ids, - API_GW - ) - - # Apply tags using unified utility - process_and_tag_track( - track=self.__c_track, - metadata_dict=enhanced_metadata, - source_type='deezer', - save_cover=getattr(self.__preferences, 'save_cover', False) - ) + # Build metadata: if using Spotify metadata, enhance for Spotify; else Deezer + use_spotify = getattr(self.__preferences, 'spotify_metadata', False) + if use_spotify: + enhanced_metadata = add_spotify_enhanced_metadata(self.__song_metadata, self.__track_obj) + process_and_tag_track( + track=self.__c_track, + metadata_dict=enhanced_metadata, + source_type='spotify', + save_cover=getattr(self.__preferences, 'save_cover', False) + ) + else: + enhanced_metadata = add_deezer_enhanced_metadata( + self.__song_metadata, + self.__infos_dw, + self.__ids, + API_GW + ) + + # Apply tags using unified utility + process_and_tag_track( + track=self.__c_track, + metadata_dict=enhanced_metadata, + source_type='deezer', + save_cover=getattr(self.__preferences, 'save_cover', False) + ) if self.__convert_to: format_name, bitrate = self._parse_format_string(self.__convert_to) @@ -800,19 +833,28 @@ class EASY_DW: logger.error(f"Audio conversion error: {str(conv_error)}. Proceeding with original format.") register_active_download(path_before_conversion) - # Apply tags using unified utility with Deezer enhancements + # Apply tags using unified utility with Deezer or Spotify enhancements from deezspot.deezloader.deegw_api import API_GW - enhanced_metadata = add_deezer_enhanced_metadata( - self.__song_metadata, - self.__infos_dw, - self.__ids, - API_GW - ) - process_and_tag_track( - track=self.__c_track, - metadata_dict=enhanced_metadata, - source_type='deezer' - ) + use_spotify = getattr(self.__preferences, 'spotify_metadata', False) + if use_spotify: + enhanced_metadata = add_spotify_enhanced_metadata(self.__song_metadata, self.__track_obj) + process_and_tag_track( + track=self.__c_track, + metadata_dict=enhanced_metadata, + source_type='spotify' + ) + else: + enhanced_metadata = add_deezer_enhanced_metadata( + self.__song_metadata, + self.__infos_dw, + self.__ids, + API_GW + ) + process_and_tag_track( + track=self.__c_track, + metadata_dict=enhanced_metadata, + source_type='deezer' + ) self.__c_track.success = True unregister_active_download(self.__song_path) diff --git a/deezspot/deezloader/__init__.py b/deezspot/deezloader/__init__.py index cad1831..058a3b7 100644 --- a/deezspot/deezloader/__init__.py +++ b/deezspot/deezloader/__init__.py @@ -65,6 +65,13 @@ from deezspot.models.callback.common import IDs from deezspot.models.callback.user import userObject +def _sim(a: str, b: str) -> float: + a = (a or '').strip().lower() + b = (b or '').strip().lower() + if not a or not b: + return 0.0 + return SequenceMatcher(None, a, b).ratio() + API() # Create a logger for the deezspot library @@ -126,7 +133,8 @@ class DeeLogin: save_cover=stock_save_cover, market=stock_market, playlist_context=None, - artist_separator: str = "; " + artist_separator: str = "; ", + spotify_metadata: bool = False ) -> Track: link_is_valid(link_track) @@ -147,7 +155,7 @@ class DeeLogin: report_progress(reporter=self.progress_reporter, callback_obj=callback_obj) try: - # Get standardized track object using our enhanced API module + # Default: Get standardized Deezer track object for tagging track_obj = API.get_track(ids) except (NoDataApi, MarketAvailabilityError) as e: # Try to get fallback track information @@ -173,10 +181,16 @@ class DeeLogin: report_error(e, ids, link_track) raise e + # If requested and provided via context, override with Spotify metadata for tagging + if spotify_metadata and playlist_context and playlist_context.get('spotify_track_obj'): + track_obj_for_tagging = playlist_context.get('spotify_track_obj') + else: + track_obj_for_tagging = track_obj + # Set up download preferences preferences = Preferences() preferences.link = link_track - preferences.song_metadata = track_obj # Use our standardized track object + preferences.song_metadata = track_obj_for_tagging # Use selected track object (Spotify or Deezer) for tagging preferences.quality_download = quality_download preferences.output_dir = output_dir preferences.ids = ids @@ -194,15 +208,17 @@ class DeeLogin: preferences.save_cover = save_cover preferences.market = market preferences.artist_separator = artist_separator + preferences.spotify_metadata = bool(spotify_metadata) + preferences.spotify_track_obj = playlist_context.get('spotify_track_obj') if (playlist_context and playlist_context.get('spotify_track_obj')) else None if playlist_context: - preferences.json_data = playlist_context['json_data'] - preferences.track_number = playlist_context['track_number'] - preferences.total_tracks = playlist_context['total_tracks'] - preferences.spotify_url = playlist_context['spotify_url'] + preferences.json_data = playlist_context.get('json_data') + preferences.track_number = playlist_context.get('track_number') + preferences.total_tracks = playlist_context.get('total_tracks') + preferences.spotify_url = playlist_context.get('spotify_url') try: - parent = 'playlist' if playlist_context else None + parent = 'playlist' if (playlist_context and playlist_context.get('json_data')) else None track = DW_TRACK(preferences, parent=parent).dw() return track except Exception as e: @@ -393,111 +409,58 @@ class DeeLogin: def convert_spoty_to_dee_link_track(self, link_track): link_is_valid(link_track) ids = get_ids(link_track) - + + # Attempt via ISRC first track_json = Spo.get_track(ids) - external_ids = track_json.get('external_ids') - - if not external_ids or 'isrc' not in external_ids: - msg = f"⚠ The track '{track_json.get('name', 'Unknown Track')}' has no ISRC and can't be converted to Deezer link :( ⚠" - logger.warning(msg) - raise TrackNotFound(url=link_track, message=msg) - - def _sim(a: str, b: str) -> float: - a = (a or '').strip().lower() - b = (b or '').strip().lower() - if not a or not b: - return 0.0 - return SequenceMatcher(None, a, b).ratio() - + if not track_json: + raise TrackNotFound(url=link_track, message="Spotify track metadata fetch failed.") + external_ids = track_json.get('external_ids') or {} + spo_isrc = (external_ids.get('isrc') or '').upper() spo_title = track_json.get('name', '') spo_album_title = (track_json.get('album') or {}).get('name', '') spo_tracknum = int(track_json.get('track_number') or 0) - spo_isrc = (external_ids.get('isrc') or '').upper() - - # 1) Primary attempt: /track/isrc:CODE then validate with strict checks + try: dz = API.get_track_json(f"isrc:{spo_isrc}") + if dz and dz.get('id'): + dz_json = dz + tn = (dz_json.get('track_position') or dz_json.get('track_number') or 0) + title_match = max( + _sim(spo_title, dz_json.get('title', '')), + _sim(spo_title, dz_json.get('title_short', '')), + ) + album_match = _sim(spo_album_title, (dz_json.get('album') or {}).get('title', '')) + if title_match >= 0.90 and album_match >= 0.90 and tn == spo_tracknum: + return f"https://www.deezer.com/track/{dz_json.get('id')}" except Exception: - dz = {} - - def _track_ok(dz_json: dict) -> bool: - if not dz_json or not dz_json.get('id'): - return False - title_match = max( - _sim(spo_title, dz_json.get('title', '')), - _sim(spo_title, dz_json.get('title_short', '')), - ) - album_match = _sim(spo_album_title, (dz_json.get('album') or {}).get('title', '')) - tn = int(dz_json.get('track_position') or dz_json.get('track_number') or 0) - return title_match >= 0.90 and album_match >= 0.90 and tn == spo_tracknum - - if _track_ok(dz): - deezer_id = dz['id'] - return f"https://www.deezer.com/track/{deezer_id}" - - # 2) Fallback: search tracks by "title album" and validate, minimizing extra calls + pass + + # Fallback: search by title + album query = f'"{spo_title} {spo_album_title}"' try: candidates = API.search_tracks_raw(query, limit=5) except Exception: candidates = [] - + for cand in candidates: - title_match = max( - _sim(spo_title, cand.get('title', '')), - _sim(spo_title, cand.get('title_short', '')), - ) - album_match = _sim(spo_album_title, (cand.get('album') or {}).get('title', '')) - if title_match < 0.90 or album_match < 0.90: + if max(_sim(spo_title, cand.get('title', '')), _sim(spo_title, cand.get('title_short', ''))) < 0.90: continue c_id = cand.get('id') if not c_id: continue - # Fetch details only for promising candidates to check track number and ISRC try: dzc = API.get_track_json(str(c_id)) except Exception: continue - tn = int(dzc.get('track_position') or dzc.get('track_number') or 0) + # Validate using track number and ISRC to be safe + tn = (dzc.get('track_position') or dzc.get('track_number') or 0) if tn != spo_tracknum: continue - if (dzc.get('isrc') or '').upper() != spo_isrc: + t_isrc = (dzc.get('isrc') or '').upper() + if spo_isrc and t_isrc and t_isrc != spo_isrc: continue - return f"https://www.deezer.com/track/{dzc['id']}" - - # 3) Last resort: search albums by album title; inspect tracks to find exact track number - try: - album_candidates = API.search_albums_raw(f'"{spo_album_title}"', limit=5) - except Exception: - album_candidates = [] - - for alb in album_candidates: - if _sim(spo_album_title, alb.get('title', '')) < 0.90: - continue - alb_id = alb.get('id') - if not alb_id: - continue - try: - # Use standardized album object to get detailed tracks (includes ISRC in IDs) - full_album = API.get_album(alb_id) - except Exception: - continue - # full_album.tracks is a list of trackAlbumObject with ids.isrc and track_number - for t in getattr(full_album, 'tracks', []) or []: - try: - t_title = getattr(t, 'title', '') - t_num = int(getattr(t, 'track_number', 0)) - t_isrc = (getattr(getattr(t, 'ids', None), 'isrc', '') or '').upper() - except Exception: - continue - if t_num != spo_tracknum: - continue - if _sim(spo_title, t_title) < 0.90: - continue - if t_isrc != spo_isrc: - continue - return f"https://www.deezer.com/track/{getattr(getattr(t, 'ids', None), 'deezer', '')}" - + return f"https://www.deezer.com/track/{c_id}" + raise TrackNotFound(url=link_track, message=f"Failed to find Deezer equivalent for ISRC {spo_isrc} from Spotify track {link_track}") def convert_isrc_to_dee_link_track(self, isrc_code: str) -> str: @@ -526,36 +489,27 @@ class DeeLogin: def convert_spoty_to_dee_link_album(self, link_album): link_is_valid(link_album) ids = get_ids(link_album) - link_dee = None - + spotify_album_data = Spo.get_album(ids) - - def _sim(a: str, b: str) -> float: - a = (a or '').strip().lower() - b = (b or '').strip().lower() - if not a or not b: - return 0.0 - return SequenceMatcher(None, a, b).ratio() - + if not spotify_album_data: + raise AlbumNotFound(f"Failed to fetch Spotify album metadata for {link_album}") + spo_album_title = spotify_album_data.get('name', '') - # Prefer main artist name for better search, if available spo_artists = spotify_album_data.get('artists') or [] spo_main_artist = (spo_artists[0].get('name') if spo_artists else '') or '' external_ids = spotify_album_data.get('external_ids') or {} spo_upc = str(external_ids.get('upc') or '').strip() - - # 1) Primary attempt: /album/upc:CODE then validate title similarity - dz_album = {} + + # Try UPC first if spo_upc: try: dz_album = API.get_album_json(f"upc:{spo_upc}") + if dz_album.get('id') and _sim(spo_album_title, dz_album.get('title', '')) >= 0.90: + return f"https://www.deezer.com/album/{dz_album.get('id')}" except Exception: - dz_album = {} - if dz_album.get('id') and _sim(spo_album_title, dz_album.get('title', '')) >= 0.90: - link_dee = f"https://www.deezer.com/album/{dz_album['id']}" - return link_dee - - # 2) Fallback: search albums by album title (+ main artist) and confirm UPC + pass + + # Fallback: title search q = f'"{spo_album_title}" {spo_main_artist}'.strip() try: candidates = API.search_albums_raw(q, limit=5) @@ -598,11 +552,26 @@ class DeeLogin: save_cover=stock_save_cover, market=stock_market, playlist_context=None, - artist_separator: str = "; " + artist_separator: str = "; ", + spotify_metadata: bool = False ) -> Track: link_dee = self.convert_spoty_to_dee_link_track(link_track) + # If requested, prepare Spotify track object for tagging in preferences via playlist_context + if spotify_metadata: + try: + from deezspot.spotloader.__spo_api__ import tracking as spo_tracking + spo_ids = get_ids(link_track) + spo_track_obj = spo_tracking(spo_ids) + if spo_track_obj: + if playlist_context is None: + playlist_context = {} + playlist_context = dict(playlist_context) + playlist_context['spotify_track_obj'] = spo_track_obj + except Exception: + pass + track = self.download_trackdee( link_dee, output_dir=output_dir, @@ -621,7 +590,8 @@ class DeeLogin: save_cover=save_cover, market=market, playlist_context=playlist_context, - artist_separator=artist_separator + artist_separator=artist_separator, + spotify_metadata=spotify_metadata ) return track @@ -689,7 +659,8 @@ class DeeLogin: bitrate=None, save_cover=stock_save_cover, market=stock_market, - artist_separator: str = "; " + artist_separator: str = "; ", + spotify_metadata: bool = False ) -> Playlist: link_is_valid(link_playlist) @@ -818,12 +789,19 @@ class DeeLogin: continue try: - playlist_context = { + playlist_ctx = { 'json_data': playlist_json, 'track_number': index, 'total_tracks': total_tracks, 'spotify_url': link_track } + # Attach Spotify track object for tagging if requested + if spotify_metadata: + try: + from deezspot.spotloader.__spo_api__ import json_to_track_playlist_object + playlist_ctx['spotify_track_obj'] = json_to_track_playlist_object(track_info) + except Exception: + pass downloaded_track = self.download_trackspo( link_track, output_dir=output_dir, quality_download=quality_download, @@ -832,35 +810,27 @@ class DeeLogin: custom_track_format=custom_track_format, pad_tracks=pad_tracks, initial_retry_delay=initial_retry_delay, retry_delay_increase=retry_delay_increase, max_retries=max_retries, convert_to=convert_to, bitrate=bitrate, - save_cover=save_cover, market=market, playlist_context=playlist_context, - artist_separator=artist_separator + save_cover=save_cover, market=market, playlist_context=playlist_ctx, + artist_separator=artist_separator, spotify_metadata=spotify_metadata ) tracks.append(downloaded_track) # After download, check status for summary - track_obj_for_cb = trackCbObject(title=track_name, artists=[artistTrackObject(name=artist_name)]) if getattr(downloaded_track, 'was_skipped', False): - skipped_tracks_cb.append(track_obj_for_cb) + skipped_tracks_cb.append(playlist_obj.tracks[index-1]) elif downloaded_track.success: - successful_tracks_cb.append(track_obj_for_cb) + successful_tracks_cb.append(playlist_obj.tracks[index-1]) else: - failed_tracks_cb.append(failedTrackObject( - track=track_obj_for_cb, - reason=getattr(downloaded_track, 'error_message', 'Unknown reason') - )) + failed_tracks_cb.append(failedTrackObject(track=playlist_obj.tracks[index-1], reason=getattr(downloaded_track, 'error_message', 'Unknown reason'))) + except Exception as e: + logger.error(f"Track '{track_name}' in playlist '{playlist_obj.title}' failed: {e}") + failed_tracks_cb.append(failedTrackObject(track=playlist_obj.tracks[index-1], reason=str(e))) + current_track_object = Track({'music': track_name, 'artist': artist_name}, None, None, None, link_track, None) + current_track_object.success = False + current_track_object.error_message = str(e) + tracks.append(current_track_object) - except (TrackNotFound, NoDataApi) as e: - logger.error(f"Failed to download track: {track_name} - {artist_name}: {e}") - failed_track_obj = trackCbObject(title=track_name, artists=[artistTrackObject(name=artist_name)]) - failed_tracks_cb.append(failedTrackObject(track=failed_track_obj, reason=str(e))) - # Create a placeholder for the failed item - failed_track = Track( - tags={'name': track_name, 'artist': artist_name}, - song_path=None, file_format=None, quality=None, link=link_track, ids=None - ) - failed_track.success = False - failed_track.error_message = str(e) - tracks.append(failed_track) + # Finalize summary and callbacks (existing logic continues below in file)... total_from_spotify = playlist_json['tracks']['total'] processed_count = len(successful_tracks_cb) + len(skipped_tracks_cb) + len(failed_tracks_cb) diff --git a/deezspot/models/download/preferences.py b/deezspot/models/download/preferences.py index de13682..3e7145a 100644 --- a/deezspot/models/download/preferences.py +++ b/deezspot/models/download/preferences.py @@ -22,4 +22,8 @@ class Preferences: self.max_retries = 5 # Default maximum number of retries per track self.save_cover: bool = False # Option to save a cover.jpg image # New: artist separator for joining multiple artists or album artists - self.artist_separator: str = "; " \ No newline at end of file + self.artist_separator: str = "; " + # New: when True, use Spotify metadata for tagging in spo flows + self.spotify_metadata: bool = False + # New: optional Spotify trackObject to use when spotify_metadata is True + self.spotify_track_obj = None \ No newline at end of file