Merge pull request #1 from Xoconoch/dev

1.4.0
This commit is contained in:
Xoconoch
2025-06-04 00:58:03 +02:00
committed by GitHub
11 changed files with 497 additions and 644 deletions

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
.venv
deezspot/libutils/__pycache__
deezspot/models/__pycache__
deezspot/spotloader/__pycache__
.cache
credentials.json
test.py
downloads/
deezspot/__pycache__
deezspot/deezloader/__pycache__
deezspot/libutils/__pycache__
Test.py

View File

@@ -1,7 +1,8 @@
#!/usr/bin/python3
import os
import json
import re
import requests
import time
from os.path import isfile
from copy import deepcopy
from deezspot.libutils.audio_converter import convert_audio, parse_format_string
@@ -32,6 +33,9 @@ from deezspot.libutils.utils import (
set_path,
trasform_sync_lyric,
create_zip,
sanitize_name,
save_cover_image,
__get_dir as get_album_directory,
)
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
@@ -191,7 +195,6 @@ class EASY_DW:
self.__ids = preferences.ids
self.__link = preferences.link
self.__output_dir = preferences.output_dir
self.__method_save = preferences.method_save
self.__not_interface = preferences.not_interface
self.__quality_download = preferences.quality_download
self.__recursive_quality = preferences.recursive_quality
@@ -276,7 +279,6 @@ class EASY_DW:
self.__output_dir,
self.__song_quality,
self.__file_format,
self.__method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks
@@ -291,7 +293,6 @@ class EASY_DW:
self.__output_dir,
self.__song_quality,
self.__file_format,
self.__method_save,
is_episode=True,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
@@ -588,18 +589,13 @@ class EASY_DW:
"artist": self.__song_metadata.get("artist", ""),
"status": "progress"
}
# Use Spotify URL if available, otherwise use Deezer link
spotify_url = getattr(self.__preferences, 'spotify_url', None)
progress_data["url"] = spotify_url if spotify_url else self.__link
# Add parent info if present
if self.__parent == "playlist" and hasattr(self.__preferences, "json_data"):
playlist_data = self.__preferences.json_data
playlist_name = playlist_data.get('title', 'unknown')
total_tracks = getattr(self.__preferences, 'total_tracks', 0)
current_track = getattr(self.__preferences, 'track_number', 0)
progress_data.update({
"current_track": current_track,
"total_tracks": total_tracks,
@@ -616,7 +612,6 @@ class EASY_DW:
album_artist = self.__song_metadata.get('album_artist', self.__song_metadata.get('album_artist', ''))
total_tracks = getattr(self.__preferences, 'total_tracks', 0)
current_track = getattr(self.__preferences, 'track_number', 0)
progress_data.update({
"current_track": current_track,
"total_tracks": total_tracks,
@@ -628,30 +623,31 @@ class EASY_DW:
"url": f"https://deezer.com/album/{self.__preferences.song_metadata.get('album_id', '')}"
}
})
Download_JOB.report_progress(progress_data)
try:
# Decrypt the file using the utility function
decryptfile(c_crypted_audio, self.__fallback_ids, self.__song_path)
logger.debug(f"Successfully decrypted track using {encryption_type} encryption")
except Exception as decrypt_error:
# Detailed error logging for debugging
logger.error(f"Decryption error ({encryption_type}): {str(decrypt_error)}")
if "Data must be padded" in str(decrypt_error):
logger.error("This appears to be a padding issue with Blowfish decryption")
raise
self.__add_more_tags()
# Start of processing block (decryption, tagging, cover, conversion)
# Decrypt the file using the utility function
decryptfile(c_crypted_audio, self.__fallback_ids, self.__song_path)
logger.debug(f"Successfully decrypted track using {encryption_type} encryption")
self.__add_more_tags() # self.__song_metadata is updated here
self.__c_track.tags = self.__song_metadata # IMPORTANT: Update track object's tags
# Save cover image if requested
if self.__preferences.save_cover and self.__song_metadata.get('image'):
try:
track_directory = os.path.dirname(self.__song_path)
save_cover_image(self.__song_metadata['image'], track_directory, "cover.jpg")
logger.info(f"Saved cover image for track in {track_directory}")
except Exception as e_img_save:
logger.warning(f"Failed to save cover image for track: {e_img_save}")
# Apply audio conversion if requested
if self.__convert_to:
format_name, bitrate = parse_format_string(self.__convert_to)
if format_name:
# Register and unregister functions for tracking downloads
from deezspot.deezloader.__download__ import register_active_download, unregister_active_download
from deezspot.deezloader.__download__ import register_active_download, unregister_active_download # Ensure these are available or handle differently
try:
# Update the path with the converted file path
converted_path = convert_audio(
self.__song_path,
format_name,
@@ -660,97 +656,52 @@ class EASY_DW:
unregister_active_download
)
if converted_path != self.__song_path:
# Update path in track object if conversion happened
self.__song_path = converted_path
self.__c_track.song_path = converted_path
except Exception as conv_error:
# Log conversion error but continue with original file
logger.error(f"Audio conversion error: {str(conv_error)}")
# Decide if this is a fatal error for the track or if we proceed with original
# Write tags to the final file (original or converted)
write_tags(self.__c_track)
except Exception as e:
self.__c_track.success = True # Mark as successful only after all steps including tags
except Exception as e: # Handles errors from __write_track, decrypt, add_tags, save_cover, convert, write_tags
if isfile(self.__song_path):
os.remove(self.__song_path)
# Improve error message formatting
error_msg = str(e)
if "Data must be padded" in error_msg:
error_msg = "Decryption error (padding issue) - Try a different quality setting or download format"
elif isinstance(e, ConnectionError) or "Connection" in error_msg:
error_msg = "Connection error - Check your internet connection"
elif "timeout" in error_msg.lower():
error_msg = "Request timed out - Server may be busy"
elif "403" in error_msg or "Forbidden" in error_msg:
error_msg = "Access denied - Track might be region-restricted or premium-only"
elif "404" in error_msg or "Not Found" in error_msg:
error_msg = "Track not found - It might have been removed"
if "Data must be padded" in error_msg: error_msg = "Decryption error (padding issue) - Try a different quality setting or download format"
elif isinstance(e, ConnectionError) or "Connection" in error_msg: error_msg = "Connection error - Check your internet connection"
elif "timeout" in error_msg.lower(): error_msg = "Request timed out - Server may be busy"
elif "403" in error_msg or "Forbidden" in error_msg: error_msg = "Access denied - Track might be region-restricted or premium-only"
elif "404" in error_msg or "Not Found" in error_msg: error_msg = "Track not found - It might have been removed"
# Create formatted error report
progress_data = {
"type": "track",
"status": "error",
"song": self.__song_metadata.get('music', ''),
"artist": self.__song_metadata.get('artist', ''),
"error": error_msg,
"url": getattr(self.__preferences, 'spotify_url', None) or self.__link,
# (Error reporting code as it exists)
error_progress_data = {
"type": "track", "status": "error",
"song": self.__song_metadata.get('music', ''), "artist": self.__song_metadata.get('artist', ''),
"error": error_msg, "url": getattr(self.__preferences, 'spotify_url', None) or self.__link,
"convert_to": self.__convert_to
}
# Add parent info based on parent type
if self.__parent == "playlist" and hasattr(self.__preferences, "json_data"):
playlist_data = self.__preferences.json_data
playlist_name = playlist_data.get('title', 'unknown')
total_tracks = getattr(self.__preferences, 'total_tracks', 0)
current_track = getattr(self.__preferences, 'track_number', 0)
progress_data.update({
"current_track": current_track,
"total_tracks": total_tracks,
"parent": {
"type": "playlist",
"name": playlist_name,
"owner": playlist_data.get('creator', {}).get('name', 'unknown'),
"total_tracks": total_tracks,
"url": f"https://deezer.com/playlist/{playlist_data.get('id', '')}"
}
})
playlist_data = self.__preferences.json_data; playlist_name = playlist_data.get('title', 'unknown')
total_tracks = getattr(self.__preferences, 'total_tracks', 0); current_track = getattr(self.__preferences, 'track_number', 0)
error_progress_data.update({"current_track": current_track, "total_tracks": total_tracks, "parent": {"type": "playlist", "name": playlist_name, "owner": playlist_data.get('creator', {}).get('name', 'unknown'), "total_tracks": total_tracks, "url": f"https://deezer.com/playlist/{playlist_data.get('id', '')}"}})
elif self.__parent == "album":
album_name = self.__song_metadata.get('album', '')
album_artist = self.__song_metadata.get('album_artist', self.__song_metadata.get('album_artist', ''))
total_tracks = getattr(self.__preferences, 'total_tracks', 0)
current_track = getattr(self.__preferences, 'track_number', 0)
progress_data.update({
"current_track": current_track,
"total_tracks": total_tracks,
"parent": {
"type": "album",
"title": album_name,
"artist": album_artist,
"total_tracks": total_tracks,
"url": f"https://deezer.com/album/{self.__preferences.song_metadata.get('album_id', '')}"
}
})
# Report the error
Download_JOB.report_progress(progress_data)
album_name = self.__song_metadata.get('album', ''); album_artist = self.__song_metadata.get('album_artist', self.__song_metadata.get('album_artist', ''))
total_tracks = getattr(self.__preferences, 'total_tracks', 0); current_track = getattr(self.__preferences, 'track_number', 0)
error_progress_data.update({"current_track": current_track, "total_tracks": total_tracks, "parent": {"type": "album", "title": album_name, "artist": album_artist, "total_tracks": total_tracks, "url": f"https://deezer.com/album/{self.__preferences.song_metadata.get('album_id', '')}"}})
Download_JOB.report_progress(error_progress_data)
logger.error(f"Failed to process track: {error_msg}")
# Still raise the exception to maintain original flow
# Add the original exception e to the message for more context
self.__c_track.success = False # Mark as failed
self.__c_track.error_message = error_msg # Store the refined error message
self.__c_track.success = False
self.__c_track.error_message = error_msg
raise TrackNotFound(f"Failed to process {self.__song_path}. Error: {error_msg}. Original Exception: {str(e)}")
# If download and processing (like decryption, tagging) were successful before conversion
if not self.__convert_to: # Or if conversion was successful
self.__c_track.success = True
return self.__c_track
except Exception as e:
# Add more context to this exception
except Exception as e: # Outer exception for initial media checks, etc.
song_title = self.__song_metadata.get('music', 'Unknown Song')
artist_name = self.__song_metadata.get('artist', 'Unknown Artist')
error_message = f"Download failed for '{song_title}' by '{artist_name}' (Link: {self.__link}). Error: {str(e)}"
@@ -859,6 +810,9 @@ class EASY_DW:
if self.__infos_dw.get('LYRICS_ID', 0) != 0:
need = API_GW.get_lyric(self.__ids)
if "LYRICS_TEXT" in need:
self.__song_metadata['lyric'] = need["LYRICS_TEXT"]
if "LYRICS_SYNC_JSON" in need:
self.__song_metadata['lyric_sync'] = trasform_sync_lyric(
need['LYRICS_SYNC_JSON']
@@ -914,10 +868,9 @@ class DW_ALBUM:
self.__ids = self.__preferences.ids
self.__make_zip = self.__preferences.make_zip
self.__output_dir = self.__preferences.output_dir
self.__method_save = self.__preferences.method_save
self.__song_metadata = self.__preferences.song_metadata
self.__not_interface = self.__preferences.not_interface
self.__quality_download = self.__preferences.quality_download
self.__recursive_quality = self.__preferences.recursive_quality
self.__song_metadata_items = self.__song_metadata.items()
@@ -965,11 +918,11 @@ class DW_ALBUM:
infos_dw = API_GW.get_album_data(self.__ids)['data']
md5_image = infos_dw[0]['ALB_PICTURE']
image = API.choose_img(md5_image)
self.__song_metadata['image'] = image
image_bytes = API.choose_img(md5_image, size="1400x1400") # Fetch highest quality
self.__song_metadata['image'] = image_bytes # Store for tagging if needed, already bytes
album = Album(self.__ids)
album.image = image
album.image = image_bytes # Store raw image bytes
album.md5_image = md5_image
album.nb_tracks = self.__song_metadata['nb_tracks']
album.album_name = self.__song_metadata['album']
@@ -982,7 +935,13 @@ class DW_ALBUM:
infos_dw, self.__quality_download
)
# The album_artist for tagging individual tracks will be derived_album_artist_from_contributors
# Determine album base directory once
album_base_directory = get_album_directory(
self.__song_metadata, # Album level metadata
self.__output_dir,
custom_dir_format=self.__preferences.custom_dir_format,
pad_tracks=self.__preferences.pad_tracks
)
total_tracks = len(infos_dw)
for a in range(total_tracks):
@@ -1073,6 +1032,10 @@ class DW_ALBUM:
logger.warning(f"Track not found: {song} :( Details: {track.error_message}. URL: {c_preferences.link if c_preferences else 'N/A'}")
tracks.append(track)
# Save album cover image
if self.__preferences.save_cover and album.image and album_base_directory:
save_cover_image(album.image, album_base_directory, "cover.jpg")
if self.__make_zip:
song_quality = tracks[0].quality if tracks else 'Unknown'
# Pass along custom directory format if set
@@ -1082,7 +1045,6 @@ class DW_ALBUM:
output_dir=self.__output_dir,
song_metadata=self.__song_metadata,
song_quality=song_quality,
method_save=self.__method_save,
custom_dir_format=custom_dir_format
)
album.zip_path = zip_name
@@ -1136,7 +1098,7 @@ class DW_PLAYLIST:
infos_dw = API_GW.get_playlist_data(self.__ids)['data']
# Extract playlist metadata - we'll use this in the track-level reporting
playlist_name = self.__json_data['title']
playlist_name_sanitized = sanitize_name(self.__json_data['title'])
total_tracks = len(infos_dw)
playlist = Playlist()
@@ -1146,7 +1108,7 @@ class DW_PLAYLIST:
# m3u file will be placed in output_dir/playlists
playlist_m3u_dir = os.path.join(self.__output_dir, "playlists")
os.makedirs(playlist_m3u_dir, exist_ok=True)
m3u_path = os.path.join(playlist_m3u_dir, f"{playlist_name}.m3u")
m3u_path = os.path.join(playlist_m3u_dir, f"{playlist_name_sanitized}.m3u")
if not os.path.exists(m3u_path):
with open(m3u_path, "w", encoding="utf-8") as m3u_file:
m3u_file.write("#EXTM3U\n")
@@ -1225,7 +1187,6 @@ class DW_EPISODE:
self.__preferences = preferences
self.__ids = preferences.ids
self.__output_dir = preferences.output_dir
self.__method_save = preferences.method_save
self.__not_interface = preferences.not_interface
self.__quality_download = preferences.quality_download
@@ -1334,6 +1295,17 @@ class DW_EPISODE:
}
Download_JOB.report_progress(progress_data)
# Save cover image for the episode
if self.__preferences.save_cover:
episode_image_md5 = infos_dw.get('EPISODE_IMAGE_MD5', '')
episode_image_data = None
if episode_image_md5:
episode_image_data = API.choose_img(episode_image_md5, size="1200x1200")
if episode_image_data:
episode_directory = os.path.dirname(output_path)
save_cover_image(episode_image_data, episode_directory, "cover.jpg")
return episode
except Exception as e:

