From 2057c9c7e8d88e148ab91bf60e4b1c6386568640 Mon Sep 17 00:00:00 2001 From: Xoconoch Date: Fri, 1 Aug 2025 15:57:51 -0600 Subject: [PATCH] Unified a bunch of download logic, fixed m3u not registering file conversion --- deezspot/deezloader/__download__.py | 337 +++++++-------------- deezspot/deezloader/__init__.py | 12 +- deezspot/libutils/metadata_converter.py | 278 +++++++++++++++++ deezspot/libutils/progress_reporter.py | 369 +++++++++++++++++++++++ deezspot/libutils/skip_detection.py | 4 + deezspot/libutils/taggers.py | 321 ++++++++++++++++++++ deezspot/libutils/write_m3u.py | 101 +++++++ deezspot/spotloader/__download__.py | 377 ++++++------------------ 8 files changed, 1272 insertions(+), 527 deletions(-) create mode 100644 deezspot/libutils/metadata_converter.py create mode 100644 deezspot/libutils/progress_reporter.py create mode 100644 deezspot/libutils/taggers.py create mode 100644 deezspot/libutils/write_m3u.py diff --git a/deezspot/deezloader/__download__.py b/deezspot/deezloader/__download__.py index fc18619..cc8294c 100644 --- a/deezspot/deezloader/__download__.py +++ b/deezspot/deezloader/__download__.py @@ -37,6 +37,17 @@ from deezspot.libutils.utils import ( save_cover_image, __get_dir as get_album_directory, ) +from deezspot.libutils.write_m3u import create_m3u_file, append_track_to_m3u +from deezspot.libutils.metadata_converter import track_object_to_dict, album_object_to_dict +from deezspot.libutils.progress_reporter import ( + report_track_initializing, report_track_skipped, report_track_retrying, + report_track_realtime_progress, report_track_error, report_track_done, + report_album_initializing, report_album_done, report_playlist_initializing, report_playlist_done +) +from deezspot.libutils.taggers import ( + enhance_metadata_with_image, add_deezer_enhanced_metadata, process_and_tag_track, + save_cover_image_for_track +) from mutagen.flac import FLAC from mutagen.mp3 import MP3 from mutagen.id3 import ID3 @@ -65,76 +76,21 @@ from deezspot.models.callback.playlist import playlistObject as playlistCbObject from deezspot.models.callback.common import IDs from deezspot.models.callback.user import userObject +# Use unified metadata converter def _track_object_to_dict(track_obj: trackCbObject) -> dict: """ Convert a track object to a dictionary format for tagging. Similar to spotloader's approach for consistent metadata handling. """ - if not track_obj: - return {} - - tags = {} - - # Track details - tags['music'] = track_obj.title - tags['tracknum'] = track_obj.track_number if track_obj.track_number is not None else 0 - tags['discnum'] = track_obj.disc_number if track_obj.disc_number is not None else 1 - tags['duration'] = track_obj.duration_ms // 1000 if track_obj.duration_ms else 0 - tags['explicit'] = track_obj.explicit - if track_obj.ids: - tags['ids'] = track_obj.ids.deezer - tags['isrc'] = track_obj.ids.isrc - - tags['artist'] = "; ".join([artist.name for artist in track_obj.artists]) - - # Album details - if track_obj.album: - album = track_obj.album - tags['album'] = album.title - tags['ar_album'] = "; ".join([artist.name for artist in album.artists]) - tags['nb_tracks'] = album.total_tracks - if album.release_date: - tags['year'] = album.release_date.get('year', 0) - - if album.ids: - tags['upc'] = album.ids.upc - tags['album_id'] = album.ids.deezer - - tags['genre'] = "; ".join(album.genres) if album.genres else "" - - # Extract image information if available - if hasattr(album, 'images') and album.images: - tags['image'] = album.images - - return tags + return track_object_to_dict(track_obj, source_type='deezer') +# Use unified metadata converter def _album_object_to_dict(album_obj: albumCbObject) -> dict: """ Convert an album object to a dictionary format for tagging. Similar to spotloader's approach for consistent metadata handling. """ - if not album_obj: - return {} - - tags = {} - - # Album details - tags['album'] = album_obj.title - tags['ar_album'] = "; ".join([artist.name for artist in album_obj.artists]) - tags['nb_tracks'] = album_obj.total_tracks - if album_obj.release_date: - tags['year'] = album_obj.release_date.get('year', 0) - - if album_obj.ids: - tags['upc'] = album_obj.ids.upc - tags['album_id'] = album_obj.ids.deezer - - if hasattr(album_obj, 'images') and album_obj.images: - tags['image'] = album_obj.images - - tags['genre'] = "; ".join(album_obj.genres) if album_obj.genres else "" - - return tags + return album_object_to_dict(album_obj, source_type='deezer') class Download_JOB: progress_reporter = None @@ -367,18 +323,9 @@ class EASY_DW: expected by legacy tagging and path functions. It intelligently finds the album information based on the download context. """ - # Use the new global helper function for basic conversion - metadata_dict = _track_object_to_dict(track_obj) + # Use the unified metadata converter + metadata_dict = track_object_to_dict(track_obj, source_type='deezer') - # Add Deezer-specific metadata handling that isn't in the basic helper - - # For full album downloads, use complete album data from preferences if available - if self.__parent == 'album' and hasattr(self.__preferences, 'json_data'): - album_data_source = self.__preferences.json_data - # Update album-related fields with more complete information - if hasattr(album_data_source, 'label'): - metadata_dict['label'] = album_data_source.label - # Check for track_position and disk_number in the original API data # These might be directly available in the infos_dw dictionary for Deezer tracks if self.__infos_dw: @@ -386,13 +333,6 @@ class EASY_DW: metadata_dict['tracknum'] = self.__infos_dw['track_position'] if 'disk_number' in self.__infos_dw: metadata_dict['discnum'] = self.__infos_dw['disk_number'] - - # Check for contributors from original API data - if 'contributors' in self.__infos_dw: - # Filter for main artists only - main_artists = [c['name'] for c in self.__infos_dw['contributors'] if c.get('role') == 'Main'] - if main_artists: - metadata_dict['album_artist'] = "; ".join(main_artists) return metadata_dict @@ -454,12 +394,16 @@ class EASY_DW: self.__c_episode.set_fallback_ids(self.__fallback_ids) def easy_dw(self) -> Track: + # Get image URL and enhance metadata if self.__infos_dw.get('__TYPE__') == 'episode': pic = self.__infos_dw.get('EPISODE_IMAGE_MD5', '') else: pic = self.__infos_dw['ALB_PICTURE'] image = API.choose_img(pic) self.__song_metadata['image'] = image + + # Process image data using unified utility + self.__song_metadata = enhance_metadata_with_image(self.__song_metadata) song = f"{self.__song_metadata['music']} - {self.__song_metadata['artist']}" # Check if track already exists based on metadata @@ -489,32 +433,15 @@ class EASY_DW: parent_obj, current_track_val, total_tracks_val = self._get_parent_context() - status_obj = skippedObject( - ids=self.__track_obj.ids, + # Report track skipped status + report_track_skipped( + track_obj=self.__track_obj, reason=f"Track already exists in desired format at {existing_file_path}", - convert_to=self.__convert_to, - bitrate=self.__bitrate - ) - - callback_obj = trackCallbackObject( - track=self.__track_obj, - status_info=status_obj, - parent=parent_obj, + preferences=self.__preferences, + parent_obj=parent_obj, current_track=current_track_val, total_tracks=total_tracks_val ) - - if self.__parent is None: - summary = summaryObject( - skipped_tracks=[self.__track_obj], - total_skipped=1 - ) - status_obj.summary = summary - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj - ) skipped_item = Track( self.__song_metadata, @@ -620,11 +547,23 @@ class EASY_DW: current_item.error_message = final_error_msg raise TrackNotFound(message=final_error_msg, url=current_link_attr) - # If we reach here, the item should be successful and not skipped. + # If we reach here, the item should be successful and not skipped. if current_item.success: if self.__infos_dw.get('__TYPE__') != 'episode': # Assuming pic is for tracks - current_item.md5_image = pic # Set md5_image for tracks - write_tags(current_item) + current_item.md5_image = pic # Set md5_image for tracks + # Apply tags using unified utility with Deezer enhancements + from deezspot.deezloader.dee_api import API_GW + enhanced_metadata = add_deezer_enhanced_metadata( + self.__song_metadata, + self.__infos_dw, + self.__ids, + API_GW + ) + process_and_tag_track( + track=current_item, + metadata_dict=enhanced_metadata, + source_type='deezer' + ) return current_item @@ -710,20 +649,14 @@ class EASY_DW: try: self.__write_track() - status_obj = initializingObject(ids=self.__track_obj.ids) - - callback_obj = trackCallbackObject( - track=self.__track_obj, - status_info=status_obj, - parent=parent_obj, + # Report track initialization status + report_track_initializing( + track_obj=self.__track_obj, + preferences=self.__preferences, + parent_obj=parent_obj, current_track=current_track_val, total_tracks=total_tracks_val ) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj - ) register_active_download(self.__song_path) try: @@ -761,16 +694,22 @@ class EASY_DW: raise TrackNotFound(f"Failed to process {self.__song_path}. Error: {str(e_decrypt)}") from e_decrypt - self.__add_more_tags() - self.__c_track.tags = self.__song_metadata - - if self.__preferences.save_cover and self.__song_metadata.get('image'): - try: - track_directory = os.path.dirname(self.__song_path) - save_cover_image(self.__song_metadata['image'], track_directory, "cover.jpg") - logger.info(f"Saved cover image for track in {track_directory}") - except Exception as e_img_save: - logger.warning(f"Failed to save cover image for track: {e_img_save}") + # Add Deezer-specific enhanced metadata and apply tags + from deezspot.deezloader.dee_api import API_GW + enhanced_metadata = add_deezer_enhanced_metadata( + self.__song_metadata, + self.__infos_dw, + self.__ids, + API_GW + ) + + # Apply tags using unified utility + process_and_tag_track( + track=self.__c_track, + metadata_dict=enhanced_metadata, + source_type='deezer', + save_cover=getattr(self.__preferences, 'save_cover', False) + ) if self.__convert_to: format_name, bitrate = self._parse_format_string(self.__convert_to) @@ -794,8 +733,19 @@ class EASY_DW: logger.error(f"Audio conversion error: {str(conv_error)}. Proceeding with original format.") register_active_download(path_before_conversion) - - write_tags(self.__c_track) + # Apply tags using unified utility with Deezer enhancements + from deezspot.deezloader.dee_api import API_GW + enhanced_metadata = add_deezer_enhanced_metadata( + self.__song_metadata, + self.__infos_dw, + self.__ids, + API_GW + ) + process_and_tag_track( + track=self.__c_track, + metadata_dict=enhanced_metadata, + source_type='deezer' + ) self.__c_track.success = True unregister_active_download(self.__song_path) @@ -896,7 +846,19 @@ class EASY_DW: self.__c_track.success = True self.__write_episode() - write_tags(self.__c_track) + # Apply tags using unified utility with Deezer enhancements + from deezspot.deezloader.dee_api import API_GW + enhanced_metadata = add_deezer_enhanced_metadata( + self.__song_metadata, + self.__infos_dw, + self.__ids, + API_GW + ) + process_and_tag_track( + track=self.__c_track, + metadata_dict=enhanced_metadata, + source_type='deezer' + ) return self.__c_track @@ -954,49 +916,7 @@ class EASY_DW: return format_name, bitrate - def __add_more_tags(self) -> None: - """ - Add Deezer-specific metadata to the song metadata dictionary. - This preserves Deezer's unique features like lyrics and contributors. - """ - contributors = self.__infos_dw.get('SNG_CONTRIBUTORS', {}) - - # Add contributor information - self.__song_metadata['author'] = "; ".join(contributors.get('author', [])) - self.__song_metadata['composer'] = "; ".join(contributors.get('composer', [])) - self.__song_metadata['lyricist'] = "; ".join(contributors.get('lyricist', [])) - - if contributors.get('composerlyricist'): - self.__song_metadata['composer'] = "; ".join(contributors.get('composerlyricist', [])) - - # Add version information if available - self.__song_metadata['version'] = self.__infos_dw.get('VERSION', '') - - # Initialize lyric fields - self.__song_metadata['lyric'] = "" - self.__song_metadata['copyright'] = "" - self.__song_metadata['lyric_sync'] = [] - - # Add lyrics if available - if self.__infos_dw.get('LYRICS_ID', 0) != 0: - try: - lyrics_data = API_GW.get_lyric(self.__ids) - - if lyrics_data and "LYRICS_TEXT" in lyrics_data: - self.__song_metadata['lyric'] = lyrics_data["LYRICS_TEXT"] - - if lyrics_data and "LYRICS_SYNC_JSON" in lyrics_data: - self.__song_metadata['lyric_sync'] = trasform_sync_lyric( - lyrics_data['LYRICS_SYNC_JSON'] - ) - except Exception as e: - logger.warning(f"Failed to retrieve lyrics: {str(e)}") - - # Extract album artist from contributors with 'Main' role - if 'contributors' in self.__infos_dw: - main_artists = [c['name'] for c in self.__infos_dw['contributors'] if c.get('role') == 'Main'] - if main_artists: - self.__song_metadata['album_artist'] = "; ".join(main_artists) + # Removed __add_more_tags() - now handled by unified libutils/taggers.py class DW_TRACK: def __init__( @@ -1030,8 +950,8 @@ class DW_TRACK: class DW_ALBUM: def _album_object_to_dict(self, album_obj: albumCbObject) -> dict: """Converts an albumObject to a dictionary for tagging and path generation.""" - # Use the new global helper function - return _album_object_to_dict(album_obj) + # Use the unified metadata converter + return album_object_to_dict(album_obj, source_type='deezer') def _track_object_to_dict(self, track_obj: any, album_obj: albumCbObject) -> dict: """Converts a track object to a dictionary with album context.""" @@ -1050,24 +970,11 @@ class DW_ALBUM: album=album_obj, # Use the parent album genres=getattr(track_obj, 'genres', []) ) - # Now use the new track object with the global helper - track_dict = _track_object_to_dict(full_track) + # Use the unified metadata converter + return track_object_to_dict(full_track, source_type='deezer') else: - # Use the global helper function with additional album context - track_dict = _track_object_to_dict(track_obj) - - # Add album-specific context that might not be in the track object - if album_obj: - track_dict['album'] = album_obj.title - track_dict['album_artist'] = "; ".join([artist.name for artist in album_obj.artists]) - track_dict['upc'] = album_obj.ids.upc if album_obj.ids else None - track_dict['genre'] = "; ".join(album_obj.genres) - - # Preserve ISRC if available in the track object - if track_obj.ids and track_obj.ids.isrc and not track_dict.get('isrc'): - track_dict['isrc'] = track_obj.ids.isrc - - return track_dict + # Use the unified metadata converter + return track_object_to_dict(track_obj, source_type='deezer') def __init__( self, @@ -1087,13 +994,8 @@ class DW_ALBUM: def dw(self) -> Album: # Report album initializing status album_obj = self.__preferences.json_data - status_obj = initializingObject(ids=album_obj.ids) - callback_obj = albumCallbackObject(album=album_obj, status_info=status_obj) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj - ) + # Report album initialization status + report_album_initializing(album_obj) infos_dw = API_GW.get_album_data(self.__ids)['data'] md5_image = infos_dw[0]['ALB_PICTURE'] @@ -1127,7 +1029,6 @@ class DW_ALBUM: total_tracks = len(infos_dw) for a, album_track_obj in enumerate(album_obj.tracks): - track_number = a + 1 c_infos_dw_item = infos_dw[a] # Update track object with proper track position and disc number from API @@ -1138,7 +1039,7 @@ class DW_ALBUM: # Ensure we have valid values, not None if album_track_obj.track_number is None: - album_track_obj.track_number = track_number + album_track_obj.track_number = a + 1 # Fallback to sequential if API doesn't provide if album_track_obj.disc_number is None: album_track_obj.disc_number = 1 @@ -1162,7 +1063,7 @@ class DW_ALBUM: c_preferences.song_metadata = full_track_obj c_preferences.ids = full_track_obj.ids.deezer - c_preferences.track_number = track_number + c_preferences.track_number = a + 1 # For progress reporting only c_preferences.total_tracks = total_tracks c_preferences.link = f"https://deezer.com/track/{c_preferences.ids}" @@ -1214,12 +1115,8 @@ class DW_ALBUM: total_failed=len(failed_tracks_cb) ) - status_obj_done = doneObject(ids=album_obj.ids, summary=summary_obj) - callback_obj_done = albumCallbackObject(album=album_obj, status_info=status_obj_done) - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj_done - ) + # Report album completion status + report_album_done(album_obj, summary_obj) return album @@ -1238,22 +1135,8 @@ class DW_PLAYLIST: self.__quality_download = self.__preferences.quality_download def _track_object_to_dict(self, track_obj: any) -> dict: - return { - "music": track_obj.title, - "artist": "; ".join([artist.name for artist in track_obj.artists]), - "album": track_obj.album.title, - "tracknum": 0, - "discnum": 0, - "duration": track_obj.duration_ms // 1000 if track_obj.duration_ms else 0, - "year": 0, - "explicit": False, - "isrc": track_obj.ids.isrc if hasattr(track_obj.ids, 'isrc') else None, - "album_artist": "", - "upc": None, - "label": "", - "genre": "", - "image": None, - } + # Use the unified metadata converter + return track_object_to_dict(track_obj, source_type='deezer') def dw(self) -> Playlist: playlist_obj: playlistCbObject = self.__preferences.json_data @@ -1271,12 +1154,7 @@ class DW_PLAYLIST: playlist = Playlist() tracks = playlist.tracks - playlist_m3u_dir = os.path.join(self.__output_dir, "playlists") - os.makedirs(playlist_m3u_dir, exist_ok=True) - m3u_path = os.path.join(playlist_m3u_dir, f"{playlist_name_sanitized}.m3u") - if not os.path.exists(m3u_path): - with open(m3u_path, "w", encoding="utf-8") as m3u_file: - m3u_file.write("#EXTM3U\n") + m3u_path = create_m3u_file(self.__output_dir, playlist_obj.title) medias = Download_JOB.check_sources(infos_dw, self.__quality_download) @@ -1341,12 +1219,7 @@ class DW_PLAYLIST: if current_track_object: tracks.append(current_track_object) if current_track_object.success and hasattr(current_track_object, 'song_path') and current_track_object.song_path: - relative_song_path = os.path.relpath( - current_track_object.song_path, - start=playlist_m3u_dir - ) - with open(m3u_path, "a", encoding="utf-8") as m3u_file: - m3u_file.write(f"{relative_song_path}\n") + append_track_to_m3u(m3u_path, current_track_object.song_path) if self.__make_zip: zip_name = f"{self.__output_dir}/{playlist_obj.title} [playlist {self.__ids}]" diff --git a/deezspot/deezloader/__init__.py b/deezspot/deezloader/__init__.py index 8338b78..256409b 100644 --- a/deezspot/deezloader/__init__.py +++ b/deezspot/deezloader/__init__.py @@ -796,16 +796,8 @@ class DeeLogin: callback_obj_done = playlistCallbackObject(playlist=playlist_obj, status_info=status_obj_done) report_progress(reporter=self.progress_reporter, callback_obj=callback_obj_done) - playlist_m3u_dir = os.path.join(output_dir, "playlists") - os.makedirs(playlist_m3u_dir, exist_ok=True) - m3u_path = os.path.join(playlist_m3u_dir, f"{sanitize_name(playlist_obj.title)}.m3u") - with open(m3u_path, "w", encoding="utf-8") as m3u_file: - m3u_file.write("#EXTM3U\n") - for track in tracks: - if isinstance(track, Track) and track.success and hasattr(track, 'song_path') and track.song_path: - relative_song_path = os.path.relpath(track.song_path, start=playlist_m3u_dir) - m3u_file.write(f"{relative_song_path}\n") - logger.info(f"Created m3u playlist file at: {m3u_path}") + from deezspot.libutils.write_m3u import write_tracks_to_m3u + m3u_path = write_tracks_to_m3u(output_dir, playlist_obj.title, tracks) if make_zip: zip_name = f"{output_dir}/playlist_{sanitize_name(playlist_obj.title)}.zip" diff --git a/deezspot/libutils/metadata_converter.py b/deezspot/libutils/metadata_converter.py new file mode 100644 index 0000000..5a2ebaf --- /dev/null +++ b/deezspot/libutils/metadata_converter.py @@ -0,0 +1,278 @@ +#!/usr/bin/python3 + +from datetime import datetime +from typing import Dict, Any, Optional, Union + + +def _detect_source_type(obj: Any) -> str: + """ + Auto-detect whether an object is from Spotify or Deezer based on its IDs. + + Args: + obj: Track or album object with ids attribute + + Returns: + str: 'spotify' or 'deezer' + """ + if hasattr(obj, 'ids') and obj.ids: + if hasattr(obj.ids, 'spotify') and obj.ids.spotify: + return 'spotify' + elif hasattr(obj.ids, 'deezer') and obj.ids.deezer: + return 'deezer' + return 'spotify' # Default fallback + + +def _get_platform_id(ids_obj: Any, source_type: str) -> Optional[str]: + """ + Get the appropriate platform ID based on source type. + + Args: + ids_obj: IDs object containing platform-specific IDs + source_type: 'spotify' or 'deezer' + + Returns: + Platform-specific ID or None + """ + if not ids_obj: + return None + + if source_type == 'spotify': + return getattr(ids_obj, 'spotify', None) + else: # deezer + return getattr(ids_obj, 'deezer', None) + + +def _format_release_date(release_date: Any, source_type: str) -> Optional[datetime]: + """ + Format release date based on source type and data structure. + + Args: + release_date: Release date object/dict from API + source_type: 'spotify' or 'deezer' + + Returns: + datetime object or None + """ + if not release_date: + return None + + try: + if source_type == 'spotify' and isinstance(release_date, dict): + # Spotify format: create datetime object + if 'year' in release_date: + return datetime( + year=release_date['year'], + month=release_date.get('month', 1), + day=release_date.get('day', 1) + ) + elif source_type == 'deezer': + # Deezer format: extract year only + if isinstance(release_date, dict) and 'year' in release_date: + return datetime(year=release_date['year'], month=1, day=1) + elif hasattr(release_date, 'year'): + return datetime(year=release_date.year, month=1, day=1) + + except (ValueError, TypeError, AttributeError): + pass + + return None + + +def _get_best_image_url(images: Any, source_type: str) -> Optional[str]: + """ + Get the best image URL based on source type and format. + + Args: + images: Images data from API + source_type: 'spotify' or 'deezer' + + Returns: + Best image URL or None + """ + if not images: + return None + + if source_type == 'spotify' and isinstance(images, list): + # Spotify: find highest resolution image + best_image = max(images, key=lambda i: i.get('height', 0) * i.get('width', 0), default=None) + return best_image.get('url') if best_image else None + elif source_type == 'deezer': + # Deezer: images might be direct URL or object + if isinstance(images, str): + return images + elif isinstance(images, list) and images: + return images[0] if isinstance(images[0], str) else None + elif hasattr(images, 'url'): + return images.url + + return None + + +def track_object_to_dict(track_obj: Any, source_type: Optional[str] = None) -> Dict[str, Any]: + """ + Convert a track object to a dictionary format for tagging. + Supports both Spotify and Deezer track objects. + + Args: + track_obj: Track object from Spotify or Deezer API + source_type: Optional source type ('spotify' or 'deezer'). If None, auto-detected. + + Returns: + Dictionary with standardized metadata tags + """ + if not track_obj: + return {} + + # Auto-detect source if not specified + if source_type is None: + source_type = _detect_source_type(track_obj) + + tags = {} + + # Basic track details + tags['music'] = getattr(track_obj, 'title', '') + tags['tracknum'] = getattr(track_obj, 'track_number', 0) if getattr(track_obj, 'track_number', None) is not None else 0 + tags['discnum'] = getattr(track_obj, 'disc_number', 1) if getattr(track_obj, 'disc_number', None) is not None else 1 + tags['duration'] = (getattr(track_obj, 'duration_ms', 0) // 1000) if getattr(track_obj, 'duration_ms', None) else 0 + + # Platform-specific IDs + if hasattr(track_obj, 'ids') and track_obj.ids: + tags['ids'] = _get_platform_id(track_obj.ids, source_type) + tags['isrc'] = getattr(track_obj.ids, 'isrc', None) + + # Artist information + if hasattr(track_obj, 'artists') and track_obj.artists: + tags['artist'] = "; ".join([getattr(artist, 'name', '') for artist in track_obj.artists]) + else: + tags['artist'] = '' + + # Explicit flag (mainly for Deezer) + if hasattr(track_obj, 'explicit'): + tags['explicit'] = track_obj.explicit + + # Album details + if hasattr(track_obj, 'album') and track_obj.album: + album = track_obj.album + tags['album'] = getattr(album, 'title', '') + + # Album artists + if hasattr(album, 'artists') and album.artists: + tags['ar_album'] = "; ".join([getattr(artist, 'name', '') for artist in album.artists]) + else: + tags['ar_album'] = '' + + tags['nb_tracks'] = getattr(album, 'total_tracks', 0) + + # Calculate total discs from all tracks in album for proper metadata + if hasattr(album, 'tracks') and album.tracks: + disc_numbers = [getattr(track, 'disc_number', 1) for track in album.tracks if hasattr(track, 'disc_number')] + tags['nb_discs'] = max(disc_numbers, default=1) + else: + tags['nb_discs'] = 1 + + # Release date handling + release_date = getattr(album, 'release_date', None) + tags['year'] = _format_release_date(release_date, source_type) + + # Platform-specific album IDs + if hasattr(album, 'ids') and album.ids: + tags['upc'] = getattr(album.ids, 'upc', None) + tags['album_id'] = _get_platform_id(album.ids, source_type) + + # Image handling + if hasattr(album, 'images'): + tags['image'] = _get_best_image_url(album.images, source_type) + else: + tags['image'] = None + + # Additional album metadata + tags['label'] = getattr(album, 'label', '') + tags['copyright'] = getattr(album, 'copyright', '') + + # Genre handling (more common in Deezer) + if hasattr(album, 'genres') and album.genres: + tags['genre'] = "; ".join(album.genres) if isinstance(album.genres, list) else str(album.genres) + else: + tags['genre'] = "" + + # Default/Placeholder values for compatibility + tags['bpm'] = tags.get('bpm', 'Unknown') + tags['gain'] = tags.get('gain', 'Unknown') + tags['lyric'] = tags.get('lyric', '') + tags['author'] = tags.get('author', '') + tags['composer'] = tags.get('composer', '') + tags['lyricist'] = tags.get('lyricist', '') + tags['version'] = tags.get('version', '') + + return tags + + +def album_object_to_dict(album_obj: Any, source_type: Optional[str] = None) -> Dict[str, Any]: + """ + Convert an album object to a dictionary format for tagging. + Supports both Spotify and Deezer album objects. + + Args: + album_obj: Album object from Spotify or Deezer API + source_type: Optional source type ('spotify' or 'deezer'). If None, auto-detected. + + Returns: + Dictionary with standardized metadata tags + """ + if not album_obj: + return {} + + # Auto-detect source if not specified + if source_type is None: + source_type = _detect_source_type(album_obj) + + tags = {} + + # Basic album details + tags['album'] = getattr(album_obj, 'title', '') + + # Album artists + if hasattr(album_obj, 'artists') and album_obj.artists: + tags['ar_album'] = "; ".join([getattr(artist, 'name', '') for artist in album_obj.artists]) + else: + tags['ar_album'] = '' + + tags['nb_tracks'] = getattr(album_obj, 'total_tracks', 0) + + # Release date handling + release_date = getattr(album_obj, 'release_date', None) + tags['year'] = _format_release_date(release_date, source_type) + + # Platform-specific album IDs + if hasattr(album_obj, 'ids') and album_obj.ids: + tags['upc'] = getattr(album_obj.ids, 'upc', None) + tags['album_id'] = _get_platform_id(album_obj.ids, source_type) + + # Image handling + if hasattr(album_obj, 'images'): + tags['image'] = _get_best_image_url(album_obj.images, source_type) + else: + tags['image'] = None + + # Additional metadata + tags['label'] = getattr(album_obj, 'label', '') + tags['copyright'] = getattr(album_obj, 'copyright', '') + + # Genre handling (more common in Deezer) + if hasattr(album_obj, 'genres') and album_obj.genres: + tags['genre'] = "; ".join(album_obj.genres) if isinstance(album_obj.genres, list) else str(album_obj.genres) + else: + tags['genre'] = "" + + return tags + + +# Backward compatibility aliases for easy migration +def _track_object_to_dict(track_obj: Any, source_type: Optional[str] = None) -> Dict[str, Any]: + """Legacy alias for track_object_to_dict""" + return track_object_to_dict(track_obj, source_type) + + +def _album_object_to_dict(album_obj: Any, source_type: Optional[str] = None) -> Dict[str, Any]: + """Legacy alias for album_object_to_dict""" + return album_object_to_dict(album_obj, source_type) \ No newline at end of file diff --git a/deezspot/libutils/progress_reporter.py b/deezspot/libutils/progress_reporter.py new file mode 100644 index 0000000..a10ecea --- /dev/null +++ b/deezspot/libutils/progress_reporter.py @@ -0,0 +1,369 @@ +#!/usr/bin/python3 + +from typing import Optional, Any, Dict, List +from deezspot.libutils.logging_utils import report_progress +from deezspot.models.callback import ( + trackCallbackObject, albumCallbackObject, playlistCallbackObject, + initializingObject, skippedObject, retryingObject, realTimeObject, + errorObject, doneObject, summaryObject +) + + +def _get_reporter(): + """Get the active progress reporter from Download_JOB""" + # Import here to avoid circular imports + try: + from deezspot.spotloader.__download__ import Download_JOB as SpotifyDJ + if hasattr(SpotifyDJ, 'progress_reporter') and SpotifyDJ.progress_reporter: + return SpotifyDJ.progress_reporter + except ImportError: + pass + + try: + from deezspot.deezloader.__download__ import Download_JOB as DeezerDJ + if hasattr(DeezerDJ, 'progress_reporter') and DeezerDJ.progress_reporter: + return DeezerDJ.progress_reporter + except ImportError: + pass + + return None + + +def report_track_initializing( + track_obj: Any, + preferences: Any, + parent_obj: Optional[Any] = None, + current_track: Optional[int] = None, + total_tracks: Optional[int] = None +) -> None: + """ + Report track initialization status. + + Args: + track_obj: Track object being initialized + preferences: Preferences object with convert_to/bitrate info + parent_obj: Parent object (album/playlist) if applicable + current_track: Current track number for progress + total_tracks: Total tracks for progress + """ + status_obj = initializingObject( + ids=getattr(track_obj, 'ids', None), + convert_to=getattr(preferences, 'convert_to', None), + bitrate=getattr(preferences, 'bitrate', None) + ) + + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, + current_track=current_track or getattr(preferences, 'track_number', None), + total_tracks=total_tracks, + parent=parent_obj + ) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_track_skipped( + track_obj: Any, + reason: str, + preferences: Any, + parent_obj: Optional[Any] = None, + current_track: Optional[int] = None, + total_tracks: Optional[int] = None +) -> None: + """ + Report track skipped status. + + Args: + track_obj: Track object being skipped + reason: Reason for skipping + preferences: Preferences object with convert_to/bitrate info + parent_obj: Parent object (album/playlist) if applicable + current_track: Current track number for progress + total_tracks: Total tracks for progress + """ + status_obj = skippedObject( + ids=getattr(track_obj, 'ids', None), + reason=reason, + convert_to=getattr(preferences, 'convert_to', None), + bitrate=getattr(preferences, 'bitrate', None) + ) + + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, + current_track=current_track or getattr(preferences, 'track_number', None), + total_tracks=total_tracks, + parent=parent_obj + ) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_track_retrying( + track_obj: Any, + retry_count: int, + seconds_left: int, + error: str, + preferences: Any, + parent_obj: Optional[Any] = None, + current_track: Optional[int] = None, + total_tracks: Optional[int] = None +) -> None: + """ + Report track retry status. + + Args: + track_obj: Track object being retried + retry_count: Current retry attempt number + seconds_left: Seconds until next retry + error: Error that caused the retry + preferences: Preferences object with convert_to/bitrate info + parent_obj: Parent object (album/playlist) if applicable + current_track: Current track number for progress + total_tracks: Total tracks for progress + """ + status_obj = retryingObject( + ids=getattr(track_obj, 'ids', None), + retry_count=retry_count, + seconds_left=seconds_left, + error=error, + convert_to=getattr(preferences, 'convert_to', None), + bitrate=getattr(preferences, 'bitrate', None) + ) + + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, + current_track=current_track or getattr(preferences, 'track_number', None), + total_tracks=total_tracks, + parent=parent_obj + ) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_track_realtime_progress( + track_obj: Any, + time_elapsed: int, + progress: int, + preferences: Any, + parent_obj: Optional[Any] = None, + current_track: Optional[int] = None, + total_tracks: Optional[int] = None +) -> None: + """ + Report real-time track download progress. + + Args: + track_obj: Track object being downloaded + time_elapsed: Milliseconds elapsed + progress: Progress percentage (0-100) + preferences: Preferences object with convert_to/bitrate info + parent_obj: Parent object (album/playlist) if applicable + current_track: Current track number for progress + total_tracks: Total tracks for progress + """ + status_obj = realTimeObject( + ids=getattr(track_obj, 'ids', None), + time_elapsed=time_elapsed, + progress=progress, + convert_to=getattr(preferences, 'convert_to', None), + bitrate=getattr(preferences, 'bitrate', None) + ) + + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, + current_track=current_track or getattr(preferences, 'track_number', None), + total_tracks=total_tracks, + parent=parent_obj + ) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_track_error( + track_obj: Any, + error: str, + preferences: Any, + parent_obj: Optional[Any] = None, + current_track: Optional[int] = None, + total_tracks: Optional[int] = None +) -> None: + """ + Report track error status. + + Args: + track_obj: Track object that errored + error: Error message + preferences: Preferences object with convert_to/bitrate info + parent_obj: Parent object (album/playlist) if applicable + current_track: Current track number for progress + total_tracks: Total tracks for progress + """ + status_obj = errorObject( + ids=getattr(track_obj, 'ids', None), + error=error, + convert_to=getattr(preferences, 'convert_to', None), + bitrate=getattr(preferences, 'bitrate', None) + ) + + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, + current_track=current_track or getattr(preferences, 'track_number', None), + total_tracks=total_tracks, + parent=parent_obj + ) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_track_done( + track_obj: Any, + preferences: Any, + summary: Optional[summaryObject] = None, + parent_obj: Optional[Any] = None, + current_track: Optional[int] = None, + total_tracks: Optional[int] = None +) -> None: + """ + Report track completion status. + + Args: + track_obj: Track object that completed + preferences: Preferences object with convert_to/bitrate info + summary: Optional summary object for single track downloads + parent_obj: Parent object (album/playlist) if applicable + current_track: Current track number for progress + total_tracks: Total tracks for progress + """ + status_obj = doneObject( + ids=getattr(track_obj, 'ids', None), + summary=summary, + convert_to=getattr(preferences, 'convert_to', None), + bitrate=getattr(preferences, 'bitrate', None) + ) + + callback_obj = trackCallbackObject( + track=track_obj, + status_info=status_obj, + current_track=current_track or getattr(preferences, 'track_number', None), + total_tracks=total_tracks, + parent=parent_obj + ) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_album_initializing(album_obj: Any) -> None: + """ + Report album initialization status. + + Args: + album_obj: Album object being initialized + """ + status_obj = initializingObject(ids=getattr(album_obj, 'ids', None)) + callback_obj = albumCallbackObject(album=album_obj, status_info=status_obj) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_album_done(album_obj: Any, summary: summaryObject) -> None: + """ + Report album completion status. + + Args: + album_obj: Album object that completed + summary: Summary of track download results + """ + status_obj = doneObject(ids=getattr(album_obj, 'ids', None), summary=summary) + callback_obj = albumCallbackObject(album=album_obj, status_info=status_obj) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_playlist_initializing(playlist_obj: Any) -> None: + """ + Report playlist initialization status. + + Args: + playlist_obj: Playlist object being initialized + """ + status_obj = initializingObject(ids=getattr(playlist_obj, 'ids', None)) + callback_obj = playlistCallbackObject(playlist=playlist_obj, status_info=status_obj) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +def report_playlist_done(playlist_obj: Any, summary: summaryObject) -> None: + """ + Report playlist completion status. + + Args: + playlist_obj: Playlist object that completed + summary: Summary of track download results + """ + status_obj = doneObject(ids=getattr(playlist_obj, 'ids', None), summary=summary) + callback_obj = playlistCallbackObject(playlist=playlist_obj, status_info=status_obj) + + reporter = _get_reporter() + if reporter: + report_progress(reporter=reporter, callback_obj=callback_obj) + + +# Convenience function for generic track reporting +def report_track_status( + status_type: str, + track_obj: Any, + preferences: Any, + parent_obj: Optional[Any] = None, + current_track: Optional[int] = None, + total_tracks: Optional[int] = None, + **kwargs +) -> None: + """ + Generic track status reporting function. + + Args: + status_type: Type of status ('initializing', 'skipped', 'retrying', 'error', 'done', 'realtime') + track_obj: Track object + preferences: Preferences object + parent_obj: Parent object if applicable + current_track: Current track number + total_tracks: Total tracks + **kwargs: Additional parameters specific to status type + """ + if status_type == 'initializing': + report_track_initializing(track_obj, preferences, parent_obj, current_track, total_tracks) + elif status_type == 'skipped': + report_track_skipped(track_obj, kwargs.get('reason', 'Unknown'), preferences, parent_obj, current_track, total_tracks) + elif status_type == 'retrying': + report_track_retrying(track_obj, kwargs.get('retry_count', 0), kwargs.get('seconds_left', 0), + kwargs.get('error', 'Unknown'), preferences, parent_obj, current_track, total_tracks) + elif status_type == 'error': + report_track_error(track_obj, kwargs.get('error', 'Unknown'), preferences, parent_obj, current_track, total_tracks) + elif status_type == 'done': + report_track_done(track_obj, preferences, kwargs.get('summary'), parent_obj, current_track, total_tracks) + elif status_type == 'realtime': + report_track_realtime_progress(track_obj, kwargs.get('time_elapsed', 0), kwargs.get('progress', 0), + preferences, parent_obj, current_track, total_tracks) \ No newline at end of file diff --git a/deezspot/libutils/skip_detection.py b/deezspot/libutils/skip_detection.py index a4afc9b..3fc0ab3 100644 --- a/deezspot/libutils/skip_detection.py +++ b/deezspot/libutils/skip_detection.py @@ -4,6 +4,7 @@ import os from mutagen import File from mutagen.easyid3 import EasyID3 from mutagen.oggvorbis import OggVorbis +from mutagen.oggopus import OggOpus from mutagen.flac import FLAC from mutagen.mp3 import MP3 # Added for explicit MP3 type checking # from mutagen.mp4 import MP4 # MP4 is usually handled by File for .m4a @@ -52,6 +53,9 @@ def read_metadata_from_file(file_path, logger): elif isinstance(audio, OggVorbis): # OGG title = audio.get('TITLE', [None])[0] # Vorbis tags are case-insensitive but typically uppercase album = audio.get('ALBUM', [None])[0] + elif isinstance(audio, OggOpus): # OPUS + title = audio.get('TITLE', [None])[0] # Opus files use Vorbis comments, similar to OGG + album = audio.get('ALBUM', [None])[0] elif isinstance(audio, FLAC): # FLAC title = audio.get('TITLE', [None])[0] album = audio.get('ALBUM', [None])[0] diff --git a/deezspot/libutils/taggers.py b/deezspot/libutils/taggers.py new file mode 100644 index 0000000..a9cb8ab --- /dev/null +++ b/deezspot/libutils/taggers.py @@ -0,0 +1,321 @@ +#!/usr/bin/python3 + +import os +from typing import Dict, Any, Optional, Union +from deezspot.libutils.utils import request +from deezspot.libutils.logging_utils import logger +from deezspot.__taggers__ import write_tags +from deezspot.models.download import Track, Episode + + +def fetch_and_process_image(image_url_or_bytes: Union[str, bytes, None]) -> Optional[bytes]: + """ + Fetch and process image data from URL or return bytes directly. + + Args: + image_url_or_bytes: Image URL string, bytes, or None + + Returns: + Image bytes or None if failed/not available + """ + if not image_url_or_bytes: + return None + + if isinstance(image_url_or_bytes, bytes): + return image_url_or_bytes + + if isinstance(image_url_or_bytes, str): + try: + response = request(image_url_or_bytes) + return response.content + except Exception as e: + logger.warning(f"Failed to fetch image from URL {image_url_or_bytes}: {e}") + return None + + return None + + +def enhance_metadata_with_image(metadata_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + Enhance metadata dictionary by fetching image data if image URL is present. + + Args: + metadata_dict: Metadata dictionary potentially containing image URL + + Returns: + Enhanced metadata dictionary with image bytes + """ + image_url = metadata_dict.get('image') + if image_url: + image_bytes = fetch_and_process_image(image_url) + if image_bytes: + metadata_dict['image'] = image_bytes + + return metadata_dict + + +def add_deezer_enhanced_metadata( + metadata_dict: Dict[str, Any], + infos_dw: Dict[str, Any], + track_ids: str, + api_gw_instance: Any = None +) -> Dict[str, Any]: + """ + Add Deezer-specific enhanced metadata including contributors, lyrics, and version info. + + Args: + metadata_dict: Base metadata dictionary + infos_dw: Deezer track information dictionary + track_ids: Track IDs for lyrics fetching + api_gw_instance: API gateway instance for lyrics retrieval + + Returns: + Enhanced metadata dictionary with Deezer-specific data + """ + # Add contributor information + contributors = infos_dw.get('SNG_CONTRIBUTORS', {}) + metadata_dict['author'] = "; ".join(contributors.get('author', [])) + metadata_dict['composer'] = "; ".join(contributors.get('composer', [])) + metadata_dict['lyricist'] = "; ".join(contributors.get('lyricist', [])) + + # Handle composer+lyricist combination + if contributors.get('composerlyricist'): + metadata_dict['composer'] = "; ".join(contributors.get('composerlyricist', [])) + + # Add version information + metadata_dict['version'] = infos_dw.get('VERSION', '') + + # Initialize lyric fields + metadata_dict['lyric'] = "" + metadata_dict['copyright'] = "" + metadata_dict['lyric_sync'] = [] + + # Add lyrics if available and API instance provided + if api_gw_instance and infos_dw.get('LYRICS_ID', 0) != 0: + try: + lyrics_data = api_gw_instance.get_lyric(track_ids) + + if lyrics_data and "LYRICS_TEXT" in lyrics_data: + metadata_dict['lyric'] = lyrics_data["LYRICS_TEXT"] + + if lyrics_data and "LYRICS_SYNC_JSON" in lyrics_data: + # Import here to avoid circular imports + from deezspot.libutils.utils import trasform_sync_lyric + metadata_dict['lyric_sync'] = trasform_sync_lyric( + lyrics_data['LYRICS_SYNC_JSON'] + ) + except Exception as e: + logger.warning(f"Failed to retrieve lyrics: {str(e)}") + + # Extract album artist from contributors with 'Main' role + if 'contributors' in infos_dw: + main_artists = [c['name'] for c in infos_dw['contributors'] if c.get('role') == 'Main'] + if main_artists: + metadata_dict['album_artist'] = "; ".join(main_artists) + + return metadata_dict + + +def add_spotify_enhanced_metadata(metadata_dict: Dict[str, Any], track_obj: Any) -> Dict[str, Any]: + """ + Add Spotify-specific enhanced metadata. + + Args: + metadata_dict: Base metadata dictionary + track_obj: Spotify track object + + Returns: + Enhanced metadata dictionary with Spotify-specific data + """ + # Spotify tracks already have most metadata from the unified converter + # Add any Spotify-specific enhancements here if needed in the future + + # Ensure image is processed + return enhance_metadata_with_image(metadata_dict) + + +def prepare_track_metadata( + metadata_dict: Dict[str, Any], + source_type: str = 'unknown', + enhanced_data: Optional[Dict[str, Any]] = None, + api_instance: Any = None, + track_ids: Optional[str] = None +) -> Dict[str, Any]: + """ + Prepare and enhance track metadata for tagging based on source type. + + Args: + metadata_dict: Base metadata dictionary + source_type: Source type ('spotify', 'deezer', or 'unknown') + enhanced_data: Additional source-specific data (infos_dw for Deezer) + api_instance: API instance for additional data fetching + track_ids: Track IDs for API calls + + Returns: + Fully prepared metadata dictionary + """ + # Always process images first + metadata_dict = enhance_metadata_with_image(metadata_dict) + + if source_type == 'deezer' and enhanced_data: + metadata_dict = add_deezer_enhanced_metadata( + metadata_dict, + enhanced_data, + track_ids or '', + api_instance + ) + elif source_type == 'spotify': + metadata_dict = add_spotify_enhanced_metadata(metadata_dict, enhanced_data) + + return metadata_dict + + +def apply_tags_to_track(track: Track, metadata_dict: Dict[str, Any]) -> None: + """ + Apply metadata tags to a track object and write them to the file. + + Args: + track: Track object to tag + metadata_dict: Metadata dictionary containing tag information + """ + if not track or not metadata_dict: + return + + try: + track.tags = metadata_dict + write_tags(track) + logger.debug(f"Successfully applied tags to track: {metadata_dict.get('music', 'Unknown')}") + except Exception as e: + logger.error(f"Failed to apply tags to track: {e}") + + +def apply_tags_to_episode(episode: Episode, metadata_dict: Dict[str, Any]) -> None: + """ + Apply metadata tags to an episode object and write them to the file. + + Args: + episode: Episode object to tag + metadata_dict: Metadata dictionary containing tag information + """ + if not episode or not metadata_dict: + return + + try: + episode.tags = metadata_dict + write_tags(episode) + logger.debug(f"Successfully applied tags to episode: {metadata_dict.get('music', 'Unknown')}") + except Exception as e: + logger.error(f"Failed to apply tags to episode: {e}") + + +def save_cover_image_for_track( + metadata_dict: Dict[str, Any], + track_path: str, + save_cover: bool = False, + cover_filename: str = "cover.jpg" +) -> None: + """ + Save cover image for a track if requested and image data is available. + + Args: + metadata_dict: Metadata dictionary potentially containing image data + track_path: Path to the track file + save_cover: Whether to save cover image + cover_filename: Filename for the cover image + """ + if not save_cover or not metadata_dict.get('image'): + return + + try: + from deezspot.libutils.utils import save_cover_image + track_directory = os.path.dirname(track_path) + + # Handle both URL and bytes + image_data = metadata_dict['image'] + if isinstance(image_data, str): + image_bytes = fetch_and_process_image(image_data) + else: + image_bytes = image_data + + if image_bytes: + save_cover_image(image_bytes, track_directory, cover_filename) + logger.info(f"Saved cover image for track in {track_directory}") + except Exception as e: + logger.warning(f"Failed to save cover image for track: {e}") + + +# Convenience function that combines metadata preparation and tagging +def process_and_tag_track( + track: Track, + metadata_dict: Dict[str, Any], + source_type: str = 'unknown', + enhanced_data: Optional[Dict[str, Any]] = None, + api_instance: Any = None, + track_ids: Optional[str] = None, + save_cover: bool = False +) -> None: + """ + Complete metadata processing and tagging workflow for a track. + + Args: + track: Track object to process + metadata_dict: Base metadata dictionary + source_type: Source type ('spotify', 'deezer', or 'unknown') + enhanced_data: Additional source-specific data + api_instance: API instance for additional data fetching + track_ids: Track IDs for API calls + save_cover: Whether to save cover image + """ + # Prepare enhanced metadata + prepared_metadata = prepare_track_metadata( + metadata_dict, + source_type, + enhanced_data, + api_instance, + track_ids + ) + + # Apply tags to track + apply_tags_to_track(track, prepared_metadata) + + # Save cover image if requested + if hasattr(track, 'song_path') and track.song_path: + save_cover_image_for_track(prepared_metadata, track.song_path, save_cover) + + +def process_and_tag_episode( + episode: Episode, + metadata_dict: Dict[str, Any], + source_type: str = 'unknown', + enhanced_data: Optional[Dict[str, Any]] = None, + api_instance: Any = None, + track_ids: Optional[str] = None, + save_cover: bool = False +) -> None: + """ + Complete metadata processing and tagging workflow for an episode. + + Args: + episode: Episode object to process + metadata_dict: Base metadata dictionary + source_type: Source type ('spotify', 'deezer', or 'unknown') + enhanced_data: Additional source-specific data + api_instance: API instance for additional data fetching + track_ids: Track IDs for API calls + save_cover: Whether to save cover image + """ + # Prepare enhanced metadata + prepared_metadata = prepare_track_metadata( + metadata_dict, + source_type, + enhanced_data, + api_instance, + track_ids + ) + + # Apply tags to episode + apply_tags_to_episode(episode, prepared_metadata) + + # Save cover image if requested + if hasattr(episode, 'episode_path') and episode.episode_path: + save_cover_image_for_track(prepared_metadata, episode.episode_path, save_cover) \ No newline at end of file diff --git a/deezspot/libutils/write_m3u.py b/deezspot/libutils/write_m3u.py new file mode 100644 index 0000000..850d3ef --- /dev/null +++ b/deezspot/libutils/write_m3u.py @@ -0,0 +1,101 @@ +#!/usr/bin/python3 + +import os +from typing import List, Union +from deezspot.libutils.utils import sanitize_name +from deezspot.libutils.logging_utils import logger +from deezspot.models.download import Track + + +def create_m3u_file(output_dir: str, playlist_name: str) -> str: + """ + Creates an m3u playlist file with the proper header. + + Args: + output_dir: Base output directory + playlist_name: Name of the playlist (will be sanitized) + + Returns: + str: Full path to the created m3u file + """ + playlist_m3u_dir = os.path.join(output_dir, "playlists") + os.makedirs(playlist_m3u_dir, exist_ok=True) + + playlist_name_sanitized = sanitize_name(playlist_name) + m3u_path = os.path.join(playlist_m3u_dir, f"{playlist_name_sanitized}.m3u") + + if not os.path.exists(m3u_path): + with open(m3u_path, "w", encoding="utf-8") as m3u_file: + m3u_file.write("#EXTM3U\n") + logger.debug(f"Created m3u playlist file: {m3u_path}") + + return m3u_path + + +def append_track_to_m3u(m3u_path: str, track_path: str) -> None: + """ + Appends a single track path to an existing m3u file. + + Args: + m3u_path: Full path to the m3u file + track_path: Full path to the track file + """ + if not track_path or not os.path.exists(track_path): + return + + playlist_m3u_dir = os.path.dirname(m3u_path) + relative_path = os.path.relpath(track_path, start=playlist_m3u_dir) + + with open(m3u_path, "a", encoding="utf-8") as m3u_file: + m3u_file.write(f"{relative_path}\n") + + +def write_tracks_to_m3u(output_dir: str, playlist_name: str, tracks: List[Track]) -> str: + """ + Creates an m3u file and writes all successful tracks to it at once. + + Args: + output_dir: Base output directory + playlist_name: Name of the playlist (will be sanitized) + tracks: List of Track objects + + Returns: + str: Full path to the created m3u file + """ + playlist_m3u_dir = os.path.join(output_dir, "playlists") + os.makedirs(playlist_m3u_dir, exist_ok=True) + + playlist_name_sanitized = sanitize_name(playlist_name) + m3u_path = os.path.join(playlist_m3u_dir, f"{playlist_name_sanitized}.m3u") + + with open(m3u_path, "w", encoding="utf-8") as m3u_file: + m3u_file.write("#EXTM3U\n") + + for track in tracks: + if (isinstance(track, Track) and + track.success and + hasattr(track, 'song_path') and + track.song_path and + os.path.exists(track.song_path)): + + relative_song_path = os.path.relpath(track.song_path, start=playlist_m3u_dir) + m3u_file.write(f"{relative_song_path}\n") + + logger.info(f"Created m3u playlist file at: {m3u_path}") + return m3u_path + + +def get_m3u_path(output_dir: str, playlist_name: str) -> str: + """ + Get the expected path for an m3u file without creating it. + + Args: + output_dir: Base output directory + playlist_name: Name of the playlist (will be sanitized) + + Returns: + str: Full path where the m3u file would be located + """ + playlist_m3u_dir = os.path.join(output_dir, "playlists") + playlist_name_sanitized = sanitize_name(playlist_name) + return os.path.join(playlist_m3u_dir, f"{playlist_name_sanitized}.m3u") \ No newline at end of file diff --git a/deezspot/spotloader/__download__.py b/deezspot/spotloader/__download__.py index 5b45a8d..3aaa93b 100644 --- a/deezspot/spotloader/__download__.py +++ b/deezspot/spotloader/__download__.py @@ -32,6 +32,17 @@ from deezspot.libutils.utils import ( save_cover_image, __get_dir as get_album_directory, ) +from deezspot.libutils.write_m3u import create_m3u_file, append_track_to_m3u +from deezspot.libutils.metadata_converter import track_object_to_dict, album_object_to_dict +from deezspot.libutils.progress_reporter import ( + report_track_initializing, report_track_skipped, report_track_retrying, + report_track_realtime_progress, report_track_error, report_track_done, + report_album_initializing, report_album_done, report_playlist_initializing, report_playlist_done +) +from deezspot.libutils.taggers import ( + enhance_metadata_with_image, process_and_tag_track, process_and_tag_episode, + save_cover_image_for_track +) from deezspot.libutils.logging_utils import logger, report_progress from deezspot.libutils.cleanup_utils import ( register_active_download, @@ -49,7 +60,6 @@ from deezspot.models.callback import ( IDs ) from deezspot.spotloader.__spo_api__ import tracking, json_to_track_playlist_object -from datetime import datetime # --- Global retry counter variables --- GLOBAL_RETRY_COUNT = 0 @@ -58,103 +68,15 @@ GLOBAL_MAX_RETRIES = 100 # Adjust this value as needed # --- Global tracking of active downloads --- # Moved to deezspot.libutils.cleanup_utils +# Use unified metadata converter def _track_object_to_dict(track_obj: trackObject) -> dict: """Converts a trackObject into a dictionary for legacy functions like taggers.""" - if not track_obj: - return {} - - tags = {} - - # Track details - tags['music'] = track_obj.title - tags['tracknum'] = track_obj.track_number - tags['discnum'] = track_obj.disc_number - tags['duration'] = track_obj.duration_ms // 1000 if track_obj.duration_ms else 0 - if track_obj.ids: - tags['ids'] = track_obj.ids.spotify - tags['isrc'] = track_obj.ids.isrc - - tags['artist'] = "; ".join([artist.name for artist in track_obj.artists]) - - # Album details - if track_obj.album: - album = track_obj.album - tags['album'] = album.title - tags['ar_album'] = "; ".join([artist.name for artist in album.artists]) - tags['nb_tracks'] = album.total_tracks - if album.release_date and 'year' in album.release_date: - try: - # Create a datetime object for the tagger - tags['year'] = datetime( - year=album.release_date['year'], - month=album.release_date.get('month', 1), - day=album.release_date.get('day', 1) - ) - except (ValueError, TypeError): - tags['year'] = None # Handle invalid date parts - else: - tags['year'] = None - - if album.ids: - tags['upc'] = album.ids.upc - tags['album_id'] = album.ids.spotify - - if album.images: - tags['image'] = max(album.images, key=lambda i: i.get('height', 0) * i.get('width', 0)).get('url') if album.images else None - else: - tags['image'] = None - - # These are not in the model, add them from album if they exist - tags['label'] = getattr(album, 'label', '') - tags['copyright'] = getattr(album, 'copyright', '') - - # Default/Placeholder values - tags['bpm'] = tags.get('bpm', 'Unknown') - tags['gain'] = tags.get('gain', 'Unknown') - tags['lyric'] = tags.get('lyric', '') - tags['author'] = tags.get('author', '') - tags['composer'] = tags.get('composer', '') - tags['lyricist'] = tags.get('lyricist', '') - tags['version'] = tags.get('version', '') - - return tags + return track_object_to_dict(track_obj, source_type='spotify') +# Use unified metadata converter def _album_object_to_dict(album_obj: albumObject) -> dict: """Converts an albumObject into a dictionary for legacy functions.""" - if not album_obj: - return {} - - tags = {} - - # Album details - tags['album'] = album_obj.title - tags['ar_album'] = "; ".join([artist.name for artist in album_obj.artists]) - tags['nb_tracks'] = album_obj.total_tracks - if album_obj.release_date and 'year' in album_obj.release_date: - try: - tags['year'] = datetime( - year=album_obj.release_date['year'], - month=album_obj.release_date.get('month', 1), - day=album_obj.release_date.get('day', 1) - ) - except (ValueError, TypeError): - tags['year'] = None - else: - tags['year'] = None - - if album_obj.ids: - tags['upc'] = album_obj.ids.upc - tags['album_id'] = album_obj.ids.spotify - - if album_obj.images: - tags['image'] = max(album_obj.images, key=lambda i: i.get('height', 0) * i.get('width', 0)).get('url') if album_obj.images else None - else: - tags['image'] = None - - tags['label'] = getattr(album_obj, 'label', '') - tags['copyright'] = getattr(album_obj, 'copyright', '') - - return tags + return album_object_to_dict(album_obj, source_type='spotify') class Download_JOB: session = None @@ -377,11 +299,8 @@ class EASY_DW: return self.__c_track def easy_dw(self) -> Track: - # Request the image data - pic = self.__song_metadata_dict.get('image') - image = request(pic).content if pic else None - if image: - self.__song_metadata_dict['image'] = image + # Process image data using unified utility + self.__song_metadata_dict = enhance_metadata_with_image(self.__song_metadata_dict) try: # Initialize success to False, it will be set to True if download_try is successful @@ -426,8 +345,13 @@ class EASY_DW: # If we reach here, the track should be successful and not skipped. if hasattr(self, '_EASY_DW__c_track') and self.__c_track and self.__c_track.success: - self.__c_track.tags = self.__song_metadata_dict - write_tags(self.__c_track) + # Apply tags using unified utility + process_and_tag_track( + track=self.__c_track, + metadata_dict=self.__song_metadata_dict, + source_type='spotify', + save_cover=getattr(self.__preferences, 'save_cover', False) + ) # Unregister the final successful file path after all operations are done. # self.__c_track.song_path would have been updated by __convert_audio__ if conversion occurred. @@ -478,28 +402,15 @@ class EASY_DW: parent_obj = playlistTrackObject( title=parent_info.get("name"), owner=userObject(name=parent_info.get("owner")) - ) + ) - # Build status object - status_obj = skippedObject( - ids=self.__song_metadata.ids, + # Report track skipped status + report_track_skipped( + track_obj=track_obj, reason=f"Track already exists at '{existing_file_path}'", - convert_to=self.__preferences.convert_to, - bitrate=self.__preferences.bitrate - ) - - # Build callback object - callback_obj = trackCallbackObject( - track=track_obj, - status_info=status_obj, - current_track=getattr(self.__preferences, 'track_number', None), - total_tracks=total_tracks_val, - parent=parent_obj - ) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj + preferences=self.__preferences, + parent_obj=parent_obj, + total_tracks=total_tracks_val ) return self.__c_track @@ -519,25 +430,12 @@ class EASY_DW: owner=userObject(name=parent_info.get("owner")) ) - # Build status object - status_obj = initializingObject( - ids=self.__song_metadata.ids, - convert_to=self.__preferences.convert_to, - bitrate=self.__preferences.bitrate - ) - - # Build callback object - callback_obj = trackCallbackObject( - track=track_obj, - status_info=status_obj, - current_track=getattr(self.__preferences, 'track_number', None), - total_tracks=total_tracks_val, - parent=parent_obj - ) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj + # Report track initialization status + report_track_initializing( + track_obj=track_obj, + preferences=self.__preferences, + parent_obj=parent_obj, + total_tracks=total_tracks_val ) # If track does not exist in the desired final format, proceed with download/conversion @@ -593,27 +491,14 @@ class EASY_DW: if current_percentage > self._last_reported_percentage: self._last_reported_percentage = current_percentage - # Build status object - status_obj = realTimeObject( - ids=self.__song_metadata.ids, + # Report real-time progress + report_track_realtime_progress( + track_obj=track_obj, time_elapsed=int((current_time - start_time) * 1000), progress=current_percentage, - convert_to=self.__convert_to, - bitrate=self.__bitrate - ) - - # Build callback object - callback_obj = trackCallbackObject( - track=track_obj, - status_info=status_obj, - current_track=getattr(self.__preferences, 'track_number', None), - total_tracks=total_tracks_val, - parent=parent_obj - ) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj + preferences=self.__preferences, + parent_obj=parent_obj, + total_tracks=total_tracks_val ) # Rate limiting (if needed) @@ -669,28 +554,15 @@ class EASY_DW: os.remove(self.__song_path) unregister_active_download(self.__song_path) - # Build status object - status_obj = retryingObject( - ids=self.__song_metadata.ids, + # Report retry status + report_track_retrying( + track_obj=track_obj, retry_count=retries, seconds_left=retry_delay, error=str(e), - convert_to=self.__convert_to, - bitrate=self.__bitrate - ) - - # Build callback object - callback_obj = trackCallbackObject( - track=track_obj, - status_info=status_obj, - current_track=getattr(self.__preferences, 'track_number', None), - total_tracks=total_tracks_val, - parent=parent_obj - ) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj + preferences=self.__preferences, + parent_obj=parent_obj, + total_tracks=total_tracks_val ) if retries >= max_retries or GLOBAL_RETRY_COUNT >= GLOBAL_MAX_RETRIES: @@ -710,15 +582,8 @@ class EASY_DW: retry_delay += retry_delay_increase # Use the custom retry delay increase # Save cover image if requested, after successful download and before conversion - if self.__preferences.save_cover and hasattr(self, '_EASY_DW__song_path') and self.__song_path and self.__song_metadata_dict.get('image'): - try: - track_directory = dirname(self.__song_path) - # Ensure the directory exists (it should, from os.makedirs earlier) - image_bytes = request(self.__song_metadata_dict['image']).content - save_cover_image(image_bytes, track_directory, "cover.jpg") - logger.info(f"Saved cover image for track in {track_directory}") - except Exception as e_img_save: - logger.warning(f"Failed to save cover image for track: {e_img_save}") + if self.__preferences.save_cover and hasattr(self, '_EASY_DW__song_path') and self.__song_path: + save_cover_image_for_track(self.__song_metadata_dict, self.__song_path, self.__preferences.save_cover) try: self.__convert_audio() @@ -732,26 +597,13 @@ class EASY_DW: else: error_msg = f"Audio conversion failed: {original_error_str}" - # Build status object - status_obj = errorObject( - ids=self.__song_metadata.ids, + # Report error status + report_track_error( + track_obj=track_obj, error=error_msg, - convert_to=self.__convert_to, - bitrate=self.__bitrate - ) - - # Build callback object - callback_obj = trackCallbackObject( - track=track_obj, - status_info=status_obj, - current_track=getattr(self.__preferences, 'track_number', None), - total_tracks=total_tracks_val, - parent=parent_obj - ) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj + preferences=self.__preferences, + parent_obj=parent_obj, + total_tracks=total_tracks_val ) logger.error(f"Audio conversion error: {error_msg}") @@ -768,26 +620,13 @@ class EASY_DW: # If conversion fails twice, create a final error report error_msg_2 = f"Audio conversion failed after retry for '{self.__song_metadata.title}'. Original error: {str(conv_e)}" - # Build status object - status_obj = errorObject( - ids=self.__song_metadata.ids, + # Report error status + report_track_error( + track_obj=track_obj, error=error_msg_2, - convert_to=self.__convert_to, - bitrate=self.__bitrate - ) - - # Build callback object - callback_obj = trackCallbackObject( - track=track_obj, - status_info=status_obj, - current_track=getattr(self.__preferences, 'track_number', None), - total_tracks=total_tracks_val, - parent=parent_obj - ) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj + preferences=self.__preferences, + parent_obj=parent_obj, + total_tracks=total_tracks_val ) logger.error(error_msg) @@ -801,8 +640,12 @@ class EASY_DW: if hasattr(self, '_EASY_DW__c_track') and self.__c_track: self.__c_track.success = True - self.__c_track.tags = self.__song_metadata_dict - write_tags(self.__c_track) + # Apply tags using unified utility + process_and_tag_track( + track=self.__c_track, + metadata_dict=self.__song_metadata_dict, + source_type='spotify' + ) # Create done status report parent_info, total_tracks_val = self._get_parent_info() @@ -823,26 +666,14 @@ class EASY_DW: total_failed=0 ) - # Build status object - status_obj = doneObject( - ids=self.__song_metadata.ids, + # Report track done status + report_track_done( + track_obj=track_obj, + preferences=self.__preferences, summary=summary_obj, - convert_to=self.__convert_to, - bitrate=self.__bitrate - ) - - # Build callback object - callback_obj = trackCallbackObject( - track=track_obj, - status_info=status_obj, + parent_obj=parent_obj, current_track=current_track_val, - total_tracks=total_tracks_val, - parent=parent_obj - ) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj + total_tracks=total_tracks_val ) if hasattr(self, '_EASY_DW__c_track') and self.__c_track and self.__c_track.success: @@ -1053,8 +884,12 @@ class EASY_DW: # If we reach here, download and any conversion were successful. if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode: self.__c_episode.success = True - self.__c_episode.tags = self.__song_metadata_dict - write_tags(self.__c_episode) + # Apply tags using unified utility + process_and_tag_episode( + episode=self.__c_episode, + metadata_dict=self.__song_metadata_dict, + source_type='spotify' + ) # Unregister the final successful file path for episodes, as it's now complete. # self.__c_episode.episode_path would have been updated by __convert_audio__ if conversion occurred. unregister_active_download(self.__c_episode.episode_path) @@ -1106,13 +941,7 @@ class DW_ALBUM: def dw(self) -> Album: # Report album initializing status album_obj = self.__album_metadata - status_obj = initializingObject(ids=album_obj.ids) - callback_obj = albumCallbackObject(album=album_obj, status_info=status_obj) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj - ) + report_album_initializing(album_obj) pic_url = max(album_obj.images, key=lambda i: i.get('height', 0) * i.get('width', 0)).get('url') if album_obj.images else None image_bytes = request(pic_url).content if pic_url else None @@ -1133,11 +962,14 @@ class DW_ALBUM: pad_tracks=self.__preferences.pad_tracks ) + # Calculate total number of discs for proper metadata tagging + total_discs = max((track.disc_number for track in album_obj.tracks), default=1) + for a, track_in_album in enumerate(album_obj.tracks): - current_track = a + 1 c_preferences = deepcopy(self.__preferences) - c_preferences.track_number = current_track + # Use actual track position for progress tracking, not for metadata + c_preferences.track_number = a + 1 # For progress reporting only try: # Fetch full track object as album endpoint only provides simplified track objects @@ -1210,13 +1042,7 @@ class DW_ALBUM: total_failed=len(failed_tracks) ) - status_obj = doneObject(ids=album_obj.ids, summary=summary_obj) - callback_obj = albumCallbackObject(album=album_obj, status_info=status_obj) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj - ) + report_album_done(album_obj, summary_obj) return album @@ -1259,21 +1085,10 @@ class DW_PLAYLIST: # --- End build playlistObject --- # Report playlist initializing status - status_obj = initializingObject(ids=IDs(spotify=playlist_id)) - callback_obj = playlistCallbackObject(playlist=playlist_obj_for_cb, status_info=status_obj) - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj - ) + report_playlist_initializing(playlist_obj_for_cb) # --- Prepare the m3u playlist file --- - playlist_m3u_dir = os.path.join(self.__output_dir, "playlists") - os.makedirs(playlist_m3u_dir, exist_ok=True) - playlist_name_sanitized = sanitize_name(playlist_name) - m3u_path = os.path.join(playlist_m3u_dir, f"{playlist_name_sanitized}.m3u") - if not os.path.exists(m3u_path): - with open(m3u_path, "w", encoding="utf-8") as m3u_file: - m3u_file.write("#EXTM3U\n") + m3u_path = create_m3u_file(self.__output_dir, playlist_name) # ------------------------------------- playlist = Playlist() @@ -1319,9 +1134,7 @@ class DW_PLAYLIST: # --- Append the final track path to the m3u file using a relative path --- if track and track.success and hasattr(track, 'song_path') and track.song_path: - relative_path = os.path.relpath(track.song_path, start=playlist_m3u_dir) - with open(m3u_path, "a", encoding="utf-8") as m3u_file: - m3u_file.write(f"{relative_path}\n") + append_track_to_m3u(m3u_path, track.song_path) # --------------------------------------------------------------------- if self.__make_zip: @@ -1361,13 +1174,7 @@ class DW_PLAYLIST: total_failed=len(failed_tracks_cb) ) - status_obj = doneObject(ids=playlist_obj_for_cb.ids, summary=summary_obj) - callback_obj = playlistCallbackObject(playlist=playlist_obj_for_cb, status_info=status_obj) - - report_progress( - reporter=Download_JOB.progress_reporter, - callback_obj=callback_obj, - ) + report_playlist_done(playlist_obj_for_cb, summary_obj) return playlist