Files
deezspot-spotizerr-dev/deezspot/spotloader/__download__.py

1290 lines
63 KiB
Python

import traceback
import json
import os
import time
from copy import deepcopy
from os.path import isfile, dirname
from librespot.core import Session
from deezspot.exceptions import TrackNotFound
from librespot.metadata import TrackId, EpisodeId
from deezspot.spotloader.spotify_settings import qualities
from deezspot.libutils.others_settings import answers
from deezspot.libutils.write_tags import check_track
from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality
from deezspot.libutils.audio_converter import convert_audio, AUDIO_FORMATS, get_output_path
from os import (
remove,
system,
replace as os_replace,
)
from deezspot.models.download import (
Track,
Album,
Playlist,
Preferences,
Episode,
)
from deezspot.libutils.utils import (
set_path,
create_zip,
request,
sanitize_name,
save_cover_image,
__get_dir as get_album_directory,
)
from deezspot.libutils.write_m3u import create_m3u_file, append_track_to_m3u
from deezspot.libutils.metadata_converter import track_object_to_dict, album_object_to_dict
from deezspot.libutils.progress_reporter import (
report_track_initializing, report_track_skipped, report_track_retrying,
report_track_realtime_progress, report_track_error, report_track_done,
report_album_initializing, report_album_done, report_playlist_initializing, report_playlist_done
)
from deezspot.libutils.taggers import (
enhance_metadata_with_image, process_and_tag_track, process_and_tag_episode,
save_cover_image_for_track
)
from deezspot.libutils.logging_utils import logger, report_progress
from deezspot.libutils.cleanup_utils import (
register_active_download,
unregister_active_download,
)
from deezspot.libutils.skip_detection import check_track_exists
from deezspot.models.callback import (
trackObject, albumTrackObject, playlistTrackObject, artistTrackObject,
trackCallbackObject, albumCallbackObject, playlistCallbackObject,
initializingObject, skippedObject, retryingObject, realTimeObject, errorObject, doneObject,
failedTrackObject, summaryObject,
albumObject, artistAlbumObject,
playlistObject,
userObject,
IDs
)
from deezspot.spotloader.__spo_api__ import tracking, json_to_track_playlist_object
# --- Global retry counter variables ---
GLOBAL_RETRY_COUNT = 0
GLOBAL_MAX_RETRIES = 100 # Adjust this value as needed
# --- Global tracking of active downloads ---
# Moved to deezspot.libutils.cleanup_utils
# Use unified metadata converter
def _track_object_to_dict(track_obj: trackObject) -> dict:
"""Converts a trackObject into a dictionary for legacy functions like taggers."""
return track_object_to_dict(track_obj, source_type='spotify')
# Use unified metadata converter
def _album_object_to_dict(album_obj: albumObject) -> dict:
"""Converts an albumObject into a dictionary for legacy functions."""
return album_object_to_dict(album_obj, source_type='spotify')
class Download_JOB:
session = None
progress_reporter = None
@classmethod
def __init__(cls, session: Session) -> None:
cls.session = session
@classmethod
def set_progress_reporter(cls, reporter):
cls.progress_reporter = reporter
class EASY_DW:
def __init__(
self,
preferences: Preferences,
parent: str = None # Can be 'album', 'playlist', or None for individual track
) -> None:
self.__preferences = preferences
self.__parent = parent # Store the parent type
self.__ids = preferences.ids
self.__link = preferences.link
self.__output_dir = preferences.output_dir
self.__song_metadata = preferences.song_metadata
# Convert song metadata to dict with configured artist separator
artist_separator = getattr(preferences, 'artist_separator', '; ')
if parent == 'album' and hasattr(self.__song_metadata, 'album'):
# When iterating album tracks later we will still need the separator, but for initial dict, use track conversion
self.__song_metadata_dict = track_object_to_dict(self.__song_metadata, source_type='spotify', artist_separator=artist_separator)
else:
self.__song_metadata_dict = track_object_to_dict(self.__song_metadata, source_type='spotify', artist_separator=artist_separator)
self.__not_interface = preferences.not_interface
self.__quality_download = preferences.quality_download or "NORMAL"
self.__recursive_download = preferences.recursive_download
self.__type = "episode" if preferences.is_episode else "track" # New type parameter
self.__real_time_dl = preferences.real_time_dl
self.__convert_to = getattr(preferences, 'convert_to', None)
self.__bitrate = getattr(preferences, 'bitrate', None) # New bitrate attribute
# Ensure if convert_to is None, bitrate is also None
if self.__convert_to is None:
self.__bitrate = None
self.__c_quality = qualities[self.__quality_download]
self.__fallback_ids = self.__ids
self.__set_quality()
if preferences.is_episode:
self.__write_episode()
else:
self.__write_track()
def __set_quality(self) -> None:
self.__dw_quality = self.__c_quality['n_quality']
self.__file_format = self.__c_quality['f_format']
self.__song_quality = self.__c_quality['s_quality']
def __set_song_path(self) -> None:
# Retrieve custom formatting strings from preferences, if any.
custom_dir_format = getattr(self.__preferences, 'custom_dir_format', None)
custom_track_format = getattr(self.__preferences, 'custom_track_format', None)
pad_tracks = getattr(self.__preferences, 'pad_tracks', True)
# Ensure the separator is available to formatting utils for indexed placeholders
self.__song_metadata_dict['artist_separator'] = getattr(self.__preferences, 'artist_separator', '; ')
# Inject playlist placeholders if in playlist context
try:
if self.__parent == 'playlist' and hasattr(self.__preferences, 'json_data') and self.__preferences.json_data:
playlist_data = self.__preferences.json_data
playlist_name = None
if isinstance(playlist_data, dict):
playlist_name = playlist_data.get('name') or playlist_data.get('title')
if not playlist_name and hasattr(playlist_data, 'title'):
playlist_name = getattr(playlist_data, 'title')
self.__song_metadata_dict['playlist'] = playlist_name or 'unknown'
self.__song_metadata_dict['playlistnum'] = getattr(self.__preferences, 'track_number', None) or 0
except Exception:
# If playlist info missing, skip silently
pass
self.__song_path = set_path(
self.__song_metadata_dict,
self.__output_dir,
self.__song_quality,
self.__file_format,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks
)
def __set_episode_path(self) -> None:
custom_dir_format = getattr(self.__preferences, 'custom_dir_format', None)
custom_track_format = getattr(self.__preferences, 'custom_track_format', None)
pad_tracks = getattr(self.__preferences, 'pad_tracks', True)
self.__song_metadata_dict['artist_separator'] = getattr(self.__preferences, 'artist_separator', '; ')
self.__song_path = set_path(
self.__song_metadata_dict,
self.__output_dir,
self.__song_quality,
self.__file_format,
is_episode=True,
custom_dir_format=custom_dir_format,
custom_track_format=custom_track_format,
pad_tracks=pad_tracks
)
def __write_track(self) -> None:
self.__set_song_path()
self.__c_track = Track(
self.__song_metadata_dict, self.__song_path,
self.__file_format, self.__song_quality,
self.__link, self.__ids
)
self.__c_track.md5_image = self.__ids
self.__c_track.set_fallback_ids(self.__fallback_ids)
def __write_episode(self) -> None:
self.__set_episode_path()
self.__c_episode = Episode(
self.__song_metadata_dict, self.__song_path,
self.__file_format, self.__song_quality,
self.__link, self.__ids
)
self.__c_episode.md5_image = self.__ids
self.__c_episode.set_fallback_ids(self.__fallback_ids)
def _get_parent_info(self):
parent_info = None
total_tracks_val = None
if self.__parent == "playlist" and hasattr(self.__preferences, "json_data"):
playlist_data = self.__preferences.json_data
total_tracks_val = playlist_data.get('tracks', {}).get('total', 'unknown')
parent_info = {
"type": "playlist",
"name": playlist_data.get('name', 'unknown'),
"owner": playlist_data.get('owner', {}).get('display_name', 'unknown'),
"total_tracks": total_tracks_val,
"url": f"https://open.spotify.com/playlist/{playlist_data.get('id', '')}"
}
elif self.__parent == "album":
album_meta = self.__song_metadata.album
total_tracks_val = album_meta.total_tracks
parent_info = {
"type": "album",
"title": album_meta.title,
"artist": getattr(self.__preferences, 'artist_separator', '; ').join([a.name for a in album_meta.artists]),
"total_tracks": total_tracks_val,
"url": f"https://open.spotify.com/album/{album_meta.ids.spotify if album_meta.ids else ''}"
}
return parent_info, total_tracks_val
def __convert_audio(self) -> None:
# First, handle Spotify's OGG to standard format conversion (always needed)
# self.__song_path is initially the path for the .ogg file (e.g., song.ogg)
og_song_path_for_ogg_output = self.__song_path
temp_filename = og_song_path_for_ogg_output.replace(".ogg", ".tmp")
# Move original .ogg to .tmp
os_replace(og_song_path_for_ogg_output, temp_filename)
register_active_download(temp_filename) # CURRENT_DOWNLOAD = temp_filename
try:
# Step 1: First convert the OGG file to standard format (copy operation)
# Output is og_song_path_for_ogg_output
ffmpeg_cmd = f'ffmpeg -y -hide_banner -loglevel error -i "{temp_filename}" -c:a copy "{og_song_path_for_ogg_output}"'
system(ffmpeg_cmd) # Creates/overwrites og_song_path_for_ogg_output
# temp_filename has been processed. Unregister and remove it.
# CURRENT_DOWNLOAD was temp_filename.
unregister_active_download(temp_filename) # CURRENT_DOWNLOAD should become None.
if os.path.exists(temp_filename):
remove(temp_filename)
# The primary file is now og_song_path_for_ogg_output. Register it.
# Ensure self.__song_path reflects this, as it might be used by other parts of the class or returned.
self.__song_path = og_song_path_for_ogg_output
register_active_download(self.__song_path) # CURRENT_DOWNLOAD = self.__song_path (the .ogg)
# Step 2: Convert to requested format if specified (e.g., MP3, FLAC)
conversion_to_another_format_occurred_and_cleared_state = False
if self.__convert_to:
format_name = self.__convert_to
bitrate = self.__bitrate
if format_name:
try:
path_before_final_conversion = self.__song_path # Current path, e.g., .ogg
converted_path = convert_audio(
path_before_final_conversion,
format_name,
bitrate,
register_active_download,
unregister_active_download
)
if converted_path != path_before_final_conversion:
# Conversion to a new format happened and path changed
self.__song_path = converted_path # Update EASY_DW's current song path
current_object_path_attr_name = 'song_path' if self.__type == "track" else 'episode_path'
current_media_object = self.__c_track if self.__type == "track" else self.__c_episode
if current_media_object:
setattr(current_media_object, current_object_path_attr_name, converted_path)
_, new_ext = os.path.splitext(converted_path)
if new_ext:
current_media_object.file_format = new_ext.lower()
# Also update EASY_DW's internal __file_format
self.__file_format = new_ext.lower()
conversion_to_another_format_occurred_and_cleared_state = True
except Exception as conv_error:
# Conversion to a different format failed.
# self.__song_path (the .ogg) is still the latest valid file and is registered.
# We want to keep it, so CURRENT_DOWNLOAD should remain set to this .ogg path.
logger.error(f"Audio conversion to {format_name} error: {str(conv_error)}")
# conversion_to_another_format_occurred_and_cleared_state remains False.
# If no conversion to another format was requested, or if it was requested but didn't effectively run
# (e.g. format_name was None), or if convert_audio failed to clear state (which would be its bug),
# then self.__song_path (the .ogg from Step 1) is the final successfully processed file for this method's scope.
# It is currently registered. Unregister it as its processing is complete.
if not conversion_to_another_format_occurred_and_cleared_state:
unregister_active_download(self.__song_path) # Clears CURRENT_DOWNLOAD if it was self.__song_path
except Exception as e:
# This outer try/except handles errors primarily from Step 1 (OGG copy)
# or issues during the setup for Step 2 before convert_audio is deeply involved.
# In case of failure, try to restore the original file from temp if Step 1 didn't complete.
if os.path.exists(temp_filename) and not os.path.exists(og_song_path_for_ogg_output):
os_replace(temp_filename, og_song_path_for_ogg_output)
# Clean up temp_filename. unregister_active_download is safe:
# it only clears CURRENT_DOWNLOAD if CURRENT_DOWNLOAD == temp_filename.
if os.path.exists(temp_filename):
unregister_active_download(temp_filename)
remove(temp_filename)
# Re-throw the exception. If a file (like og_song_path_for_ogg_output) was registered
# and an error occurred, it remains registered for atexit cleanup, which is intended.
raise e
def get_no_dw_track(self) -> Track:
return self.__c_track
def easy_dw(self) -> Track:
# Process image data using unified utility
self.__song_metadata_dict = enhance_metadata_with_image(self.__song_metadata_dict)
try:
# Initialize success to False, it will be set to True if download_try is successful
if hasattr(self, '_EASY_DW__c_track') and self.__c_track:
self.__c_track.success = False
elif hasattr(self, '_EASY_DW__c_episode') and self.__c_episode: # For episodes
self.__c_episode.success = False
self.download_try() # This should set self.__c_track.success = True if successful
except Exception as e:
song_title = self.__song_metadata.title
artist_name = getattr(self.__preferences, 'artist_separator', '; ').join([a.name for a in self.__song_metadata.artists])
error_message = f"Download failed for '{song_title}' by '{artist_name}' (URL: {self.__link}). Original error: {str(e)}"
logger.error(error_message)
traceback.print_exc()
# Store the error message on the track object if it exists
if hasattr(self, '_EASY_DW__c_track') and self.__c_track:
self.__c_track.success = False
self.__c_track.error_message = error_message # Store the more detailed error message
# Removed problematic elif for __c_episode here as easy_dw in spotloader is focused on tracks.
# Episode-specific error handling should be within download_eps or its callers.
raise TrackNotFound(message=error_message, url=self.__link) from e
# If the track was skipped (e.g. file already exists), return it immediately.
# download_try sets success=False and was_skipped=True in this case.
if hasattr(self, '_EASY_DW__c_track') and self.__c_track and getattr(self.__c_track, 'was_skipped', False):
return self.__c_track
# Final check for non-skipped tracks that might have failed after download_try returned.
# This handles cases where download_try didn't raise an exception but self.__c_track.success is still False.
if hasattr(self, '_EASY_DW__c_track') and self.__c_track and not self.__c_track.success:
song_title = self.__song_metadata.title
artist_name = getattr(self.__preferences, 'artist_separator', '; ').join([a.name for a in self.__song_metadata.artists])
original_error_msg = getattr(self.__c_track, 'error_message', "Download failed for an unspecified reason after attempt.")
error_msg_template = "Cannot download '{title}' by '{artist}'. Reason: {reason}"
final_error_msg = error_msg_template.format(title=song_title, artist=artist_name, reason=original_error_msg)
current_link = self.__c_track.link if hasattr(self.__c_track, 'link') and self.__c_track.link else self.__link
logger.error(f"{final_error_msg} (URL: {current_link})")
self.__c_track.error_message = final_error_msg # Ensure the most specific error is on the object
raise TrackNotFound(message=final_error_msg, url=current_link)
# If we reach here, the track should be successful and not skipped.
if hasattr(self, '_EASY_DW__c_track') and self.__c_track and self.__c_track.success:
# Apply tags using unified utility
process_and_tag_track(
track=self.__c_track,
metadata_dict=self.__song_metadata_dict,
source_type='spotify',
save_cover=getattr(self.__preferences, 'save_cover', False)
)
# Unregister the final successful file path after all operations are done.
# self.__c_track.song_path would have been updated by __convert_audio__ if conversion occurred.
unregister_active_download(self.__c_track.song_path)
return self.__c_track
def download_try(self) -> Track:
current_title = self.__song_metadata.title
current_album = self.__song_metadata.album.title if self.__song_metadata.album else ''
current_artist = getattr(self.__preferences, 'artist_separator', '; ').join([a.name for a in self.__song_metadata.artists])
# Call the new check_track_exists function from skip_detection.py
# It needs: original_song_path, title, album, convert_to, logger
# self.__song_path is the original_song_path before any conversion attempts by this specific download operation.
# self.__preferences.convert_to is the convert_to parameter.
# logger is available as a global import in this module.
exists, existing_file_path = check_track_exists(
original_song_path=self.__song_path,
title=current_title,
album=current_album,
convert_to=self.__preferences.convert_to,
logger=logger # Pass the logger instance
)
if exists and existing_file_path:
logger.info(f"Track '{current_title}' by '{current_artist}' already exists at '{existing_file_path}'. Skipping download and conversion.")
# Update the track object to point to the existing file
self.__c_track.song_path = existing_file_path
_, new_ext = os.path.splitext(existing_file_path)
self.__c_track.file_format = new_ext.lower() # Ensure it's just the extension like '.mp3'
# self.__c_track.song_quality might need re-evaluation if we could determine quality of existing file
# For now, assume if it exists in target format, its quality is acceptable.
self.__c_track.success = True # Mark as success because the desired file is available
self.__c_track.was_skipped = True
parent_info, total_tracks_val = self._get_parent_info()
# Build track object
track_obj = self.__song_metadata
# Build parent object
parent_obj = None
if self.__parent == "album":
parent_obj = self.__song_metadata.album
elif self.__parent == "playlist" and parent_info:
parent_obj = playlistTrackObject(
title=parent_info.get("name"),
owner=userObject(name=parent_info.get("owner"))
)
# Report track skipped status
report_track_skipped(
track_obj=track_obj,
reason=f"Track already exists at '{existing_file_path}'",
preferences=self.__preferences,
parent_obj=parent_obj,
total_tracks=total_tracks_val
)
return self.__c_track
# Report initializing status for the track download
parent_info, total_tracks_val = self._get_parent_info()
# Build track object
track_obj = self.__song_metadata
# Build parent object
parent_obj = None
if self.__parent == "album":
parent_obj = self.__song_metadata.album
elif self.__parent == "playlist" and parent_info:
parent_obj = playlistTrackObject(
title=parent_info.get("name"),
owner=userObject(name=parent_info.get("owner"))
)
# Report track initialization status
report_track_initializing(
track_obj=track_obj,
preferences=self.__preferences,
parent_obj=parent_obj,
total_tracks=total_tracks_val
)
# If track does not exist in the desired final format, proceed with download/conversion
retries = 0
# Use the customizable retry parameters
retry_delay = getattr(self.__preferences, 'initial_retry_delay', 30) # Default to 30 seconds
retry_delay_increase = getattr(self.__preferences, 'retry_delay_increase', 30) # Default to 30 seconds
max_retries = getattr(self.__preferences, 'max_retries', 5) # Default to 5 retries
while True:
try:
track_id_obj = TrackId.from_base62(self.__ids)
stream = Download_JOB.session.content_feeder().load_track(
track_id_obj,
VorbisOnlyAudioQuality(self.__dw_quality),
False,
None
)
c_stream = stream.input_stream.stream()
total_size = stream.input_stream.size
os.makedirs(dirname(self.__song_path), exist_ok=True)
# Register this file as being actively downloaded
register_active_download(self.__song_path)
try:
with open(self.__song_path, "wb") as f:
if self.__real_time_dl and self.__song_metadata_dict.get("duration") and self.__song_metadata_dict.get("duration") > 0:
# Real-time download path
duration = self.__song_metadata_dict["duration"]
if duration > 0:
# Base rate limit in bytes per second to match real-time
base_rate_limit = total_size / duration
# Multiplier handling (0 disables pacing, 1=real-time, up to 10x)
m = getattr(self.__preferences, 'real_time_multiplier', 1)
try:
m = int(m)
except Exception:
m = 1
if m < 0:
m = 0
if m > 10:
m = 10
pacing_enabled = m > 0
rate_limit = base_rate_limit * m if pacing_enabled else None
chunk_size = 4096
bytes_written = 0
start_time = time.time()
# Initialize tracking variable for percentage reporting
self._last_reported_percentage = -1
while True:
chunk = c_stream.read(chunk_size)
if not chunk:
break
f.write(chunk)
bytes_written += len(chunk)
# Calculate current percentage (as integer)
current_time = time.time()
current_percentage = int((bytes_written / total_size) * 100)
# Only report when percentage increases by at least 1 point
if current_percentage > self._last_reported_percentage:
self._last_reported_percentage = current_percentage
# Report real-time progress
report_track_realtime_progress(
track_obj=track_obj,
time_elapsed=int((current_time - start_time) * 1000),
progress=current_percentage,
preferences=self.__preferences,
parent_obj=parent_obj,
total_tracks=total_tracks_val
)
# Rate limiting (if pacing is enabled)
if pacing_enabled and rate_limit:
expected_time = bytes_written / rate_limit
elapsed = time.time() - start_time
if expected_time > elapsed:
time.sleep(expected_time - elapsed)
else:
# Non real-time download path
data = c_stream.read(total_size)
f.write(data)
# Close the stream after successful write
c_stream.close()
# After successful download, unregister the file
unregister_active_download(self.__song_path)
break
except Exception as e:
# Handle any exceptions that might occur during download
error_msg = f"Error during download process: {str(e)}"
logger.error(error_msg)
# Clean up resources
if 'c_stream' in locals():
try:
c_stream.close()
except Exception:
pass
# Remove partial download if it exists
if os.path.exists(self.__song_path):
try:
os.remove(self.__song_path)
except Exception:
pass
# Unregister the download
unregister_active_download(self.__song_path)
# After successful download, unregister the file (moved here from below)
unregister_active_download(self.__song_path)
break
except Exception as e:
# Handle retry logic
global GLOBAL_RETRY_COUNT
GLOBAL_RETRY_COUNT += 1
retries += 1
# Clean up any incomplete file
if os.path.exists(self.__song_path):
os.remove(self.__song_path)
unregister_active_download(self.__song_path)
# Report retry status
report_track_retrying(
track_obj=track_obj,
retry_count=retries,
seconds_left=retry_delay,
error=str(e),
preferences=self.__preferences,
parent_obj=parent_obj,
total_tracks=total_tracks_val
)
if retries >= max_retries or GLOBAL_RETRY_COUNT >= GLOBAL_MAX_RETRIES:
# Final cleanup before giving up
if os.path.exists(self.__song_path):
os.remove(self.__song_path)
# Add track info to exception
track_name = self.__song_metadata.title
artist_name = getattr(self.__preferences, 'artist_separator', '; ').join([a.name for a in self.__song_metadata.artists])
final_error_msg = f"Maximum retry limit reached for '{track_name}' by '{artist_name}' (local: {max_retries}, global: {GLOBAL_MAX_RETRIES}). Last error: {str(e)}"
# Store error on track object
if hasattr(self, '_EASY_DW__c_track') and self.__c_track:
self.__c_track.success = False
self.__c_track.error_message = final_error_msg
raise Exception(final_error_msg) from e
time.sleep(retry_delay)
retry_delay += retry_delay_increase # Use the custom retry delay increase
# Save cover image if requested, after successful download and before conversion
if self.__preferences.save_cover and hasattr(self, '_EASY_DW__song_path') and self.__song_path:
save_cover_image_for_track(self.__song_metadata_dict, self.__song_path, self.__preferences.save_cover)
try:
self.__convert_audio()
except Exception as e:
# Improve error message formatting
original_error_str = str(e)
if "codec" in original_error_str.lower():
error_msg = "Audio conversion error - Missing codec or unsupported format"
elif "ffmpeg" in original_error_str.lower():
error_msg = "FFmpeg error - Audio conversion failed"
else:
error_msg = f"Audio conversion failed: {original_error_str}"
# Report error status
report_track_error(
track_obj=track_obj,
error=error_msg,
preferences=self.__preferences,
parent_obj=parent_obj,
total_tracks=total_tracks_val
)
logger.error(f"Audio conversion error: {error_msg}")
# If conversion fails, clean up the .ogg file
if os.path.exists(self.__song_path):
os.remove(self.__song_path)
# Try one more time
time.sleep(retry_delay)
retry_delay += retry_delay_increase
try:
self.__convert_audio()
except Exception as conv_e:
# If conversion fails twice, create a final error report
error_msg_2 = f"Audio conversion failed after retry for '{self.__song_metadata.title}'. Original error: {str(conv_e)}"
# Report error status
report_track_error(
track_obj=track_obj,
error=error_msg_2,
preferences=self.__preferences,
parent_obj=parent_obj,
total_tracks=total_tracks_val
)
logger.error(error_msg)
if os.path.exists(self.__song_path):
os.remove(self.__song_path)
# Store error on track object
if hasattr(self, '_EASY_DW__c_track') and self.__c_track:
self.__c_track.success = False
self.__c_track.error_message = error_msg_2
raise TrackNotFound(message=error_msg, url=self.__link) from conv_e
if hasattr(self, '_EASY_DW__c_track') and self.__c_track:
self.__c_track.success = True
# Apply tags using unified utility
process_and_tag_track(
track=self.__c_track,
metadata_dict=self.__song_metadata_dict,
source_type='spotify'
)
# Create done status report
parent_info, total_tracks_val = self._get_parent_info()
current_track_val = getattr(self.__preferences, 'track_number', None)
summary_obj = None
if self.__parent is None:
# Create a summary object for single-track downloads
successful_track_list = [track_obj] if self.__c_track.success and not getattr(self.__c_track, 'was_skipped', False) else []
skipped_track_list = [track_obj] if getattr(self.__c_track, 'was_skipped', False) else []
summary_obj = summaryObject(
successful_tracks=successful_track_list,
skipped_tracks=skipped_track_list,
failed_tracks=[],
total_successful=len(successful_track_list),
total_skipped=len(skipped_track_list),
total_failed=0
)
# Enrich summary with final path and quality
try:
final_path_val = getattr(self.__c_track, 'song_path', None)
except Exception:
final_path_val = None
quality_key_single = self.__quality_download
sp_quality_map_single = {
'NORMAL': 'OGG_96',
'HIGH': 'OGG_160',
'VERY_HIGH': 'OGG_320'
}
summary_obj.final_path = final_path_val
summary_obj.download_quality = sp_quality_map_single.get(quality_key_single, 'OGG')
# Report track done status
# Compute final path and quality label
final_path_val = getattr(self.__c_track, 'song_path', None)
# Map Spotify quality to OGG bitrate label
quality_key = self.__quality_download if hasattr(self, '_EASY_DW__quality_download') else getattr(self, '_EASY_DW__quality_download', None)
quality_key = self.__quality_download if quality_key is None else quality_key
sp_quality_map = {
'NORMAL': 'OGG_96',
'HIGH': 'OGG_160',
'VERY_HIGH': 'OGG_320'
}
download_quality_val = sp_quality_map.get(quality_key, 'OGG')
report_track_done(
track_obj=track_obj,
preferences=self.__preferences,
summary=summary_obj,
parent_obj=parent_obj,
current_track=current_track_val,
total_tracks=total_tracks_val,
final_path=final_path_val,
download_quality=download_quality_val
)
if hasattr(self, '_EASY_DW__c_track') and self.__c_track and self.__c_track.success:
# Unregister the final successful file path after all operations are done.
unregister_active_download(self.__c_track.song_path)
return self.__c_track
def download_eps(self) -> Episode:
# Use the customizable retry parameters
retry_delay = getattr(self.__preferences, 'initial_retry_delay', 30) # Default to 30 seconds
retry_delay_increase = getattr(self.__preferences, 'retry_delay_increase', 30) # Default to 30 seconds
max_retries = getattr(self.__preferences, 'max_retries', 5) # Default to 5 retries
retries = 0
# Initialize success to False for the episode, to be set True on completion
if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode:
self.__c_episode.success = False
if isfile(self.__song_path) and check_track(self.__c_episode):
ans = input(
f"Episode \"{self.__song_path}\" already exists, do you want to redownload it?(y or n):"
)
if not ans in answers:
# If user chooses not to redownload, and file exists, consider it 'successful' for cleanup purposes if needed.
# However, the main .success might be for actual download processing.
# For now, just return. The file isn't in ACTIVE_DOWNLOADS from *this* run.
return self.__c_episode
episode_id = EpisodeId.from_base62(self.__ids)
while True:
try:
stream = Download_JOB.session.content_feeder().load_episode(
episode_id,
AudioQuality(self.__dw_quality),
False,
None
)
# If load_episode is successful, break from retry loop
break
except Exception as e:
global GLOBAL_RETRY_COUNT
GLOBAL_RETRY_COUNT += 1
track_obj = self.__song_metadata
status_obj = retryingObject(
ids=track_obj.ids,
retry_count=retries,
seconds_left=retry_delay,
error=str(e),
convert_to=self.__convert_to,
bitrate=self.__bitrate
)
callback_obj = trackCallbackObject(track=track_obj, status_info=status_obj)
# Log retry attempt with structured data
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_obj
)
if retries >= max_retries or GLOBAL_RETRY_COUNT >= GLOBAL_MAX_RETRIES:
if os.path.exists(self.__song_path):
os.remove(self.__song_path) # Clean up partial file
track_name = self.__song_metadata.title
artist_name = getattr(self.__preferences, 'artist_separator', '; ').join([a.name for a in self.__song_metadata.artists])
final_error_msg = f"Maximum retry limit reached for '{track_name}' by '{artist_name}' (local: {max_retries}, global: {GLOBAL_MAX_RETRIES}). Last error: {str(e)}"
if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode:
self.__c_episode.success = False
self.__c_episode.error_message = final_error_msg
raise Exception(final_error_msg) from e
time.sleep(retry_delay)
retry_delay += retry_delay_increase
total_size = stream.input_stream.size
os.makedirs(dirname(self.__song_path), exist_ok=True)
register_active_download(self.__song_path) # Register before writing
try:
with open(self.__song_path, "wb") as f:
c_stream = stream.input_stream.stream()
if self.__real_time_dl and self.__song_metadata_dict.get("duration") and self.__song_metadata_dict["duration"] > 0:
# Restored Real-time download logic for episodes
duration = self.__song_metadata_dict["duration"]
# Base rate to match real-time
base_rate_limit = total_size / duration if duration > 0 else None
# Multiplier handling
m = getattr(self.__preferences, 'real_time_multiplier', 1)
try:
m = int(m)
except Exception:
m = 1
if m < 0:
m = 0
if m > 10:
m = 10
pacing_enabled = (base_rate_limit is not None) and m > 0
rate_limit = (base_rate_limit * m) if pacing_enabled else None
chunk_size = 4096
bytes_written = 0
start_time = time.time()
try:
while True:
chunk = c_stream.read(chunk_size)
if not chunk:
break
f.write(chunk)
bytes_written += len(chunk)
# Optional: Real-time progress reporting for episodes (can be added here if desired)
# Matching the style of download_try, no specific progress report inside this loop for episodes by default.
if pacing_enabled and rate_limit:
expected_time = bytes_written / rate_limit
elapsed_time = time.time() - start_time
if expected_time > elapsed_time:
time.sleep(expected_time - elapsed_time)
except Exception as e_realtime:
# If any error occurs during real-time download, clean up
if not c_stream.closed:
try:
c_stream.close()
except:
pass
# f.close() is handled by with statement, but an explicit one might be here if not using with.
if os.path.exists(self.__song_path):
try:
os.remove(self.__song_path)
except:
pass
unregister_active_download(self.__song_path)
episode_title = self.__song_metadata.title
artist_name = getattr(self.__preferences, 'artist_separator', '; ').join([a.name for a in self.__song_metadata.artists])
final_error_msg = f"Error during real-time download for episode '{episode_title}' by '{artist_name}' (URL: {self.__link}). Error: {str(e_realtime)}"
logger.error(final_error_msg)
if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode:
self.__c_episode.success = False
self.__c_episode.error_message = final_error_msg
raise TrackNotFound(message=final_error_msg, url=self.__link) from e_realtime
else:
# Restored Non real-time download logic for episodes
try:
data = c_stream.read(total_size)
f.write(data)
except Exception as e_standard:
# If any error occurs during standard download, clean up
if not c_stream.closed:
try:
c_stream.close()
except:
pass
if os.path.exists(self.__song_path):
try:
os.remove(self.__song_path)
except:
pass
unregister_active_download(self.__song_path)
episode_title = self.__song_metadata.title
artist_name = getattr(self.__preferences, 'artist_separator', '; ').join([a.name for a in self.__song_metadata.artists])
final_error_msg = f"Error during standard download for episode '{episode_title}' by '{artist_name}' (URL: {self.__link}). Error: {str(e_standard)}"
logger.error(final_error_msg)
if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode:
self.__c_episode.success = False
self.__c_episode.error_message = final_error_msg
raise TrackNotFound(message=final_error_msg, url=self.__link) from e_standard
# If all went well with writing to file and reading stream:
if not c_stream.closed: c_stream.close()
# If with open completes without internal exceptions leading to TrackNotFound:
unregister_active_download(self.__song_path) # Unregister after successful write of original file
except TrackNotFound: # Re-raise if it was an internally handled download error
raise
except Exception as e_outer: # Catch other potential errors around file handling or unexpected issues
# Cleanup for download part if an unexpected error occurs outside the inner try-excepts
if 'c_stream' in locals() and hasattr(c_stream, 'closed') and not c_stream.closed:
try: c_stream.close()
except: pass
if os.path.exists(self.__song_path):
try: os.remove(self.__song_path)
except: pass
unregister_active_download(self.__song_path)
episode_title = self.__song_metadata.title
error_message = f"Failed to download episode '{episode_title}' (URL: {self.__link}) during file operations. Error: {str(e_outer)}"
logger.error(error_message)
if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode:
self.__c_episode.success = False
self.__c_episode.error_message = error_message
raise TrackNotFound(message=error_message, url=self.__link) from e_outer
# If download was successful, proceed to conversion and tagging
try:
self.__convert_audio() # This will update self.__c_episode.file_format and path if conversion occurs
# It also handles registration/unregistration of intermediate/final files during conversion.
except Exception as conv_e:
# Conversion failed. __convert_audio or underlying convert_audio should have cleaned up its own temps.
# The original downloaded file (if __convert_audio started from it) might still exist or be the self.__song_path.
# Or self.__song_path might be a partially converted file if convert_audio failed mid-way and didn't cleanup perfectly.
episode_title = self.__song_metadata.title
error_message = f"Audio conversion for episode '{episode_title}' failed. Original error: {str(conv_e)}"
track_obj = self.__song_metadata
status_obj = errorObject(
ids=track_obj.ids,
error=error_message,
convert_to=self.__convert_to,
bitrate=self.__bitrate
)
callback_obj = trackCallbackObject(track=track_obj, status_info=status_obj)
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_obj
)
# Attempt to remove self.__song_path, which is the latest known path for this episode
if os.path.exists(self.__song_path):
os.remove(self.__song_path)
unregister_active_download(self.__song_path) # Unregister it as it failed/was removed
logger.error(error_message)
if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode:
self.__c_episode.success = False
self.__c_episode.error_message = error_message
raise TrackNotFound(message=error_message, url=self.__link) from conv_e
# If we reach here, download and any conversion were successful.
if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode:
self.__c_episode.success = True
# Apply tags using unified utility
process_and_tag_episode(
episode=self.__c_episode,
metadata_dict=self.__song_metadata_dict,
source_type='spotify'
)
# Unregister the final successful file path for episodes, as it's now complete.
# self.__c_episode.episode_path would have been updated by __convert_audio__ if conversion occurred.
unregister_active_download(self.__c_episode.episode_path)
return self.__c_episode
def download_cli(preferences: Preferences) -> None:
__link = preferences.link
__output_dir = preferences.output_dir
__not_interface = preferences.not_interface
__quality_download = preferences.quality_download
__recursive_download = preferences.recursive_download
cmd = f"deez-dw.py -so spo -l \"{__link}\" "
if __output_dir:
cmd += f"-o {__output_dir} "
if __not_interface:
cmd += f"-g "
if __quality_download:
cmd += f"-q {__quality_download} "
if __recursive_download:
cmd += f"-rd "
system(cmd)
class DW_TRACK:
def __init__(
self,
preferences: Preferences
) -> None:
self.__preferences = preferences
def dw(self) -> Track:
track = EASY_DW(self.__preferences).easy_dw()
# No error handling needed here - if track.success is False but was_skipped is True,
# it's an intentional skip, not an error
return track
class DW_ALBUM:
def __init__(
self,
preferences: Preferences
) -> None:
self.__preferences = preferences
self.__ids = self.__preferences.ids
self.__make_zip = self.__preferences.make_zip
self.__output_dir = self.__preferences.output_dir
self.__album_metadata = self.__preferences.song_metadata
self.__not_interface = self.__preferences.not_interface
def dw(self) -> Album:
# Report album initializing status
album_obj = self.__album_metadata
report_album_initializing(album_obj)
pic_url = max(album_obj.images, key=lambda i: i.get('height', 0) * i.get('width', 0)).get('url') if album_obj.images else None
image_bytes = request(pic_url).content if pic_url else None
album = Album(self.__ids)
album.image = image_bytes
album.nb_tracks = album_obj.total_tracks
album.album_name = album_obj.title
album.upc = album_obj.ids.upc
tracks = album.tracks
album.md5_image = self.__ids
album.tags = album_object_to_dict(self.__album_metadata, source_type='spotify', artist_separator=getattr(self.__preferences, 'artist_separator', '; ')) # For top-level album tags if needed
album.tags['artist_separator'] = getattr(self.__preferences, 'artist_separator', '; ')
album_base_directory = get_album_directory(
album.tags,
self.__output_dir,
custom_dir_format=self.__preferences.custom_dir_format,
pad_tracks=self.__preferences.pad_tracks
)
# Calculate total number of discs for proper metadata tagging
total_discs = max((track.disc_number for track in album_obj.tracks), default=1)
for a, track_in_album in enumerate(album_obj.tracks):
c_preferences = deepcopy(self.__preferences)
try:
# Fetch full track object as album endpoint only provides simplified track objects
full_track_obj = tracking(
track_in_album.ids.spotify,
album_data_for_track=self.__preferences.json_data, # pass raw album json
market=self.__preferences.market
)
if not full_track_obj:
raise TrackNotFound(f"Could not fetch metadata for track ID {track_in_album.ids.spotify}")
c_preferences.song_metadata = full_track_obj
c_preferences.ids = full_track_obj.ids.spotify
c_preferences.link = f"https://open.spotify.com/track/{c_preferences.ids}"
# Set album position for progress reporting (not for metadata - that comes from API)
c_preferences.track_number = a + 1
track = EASY_DW(c_preferences, parent='album').easy_dw()
except (TrackNotFound, Exception) as e:
# Create a failed track object for the summary
song_tags = _track_object_to_dict(track_in_album) if isinstance(track_in_album, trackObject) else {'music': 'Unknown Track'}
track = Track(tags=song_tags, song_path=None, file_format=None, quality=None, link=None, ids=track_in_album.ids)
track.success = False
track.error_message = str(e)
logger.warning(f"Track '{song_tags.get('music')}' from album '{album.album_name}' failed to download. Reason: {track.error_message}")
tracks.append(track)
# Save album cover image
if self.__preferences.save_cover and album.image and album_base_directory:
save_cover_image(album.image, album_base_directory, "cover.jpg")
if self.__make_zip:
song_quality = tracks[0].quality if tracks and tracks[0].quality else 'HIGH' # Fallback quality
zip_name = create_zip(
tracks,
output_dir=self.__output_dir,
song_metadata=album.tags,
song_quality=song_quality,
custom_dir_format=self.__preferences.custom_dir_format
)
album.zip_path = zip_name
successful_tracks = []
failed_tracks = []
skipped_tracks = []
for track in tracks:
# track.tags is a dict.
track_obj_for_cb = trackObject(
title=track.tags.get('music', 'Unknown Track'),
artists=[artistTrackObject(name=artist.strip()) for artist in track.tags.get('artist', '').split(';')]
)
if getattr(track, 'was_skipped', False):
skipped_tracks.append(track_obj_for_cb)
elif track.success:
successful_tracks.append(track_obj_for_cb)
else:
failed_tracks.append(failedTrackObject(
track=track_obj_for_cb,
reason=getattr(track, 'error_message', 'Unknown reason')
))
summary_obj = summaryObject(
successful_tracks=successful_tracks,
skipped_tracks=skipped_tracks,
failed_tracks=failed_tracks,
total_successful=len(successful_tracks),
total_skipped=len(skipped_tracks),
total_failed=len(failed_tracks)
)
report_album_done(album_obj, summary_obj)
return album
class DW_PLAYLIST:
def __init__(
self,
preferences: Preferences
) -> None:
self.__preferences = preferences
self.__ids = self.__preferences.ids
self.__json_data = preferences.json_data
self.__make_zip = self.__preferences.make_zip
self.__output_dir = self.__preferences.output_dir
self.__song_metadata_list = self.__preferences.song_metadata
self.__playlist_tracks_json = getattr(self.__preferences, 'playlist_tracks_json', None)
def dw(self) -> Playlist:
playlist_name = self.__json_data.get('name', 'unknown')
playlist_owner_name = self.__json_data.get('owner', {}).get('display_name', 'Unknown Owner')
playlist_id = self.__ids
# --- Build playlistObject for callbacks ---
playlist_tracks_for_cb = []
if self.__playlist_tracks_json:
for item in self.__playlist_tracks_json:
track_data = item.get('track')
if track_data:
track_playlist_obj = json_to_track_playlist_object(track_data)
if track_playlist_obj:
playlist_tracks_for_cb.append(track_playlist_obj)
playlist_obj_for_cb = playlistObject(
title=playlist_name,
owner=userObject(name=playlist_owner_name, ids=IDs(spotify=self.__json_data.get('owner', {}).get('id'))),
ids=IDs(spotify=playlist_id),
images=self.__json_data.get('images', []),
tracks=playlist_tracks_for_cb,
description=self.__json_data.get('description')
)
# --- End build playlistObject ---
# Report playlist initializing status
report_playlist_initializing(playlist_obj_for_cb)
# --- Prepare the m3u playlist file ---
m3u_path = create_m3u_file(self.__output_dir, playlist_name)
# -------------------------------------
playlist = Playlist()
tracks = playlist.tracks
for idx, c_song_metadata in enumerate(self.__song_metadata_list):
track = None
if isinstance(c_song_metadata, dict) and 'error_type' in c_song_metadata:
track_title = c_song_metadata.get('name', 'Unknown Track')
track_ids = c_song_metadata.get('ids')
error_message = c_song_metadata.get('error_message', 'Unknown error during metadata retrieval.')
logger.warning(f"Skipping download for track '{track_title}' (ID: {track_ids}) from playlist '{playlist_name}' due to error: {error_message}")
track_tags = {'music': track_title, 'ids': track_ids}
track = Track(tags=track_tags, song_path=None, file_format=None, quality=None, link=None, ids=track_ids)
track.success = False
track.error_message = error_message
tracks.append(track)
continue
# c_song_metadata is a trackObject
c_preferences = deepcopy(self.__preferences)
c_preferences.ids = c_song_metadata.ids.spotify
c_preferences.song_metadata = c_song_metadata
c_preferences.json_data = self.__json_data
c_preferences.track_number = idx + 1
c_preferences.link = f"https://open.spotify.com/track/{c_preferences.ids}" if c_preferences.ids else None
easy_dw_instance = EASY_DW(c_preferences, parent='playlist')
try:
track = easy_dw_instance.easy_dw()
except (TrackNotFound, Exception) as e:
track = easy_dw_instance.get_no_dw_track()
if not isinstance(track, Track):
track = Track(_track_object_to_dict(c_song_metadata), None, None, None, c_preferences.link, c_preferences.ids)
track.success = False
track.error_message = str(e)
logger.warning(f"Failed to download track '{c_song_metadata.title}' from playlist '{playlist_name}'. Reason: {track.error_message}")
if track:
tracks.append(track)
# --- Append the final track to the m3u file with extended format ---
if track and track.success and hasattr(track, 'song_path') and track.song_path:
append_track_to_m3u(m3u_path, track)
# ---------------------------------------------------------------------
if self.__make_zip:
playlist_title = self.__json_data['name']
zip_name = f"{self.__output_dir}/{playlist_title} [playlist {self.__ids}]"
create_zip(tracks, zip_name=zip_name)
playlist.zip_path = zip_name
# Report playlist done status
successful_tracks_cb = []
failed_tracks_cb = []
skipped_tracks_cb = []
for track in tracks:
# Create a trackObject for the callback from the internal Track object's tags
track_tags = track.tags
track_obj_for_cb = trackObject(
title=track_tags.get('music', 'Unknown Track'),
artists=[artistTrackObject(name=artist.strip()) for artist in track_tags.get('artist', '').split(';')]
)
if getattr(track, 'was_skipped', False):
skipped_tracks_cb.append(track_obj_for_cb)
elif track.success:
successful_tracks_cb.append(track_obj_for_cb)
else:
failed_tracks_cb.append(failedTrackObject(
track=track_obj_for_cb,
reason=getattr(track, 'error_message', 'Unknown reason')
))
summary_obj = summaryObject(
successful_tracks=successful_tracks_cb,
skipped_tracks=skipped_tracks_cb,
failed_tracks=failed_tracks_cb,
total_successful=len(successful_tracks_cb),
total_skipped=len(skipped_tracks_cb),
total_failed=len(failed_tracks_cb)
)
# Include m3u path in summary and callback
report_playlist_done(playlist_obj_for_cb, summary_obj, m3u_path=m3u_path)
return playlist
class DW_EPISODE:
def __init__(
self,
preferences: Preferences
) -> None:
self.__preferences = preferences
def dw(self) -> Episode:
episode_obj = self.__preferences.song_metadata # This is a trackObject
# Build status object
status_obj_init = initializingObject(ids=episode_obj.ids)
callback_obj_init = trackCallbackObject(track=episode_obj, status_info=status_obj_init)
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_obj_init
)
episode = EASY_DW(self.__preferences).download_eps()
# Build status object
status_obj_done = doneObject(ids=episode_obj.ids)
callback_obj_done = trackCallbackObject(track=episode_obj, status_info=status_obj_done)
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_obj_done
)
return episode