View File

@@ -39,7 +39,7 @@ from deezspot.libutils.others_settings import (
stock_recursive_download,
stock_not_interface,
stock_zip,
method_save,
stock_save_cover,
)
from deezspot.libutils.logging_utils import ProgressReporter, logger
@@ -97,14 +97,14 @@ class DeeLogin:
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Track:
link_is_valid(link_track)
@@ -130,7 +130,6 @@ class DeeLogin:
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.method_save = method_save
# New custom formatting preferences:
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
@@ -142,6 +141,7 @@ class DeeLogin:
preferences.max_retries = max_retries
# Audio conversion parameter
preferences.convert_to = convert_to
preferences.save_cover = save_cover
track = DW_TRACK(preferences).dw()
@@ -155,14 +155,14 @@ class DeeLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Album:
link_is_valid(link_album)
@@ -185,7 +185,6 @@ class DeeLogin:
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.method_save = method_save
preferences.make_zip = make_zip
# New custom formatting preferences:
preferences.custom_dir_format = custom_dir_format
@@ -198,6 +197,7 @@ class DeeLogin:
preferences.max_retries = max_retries
# Audio conversion parameter
preferences.convert_to = convert_to
preferences.save_cover = save_cover
album = DW_ALBUM(preferences).dw()
@@ -211,14 +211,14 @@ class DeeLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Playlist:
link_is_valid(link_playlist)
@@ -251,7 +251,6 @@ class DeeLogin:
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.method_save = method_save
preferences.make_zip = make_zip
# New custom formatting preferences:
preferences.custom_dir_format = custom_dir_format
@@ -264,6 +263,7 @@ class DeeLogin:
preferences.max_retries = max_retries
# Audio conversion parameter
preferences.convert_to = convert_to
preferences.save_cover = save_cover
playlist = DW_PLAYLIST(preferences).dw()
@@ -276,11 +276,11 @@ class DeeLogin:
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> list[Track]:
link_is_valid(link_artist)
@@ -293,11 +293,11 @@ class DeeLogin:
track['link'], output_dir,
quality_download, recursive_quality,
recursive_download, not_interface,
method_save=method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
convert_to=convert_to
convert_to=convert_to,
save_cover=save_cover
)
for track in playlist_json
]
@@ -332,14 +332,14 @@ class DeeLogin:
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Track:
track_link_dee = self.convert_spoty_to_dee_link_track(link_track)
@@ -351,14 +351,14 @@ class DeeLogin:
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
method_save=method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to
convert_to=convert_to,
save_cover=save_cover
)
return track
@@ -450,14 +450,14 @@ class DeeLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Album:
link_dee = self.convert_spoty_to_dee_link_album(link_album)
@@ -466,14 +466,15 @@ class DeeLogin:
link_dee, output_dir,
quality_download, recursive_quality,
recursive_download, not_interface,
make_zip, method_save,
make_zip,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to
convert_to=convert_to,
save_cover=save_cover
)
return album
@@ -486,14 +487,14 @@ class DeeLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Playlist:
link_is_valid(link_playlist)
@@ -563,14 +564,14 @@ class DeeLogin:
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
method_save=method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to
convert_to=convert_to,
save_cover=save_cover
)
tracks.append(downloaded_track)
except (TrackNotFound, NoDataApi) as e:
@@ -618,11 +619,14 @@ class DeeLogin:
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
pad_tracks=True,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Track:
query = f"track:{song} artist:{artist}"
@@ -648,11 +652,14 @@ class DeeLogin:
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
method_save=method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
convert_to=convert_to
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries,
convert_to=convert_to,
save_cover=save_cover
)
return track
@@ -665,14 +672,13 @@ class DeeLogin:
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
save_cover=stock_save_cover
) -> Episode:
link_is_valid(link_episode)
@@ -707,16 +713,8 @@ class DeeLogin:
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.method_save = method_save
# New custom formatting preferences:
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
# Track number padding option
preferences.pad_tracks = pad_tracks
# Retry parameters
preferences.initial_retry_delay = initial_retry_delay
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
# No convert_to for episode download (and preferences.convert_to is not set here)
preferences.save_cover = save_cover
episode = DW_EPISODE(preferences).dw()
@@ -730,13 +728,14 @@ class DeeLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True,
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5
max_retries=5,
convert_to=None,
save_cover=stock_save_cover
) -> Smart:
link_is_valid(link)
@@ -773,13 +772,14 @@ class DeeLogin:
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
method_save=method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries
max_retries=max_retries,
convert_to=convert_to,
save_cover=save_cover
)
smart.type = "track"
smart.track = track
@@ -800,13 +800,14 @@ class DeeLogin:
recursive_download=recursive_download,
not_interface=not_interface,
make_zip=make_zip,
method_save=method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries
max_retries=max_retries,
convert_to=convert_to,
save_cover=save_cover
)
smart.type = "album"
smart.album = album
@@ -827,13 +828,14 @@ class DeeLogin:
recursive_download=recursive_download,
not_interface=not_interface,
make_zip=make_zip,
method_save=method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries
max_retries=max_retries,
convert_to=convert_to,
save_cover=save_cover
)
smart.type = "playlist"
smart.playlist = playlist

