Implemented callback models

This commit is contained in:
Xoconoch
2025-06-10 18:46:53 -06:00
parent c1438ecb47
commit cc947fe374
24 changed files with 633 additions and 434 deletions

View File

@@ -9,7 +9,7 @@ from deezspot.libutils.logging_utils import configure_logger, logger
# Export key functionality
from deezspot.deezloader import DeeLogin
from deezspot.models import Track, Album, Playlist, Smart, Episode
from deezspot.models.download import Track, Album, Playlist, Smart, Episode
__version__ = "1.2.0"

View File

@@ -11,7 +11,7 @@ from mutagen.id3 import (
APIC, COMM, SYLT, TALB, TCOM, TCON, TCOP, TDRC, TEXT, TIT2, TLEN,
TPE1, TPE2, TPOS, TPUB, TRCK, TSRC, TXXX, USLT, TYER
)
from deezspot.models import Track, Episode
from deezspot.models.download import Track, Episode
import requests
import logging
import os

View File

@@ -17,7 +17,7 @@ from deezspot.exceptions import (
NoRightOnMedia,
QualityNotFound,
)
from deezspot.models import (
from deezspot.models.download import (
Track,
Album,
Playlist,

View File

@@ -6,7 +6,7 @@ from deezspot.deezloader.dee_api import API
from deezspot.easy_spoty import Spo
from deezspot.deezloader.deegw_api import API_GW
from deezspot.deezloader.deezer_settings import stock_quality
from deezspot.models import (
from deezspot.models.download import (
Track,
Album,
Playlist,

View File

@@ -5,6 +5,25 @@ import sys
from typing import Optional, Callable, Dict, Any, Union
import json
from deezspot.models.callback.callbacks import (
BaseStatusObject,
initializingObject,
skippedObject,
retryingObject,
realTimeObject,
errorObject,
doneObject,
summaryObject,
failedTrackObject,
trackCallbackObject,
albumCallbackObject,
playlistCallbackObject
)
from deezspot.models.callback.track import trackObject, albumTrackObject, playlistTrackObject, artistTrackObject
from deezspot.models.callback.album import albumObject
from deezspot.models.callback.playlist import playlistObject
from deezspot.models.callback.user import userObject
# Create the main library logger
logger = logging.getLogger('deezspot')
@@ -88,7 +107,7 @@ class ProgressReporter:
#
# [ Type: "track" ]
# - song: (str) The name of the track.
# - artist: (str) The artist of the track.
# - artists: (str) The artist of the track.
# - album: (str, optional) The album of the track.
# - parent: (dict, optional) Information about the container (album/playlist).
# { "type": "album"|"playlist", "name": str, "owner": str, "artist": str, ... }
@@ -132,82 +151,21 @@ class ProgressReporter:
def report_progress(
reporter: Optional["ProgressReporter"],
report_type: str,
status: str,
song: Optional[str] = None,
artist: Optional[str] = None,
album: Optional[str] = None,
url: Optional[str] = None,
convert_to: Optional[str] = None,
bitrate: Optional[str] = None,
parent: Optional[Dict[str, Any]] = None,
current_track: Optional[int] = None,
total_tracks: Optional[Union[int, str]] = None,
reason: Optional[str] = None,
summary: Optional[Dict[str, Any]] = None,
error: Optional[str] = None,
retry_count: Optional[int] = None,
seconds_left: Optional[int] = None,
time_elapsed: Optional[int] = None,
progress: Optional[int] = None,
owner: Optional[str] = None,
name: Optional[str] = None,
title: Optional[str] = None,
callback_obj: Union[trackCallbackObject, albumCallbackObject, playlistCallbackObject]
):
"""Builds and reports a standardized progress dictionary after validating the input."""
"""
Reports progress using a standardized callback object.
# --- Input Validation ---
# Enforce the standardized format to ensure consistent reporting.
if report_type == "track":
if not all([song, artist]):
raise ValueError("For report_type 'track', 'song' and 'artist' parameters are required.")
if status == "skipped" and reason is None:
raise ValueError("For a 'skipped' track, a 'reason' is required.")
if status == "retrying" and not all(p is not None for p in [retry_count, seconds_left, error]):
raise ValueError("For a 'retrying' track, 'retry_count', 'seconds_left', and 'error' are required.")
if status == "real-time" and not all(p is not None for p in [time_elapsed, progress]):
raise ValueError("For a 'real-time' track, 'time_elapsed' and 'progress' are required.")
if status == "error" and error is None:
raise ValueError("For an 'error' track, an 'error' message is required.")
elif report_type == "album":
if not all(p is not None for p in [title, artist, total_tracks]):
raise ValueError("For report_type 'album', 'title', 'artist', and 'total_tracks' are required.")
if status == "done" and summary is None:
raise ValueError("For an 'album' with status 'done', a 'summary' is required.")
elif report_type == "playlist":
if not all(p is not None for p in [name, owner, total_tracks]):
raise ValueError("For report_type 'playlist', 'name', 'owner', and 'total_tracks' are required.")
if status == "done" and summary is None:
raise ValueError("For a 'playlist' with status 'done', a 'summary' is required.")
elif report_type == "episode":
if not all([song, artist]): # song=episode_title, artist=show_name
raise ValueError("For report_type 'episode', 'song' and 'artist' parameters are required.")
if status == "retrying" and not all(p is not None for p in [retry_count, seconds_left, error]):
raise ValueError("For a 'retrying' episode, 'retry_count', 'seconds_left', and 'error' are required.")
if status == "error" and error is None:
raise ValueError("For an 'error' episode, an 'error' message is required.")
# --- Report Building ---
report = {"type": report_type, "status": status}
Args:
reporter: The ProgressReporter to use for reporting
callback_obj: A callback object of type trackCallbackObject, albumCallbackObject, or playlistCallbackObject
"""
# Validate the callback object type
if not isinstance(callback_obj, (trackCallbackObject, albumCallbackObject, playlistCallbackObject)):
raise TypeError(f"callback_obj must be of type trackCallbackObject, albumCallbackObject, or playlistCallbackObject, got {type(callback_obj)}")
data_fields = {
"song": song, "artist": artist, "album": album, "url": url,
"convert_to": convert_to, "bitrate": bitrate, "parent": parent,
"current_track": current_track, "total_tracks": total_tracks,
"reason": reason, "summary": summary, "error": error,
"retry_count": retry_count, "seconds_left": seconds_left,
"time_elapsed": time_elapsed, "progress": progress,
"owner": owner, "name": name, "title": title
}
for key, value in data_fields.items():
if value is not None:
report[key] = value
# Convert the callback object to a dictionary and report it
if reporter:
reporter.report(report)
reporter.report(callback_obj.__dict__)
else:
logger.info(json.dumps(report))
logger.info(json.dumps(callback_obj.__dict__))

View File

@@ -6,7 +6,7 @@ from datetime import datetime
from urllib.parse import urlparse
from requests import get as req_get
from zipfile import ZipFile, ZIP_DEFLATED
from deezspot.models.track import Track
from deezspot.models.download.track import Track
from deezspot.exceptions import InvalidLink
from deezspot.libutils.others_settings import supported_link, header
from deezspot.libutils.logging_utils import ProgressReporter, logger

View File

@@ -1,8 +0,0 @@
#!/usr/bin/python3
from deezspot.models.smart import Smart
from deezspot.models.track import Track
from deezspot.models.album import Album
from deezspot.models.playlist import Playlist
from deezspot.models.preferences import Preferences
from deezspot.models.episode import Episode

View File

@@ -5,7 +5,21 @@ Callback data models for the music metadata schema.
"""
from .common import IDs, ReleaseDate
from .artist import artistObject, artistTrackObject
from .album import albumObject, albumTrackObject, albumArtistObject, trackAlbumObject
from .track import trackObject
from .playlist import playlistObject, trackPlaylistObject, albumTrackPlaylistObject, artistTrackPlaylistObject
from .artist import artistObject, albumArtistObject
from .album import albumObject, trackAlbumObject
from .track import trackObject, artistTrackObject, albumTrackObject, playlistTrackObject
from .playlist import playlistObject, trackPlaylistObject, albumTrackPlaylistObject, artistTrackPlaylistObject
from .callbacks import (
BaseStatusObject,
initializingObject,
skippedObject,
retryingObject,
realTimeObject,
errorObject,
doneObject,
summaryObject,
failedTrackObject,
trackCallbackObject,
albumCallbackObject,
playlistCallbackObject
)

View File

@@ -5,7 +5,22 @@ from typing import List, Optional, Dict, Any
from .common import IDs, ReleaseDate
@dataclass
class artistTrackAlbumObject:
"""Artist representation for a track in an album context."""
type: str = "artistTrackAlbum"
name: str = ""
ids: IDs = field(default_factory=IDs)
@dataclass
class artistAlbumObject:
"""Artist representation for an album."""
type: str = "artistAlbum"
name: str = ""
genres: List[str] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)
@dataclass
class trackAlbumObject:
"""Track when nested inside an album context."""
@@ -16,18 +31,7 @@ class trackAlbumObject:
duration_ms: int = 0
genres: List[str] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)
@dataclass
class albumTrackObject:
"""Album when nested inside a track context."""
type: str = "albumTrack"
album_type: str = "" # "album" | "single" | "compilation"
title: str = ""
release_date: Dict[str, Any] = field(default_factory=dict) # ReleaseDate as dict
total_tracks: int = 0
genres: List[str] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)
artists: List[artistTrackAlbumObject] = field(default_factory=list)
@dataclass
@@ -36,9 +40,9 @@ class albumObject:
type: str = "album"
album_type: str = "" # "album" | "single" | "compilation"
title: str = ""
release_date: Dict[str, Any] = field(default_factory=dict) # ReleaseDate as dict
release_date: Dict[str, Any] = field(default_factory=dict)
total_tracks: int = 0
genres: List[str] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)
# Nested: album's tracks without redundant album info
tracks: List[trackAlbumObject] = field(default_factory=list)
tracks: List[trackAlbumObject] = field(default_factory=list)
artists: List[artistAlbumObject] = field(default_factory=list)

View File

@@ -6,17 +6,6 @@ from typing import List, Optional
from .common import IDs
@dataclass
class artistTrackObject:
"""
An artist when nested inside a track context.
No genres, no albums—just identifying info.
"""
type: str = "artistTrack"
name: str = ""
ids: IDs = field(default_factory=IDs)
@dataclass
class albumArtistObject:
"""Album when nested inside an artist context."""
@@ -35,5 +24,4 @@ class artistObject:
name: str = ""
genres: List[str] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)
# Nested: artist's albums without redundant artist info
albums: List[albumArtistObject] = field(default_factory=list)

View File

@@ -0,0 +1,133 @@
#!/usr/bin/python3
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Union
from .common import IDs
from .track import trackObject, albumTrackObject, playlistTrackObject
from .album import albumObject
from .playlist import playlistObject
@dataclass
class BaseStatusObject:
"""Base class for all status objects with common fields."""
ids: Optional[IDs] = None
convert_to: Optional[str] = None
bitrate: Optional[str] = None
@dataclass
class initializingObject(BaseStatusObject):
"""Status object for 'initializing' state."""
status: str = "initializing"
@dataclass
class skippedObject(BaseStatusObject):
"""Status object for 'skipped' state."""
status: str = "skipped"
reason: str = ""
@dataclass
class retryingObject(BaseStatusObject):
"""Status object for 'retrying' state."""
status: str = "retrying"
retry_count: int = 0
seconds_left: int = 0
error: str = ""
@dataclass
class realTimeObject(BaseStatusObject):
"""Status object for 'real-time' state."""
status: str = "real-time"
time_elapsed: int = 0
progress: int = 0
@dataclass
class errorObject(BaseStatusObject):
"""Status object for 'error' state."""
status: str = "error"
error: str = ""
@dataclass
class failedTrackObject:
"""Represents a failed track with a reason."""
track: trackObject = field(default_factory=trackObject)
reason: str = ""
@dataclass
class summaryObject:
"""Summary of a download operation for an album or playlist."""
successful_tracks: List[trackObject] = field(default_factory=list)
skipped_tracks: List[trackObject] = field(default_factory=list)
failed_tracks: List[failedTrackObject] = field(default_factory=list)
total_successful: int = 0
total_skipped: int = 0
total_failed: int = 0
@dataclass
class doneObject(BaseStatusObject):
"""Status object for 'done' state."""
status: str = "done"
summary: Optional[summaryObject] = None
@dataclass
class trackCallbackObject:
"""
Track callback object that combines trackObject with status-specific fields.
Used for progress reporting during track processing.
"""
track: trackObject = field(default_factory=trackObject)
status_info: Union[
initializingObject,
skippedObject,
retryingObject,
realTimeObject,
errorObject,
doneObject
] = field(default_factory=initializingObject)
current_track: Optional[int] = None
total_tracks: Optional[int] = None
parent: Optional[Union[albumTrackObject, playlistTrackObject]] = None
@dataclass
class albumCallbackObject:
"""
Album callback object that combines albumObject with status-specific fields.
Used for progress reporting during album processing.
"""
album: albumObject = field(default_factory=albumObject)
status_info: Union[
initializingObject,
skippedObject,
retryingObject,
realTimeObject,
errorObject,
doneObject
] = field(default_factory=initializingObject)
@dataclass
class playlistCallbackObject:
"""
Playlist callback object that combines playlistObject with status-specific fields.
Used for progress reporting during playlist processing.
"""
playlist: playlistObject = field(default_factory=playlistObject)
status_info: Union[
initializingObject,
skippedObject,
retryingObject,
realTimeObject,
errorObject,
doneObject
] = field(default_factory=initializingObject)

View File

@@ -4,8 +4,14 @@ from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from .common import IDs
from .artist import artistObject
from .user import userObject
@dataclass
class artistAlbumTrackPlaylistObject:
"""Artist when nested inside a track in a playlist context."""
type: str = "artistAlbumTrackPlaylist"
name: str = ""
ids: IDs = field(default_factory=IDs)
@dataclass
class albumTrackPlaylistObject:
@@ -15,6 +21,7 @@ class albumTrackPlaylistObject:
title: str = ""
release_date: Dict[str, Any] = field(default_factory=dict) # ReleaseDate as dict
ids: IDs = field(default_factory=IDs)
artists: List[artistAlbumTrackPlaylistObject] = field(default_factory=list)
@dataclass
@@ -22,7 +29,6 @@ class artistTrackPlaylistObject:
"""Artist when nested inside a track in a playlist context."""
type: str = "artistTrackPlaylist"
name: str = ""
genres: List[str] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)
@@ -33,10 +39,11 @@ class trackPlaylistObject:
title: str = ""
position: int = 0 # Position in the playlist
duration_ms: int = 0 # mandatory
# Nested objects instead of string references
artist: artistTrackPlaylistObject = field(default_factory=artistTrackPlaylistObject)
artists: List[artistTrackPlaylistObject] = field(default_factory=list)
album: albumTrackPlaylistObject = field(default_factory=albumTrackPlaylistObject)
ids: IDs = field(default_factory=IDs)
disc_number: int = 1
track_number: int = 1
@dataclass
@@ -45,7 +52,6 @@ class playlistObject:
type: str = "playlist"
title: str = ""
description: Optional[str] = None
collaborative: bool = False
owner: artistObject = field(default_factory=artistObject)
owner: userObject = field(default_factory=userObject)
tracks: List[trackPlaylistObject] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)

View File

@@ -1,12 +1,48 @@
#!/usr/bin/python3
from dataclasses import dataclass, field
from typing import List, Optional
from typing import List, Optional, Dict, Any
from .common import IDs
from .album import albumTrackObject
from .artist import artistTrackObject
from .user import userObject
@dataclass
class artistAlbumTrackObject:
"""Artist when nested inside a track in an album context."""
type: str = "artistAlbumTrack"
name: str = ""
ids: IDs = field(default_factory=IDs)
@dataclass
class artistTrackObject:
"""
An artist when nested inside a track context.
No genres, no albums—just identifying info.
"""
type: str = "artistTrack"
name: str = ""
ids: IDs = field(default_factory=IDs)
@dataclass
class albumTrackObject:
"""Album when nested inside a track context."""
type: str = "albumTrack"
album_type: str = "" # "album" | "single" | "compilation"
title: str = ""
release_date: Dict[str, Any] = field(default_factory=dict) # ReleaseDate as dict
total_tracks: int = 0
genres: List[str] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)
artists: List[artistAlbumTrackObject] = field(default_factory=list)
@dataclass
class playlistTrackObject:
"""Playlist when nested inside a track context."""
type: str = "playlistTrack"
title: str = ""
description: Optional[str] = None
owner: userObject = field(default_factory=userObject)
ids: IDs = field(default_factory=IDs)
@dataclass
class trackObject:
@@ -17,11 +53,6 @@ class trackObject:
track_number: int = 1
duration_ms: int = 0 # mandatory
genres: List[str] = field(default_factory=list)
# Nested album summary
album: albumTrackObject = field(default_factory=albumTrackObject)
# Nested lean artist summary (no genres/albums)
artist: artistTrackObject = field(default_factory=artistTrackObject)
ids: IDs = field(default_factory=IDs)
artists: List[artistTrackObject] = field(default_factory=list)
ids: IDs = field(default_factory=IDs)

View File

@@ -0,0 +1,14 @@
#!/usr/bin/python3
from dataclasses import dataclass, field
from typing import Optional
from .common import IDs
@dataclass
class userObject:
"""A user object representation."""
name: str = ""
type: str = "user"
ids: IDs = field(default_factory=IDs)

View File

@@ -0,0 +1,8 @@
#!/usr/bin/python3
from deezspot.models.download.smart import Smart
from deezspot.models.download.track import Track
from deezspot.models.download.album import Album
from deezspot.models.download.playlist import Playlist
from deezspot.models.download.preferences import Preferences
from deezspot.models.download.episode import Episode

View File

@@ -1,6 +1,6 @@
#!/usr/bin/python3
from deezspot.models.track import Track
from deezspot.models.download.track import Track
class Album:
def __init__(self, ids: int) -> None:

View File

@@ -1,6 +1,6 @@
#!/usr/bin/python3
from deezspot.models.track import Track
from deezspot.models.download.track import Track
class Playlist:
def __init__(self) -> None:

View File

@@ -1,8 +1,8 @@
#!/usr/bin/python3
from deezspot.models.track import Track
from deezspot.models.album import Album
from deezspot.models.playlist import Playlist
from deezspot.models.download.track import Track
from deezspot.models.download.album import Album
from deezspot.models.download.playlist import Playlist
class Smart:
def __init__(self) -> None:

View File

@@ -17,7 +17,7 @@ from os import (
system,
replace as os_replace,
)
from deezspot.models import (
from deezspot.models.download import (
Track,
Album,
Playlist,
@@ -38,6 +38,27 @@ from deezspot.libutils.cleanup_utils import (
unregister_active_download,
)
from deezspot.libutils.skip_detection import check_track_exists
from deezspot.models.callback import (
trackCallbackObject,
albumCallbackObject,
playlistCallbackObject,
initializingObject,
skippedObject,
retryingObject,
realTimeObject,
errorObject,
doneObject,
summaryObject,
failedTrackObject,
trackObject as cb_trackObject,
albumObject as cb_albumObject,
playlistObject as cb_playlistObject,
artistTrackObject,
albumTrackObject,
playlistTrackObject,
userObject,
IDs
)
# --- Global retry counter variables ---
GLOBAL_RETRY_COUNT = 0
@@ -352,48 +373,90 @@ class EASY_DW:
self.__c_track.was_skipped = True
parent_info, total_tracks_val = self._get_parent_info()
summary_data = {
"successful_tracks": [],
"skipped_tracks": [f"{current_title} - {current_artist}"],
"failed_tracks": [],
"total_successful": 0,
"total_skipped": 1,
"total_failed": 0,
} if self.__parent is None else None
track_for_callback = cb_trackObject(
title=current_title,
artists=[artistTrackObject(name=artist.strip()) for artist in current_artist.split(';')],
ids=IDs(spotify=self.__ids)
)
initializing_status = initializingObject(
ids=IDs(spotify=self.__ids),
convert_to=self.__preferences.convert_to,
bitrate=self.__preferences.bitrate
)
parent_for_callback = None
if parent_info:
if parent_info.get("type") == "album":
parent_for_callback = albumTrackObject(
title=parent_info.get("title"),
album_type=self.__song_metadata.get("album_type", "album"),
total_tracks=parent_info.get("total_tracks"),
ids=IDs(spotify=self.__song_metadata.get("album_id"))
)
elif parent_info.get("type") == "playlist":
parent_for_callback = playlistTrackObject(
title=parent_info.get("name"),
owner=userObject(name=parent_info.get("owner")),
ids=IDs(spotify=parent_info.get("url", "").split("/")[-1])
)
callback_object = trackCallbackObject(
track=track_for_callback,
status_info=initializing_status,
current_track=getattr(self.__preferences, 'track_number', None),
total_tracks=total_tracks_val,
parent=parent_for_callback
)
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="track",
status="skipped",
song=current_title,
artist=current_artist,
url=self.__link,
reason=f"Track already exists in desired format at {existing_file_path}",
convert_to=self.__preferences.convert_to,
bitrate=self.__preferences.bitrate,
current_track=getattr(self.__preferences, 'track_number', None),
total_tracks=total_tracks_val,
parent=parent_info,
summary=summary_data
callback_obj=callback_object
)
return self.__c_track
# Report initializing status for the track download
parent_info, total_tracks_val = self._get_parent_info()
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="track",
status="initializing",
song=current_title,
artist=current_artist,
album=current_album,
url=self.__link,
track_for_callback = cb_trackObject(
title=current_title,
artists=[artistTrackObject(name=artist.strip()) for artist in current_artist.split(';')],
ids=IDs(spotify=self.__ids)
)
initializing_status = initializingObject(
ids=IDs(spotify=self.__ids),
convert_to=self.__preferences.convert_to,
bitrate=self.__preferences.bitrate,
parent=parent_info,
bitrate=self.__preferences.bitrate
)
parent_for_callback = None
if parent_info:
if parent_info.get("type") == "album":
parent_for_callback = albumTrackObject(
title=parent_info.get("title"),
album_type=self.__song_metadata.get("album_type", "album"),
total_tracks=parent_info.get("total_tracks"),
ids=IDs(spotify=self.__song_metadata.get("album_id"))
)
elif parent_info.get("type") == "playlist":
parent_for_callback = playlistTrackObject(
title=parent_info.get("name"),
owner=userObject(name=parent_info.get("owner")),
ids=IDs(spotify=parent_info.get("url", "").split("/")[-1])
)
callback_object = trackCallbackObject(
track=track_for_callback,
status_info=initializing_status,
current_track=getattr(self.__preferences, 'track_number', None),
total_tracks=total_tracks_val,
parent=parent_for_callback
)
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_object
)
# If track does not exist in the desired final format, proceed with download/conversion
@@ -449,20 +512,31 @@ class EASY_DW:
if current_percentage > self._last_reported_percentage:
self._last_reported_percentage = current_percentage
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="track",
status="real-time",
song=self.__song_metadata.get("music", ""),
artist=self.__song_metadata.get("artist", ""),
url=self.__link,
time_elapsed=int((current_time - start_time) * 1000),
progress=current_percentage,
real_time_status = realTimeObject(
ids=IDs(spotify=self.__ids),
convert_to=self.__convert_to,
bitrate=self.__bitrate,
time_elapsed=int((current_time - start_time) * 1000),
progress=current_percentage
)
track_for_callback = cb_trackObject(
title=self.__song_metadata.get("music", ""),
artists=[artistTrackObject(name=artist.strip()) for artist in self.__song_metadata.get("artist", "").split(';')],
ids=IDs(spotify=self.__ids)
)
callback_object = trackCallbackObject(
track=track_for_callback,
status_info=real_time_status,
current_track=getattr(self.__preferences, 'track_number', None),
total_tracks=total_tracks_val,
parent=parent_info
parent=parent_for_callback
)
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_object
)
# Rate limiting (if needed)
@@ -570,28 +644,34 @@ class EASY_DW:
}
})
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="track",
status="retrying",
retry_count=retries,
seconds_left=retry_delay,
song=self.__song_metadata.get('music', ''),
artist=self.__song_metadata.get('artist', ''),
album=self.__song_metadata.get('album', ''),
error=str(e),
url=self.__link,
retrying_status = retryingObject(
ids=IDs(spotify=self.__ids),
convert_to=self.__convert_to,
bitrate=self.__bitrate,
current_track=getattr(self.__preferences, 'track_number', None),
total_tracks=total_tracks_val,
parent=parent_info
retry_count=retries,
seconds_left=retry_delay,
error=str(e)
)
track_for_callback = cb_trackObject(
title=self.__song_metadata.get('music', 'Unknown Episode'),
artists=[artistTrackObject(name=self.__song_metadata.get('artist', 'Unknown Show'))],
ids=IDs(spotify=self.__ids)
)
callback_object = trackCallbackObject(
track=track_for_callback,
status_info=retrying_status
)
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_object
)
if retries >= max_retries or GLOBAL_RETRY_COUNT >= GLOBAL_MAX_RETRIES:
# Final cleanup before giving up
if os.path.exists(self.__song_path):
os.remove(self.__song_path)
os.remove(self.__song_path) # Clean up partial file
# Add track info to exception
track_name = self.__song_metadata.get('music', 'Unknown Track')
artist_name = self.__song_metadata.get('artist', 'Unknown Artist')
@@ -626,20 +706,28 @@ class EASY_DW:
else:
error_msg = f"Audio conversion failed: {original_error_str}"
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="track",
status="error",
song=self.__song_metadata.get('music', ''),
artist=self.__song_metadata.get('artist', ''),
error=error_msg,
url=self.__link,
error_status = errorObject(
ids=IDs(spotify=self.__ids),
convert_to=self.__convert_to,
bitrate=self.__bitrate,
error=error_msg
)
track_for_callback = cb_trackObject(
title=self.__song_metadata.get('music', ''),
artists=[artistTrackObject(name=artist.strip()) for artist in self.__song_metadata.get('artist', '').split(';')],
ids=IDs(spotify=self.__ids)
)
callback_object = trackCallbackObject(
track=track_for_callback,
status_info=error_status,
current_track=getattr(self.__preferences, 'track_number', None),
total_tracks=total_tracks_val,
parent=parent_info
)
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_object
)
logger.error(f"Audio conversion error: {error_msg}")
# If conversion fails, clean up the .ogg file
@@ -654,19 +742,27 @@ class EASY_DW:
except Exception as conv_e:
# If conversion fails twice, create a final error report
error_msg_2 = f"Audio conversion failed after retry for '{self.__song_metadata.get('music', 'Unknown Track')}'. Original error: {str(conv_e)}"
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="track",
status="error",
song=self.__song_metadata.get('music', 'Unknown Track'),
artist=self.__song_metadata.get('artist', ''),
error=error_msg_2,
url=self.__link,
error_status_2 = errorObject(
ids=IDs(spotify=self.__ids),
convert_to=self.__convert_to,
bitrate=self.__bitrate,
parent=parent_info,
error=error_msg_2
)
track_for_callback_2 = cb_trackObject(
title=self.__song_metadata.get('music', 'Unknown Track'),
artists=[artistTrackObject(name=artist.strip()) for artist in self.__song_metadata.get('artist', '').split(';')],
ids=IDs(spotify=self.__ids)
)
callback_object_2 = trackCallbackObject(
track=track_for_callback_2,
status_info=error_status_2,
current_track=getattr(self.__preferences, 'track_number', None),
total_tracks=total_tracks_val
total_tracks=total_tracks_val,
parent=parent_info
)
report_progress(
reporter=Download_JOB.progress_reporter,
callback_obj=callback_object_2
)
logger.error(error_msg)
@@ -697,29 +793,38 @@ class EASY_DW:
total_tracks_val = self.__song_metadata.get('nb_tracks', 0)
current_track_val = getattr(self.__preferences, 'track_number', 0)
summary_obj = None
if self.__parent is None:
summary_data = {
"successful_tracks": [f"{song} - {artist}"],
"skipped_tracks": [],
"failed_tracks": [],
"total_successful": 1,
"total_skipped": 0,
"total_failed": 0,
}
track_obj = cb_trackObject(title=f"{song} - {artist}")
summary_obj = summaryObject(
successful_tracks=[track_obj],
total_successful=1
)
done_status = doneObject(
ids=IDs(spotify=self.__ids),
convert_to=self.__convert_to,
bitrate=self.__bitrate,
summary=summary_obj
)
track_for_callback = cb_trackObject(
title=song,
artists=[artistTrackObject(name=a.strip()) for a in artist.split(';')],
ids=IDs(spotify=self.__ids)
)
callback_object = trackCallbackObject(
track=track_for_callback,
status_info=done_status,
current_track=current_track_val,
total_tracks=total_tracks_val,
parent=parent_for_callback
)
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="track",
status="done",
song=song,
artist=artist,
url=self.__link,
convert_to=self.__convert_to,
bitrate=self.__bitrate,
parent=parent_info,
current_track=current_track_val,
total_tracks=total_tracks_val,
summary=summary_data,
callback_obj=callback_object
)
if hasattr(self, '_EASY_DW__c_track') and self.__c_track and self.__c_track.success:
@@ -902,22 +1007,33 @@ class EASY_DW:
# Or self.__song_path might be a partially converted file if convert_audio failed mid-way and didn't cleanup perfectly.
episode_title = self.__song_metadata.get('music', 'Unknown Episode')
error_message = f"Audio conversion for episode '{episode_title}' failed. Original error: {str(conv_e)}"
track_for_callback = cb_trackObject(
title=episode_title,
artists=[artistTrackObject(name=self.__song_metadata.get('artist', 'Unknown Show'))],
ids=IDs(spotify=self.__ids)
)
error_status = errorObject(
ids=IDs(spotify=self.__ids),
convert_to=self.__convert_to,
bitrate=self.__bitrate,
error=error_message
)
callback_object = trackCallbackObject(
track=track_for_callback,
status_info=error_status
)
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="episode",
status="error",
song=episode_title,
artist=self.__song_metadata.get('artist', 'Unknown Show'),
error=error_message,
url=self.__link,
convert_to=self.__convert_to,
bitrate=self.__bitrate
callback_obj=callback_object
)
# Attempt to remove self.__song_path, which is the latest known path for this episode
if os.path.exists(self.__song_path):
os.remove(self.__song_path)
unregister_active_download(self.__song_path) # Unregister it as it failed/was removed
logger.error(error_message)
if hasattr(self, '_EASY_DW__c_episode') and self.__c_episode:
self.__c_episode.success = False
@@ -1005,157 +1121,54 @@ class DW_ALBUM:
total_tracks = self.__song_metadata.get('nb_tracks', 0)
album_id = self.__ids
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="album",
artist=album_artist,
status="initializing",
total_tracks=total_tracks,
title=album_name,
url=f"https://open.spotify.com/album/{album_id}",
)
pic_url = self.__song_metadata['image'] # This is URL for spotify
image_bytes = request(pic_url).content
self.__song_metadata['image'] = image_bytes # Keep bytes for tagging
album = Album(self.__ids)
album.image = image_bytes # Store raw image bytes for cover saving
album.nb_tracks = self.__song_metadata['nb_tracks']
album.album_name = self.__song_metadata['album']
album.upc = self.__song_metadata['upc']
tracks = album.tracks
album.md5_image = self.__ids
album.tags = self.__song_metadata
# Determine album base directory once
album_base_directory = get_album_directory(
self.__song_metadata, # Album level metadata
self.__output_dir,
custom_dir_format=self.__preferences.custom_dir_format,
pad_tracks=self.__preferences.pad_tracks
)
c_song_metadata = {}
for key, item in self.__song_metadata_items:
if key == 'popularity_list':
continue
if not isinstance(item, list): # Changed from type() to isinstance()
c_song_metadata[key] = item # Use item directly (it's self.__song_metadata[key])
total_tracks = album.nb_tracks
for a in range(total_tracks):
for key, metadata_value_for_key in self.__song_metadata_items: # metadata_value_for_key is self.__song_metadata[key]
if isinstance(metadata_value_for_key, list): # Changed from type() is list to isinstance()
if key == 'popularity_list':
continue
if a < len(metadata_value_for_key):
c_song_metadata[key] = metadata_value_for_key[a]
else:
# Log a warning because a per-track list is shorter than expected.
# This was causing the IndexError.
album_name_for_log = c_song_metadata.get('album', self.__song_metadata.get('album', 'Unknown Album'))
logger.warning(
f"In album '{album_name_for_log}', metadata list for key '{key}' is too short. "
f"Expected at least {a + 1} elements for track {a + 1} "
f"(list has {len(metadata_value_for_key)}). Assigning None to '{key}' for this track."
)
c_song_metadata[key] = None
song_name = c_song_metadata['music']
artist_name = c_song_metadata['artist']
album_name = c_song_metadata['album']
current_track = a + 1
c_preferences = deepcopy(self.__preferences)
c_preferences.song_metadata = c_song_metadata.copy()
c_preferences.ids = c_song_metadata['ids']
c_preferences.track_number = current_track # Track number in the album
c_preferences.link = f"https://open.spotify.com/track/{c_preferences.ids}"
# Add album_id to song metadata for consistent parent info
c_preferences.song_metadata['album_id'] = self.__ids
try:
# Use track-level reporting through EASY_DW
track = EASY_DW(c_preferences, parent='album').download_try()
except TrackNotFound as e_track:
track = Track(c_song_metadata, None, None, None, None, None)
track.success = False
track.error_message = str(e_track) # Store the error message from TrackNotFound
logger.warning(f"Track '{song_name}' by '{artist_name}' from album '{album.album_name}' not found or failed to download. Reason: {track.error_message}")
except Exception as e_generic:
track = Track(c_song_metadata, None, None, None, None, None)
track.success = False
track.error_message = f"An unexpected error occurred: {str(e_generic)}"
logger.error(f"Unexpected error downloading track '{song_name}' by '{artist_name}' from album '{album.album_name}'. Reason: {track.error_message}")
tracks.append(track)
successful_tracks_objs = []
failed_tracks_objs = []
skipped_tracks_objs = []
# Save album cover image
if self.__preferences.save_cover and album.image and album_base_directory:
save_cover_image(album.image, album_base_directory, "cover.jpg")
if self.__make_zip:
song_quality = tracks[0].quality
custom_dir_format = getattr(self.__preferences, 'custom_dir_format', None)
zip_name = create_zip(
tracks,
output_dir=self.__output_dir,
song_metadata=self.__song_metadata,
song_quality=song_quality,
custom_dir_format=custom_dir_format
)
album.zip_path = zip_name
# Report album done status
album_name = self.__song_metadata.get('album', 'Unknown Album')
# Process album artist for the done status (use the same logic as initializing)
album_artist = self.__song_metadata.get('artist', 'Unknown Artist')
if isinstance(album_artist, list):
album_artist = most_frequent(album_artist)
elif isinstance(album_artist, str) and ";" in album_artist:
artists_list = [artist.strip() for artist in album_artist.split(";")]
album_artist = most_frequent(artists_list) if artists_list else album_artist
total_tracks = self.__song_metadata.get('nb_tracks', 0)
album_id = self.__ids
successful_tracks = []
failed_tracks = []
skipped_tracks = []
for track in tracks:
track_info = {
"name": track.tags.get('music', 'Unknown Track'),
"artist": track.tags.get('artist', 'Unknown Artist')
}
track_info = cb_trackObject(
title=track.tags.get('music', 'Unknown Track'),
artists=[artistTrackObject(name=artist.strip()) for artist in track.tags.get('artist', 'Unknown Artist').split(';')],
ids=IDs(spotify=track.tags.get('ids'))
)
if getattr(track, 'was_skipped', False):
skipped_tracks.append(track_info)
skipped_tracks_objs.append(track_info)
elif track.success:
successful_tracks.append(track_info)
successful_tracks_objs.append(track_info)
else:
track_info["reason"] = getattr(track, 'error_message', 'Unknown reason')
failed_tracks.append(track_info)
reason = getattr(track, 'error_message', 'Unknown reason')
failed_tracks_objs.append(failedTrackObject(track=track_info, reason=reason))
summary = {
"successful_tracks": [f"{t['name']} - {t['artist']}" for t in successful_tracks],
"skipped_tracks": [f"{t['name']} - {t['artist']}" for t in skipped_tracks],
"failed_tracks": [{
"track": f"{t['name']} - {t['artist']}",
"reason": t['reason']
} for t in failed_tracks],
"total_successful": len(successful_tracks),
"total_skipped": len(skipped_tracks),
"total_failed": len(failed_tracks)
}
summary = summaryObject(
successful_tracks=successful_tracks_objs,
skipped_tracks=skipped_tracks_objs,
failed_tracks=failed_tracks_objs,
total_successful=len(successful_tracks_objs),
total_skipped=len(skipped_tracks_objs),
total_failed=len(failed_tracks_objs)
)
done_status = doneObject(
ids=IDs(spotify=album_id),
summary=summary
)
album_for_callback = cb_albumObject(
title=album_name,
artists=[artistTrackObject(name=album_artist.strip())],
total_tracks=total_tracks,
ids=IDs(spotify=album_id),
album_type=self.__song_metadata.get("album_type", "album")
)
callback_object = albumCallbackObject(
album=album_for_callback,
status_info=done_status
)
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="album",
artist=album_artist,
status="done",
total_tracks=total_tracks,
title=album_name,
url=f"https://open.spotify.com/album/{album_id}",
summary=summary,
callback_obj=callback_object
)
return album
@@ -1179,14 +1192,25 @@ class DW_PLAYLIST:
playlist_id = self.__ids
# Report playlist initializing status
playlist_for_callback = cb_playlistObject(
title=playlist_name,
owner=userObject(name=playlist_owner),
ids=IDs(spotify=playlist_id),
description=self.__json_data.get('description')
)
initializing_status = initializingObject(
ids=IDs(spotify=playlist_id)
)
callback_object = playlistCallbackObject(
playlist=playlist_for_callback,
status_info=initializing_status
)
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="playlist",
owner=playlist_owner,
status="initializing",
total_tracks=total_tracks,
name=playlist_name,
url=f"https://open.spotify.com/playlist/{playlist_id}",
callback_obj=callback_object
)
# --- Prepare the m3u playlist file ---
@@ -1336,42 +1360,52 @@ class DW_PLAYLIST:
total_tracks = self.__json_data.get('tracks', {}).get('total', 0)
playlist_id = self.__ids
successful_tracks = []
failed_tracks = []
skipped_tracks = []
successful_tracks_objs = []
failed_tracks_objs = []
skipped_tracks_objs = []
for track in tracks:
track_info = {
"name": track.tags.get('music') or track.tags.get('name', 'Unknown Track'),
"artist": track.tags.get('artist', 'Unknown Artist')
}
track_info = cb_trackObject(
title=track.tags.get('music') or track.tags.get('name', 'Unknown Track'),
artists=[artistTrackObject(name=artist.strip()) for artist in track.tags.get('artist', 'Unknown Artist').split(';')],
ids=IDs(spotify=track.tags.get('ids'))
)
if getattr(track, 'was_skipped', False):
skipped_tracks.append(track_info)
skipped_tracks_objs.append(track_info)
elif track.success:
successful_tracks.append(track_info)
successful_tracks_objs.append(track_info)
else:
track_info["reason"] = getattr(track, 'error_message', 'Unknown reason')
failed_tracks.append(track_info)
reason = getattr(track, 'error_message', 'Unknown reason')
failed_tracks_objs.append(failedTrackObject(track=track_info, reason=reason))
summary = summaryObject(
successful_tracks=successful_tracks_objs,
skipped_tracks=skipped_tracks_objs,
failed_tracks=failed_tracks_objs,
total_successful=len(successful_tracks_objs),
total_skipped=len(skipped_tracks_objs),
total_failed=len(failed_tracks_objs)
)
done_status = doneObject(
ids=IDs(spotify=playlist_id),
summary=summary
)
playlist_for_callback = cb_playlistObject(
title=playlist_name,
owner=userObject(name=playlist_owner),
ids=IDs(spotify=playlist_id),
description=self.__json_data.get('description')
)
callback_object = playlistCallbackObject(
playlist=playlist_for_callback,
status_info=done_status
)
summary = {
"successful_tracks": [f"{t['name']} - {t['artist']}" for t in successful_tracks],
"skipped_tracks": [f"{t['name']} - {t['artist']}" for t in skipped_tracks],
"failed_tracks": [{
"track": f"{t['name']} - {t['artist']}",
"reason": t['reason']
} for t in failed_tracks],
"total_successful": len(successful_tracks),
"total_skipped": len(skipped_tracks),
"total_failed": len(failed_tracks)
}
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="playlist",
owner=playlist_owner,
status="done",
total_tracks=total_tracks,
name=playlist_name,
url=f"https://open.spotify.com/playlist/{playlist_id}",
summary=summary,
callback_obj=callback_object
)
return playlist
@@ -1385,26 +1419,41 @@ class DW_EPISODE:
def dw(self) -> Episode:
episode_id = self.__preferences.ids
url = f"https://open.spotify.com/episode/{episode_id}" if episode_id else None
track_for_callback = cb_trackObject(
title=self.__preferences.song_metadata.get('name', 'Unknown Episode'),
artists=[artistTrackObject(name=self.__preferences.song_metadata.get('show', 'Unknown Show'))],
ids=IDs(spotify=episode_id)
)
initializing_status = initializingObject(
ids=IDs(spotify=episode_id)
)
callback_object_init = trackCallbackObject(
track=track_for_callback,
status_info=initializing_status
)
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="episode",
song=self.__preferences.song_metadata.get('name', 'Unknown Episode'),
artist=self.__preferences.song_metadata.get('show', 'Unknown Show'),
status="initializing",
url=url,
callback_obj=callback_object_init
)
episode = EASY_DW(self.__preferences).download_eps()
done_status = doneObject(
ids=IDs(spotify=episode_id)
)
callback_object_done = trackCallbackObject(
track=track_for_callback,
status_info=done_status
)
report_progress(
reporter=Download_JOB.progress_reporter,
report_type="episode",
song=self.__preferences.song_metadata.get('name', 'Unknown Episode'),
artist=self.__preferences.song_metadata.get('show', 'Unknown Show'),
status="done",
url=url,
callback_obj=callback_object_done
)
return episode

View File

@@ -11,7 +11,7 @@ from deezspot.libutils.utils import (
link_is_valid,
what_kind,
)
from deezspot.models import (
from deezspot.models.download import (
Track,
Album,
Playlist,