View File

@@ -1,6 +1,7 @@
import os
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from mutagen.id3 import ID3
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TCON, TRCK, TPOS, TSRC, TPE2, TCOM, USLT, TPUB, COMM, TCOP, TENC, TLAN, TYER, TXXX
from mutagen.mp4 import MP4
from mutagen import File
from deezspot.libutils.logging_utils import logger
@@ -83,6 +84,34 @@ def write_tags(track):
id3.add(TCOM(encoding=3, text=value))
elif key == 'lyrics':
id3.add(USLT(encoding=3, lang='eng', desc='', text=value))
elif key == 'publisher':
id3.add(TPUB(encoding=3, text=value))
elif key == 'comment':
id3.add(COMM(encoding=3, lang='eng', desc='', text=value))
elif key == 'copyright':
id3.add(TCOP(encoding=3, text=value))
elif key == 'encodedby':
id3.add(TENC(encoding=3, text=value))
elif key == 'language':
id3.add(TLAN(encoding=3, text=value))
elif key == 'year' and not metadata.get('date'):
id3.add(TYER(encoding=3, text=value))
elif key == 'mood':
id3.add(TXXX(encoding=3, desc='MOOD', text=value))
elif key == 'explicit':
id3.add(TXXX(encoding=3, desc='EXPLICIT', text=value))
elif key == 'rating':
id3.add(TXXX(encoding=3, desc='RATING', text=value))
elif key == 'website':
id3.add(TXXX(encoding=3, desc='WEBSITE', text=value))
elif key == 'replaygain_track_gain':
id3.add(TXXX(encoding=3, desc='REPLAYGAIN_TRACK_GAIN', text=value))
elif key == 'replaygain_track_peak':
id3.add(TXXX(encoding=3, desc='REPLAYGAIN_TRACK_PEAK', text=value))
elif key == 'replaygain_album_gain':
id3.add(TXXX(encoding=3, desc='REPLAYGAIN_ALBUM_GAIN', text=value))
elif key == 'replaygain_album_peak':
id3.add(TXXX(encoding=3, desc='REPLAYGAIN_ALBUM_PEAK', text=value))
audio.tags = id3
@@ -101,9 +130,15 @@ def write_tags(track):
elif key == 'genre':
audio['\xa9gen'] = value
elif key == 'tracknumber':
audio['trkn'] = [(int(value.split('/')[0]), int(value.split('/')[1]))]
parts = str(value).split('/')
track_num = int(parts[0])
total_tracks = int(parts[1]) if len(parts) > 1 else 0
audio['trkn'] = [(track_num, total_tracks)]
elif key == 'discnumber':
audio['disk'] = [(int(value.split('/')[0]), int(value.split('/')[1]))]
parts = str(value).split('/')
disc_num = int(parts[0])
total_discs = int(parts[1]) if len(parts) > 1 else 0
audio['disk'] = [(disc_num, total_discs)]
elif key == 'isrc':
audio['isrc'] = value
elif key == 'albumartist':
@@ -112,6 +147,20 @@ def write_tags(track):
audio['\xa9wrt'] = value
elif key == 'lyrics':
audio['\xa9lyr'] = value
elif key == 'publisher':
audio['\xa9pub'] = value
elif key == 'comment':
audio['\xa9cmt'] = value
elif key == 'copyright':
audio['\xa9cpy'] = value
elif key == 'encodedby':
audio['\xa9too'] = value
elif key == 'explicit':
if value == '1': # True for explicit
audio['rtng'] = [4] # Explicit for iTunes
elif value == '0': # False for explicit
audio['rtng'] = [0] # None
# else: do not set rtng if value is not '0' or '1'
# Save the changes
audio.save()

View File

@@ -115,45 +115,6 @@ def check_track_md5(infos_dw):
logger.error(f"Failed to check track MD5: {str(e)}")
raise
def set_path(song_metadata, output_dir, method_save):
"""
Set the output path for a track based on metadata and save method.
Args:
song_metadata: Track metadata
output_dir: Base output directory
method_save: Save method (e.g., 'artist/album/track')
Returns:
str: Full output path
"""
try:
# Create base directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Build path based on method
if method_save == 'artist/album/track':
path = os.path.join(
output_dir,
song_metadata['artist'],
song_metadata['album'],
f"{song_metadata['music']}.mp3"
)
else:
path = os.path.join(
output_dir,
f"{song_metadata['artist']} - {song_metadata['music']}.mp3"
)
# Create parent directories
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
except Exception as e:
logger.error(f"Failed to set path: {str(e)}")
raise
def trasform_sync_lyric(lyrics):
"""
Transform synchronized lyrics into a standard format.

View File

@@ -1,7 +1,6 @@
#!/usr/bin/python3
stock_quality = "MP3_320"
method_saves = ["0", "1", "2"]
qualities = {
"MP3_320": {

View File

@@ -1,7 +1,5 @@
#!/usr/bin/python3
method_saves = ["0", "1", "2", "3"]
sources = [
"dee", "spo"
]
@@ -23,6 +21,5 @@ stock_recursive_quality = False
stock_recursive_download = False
stock_not_interface = False
stock_zip = False
method_save = 3
is_thread = False # WARNING FOR TRUE, LOOP ON DEFAULT
stock_real_time_dl = True
stock_real_time_dl = False
stock_save_cover = False # Default for saving cover image

View File

@@ -9,10 +9,11 @@ from zipfile import ZipFile, ZIP_DEFLATED
from deezspot.models.track import Track
from deezspot.exceptions import InvalidLink
from deezspot.libutils.others_settings import supported_link, header
from deezspot.libutils.logging_utils import ProgressReporter, logger
from os.path import (
isdir, basename,
join, isfile
join, isfile, dirname
)
def link_is_valid(link):
@@ -120,247 +121,183 @@ def what_kind(link):
return url
def __get_tronc(string):
l_encoded = len(string.encode())
if l_encoded > 242:
n_tronc = len(string) - l_encoded - 242
else:
n_tronc = 242
return n_tronc
return string[:len(string) - 1]
def apply_custom_format(format_str, metadata: dict, pad_tracks=True) -> str:
"""
Replaces placeholders in the format string with values from metadata.
Placeholders are denoted by %key%, for example: "%ar_album%/%album%".
The pad_tracks parameter controls whether track numbers are padded with leading zeros.
"""
def replacer(match):
key = match.group(1)
# Alias and special keys
if key == 'album_artist':
raw_value = metadata.get('ar_album', metadata.get('album_artist'))
elif key == 'year':
raw_value = metadata.get('release_date', metadata.get('year'))
elif key == 'date':
raw_value = metadata.get('release_date', metadata.get('date'))
elif key == 'discnum':
raw_value = metadata.get('disc_number', metadata.get('discnum'))
else:
# All other placeholders map directly
raw_value = metadata.get(key)
# Friendly names for missing metadata
key_mappings = {
'ar_album': 'album artist',
'album_artist': 'album artist',
'artist': 'artist',
'album': 'album',
'tracknum': 'track number',
'discnum': 'disc number',
'date': 'release date',
'year': 'year',
'genre': 'genre',
'isrc': 'ISRC',
'explicit': 'explicit flag',
'duration': 'duration',
'publisher': 'publisher',
'composer': 'composer',
'copyright': 'copyright',
'author': 'author',
'lyricist': 'lyricist',
'version': 'version',
'comment': 'comment',
'encodedby': 'encoded by',
'language': 'language',
'lyrics': 'lyrics',
'mood': 'mood',
'rating': 'rating',
'website': 'website',
'replaygain_album_gain': 'replaygain album gain',
'replaygain_album_peak': 'replaygain album peak',
'replaygain_track_gain': 'replaygain track gain',
'replaygain_track_peak': 'replaygain track peak',
}
# Custom formatting for specific keys
if key == 'tracknum' and pad_tracks and raw_value not in (None, ''):
try:
return sanitize_name(f"{int(raw_value):02d}")
except (ValueError, TypeError):
pass
if key == 'discnum' and raw_value not in (None, ''):
try:
return sanitize_name(f"{int(raw_value):02d}")
except (ValueError, TypeError):
pass
if key == 'year' and raw_value not in (None, ''):
m = re.match(r"^(\d{4})", str(raw_value))
if m:
return sanitize_name(m.group(1))
# Handle missing metadata with descriptive default
if raw_value in (None, ''):
friendly = key_mappings.get(key, key.replace('_', ' '))
return sanitize_name(f"Unknown {friendly}")
# Default handling
return sanitize_name(str(raw_value))
return re.sub(r'%(\w+)%', replacer, format_str)
full_key = match.group(1) # e.g., "artist", "ar_album_1"
# Check for specific indexed placeholders: artist_INDEX or ar_album_INDEX
# Allows %artist_1%, %ar_album_1%, etc.
indexed_artist_match = re.fullmatch(r'(artist|ar_album)_(\d+)', full_key)
if indexed_artist_match:
base_key = indexed_artist_match.group(1) # "artist" or "ar_album"
try:
index = int(indexed_artist_match.group(2))
except ValueError: # Should not happen with \d+ but good practice
return ""
raw_value = metadata.get(base_key) # Get the value of "artist" or "ar_album"
items = []
if isinstance(raw_value, str):
# Split semicolon-separated strings and strip whitespace
items = [item.strip() for item in raw_value.split(';') if item.strip()]
elif isinstance(raw_value, list):
# Convert all items to string, strip whitespace
items = [str(item).strip() for item in raw_value if str(item).strip()]
# If raw_value is not string or list, items remains []
if items: # If we have a list of artists/ar_album
if 1 <= index <= len(items):
return items[index - 1]
elif items: # Index out of bounds, but list is not empty
return items[0] # Fallback to the first item
# If items is empty after processing, fall through
# Fallback if no items or base_key was not found or not list/string
return ""
def __get_dir(song_metadata, output_dir, method_save, custom_dir_format=None, pad_tracks=True):
"""
Returns the final directory based either on a custom directory format string
or the legacy method_save logic.
"""
if song_metadata is None:
raise ValueError("song_metadata cannot be None")
if custom_dir_format is not None:
# Use the custom format string
dir_name = apply_custom_format(custom_dir_format, song_metadata, pad_tracks)
else:
# Legacy logic based on method_save (for episodes or albums)
if 'show' in song_metadata and 'name' in song_metadata:
show = var_excape(song_metadata.get('show', ''))
episode = var_excape(song_metadata.get('name', ''))
if show and episode:
dir_name = f"{show} - {episode}"
elif show:
dir_name = show
elif episode:
dir_name = episode
else:
dir_name = "Unknown Episode"
else:
album = var_excape(song_metadata.get('album', ''))
ar_album = var_excape(song_metadata.get('ar_album', ''))
if method_save == 0:
dir_name = f"{album} - {ar_album}"
elif method_save == 1:
dir_name = f"{ar_album}/{album}"
elif method_save == 2:
dir_name = f"{album} - {ar_album}"
elif method_save == 3:
dir_name = f"{album} - {ar_album}"
else:
dir_name = "Unknown"
# Original non-indexed placeholder logic (for %album%, %title%, %artist%, %ar_album%, etc.)
value = metadata.get(full_key, '')
if pad_tracks and full_key in ['tracknum', 'discnum']:
str_value = str(value)
# Pad with leading zero if it's a single digit
if str_value.isdigit() and len(str_value) == 1:
return str_value.zfill(2)
return str(value)
return re.sub(r'%([^%]+)%', replacer, format_str)
def __get_dir(song_metadata, output_dir, custom_dir_format=None, pad_tracks=True):
# If custom_dir_format is explicitly empty or None, use output_dir directly
if not custom_dir_format:
# Ensure output_dir itself exists, as __check_dir won't be called on a subpath
__check_dir(output_dir)
return output_dir
# Apply the custom format string.
# pad_tracks is passed along in case 'tracknum' or 'discnum' are used in dir format.
formatted_path_segment = apply_custom_format(custom_dir_format, song_metadata, pad_tracks)
# Prevent absolute paths and sanitize each directory segment
dir_name = dir_name.strip('/')
dir_name = '/'.join(sanitize_name(seg) for seg in dir_name.split('/') if seg)
final_dir = join(output_dir, dir_name)
if not isdir(final_dir):
makedirs(final_dir)
return final_dir
# Sanitize each component of the formatted path segment
sanitized_path_segment = "/".join(
sanitize_name(part) for part in formatted_path_segment.split("/")
)
# Join with the base output directory
path = join(output_dir, sanitized_path_segment)
# __check_dir will create the directory if it doesn't exist.
__check_dir(path)
return path
def set_path(
song_metadata, output_dir,
song_quality, file_format, method_save,
song_quality, file_format,
is_episode=False,
custom_dir_format=None,
custom_track_format=None,
pad_tracks=True
):
if song_metadata is None:
raise ValueError("song_metadata cannot be None")
if is_episode:
if custom_track_format is not None:
song_name = apply_custom_format(custom_track_format, song_metadata, pad_tracks)
else:
show = var_excape(song_metadata.get('show', ''))
episode = var_excape(song_metadata.get('name', ''))
if show and episode:
song_name = f"{show} - {episode}"
elif show:
song_name = show
elif episode:
song_name = episode
else:
song_name = "Unknown Episode"
else:
if custom_track_format is not None:
song_name = apply_custom_format(custom_track_format, song_metadata, pad_tracks)
else:
album = var_excape(song_metadata.get('album', ''))
artist = var_excape(song_metadata.get('artist', ''))
music = var_excape(song_metadata.get('music', '')) # Track title
discnum = song_metadata.get('discnum', '')
tracknum = song_metadata.get('tracknum', '')
# Determine the directory for the song
directory = __get_dir(
song_metadata,
output_dir,
custom_dir_format=custom_dir_format,
pad_tracks=pad_tracks
)
if method_save == 0:
song_name = f"{album} CD {discnum} TRACK {tracknum}"
elif method_save == 1:
try:
if pad_tracks:
tracknum = f"{int(tracknum):02d}" # Format as two digits with padding
else:
tracknum = f"{int(tracknum)}" # Format without padding
except (ValueError, TypeError):
pass # Fallback to raw value
tracknum_clean = var_excape(str(tracknum))
tracktitle_clean = var_excape(music)
song_name = f"{tracknum_clean}. {tracktitle_clean}"
elif method_save == 2:
isrc = song_metadata.get('isrc', '')
song_name = f"{music} - {artist} [{isrc}]"
elif method_save == 3:
song_name = f"{discnum}|{tracknum} - {music} - {artist}"
# Determine the filename for the song
# Default track format if no custom one is provided
if custom_track_format is None:
if is_episode:
# Default for episodes: %title%
# Episodes usually don't have artist/album context in the same way tracks do.
# Their 'album' is the show name, and 'artist' is the publisher.
custom_track_format = "%music%"
else:
# Default for tracks: %artist% - %title%
custom_track_format = "%artist% - %music%"
# Sanitize song_name to remove invalid chars and prevent '/'
song_name = sanitize_name(song_name)
# Truncate to avoid filesystem limits
max_length = 255 - len(output_dir) - len(file_format)
song_name = song_name[:max_length]
# Apply the custom format string for the track filename.
# pad_tracks is passed along for track/disc numbers in filename.
track_filename_base = apply_custom_format(custom_track_format, song_metadata, pad_tracks)
track_filename_base = sanitize_name(track_filename_base)
# Build final path
song_dir = __get_dir(song_metadata, output_dir, method_save, custom_dir_format, pad_tracks)
__check_dir(song_dir)
n_tronc = __get_tronc(song_name)
song_path = f"{song_dir}/{song_name[:n_tronc]}{file_format}"
return song_path
# Add quality and file format to the filename
if song_quality and file_format:
# Ensure file_format starts with a dot
ext = file_format if file_format.startswith('.') else f".{file_format}"
filename = f"{track_filename_base} [{song_quality}]{ext}"
elif file_format: # Only file_format provided
ext = file_format if file_format.startswith('.') else f".{file_format}"
filename = f"{track_filename_base}{ext}"
else: # Neither provided (should not happen for standard audio)
filename = track_filename_base
return join(directory, filename)
def create_zip(
tracks: list[Track],
output_dir=None,
song_metadata=None,
song_quality=None,
method_save=0,
zip_name=None
song_metadata=None, # Album/Playlist level metadata
song_quality=None, # Overall quality for the zip, if applicable
zip_name=None, # Specific name for the zip file
custom_dir_format=None # To determine zip name if not provided, and for paths inside zip
):
if not zip_name:
album = var_excape(song_metadata.get('album', ''))
song_dir = __get_dir(song_metadata, output_dir, method_save)
if method_save == 0:
zip_name = f"{album}"
elif method_save == 1:
artist = var_excape(song_metadata.get('ar_album', ''))
zip_name = f"{album} - {artist}"
elif method_save == 2:
artist = var_excape(song_metadata.get('ar_album', ''))
upc = song_metadata.get('upc', '')
zip_name = f"{album} - {artist} {upc}"
elif method_save == 3:
artist = var_excape(song_metadata.get('ar_album', ''))
upc = song_metadata.get('upc', '')
zip_name = f"{album} - {artist} {upc}"
n_tronc = __get_tronc(zip_name)
zip_name = zip_name[:n_tronc]
zip_name += ".zip"
zip_path = f"{song_dir}/{zip_name}"
# Determine the zip file name and path
if zip_name:
# If zip_name is a full path, use it as is.
# Otherwise, prepend output_dir.
if not basename(zip_name) == zip_name: # Checks if it's just a filename
actual_zip_path = zip_name
else:
# Ensure output_dir exists for placing the zip file
if not output_dir:
# Fallback to a default if output_dir is not provided with a relative zip_name
output_dir = "."
__check_dir(output_dir)
actual_zip_path = join(output_dir, zip_name)
elif song_metadata and output_dir: # Construct default name if song_metadata and output_dir exist
# Use album/playlist name and quality for default zip name
# Sanitize the album/playlist name part of the zip file
name_part = sanitize_name(song_metadata.get('album', song_metadata.get('name', 'archive')))
quality_part = f" [{song_quality}]" if song_quality else ""
actual_zip_path = join(output_dir, f"{name_part}{quality_part}.zip")
else:
zip_path = zip_name
# Fallback zip name if not enough info
actual_zip_path = join(output_dir if output_dir else ".", "archive.zip")
z = ZipFile(zip_path, "w", ZIP_DEFLATED)
for track in tracks:
if not track.success:
continue
c_song_path = track.song_path
song_path = basename(c_song_path)
if not isfile(c_song_path):
continue
z.write(c_song_path, song_path)
z.close()
return zip_path
# Ensure the directory for the zip file exists
zip_dir = dirname(actual_zip_path)
__check_dir(zip_dir)
with ZipFile(actual_zip_path, 'w', ZIP_DEFLATED) as zf:
for track in tracks:
if track.success and isfile(track.song_path):
# Determine path inside the zip
# This uses the same logic as saving individual files,
# but relative to the zip root.
# We pass an empty string as base_output_dir to set_path essentially,
# so it generates a relative path structure.
path_in_zip = set_path(
track.tags, # Use individual track metadata for path inside zip
"", # Base output dir (empty for relative paths in zip)
track.quality,
track.file_format,
custom_dir_format=custom_dir_format, # Use album/playlist custom dir format
custom_track_format=track.tags.get('custom_track_format'), # Use track specific if available
pad_tracks=track.tags.get('pad_tracks', True)
)
# Remove leading slash if any, to ensure it's relative inside zip
path_in_zip = path_in_zip.lstrip('/').lstrip('\\')
zf.write(track.song_path, arcname=path_in_zip)
return actual_zip_path
def trasform_sync_lyric(lyric):
sync_array = []
@@ -369,3 +306,27 @@ def trasform_sync_lyric(lyric):
arr = (a['line'], int(a['milliseconds']))
sync_array.append(arr)
return sync_array
def save_cover_image(image_data: bytes, directory_path: str, cover_filename: str = "cover.jpg"):
if not image_data:
logger.warning(f"No image data provided to save cover in {directory_path}.")
return
if not isdir(directory_path):
# This case should ideally be handled by prior directory creation (e.g., __get_dir)
# but as a fallback, we can try to create it or log a warning.
logger.warning(f"Directory {directory_path} does not exist. Attempting to create it for cover image.")
try:
makedirs(directory_path, exist_ok=True)
logger.info(f"Created directory {directory_path} for cover image.")
except OSError as e:
logger.error(f"Failed to create directory {directory_path} for cover: {e}")
return
cover_path = join(directory_path, cover_filename)
try:
with open(cover_path, "wb") as f:
f.write(image_data)
logger.info(f"Successfully saved cover image to {cover_path}")
except OSError as e:
logger.error(f"Failed to save cover image to {cover_path}: {e}")

View File

@@ -11,7 +11,6 @@ class Preferences:
self.recursive_quality = None
self.recursive_download = None
self.not_interface = None
self.method_save = None
self.make_zip = None
self.real_time_dl = None ,
self.custom_dir_format = None,
@@ -19,4 +18,5 @@ class Preferences:
self.pad_tracks = True # Default to padded track numbers (01, 02, etc.)
self.initial_retry_delay = 30 # Default initial retry delay in seconds
self.retry_delay_increase = 30 # Default increase in delay between retries in seconds
self.max_retries = 5 # Default maximum number of retries per track
self.max_retries = 5 # Default maximum number of retries per track
self.save_cover: bool = False # Option to save a cover.jpg image

View File

@@ -31,6 +31,9 @@ from deezspot.libutils.utils import (
set_path,
create_zip,
request,
sanitize_name,
save_cover_image,
__get_dir as get_album_directory,
)
from mutagen import File
from mutagen.easyid3 import EasyID3
@@ -135,7 +138,6 @@ class EASY_DW:
self.__ids = preferences.ids
self.__link = preferences.link
self.__output_dir = preferences.output_dir
self.__method_save = preferences.method_save
self.__song_metadata = preferences.song_metadata
self.__not_interface = preferences.not_interface
self.__quality_download = preferences.quality_download or "NORMAL"
@@ -168,7 +170,6 @@ class EASY_DW:
self.__output_dir,
self.__song_quality,
self.__file_format,
self.__method_save,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks
@@ -183,7 +184,6 @@ class EASY_DW:
self.__output_dir,
self.__song_quality,
self.__file_format,
self.__method_save,
is_episode=True,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
@@ -212,33 +212,43 @@ class EASY_DW:
def __convert_audio(self) -> None:
# First, handle Spotify's OGG to standard format conversion (always needed)
temp_filename = self.__song_path.replace(".ogg", ".tmp")
os_replace(self.__song_path, temp_filename)
# Register the temporary file
register_active_download(temp_filename)
# self.__song_path is initially the path for the .ogg file (e.g., song.ogg)
og_song_path_for_ogg_output = self.__song_path
temp_filename = og_song_path_for_ogg_output.replace(".ogg", ".tmp")
# Move original .ogg to .tmp
os_replace(og_song_path_for_ogg_output, temp_filename)
register_active_download(temp_filename) # CURRENT_DOWNLOAD = temp_filename
try:
# Step 1: First convert the OGG file to standard format
ffmpeg_cmd = f"ffmpeg -y -hide_banner -loglevel error -i \"{temp_filename}\" -c:a copy \"{self.__song_path}\""
system(ffmpeg_cmd)
# Step 1: First convert the OGG file to standard format (copy operation)
# Output is og_song_path_for_ogg_output
ffmpeg_cmd = f'ffmpeg -y -hide_banner -loglevel error -i "{temp_filename}" -c:a copy "{og_song_path_for_ogg_output}"'
system(ffmpeg_cmd) # Creates/overwrites og_song_path_for_ogg_output
# Register the new output file and unregister the temp file
register_active_download(self.__song_path)
# Remove the temporary file
# temp_filename has been processed. Unregister and remove it.
# CURRENT_DOWNLOAD was temp_filename.
unregister_active_download(temp_filename) # CURRENT_DOWNLOAD should become None.
if os.path.exists(temp_filename):
remove(temp_filename)
unregister_active_download(temp_filename)
# Step 2: Convert to requested format if specified
# The primary file is now og_song_path_for_ogg_output. Register it.
# Ensure self.__song_path reflects this, as it might be used by other parts of the class or returned.
self.__song_path = og_song_path_for_ogg_output
register_active_download(self.__song_path) # CURRENT_DOWNLOAD = self.__song_path (the .ogg)
# Step 2: Convert to requested format if specified (e.g., MP3, FLAC)
conversion_to_another_format_occurred_and_cleared_state = False
if self.__convert_to:
format_name, bitrate = parse_format_string(self.__convert_to)
if format_name:
try:
# Convert to the requested format using our standardized converter
# convert_audio is expected to handle its own input/output registration/unregistration.
# Input to convert_audio is self.__song_path (the .ogg path).
# On success, convert_audio should unregister its input and its output,
# leaving CURRENT_DOWNLOAD as None.
converted_path = convert_audio(
self.__song_path,
self.__song_path, # Current .ogg path
format_name,
bitrate,
register_active_download,
@@ -247,24 +257,41 @@ class EASY_DW:
if converted_path != self.__song_path:
# Update the path to the converted file
self.__song_path = converted_path
self.__c_track.song_path = converted_path
self.__c_track.song_path = converted_path # Ensure track object has the final path
conversion_to_another_format_occurred_and_cleared_state = True
except Exception as conv_error:
# Log conversion error but continue with original file
logger.error(f"Audio conversion error: {str(conv_error)}")
# Conversion to a different format failed.
# self.__song_path (the .ogg) is still the latest valid file and is registered.
# We want to keep it, so CURRENT_DOWNLOAD should remain set to this .ogg path.
logger.error(f"Audio conversion to {format_name} error: {str(conv_error)}")
# conversion_to_another_format_occurred_and_cleared_state remains False.
# else: format_name was None after parsing __convert_to. No specific conversion attempt.
# conversion_to_another_format_occurred_and_cleared_state remains False.
# If no conversion to another format was requested, or if it was requested but didn't effectively run
# (e.g. format_name was None), or if convert_audio failed to clear state (which would be its bug),
# then self.__song_path (the .ogg from Step 1) is the final successfully processed file for this method's scope.
# It is currently registered. Unregister it as its processing is complete.
if not conversion_to_another_format_occurred_and_cleared_state:
unregister_active_download(self.__song_path) # Clears CURRENT_DOWNLOAD if it was self.__song_path
except Exception as e:
# In case of failure, try to restore the original file
if os.path.exists(temp_filename) and not os.path.exists(self.__song_path):
os_replace(temp_filename, self.__song_path)
# This outer try/except handles errors primarily from Step 1 (OGG copy)
# or issues during the setup for Step 2 before convert_audio is deeply involved.
# In case of failure, try to restore the original file from temp if Step 1 didn't complete.
if os.path.exists(temp_filename) and not os.path.exists(og_song_path_for_ogg_output):
os_replace(temp_filename, og_song_path_for_ogg_output)
# Clean up temp files
# Clean up temp_filename. unregister_active_download is safe:
# it only clears CURRENT_DOWNLOAD if CURRENT_DOWNLOAD == temp_filename.
if os.path.exists(temp_filename):
remove(temp_filename)
unregister_active_download(temp_filename)
remove(temp_filename)
# Re-throw the exception
# Re-throw the exception. If a file (like og_song_path_for_ogg_output) was registered
# and an error occurred, it remains registered for atexit cleanup, which is intended.
raise e
def get_no_dw_track(self) -> Track:
return self.__c_track
@@ -319,7 +346,7 @@ class EASY_DW:
if hasattr(self, '_EASY_DW__c_track') and self.__c_track and self.__c_track.success:
write_tags(self.__c_track)
return self.__c_track # Return the track object
return self.__c_track
def track_exists(self, title, album):
try:
@@ -664,6 +691,16 @@ class EASY_DW:
time.sleep(retry_delay)
retry_delay += retry_delay_increase # Use the custom retry delay increase
# Save cover image if requested, after successful download and before conversion
if self.__preferences.save_cover and hasattr(self, '_EASY_DW__song_path') and self.__song_path and self.__song_metadata.get('image'):
try:
track_directory = dirname(self.__song_path)
# Ensure the directory exists (it should, from os.makedirs earlier)
save_cover_image(self.__song_metadata['image'], track_directory, "cover.jpg")
logger.info(f"Saved cover image for track in {track_directory}")
except Exception as e_img_save:
logger.warning(f"Failed to save cover image for track: {e_img_save}")
try:
self.__convert_audio()
except Exception as e:
@@ -1000,32 +1037,26 @@ class EASY_DW:
self.__c_episode.error_message = error_message
raise TrackNotFound(message=error_message, url=self.__link) from conv_e
self.__write_episode()
# Write metadata tags so subsequent skips work
write_tags(self.__c_episode)
return self.__c_episode
def download_cli(preferences: Preferences) -> None:
__link = preferences.link
__output_dir = preferences.output_dir
__method_save = preferences.method_save
__not_interface = preferences.not_interface
__quality_download = preferences.quality_download
__recursive_download = preferences.recursive_download
__recursive_quality = preferences.recursive_quality
cmd = f"deez-dw.py -so spo -l \"{__link}\" "
if __output_dir:
cmd += f"-o {__output_dir} "
if __method_save:
cmd += f"-sa {__method_save} "
if __not_interface:
cmd += f"-g "
if __quality_download:
cmd += f"-q {__quality_download} "
if __recursive_download:
cmd += f"-rd "
if __recursive_quality:
cmd += f"-rq"
system(cmd)
class DW_TRACK:
@@ -1041,11 +1072,6 @@ class DW_TRACK:
# it's an intentional skip, not an error
return track
def dw2(self) -> Track:
track = EASY_DW(self.__preferences).get_no_dw_track()
download_cli(self.__preferences)
return track
class DW_ALBUM:
def __init__(
self,
@@ -1055,7 +1081,6 @@ class DW_ALBUM:
self.__ids = self.__preferences.ids
self.__make_zip = self.__preferences.make_zip
self.__output_dir = self.__preferences.output_dir
self.__method_save = self.__preferences.method_save
self.__song_metadata = self.__preferences.song_metadata
self.__not_interface = self.__preferences.not_interface
self.__song_metadata_items = self.__song_metadata.items()
@@ -1097,11 +1122,12 @@ class DW_ALBUM:
"url": f"https://open.spotify.com/album/{album_id}"
})
pic = self.__song_metadata['image']
image = request(pic).content
self.__song_metadata['image'] = image
pic_url = self.__song_metadata['image'] # This is URL for spotify
image_bytes = request(pic_url).content
self.__song_metadata['image'] = image_bytes # Keep bytes for tagging
album = Album(self.__ids)
album.image = image
album.image = image_bytes # Store raw image bytes for cover saving
album.nb_tracks = self.__song_metadata['nb_tracks']
album.album_name = self.__song_metadata['album']
album.upc = self.__song_metadata['upc']
@@ -1109,6 +1135,14 @@ class DW_ALBUM:
album.md5_image = self.__ids
album.tags = self.__song_metadata
# Determine album base directory once
album_base_directory = get_album_directory(
self.__song_metadata, # Album level metadata
self.__output_dir,
custom_dir_format=self.__preferences.custom_dir_format,
pad_tracks=self.__preferences.pad_tracks
)
c_song_metadata = {}
for key, item in self.__song_metadata_items:
if type(item) is not list:
@@ -1146,6 +1180,11 @@ class DW_ALBUM:
track.error_message = f"An unexpected error occurred: {str(e_generic)}"
logger.error(f"Unexpected error downloading track '{song_name}' by '{artist_name}' from album '{album.album_name}'. Reason: {track.error_message}")
tracks.append(track)
# Save album cover image
if self.__preferences.save_cover and album.image and album_base_directory:
save_cover_image(album.image, album_base_directory, "cover.jpg")
if self.__make_zip:
song_quality = tracks[0].quality
custom_dir_format = getattr(self.__preferences, 'custom_dir_format', None)
@@ -1154,7 +1193,6 @@ class DW_ALBUM:
output_dir=self.__output_dir,
song_metadata=self.__song_metadata,
song_quality=song_quality,
method_save=self.__method_save,
custom_dir_format=custom_dir_format
)
album.zip_path = zip_name
@@ -1184,11 +1222,6 @@ class DW_ALBUM:
return album
def dw2(self) -> Album:
track = EASY_DW(self.__preferences).get_no_dw_track()
download_cli(self.__preferences)
return track
class DW_PLAYLIST:
def __init__(
self,
@@ -1220,10 +1253,11 @@ class DW_PLAYLIST:
# --- Prepare the m3u playlist file ---
playlist_m3u_dir = os.path.join(self.__output_dir, "playlists")
os.makedirs(playlist_m3u_dir, exist_ok=True)
m3u_path = os.path.join(playlist_m3u_dir, f"{playlist_name}.m3u")
playlist_name_sanitized = sanitize_name(playlist_name)
m3u_path = os.path.join(playlist_m3u_dir, f"{playlist_name_sanitized}.m3u")
if not os.path.exists(m3u_path):
with open(m3u_path, "w", encoding="utf-8") as m3u_file:
m3u_file.write("#EXTM3U\n")
m3u_file.write("#EXTM3U\\n")
# -------------------------------------
playlist = Playlist()
@@ -1282,86 +1316,6 @@ class DW_PLAYLIST:
return playlist
def dw2(self) -> Playlist:
# Extract playlist metadata for reporting
playlist_name = self.__json_data.get('name', 'Unknown Playlist')
playlist_owner = self.__json_data.get('owner', {}).get('display_name', 'Unknown Owner')
total_tracks = self.__json_data.get('tracks', {}).get('total', 'unknown')
playlist_id = self.__ids
# Report playlist initializing status
Download_JOB.report_progress({
"type": "playlist",
"owner": playlist_owner,
"status": "initializing",
"total_tracks": total_tracks,
"name": playlist_name,
"url": f"https://open.spotify.com/playlist/{playlist_id}"
})
playlist = Playlist()
tracks = playlist.tracks
for i, c_song_metadata in enumerate(self.__song_metadata):
if type(c_song_metadata) is str:
logger.warning(f"Track not found {c_song_metadata}")
continue
c_preferences = deepcopy(self.__preferences)
c_preferences.ids = c_song_metadata['ids']
c_preferences.song_metadata = c_song_metadata
c_preferences.json_data = self.__json_data # Pass playlist data for reporting
c_preferences.track_number = i + 1 # Track number in the playlist
# Even though we're not downloading directly, we still need to set up the track object
track = EASY_DW(c_preferences, parent='playlist').get_no_dw_track()
if not track.success:
song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}"
error_detail = getattr(track, 'error_message', 'Download failed for unspecified reason.')
logger.warning(f"Cannot download '{song}' (CLI mode). Reason: {error_detail} (Link: {track.link or c_preferences.link})")
tracks.append(track)
# Track-level progress reporting using the standardized format
progress_data = {
"type": "track",
"song": c_song_metadata.get("music", ""),
"artist": c_song_metadata.get("artist", ""),
"status": "progress",
"current_track": i + 1,
"total_tracks": total_tracks,
"parent": {
"type": "playlist",
"name": playlist_name,
"owner": self.__json_data.get('owner', {}).get('display_name', 'unknown'),
"total_tracks": total_tracks,
"url": f"https://open.spotify.com/playlist/{self.__json_data.get('id', '')}"
},
"url": f"https://open.spotify.com/track/{c_song_metadata['ids']}"
}
Download_JOB.report_progress(progress_data)
download_cli(self.__preferences)
if self.__make_zip:
playlist_title = self.__json_data['name']
zip_name = f"{self.__output_dir}/{playlist_title} [playlist {self.__ids}]"
create_zip(tracks, zip_name=zip_name)
playlist.zip_path = zip_name
# Report playlist done status
playlist_name = self.__json_data.get('name', 'Unknown Playlist')
playlist_owner = self.__json_data.get('owner', {}).get('display_name', 'Unknown Owner')
total_tracks = self.__json_data.get('tracks', {}).get('total', 0)
playlist_id = self.__ids
Download_JOB.report_progress({
"type": "playlist",
"owner": playlist_owner,
"status": "done",
"total_tracks": total_tracks,
"name": playlist_name,
"url": f"https://open.spotify.com/playlist/{playlist_id}"
})
return playlist
class DW_EPISODE:
def __init__(
self,
@@ -1403,39 +1357,3 @@ class DW_EPISODE:
Download_JOB.report_progress(progress_data)
return episode
def dw2(self) -> Episode:
# Using standardized episode progress format
progress_data = {
"type": "episode",
"song": self.__preferences.song_metadata.get('name', 'Unknown Episode'),
"artist": self.__preferences.song_metadata.get('show', 'Unknown Show'),
"status": "initializing"
}
# Set URL if available
episode_id = self.__preferences.ids
if episode_id:
progress_data["url"] = f"https://open.spotify.com/episode/{episode_id}"
Download_JOB.report_progress(progress_data)
episode = EASY_DW(self.__preferences).get_no_dw_track()
download_cli(self.__preferences)
# Using standardized episode progress format
progress_data = {
"type": "episode",
"song": self.__preferences.song_metadata.get('name', 'Unknown Episode'),
"artist": self.__preferences.song_metadata.get('show', 'Unknown Show'),
"status": "done"
}
# Set URL if available
episode_id = self.__preferences.ids
if episode_id:
progress_data["url"] = f"https://open.spotify.com/episode/{episode_id}"
Download_JOB.report_progress(progress_data)
return episode

View File

@@ -32,8 +32,7 @@ from deezspot.libutils.others_settings import (
stock_recursive_download,
stock_not_interface,
stock_zip,
method_save,
is_thread,
stock_save_cover,
stock_real_time_dl
)
from deezspot.libutils.logging_utils import logger, ProgressReporter
@@ -90,8 +89,6 @@ class SpoLogin:
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
method_save=method_save,
is_thread=is_thread,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
@@ -99,7 +96,8 @@ class SpoLogin:
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Track:
try:
link_is_valid(link_track)
@@ -118,7 +116,6 @@ class SpoLogin:
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.method_save = method_save
preferences.is_episode = False
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
@@ -127,11 +124,9 @@ class SpoLogin:
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
preferences.convert_to = convert_to
preferences.save_cover = save_cover
if not is_thread:
track = DW_TRACK(preferences).dw()
else:
track = DW_TRACK(preferences).dw2()
track = DW_TRACK(preferences).dw()
return track
except Exception as e:
@@ -147,8 +142,6 @@ class SpoLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
is_thread=is_thread,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
@@ -156,7 +149,8 @@ class SpoLogin:
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Album:
try:
link_is_valid(link_album)
@@ -178,7 +172,6 @@ class SpoLogin:
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.method_save = method_save
preferences.make_zip = make_zip
preferences.is_episode = False
preferences.custom_dir_format = custom_dir_format
@@ -188,11 +181,9 @@ class SpoLogin:
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
preferences.convert_to = convert_to
preferences.save_cover = save_cover
if not is_thread:
album = DW_ALBUM(preferences).dw()
else:
album = DW_ALBUM(preferences).dw2()
album = DW_ALBUM(preferences).dw()
return album
except Exception as e:
@@ -208,8 +199,6 @@ class SpoLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
is_thread=is_thread,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
@@ -217,7 +206,8 @@ class SpoLogin:
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Playlist:
try:
link_is_valid(link_playlist)
@@ -253,7 +243,6 @@ class SpoLogin:
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.method_save = method_save
preferences.make_zip = make_zip
preferences.is_episode = False
preferences.custom_dir_format = custom_dir_format
@@ -263,11 +252,9 @@ class SpoLogin:
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
preferences.convert_to = convert_to
preferences.save_cover = save_cover
if not is_thread:
playlist = DW_PLAYLIST(preferences).dw()
else:
playlist = DW_PLAYLIST(preferences).dw2()
playlist = DW_PLAYLIST(preferences).dw()
return playlist
except Exception as e:
@@ -282,8 +269,6 @@ class SpoLogin:
recursive_quality=stock_recursive_quality,
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
method_save=method_save,
is_thread=is_thread,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
@@ -291,7 +276,8 @@ class SpoLogin:
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Episode:
try:
link_is_valid(link_episode)
@@ -312,7 +298,6 @@ class SpoLogin:
preferences.recursive_quality = recursive_quality
preferences.recursive_download = recursive_download
preferences.not_interface = not_interface
preferences.method_save = method_save
preferences.is_episode = True
preferences.custom_dir_format = custom_dir_format
preferences.custom_track_format = custom_track_format
@@ -321,11 +306,9 @@ class SpoLogin:
preferences.retry_delay_increase = retry_delay_increase
preferences.max_retries = max_retries
preferences.convert_to = convert_to
preferences.save_cover = save_cover
if not is_thread:
episode = DW_EPISODE(preferences).dw()
else:
episode = DW_EPISODE(preferences).dw2()
episode = DW_EPISODE(preferences).dw()
return episode
except Exception as e:
@@ -343,8 +326,6 @@ class SpoLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
is_thread=is_thread,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
@@ -382,8 +363,6 @@ class SpoLogin:
recursive_download=recursive_download,
not_interface=not_interface,
make_zip=make_zip,
method_save=method_save,
is_thread=is_thread,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
@@ -408,7 +387,6 @@ class SpoLogin:
recursive_download=stock_recursive_download,
not_interface=stock_not_interface,
make_zip=stock_zip,
method_save=method_save,
real_time_dl=stock_real_time_dl,
custom_dir_format=None,
custom_track_format=None,
@@ -416,7 +394,8 @@ class SpoLogin:
initial_retry_delay=30,
retry_delay_increase=30,
max_retries=5,
convert_to=None
convert_to=None,
save_cover=stock_save_cover
) -> Smart:
try:
link_is_valid(link)
@@ -439,14 +418,14 @@ class SpoLogin:
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
method_save=method_save,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries
max_retries=max_retries,
save_cover=save_cover
)
smart.type = "track"
smart.track = track
@@ -462,14 +441,15 @@ class SpoLogin:
recursive_download=recursive_download,
not_interface=not_interface,
make_zip=make_zip,
method_save=method_save,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries
max_retries=max_retries,
convert_to=convert_to,
save_cover=save_cover
)
smart.type = "album"
smart.album = album
@@ -485,14 +465,15 @@ class SpoLogin:
recursive_download=recursive_download,
not_interface=not_interface,
make_zip=make_zip,
method_save=method_save,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries
max_retries=max_retries,
convert_to=convert_to,
save_cover=save_cover
)
smart.type = "playlist"
smart.playlist = playlist
@@ -507,14 +488,15 @@ class SpoLogin:
recursive_quality=recursive_quality,
recursive_download=recursive_download,
not_interface=not_interface,
method_save=method_save,
real_time_dl=real_time_dl,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks,
initial_retry_delay=initial_retry_delay,
retry_delay_increase=retry_delay_increase,
max_retries=max_retries
max_retries=max_retries,
convert_to=convert_to,
save_cover=save_cover
)
smart.type = "episode"
smart.episode = episode