implemented objects standard in deezloader
This commit is contained in:
406
deezspot/deezloader/__dee_api__.py
Normal file
406
deezspot/deezloader/__dee_api__.py
Normal file
@@ -0,0 +1,406 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
from typing import Optional, List, Dict, Any, Union
|
||||
|
||||
from deezspot.models.callback.common import IDs
|
||||
from deezspot.models.callback.track import (
|
||||
trackObject,
|
||||
albumTrackObject,
|
||||
artistTrackObject,
|
||||
artistAlbumTrackObject,
|
||||
)
|
||||
from deezspot.models.callback.album import (
|
||||
albumObject,
|
||||
trackAlbumObject,
|
||||
artistAlbumObject,
|
||||
artistTrackAlbumObject,
|
||||
)
|
||||
from deezspot.models.callback.playlist import (
|
||||
playlistObject,
|
||||
trackPlaylistObject,
|
||||
albumTrackPlaylistObject,
|
||||
artistTrackPlaylistObject,
|
||||
artistAlbumTrackPlaylistObject,
|
||||
)
|
||||
from deezspot.models.callback.user import userObject
|
||||
|
||||
def _parse_release_date(date_str: Optional[str]) -> Dict[str, Any]:
|
||||
if not date_str:
|
||||
return {"year": 0, "month": 0, "day": 0}
|
||||
|
||||
parts = list(map(int, date_str.split('-')))
|
||||
return {
|
||||
"year": parts[0] if len(parts) > 0 else 0,
|
||||
"month": parts[1] if len(parts) > 1 else 0,
|
||||
"day": parts[2] if len(parts) > 2 else 0
|
||||
}
|
||||
|
||||
def _get_images_from_cover(item_json: dict) -> List[Dict[str, Any]]:
|
||||
images = []
|
||||
if item_json.get("cover_small"):
|
||||
images.append({"url": item_json["cover_small"], "height": 56, "width": 56})
|
||||
if item_json.get("cover_medium"):
|
||||
images.append({"url": item_json["cover_medium"], "height": 250, "width": 250})
|
||||
if item_json.get("cover_big"):
|
||||
images.append({"url": item_json["cover_big"], "height": 500, "width": 500})
|
||||
if item_json.get("cover_xl"):
|
||||
images.append({"url": item_json["cover_xl"], "height": 1000, "width": 1000})
|
||||
if item_json.get("picture_small"):
|
||||
images.append({"url": item_json["picture_small"], "height": 56, "width": 56})
|
||||
if item_json.get("picture_medium"):
|
||||
images.append({"url": item_json["picture_medium"], "height": 250, "width": 250})
|
||||
if item_json.get("picture_big"):
|
||||
images.append({"url": item_json["picture_big"], "height": 500, "width": 500})
|
||||
if item_json.get("picture_xl"):
|
||||
images.append({"url": item_json["picture_xl"], "height": 1000, "width": 1000})
|
||||
return images
|
||||
|
||||
|
||||
def _json_to_artist_track_object(artist_json: dict) -> artistTrackObject:
|
||||
return artistTrackObject(
|
||||
name=artist_json.get('name'),
|
||||
ids=IDs(deezer=artist_json.get('id'))
|
||||
)
|
||||
|
||||
def _json_to_album_track_object(album_json: dict) -> albumTrackObject:
|
||||
artists = []
|
||||
|
||||
# Check for contributors first - they're more detailed
|
||||
if "contributors" in album_json:
|
||||
# Look for main artists
|
||||
main_artists = [c for c in album_json['contributors'] if c.get('role') == 'Main']
|
||||
if main_artists:
|
||||
artists = [artistAlbumTrackObject(
|
||||
name=c.get('name'),
|
||||
ids=IDs(deezer=c.get('id'))
|
||||
) for c in main_artists]
|
||||
else:
|
||||
# If no main artists specified, use all contributors
|
||||
artists = [artistAlbumTrackObject(
|
||||
name=c.get('name'),
|
||||
ids=IDs(deezer=c.get('id'))
|
||||
) for c in album_json['contributors']]
|
||||
|
||||
# If no contributors found, use the artist field
|
||||
if not artists and "artist" in album_json:
|
||||
artists.append(artistAlbumTrackObject(
|
||||
name=album_json['artist'].get('name'),
|
||||
ids=IDs(deezer=album_json['artist'].get('id'))
|
||||
))
|
||||
|
||||
return albumTrackObject(
|
||||
album_type=album_json.get('record_type', ''),
|
||||
title=album_json.get('title'),
|
||||
ids=IDs(deezer=album_json.get('id')),
|
||||
images=_get_images_from_cover(album_json),
|
||||
release_date=_parse_release_date(album_json.get('release_date')),
|
||||
artists=artists,
|
||||
total_tracks=album_json.get('nb_tracks', 0),
|
||||
genres=[g['name'] for g in album_json.get('genres', {}).get('data', [])]
|
||||
)
|
||||
|
||||
def tracking(track_json: dict) -> Optional[trackObject]:
|
||||
"""
|
||||
Convert raw Deezer API track response to a standardized trackObject.
|
||||
|
||||
Args:
|
||||
track_json: Raw track data from Deezer API
|
||||
|
||||
Returns:
|
||||
A standardized trackObject or None if input is invalid
|
||||
"""
|
||||
if not track_json or 'id' not in track_json:
|
||||
return None
|
||||
|
||||
return create_standardized_track(track_json)
|
||||
|
||||
def _json_to_track_album_object(track_json: dict) -> trackAlbumObject:
|
||||
artists = []
|
||||
if "artist" in track_json:
|
||||
artists.append(artistTrackAlbumObject(
|
||||
name=track_json['artist'].get('name'),
|
||||
ids=IDs(deezer=track_json['artist'].get('id'))
|
||||
))
|
||||
|
||||
# If 'contributors' exists, add them as artists too
|
||||
if "contributors" in track_json:
|
||||
for contributor in track_json['contributors']:
|
||||
# Skip duplicates - don't add if name already exists
|
||||
if not any(artist.name == contributor.get('name') for artist in artists):
|
||||
artists.append(artistTrackAlbumObject(
|
||||
name=contributor.get('name'),
|
||||
ids=IDs(deezer=contributor.get('id'))
|
||||
))
|
||||
|
||||
# Ensure track position and disc number are properly extracted
|
||||
track_position = track_json.get('track_position')
|
||||
# Default to track_number if track_position isn't available
|
||||
if track_position is None:
|
||||
track_position = track_json.get('track_number')
|
||||
# Ensure we have a non-None value
|
||||
if track_position is None:
|
||||
track_position = 0
|
||||
|
||||
disc_number = track_json.get('disk_number')
|
||||
# Default to disc_number if disk_number isn't available
|
||||
if disc_number is None:
|
||||
disc_number = track_json.get('disc_number')
|
||||
# Ensure we have a non-None value
|
||||
if disc_number is None:
|
||||
disc_number = 1
|
||||
|
||||
return trackAlbumObject(
|
||||
title=track_json.get('title'),
|
||||
duration_ms=track_json.get('duration', 0) * 1000,
|
||||
explicit=track_json.get('explicit_lyrics', False),
|
||||
track_number=track_position,
|
||||
disc_number=disc_number,
|
||||
ids=IDs(deezer=track_json.get('id')),
|
||||
artists=artists
|
||||
)
|
||||
|
||||
|
||||
def tracking_album(album_json: dict) -> Optional[albumObject]:
|
||||
if not album_json or 'id' not in album_json:
|
||||
return None
|
||||
|
||||
# Determine album artists from contributors or artist field
|
||||
album_artists = []
|
||||
if 'contributors' in album_json:
|
||||
main_artists = [c for c in album_json['contributors'] if c.get('role') == 'Main']
|
||||
if main_artists:
|
||||
album_artists = [artistAlbumObject(
|
||||
name=c.get('name', ''),
|
||||
ids=IDs(deezer=c.get('id'))
|
||||
) for c in main_artists]
|
||||
else:
|
||||
# Fallback to all contributors if no main artist is specified
|
||||
album_artists = [artistAlbumObject(
|
||||
name=c.get('name', ''),
|
||||
ids=IDs(deezer=c.get('id'))
|
||||
) for c in album_json['contributors']]
|
||||
elif 'artist' in album_json:
|
||||
album_artists.append(artistAlbumObject(
|
||||
name=album_json['artist'].get('name', ''),
|
||||
ids=IDs(deezer=album_json['artist'].get('id'))
|
||||
))
|
||||
|
||||
# Extract album metadata
|
||||
album_obj = albumObject(
|
||||
album_type=album_json.get('record_type', ''),
|
||||
title=album_json.get('title', ''),
|
||||
ids=IDs(deezer=album_json.get('id'), upc=album_json.get('upc')),
|
||||
images=_get_images_from_cover(album_json),
|
||||
release_date=_parse_release_date(album_json.get('release_date')),
|
||||
total_tracks=album_json.get('nb_tracks', 0),
|
||||
genres=[g['name'] for g in album_json.get('genres', {}).get('data', [])] if album_json.get('genres') else [],
|
||||
artists=album_artists
|
||||
)
|
||||
|
||||
# Process tracks
|
||||
album_tracks = []
|
||||
tracks_data = album_json.get('tracks', {}).get('data', [])
|
||||
|
||||
for track_data in tracks_data:
|
||||
# Ensure we have detailed track information
|
||||
# The /album/{id}/tracks endpoint provides ISRC, explicit flags, etc.
|
||||
|
||||
# Create track artists with main artist
|
||||
track_artists = []
|
||||
if "artist" in track_data:
|
||||
track_artists.append(artistTrackAlbumObject(
|
||||
name=track_data['artist'].get('name'),
|
||||
ids=IDs(deezer=track_data['artist'].get('id'))
|
||||
))
|
||||
|
||||
# Ensure track position and disc number are properly extracted
|
||||
track_position = track_data.get('track_position')
|
||||
if track_position is None:
|
||||
track_position = track_data.get('track_number', 0)
|
||||
|
||||
disc_number = track_data.get('disk_number')
|
||||
if disc_number is None:
|
||||
disc_number = track_data.get('disc_number', 1)
|
||||
|
||||
# Create the track object with enhanced metadata
|
||||
track = trackAlbumObject(
|
||||
title=track_data.get('title'),
|
||||
duration_ms=track_data.get('duration', 0) * 1000,
|
||||
explicit=track_data.get('explicit_lyrics', False),
|
||||
track_number=track_position,
|
||||
disc_number=disc_number,
|
||||
ids=IDs(deezer=track_data.get('id'), isrc=track_data.get('isrc')),
|
||||
artists=track_artists
|
||||
)
|
||||
album_tracks.append(track)
|
||||
|
||||
album_obj.tracks = album_tracks
|
||||
|
||||
return album_obj
|
||||
|
||||
def _json_to_track_playlist_object(track_json: dict) -> Optional[trackPlaylistObject]:
|
||||
if not track_json or not track_json.get('id'):
|
||||
return None
|
||||
|
||||
# Create artists with proper type
|
||||
artists = []
|
||||
if "artist" in track_json:
|
||||
artists.append(artistTrackPlaylistObject(
|
||||
name=track_json['artist'].get('name'),
|
||||
ids=IDs(deezer=track_json['artist'].get('id'))
|
||||
))
|
||||
|
||||
# If 'contributors' exists, add them as artists too
|
||||
if "contributors" in track_json:
|
||||
for contributor in track_json['contributors']:
|
||||
# Skip duplicates - don't add if name already exists
|
||||
if not any(artist.name == contributor.get('name') for artist in artists):
|
||||
artists.append(artistTrackPlaylistObject(
|
||||
name=contributor.get('name'),
|
||||
ids=IDs(deezer=contributor.get('id'))
|
||||
))
|
||||
|
||||
# Process album
|
||||
album_data = track_json.get('album', {})
|
||||
|
||||
# Process album artists
|
||||
album_artists = []
|
||||
if "artist" in album_data:
|
||||
album_artists.append(artistAlbumTrackPlaylistObject(
|
||||
name=album_data['artist'].get('name'),
|
||||
ids=IDs(deezer=album_data['artist'].get('id'))
|
||||
))
|
||||
|
||||
album = albumTrackPlaylistObject(
|
||||
title=album_data.get('title'),
|
||||
ids=IDs(deezer=album_data.get('id')),
|
||||
images=_get_images_from_cover(album_data),
|
||||
artists=album_artists,
|
||||
album_type=album_data.get('record_type', ''),
|
||||
release_date=_parse_release_date(album_data.get('release_date')),
|
||||
total_tracks=album_data.get('nb_tracks', 0)
|
||||
)
|
||||
|
||||
return trackPlaylistObject(
|
||||
title=track_json.get('title'),
|
||||
duration_ms=track_json.get('duration', 0) * 1000,
|
||||
ids=IDs(deezer=track_json.get('id'), isrc=track_json.get('isrc')),
|
||||
artists=artists,
|
||||
album=album,
|
||||
explicit=track_json.get('explicit_lyrics', False),
|
||||
disc_number=track_json.get('disk_number') or track_json.get('disc_number', 1),
|
||||
track_number=track_json.get('track_position') or track_json.get('track_number', 0)
|
||||
)
|
||||
|
||||
def tracking_playlist(playlist_json: dict) -> Optional[playlistObject]:
|
||||
if not playlist_json or 'id' not in playlist_json:
|
||||
return None
|
||||
|
||||
creator = playlist_json.get('creator', {})
|
||||
owner = userObject(
|
||||
name=creator.get('name'),
|
||||
ids=IDs(deezer=creator.get('id'))
|
||||
)
|
||||
|
||||
tracks_data = playlist_json.get('tracks', {}).get('data', [])
|
||||
tracks = []
|
||||
for track_data in tracks_data:
|
||||
track = _json_to_track_playlist_object(track_data)
|
||||
if track:
|
||||
tracks.append(track)
|
||||
|
||||
# Extract playlist images
|
||||
images = _get_images_from_cover(playlist_json)
|
||||
|
||||
# Add picture of the first track as playlist image if no images found
|
||||
if not images and tracks and tracks[0].album and tracks[0].album.images:
|
||||
images = tracks[0].album.images
|
||||
|
||||
description = playlist_json.get('description') or ""
|
||||
|
||||
playlist_obj = playlistObject(
|
||||
title=playlist_json.get('title'),
|
||||
description=description,
|
||||
ids=IDs(deezer=playlist_json.get('id')),
|
||||
images=images,
|
||||
owner=owner,
|
||||
tracks=tracks,
|
||||
)
|
||||
|
||||
return playlist_obj
|
||||
|
||||
def create_standardized_track(track_json: dict) -> trackObject:
|
||||
"""
|
||||
Create a standardized trackObject directly from Deezer API response.
|
||||
This makes metadata handling more consistent with spotloader's approach.
|
||||
|
||||
Args:
|
||||
track_json: Raw track data from Deezer API
|
||||
|
||||
Returns:
|
||||
A standardized trackObject
|
||||
"""
|
||||
# Extract artist information
|
||||
artists = []
|
||||
if "artist" in track_json:
|
||||
artists.append(artistTrackObject(
|
||||
name=track_json['artist'].get('name', ''),
|
||||
ids=IDs(deezer=track_json['artist'].get('id'))
|
||||
))
|
||||
|
||||
# Add additional artists from contributors
|
||||
if "contributors" in track_json:
|
||||
for contributor in track_json['contributors']:
|
||||
# Skip if already added
|
||||
if not any(artist.name == contributor.get('name') for artist in artists):
|
||||
artists.append(artistTrackObject(
|
||||
name=contributor.get('name', ''),
|
||||
ids=IDs(deezer=contributor.get('id'))
|
||||
))
|
||||
|
||||
# Extract album information
|
||||
album_data = None
|
||||
if "album" in track_json:
|
||||
album_artists = []
|
||||
|
||||
# First check for main contributors if available
|
||||
if "contributors" in track_json:
|
||||
main_artists = [c for c in track_json['contributors'] if c.get('role') == 'Main']
|
||||
if main_artists:
|
||||
album_artists = [artistAlbumTrackObject(
|
||||
name=c.get('name', ''),
|
||||
ids=IDs(deezer=c.get('id'))
|
||||
) for c in main_artists]
|
||||
|
||||
# If no main contributors found and album has its own artist field
|
||||
if not album_artists and "artist" in track_json["album"]:
|
||||
album_artists.append(artistAlbumTrackObject(
|
||||
name=track_json["album"]["artist"].get('name', ''),
|
||||
ids=IDs(deezer=track_json["album"]["artist"].get('id'))
|
||||
))
|
||||
|
||||
album_data = albumTrackObject(
|
||||
album_type=track_json["album"].get('record_type', ''),
|
||||
title=track_json["album"].get('title', ''),
|
||||
ids=IDs(deezer=track_json["album"].get('id')),
|
||||
images=_get_images_from_cover(track_json["album"]),
|
||||
release_date=_parse_release_date(track_json["album"].get('release_date')),
|
||||
artists=album_artists,
|
||||
total_tracks=track_json["album"].get('nb_tracks', 0),
|
||||
genres=[g['name'] for g in track_json["album"].get('genres', {}).get('data', [])]
|
||||
)
|
||||
|
||||
# Create track object
|
||||
track_obj = trackObject(
|
||||
title=track_json.get('title', ''),
|
||||
duration_ms=track_json.get('duration', 0) * 1000,
|
||||
explicit=track_json.get('explicit_lyrics', False),
|
||||
track_number=track_json.get('track_position') or track_json.get('track_number', 0),
|
||||
disc_number=track_json.get('disk_number') or track_json.get('disc_number', 1),
|
||||
ids=IDs(deezer=track_json.get('id'), isrc=track_json.get('isrc')),
|
||||
artists=artists,
|
||||
album=album_data,
|
||||
genres=[g['name'] for g in track_json.get('genres', {}).get('data', [])],
|
||||
)
|
||||
|
||||
return track_obj
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -1,155 +1,176 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
from time import sleep
|
||||
from datetime import datetime
|
||||
from typing import Union
|
||||
from deezspot.deezloader.__utils__ import artist_sort
|
||||
from requests import get as req_get
|
||||
from deezspot.libutils.utils import convert_to_date
|
||||
from deezspot.libutils.others_settings import header
|
||||
from deezspot.exceptions import (
|
||||
NoDataApi,
|
||||
QuotaExceeded,
|
||||
TrackNotFound,
|
||||
MarketAvailabilityError,
|
||||
)
|
||||
from deezspot.libutils.logging_utils import logger
|
||||
import requests
|
||||
from requests import get as req_get
|
||||
from deezspot.exceptions import NoDataApi
|
||||
from deezspot.libutils.logging_utils import logger
|
||||
from .__dee_api__ import tracking, tracking_album, tracking_playlist
|
||||
|
||||
class API:
|
||||
__api_link = "https://api.deezer.com/"
|
||||
__cover = "https://e-cdns-images.dzcdn.net/images/cover/%s/{}-000000-80-0-0.jpg"
|
||||
__album_cache = {}
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def __init__(cls):
|
||||
cls.__api_link = "https://api.deezer.com/"
|
||||
cls.__cover = "https://e-cdns-images.dzcdn.net/images/cover/%s/{}-000000-80-0-0.jpg"
|
||||
cls.headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def __get_api(cls, url, params=None, quota_exceeded=False):
|
||||
def __get_api(cls, url, params=None):
|
||||
try:
|
||||
response = req_get(url, headers=cls.headers, params=params)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
data = response.json()
|
||||
if data.get("error"):
|
||||
logger.error(f"Deezer API error for url {url}: {data['error']}")
|
||||
return data
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to get API data from {url}: {str(e)}")
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
def get_chart(cls, index = 0):
|
||||
url = f"{cls.__api_link}chart/{index}"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def get_track(cls, track_id):
|
||||
url = f"{cls.__api_link}track/{track_id}"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
if infos and infos.get('album') and infos.get('album', {}).get('id'):
|
||||
album_id = infos['album']['id']
|
||||
full_album_json = cls.__album_cache.get(album_id)
|
||||
|
||||
return infos
|
||||
if not full_album_json:
|
||||
try:
|
||||
album_url = f"{cls.__api_link}album/{album_id}"
|
||||
full_album_json = cls.__get_api(album_url)
|
||||
if full_album_json:
|
||||
cls.__album_cache[album_id] = full_album_json
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not fetch full album details for album {album_id}: {e}")
|
||||
full_album_json = None
|
||||
|
||||
if full_album_json:
|
||||
album_data = infos.setdefault('album', {})
|
||||
if 'genres' in full_album_json:
|
||||
album_data['genres'] = full_album_json.get('genres')
|
||||
infos['genres'] = full_album_json.get('genres')
|
||||
if 'nb_tracks' in full_album_json:
|
||||
album_data['nb_tracks'] = full_album_json.get('nb_tracks')
|
||||
if 'record_type' in full_album_json:
|
||||
album_data['record_type'] = full_album_json.get('record_type')
|
||||
if 'contributors' in full_album_json:
|
||||
album_data['contributors'] = full_album_json.get('contributors')
|
||||
# If track doesn't have contributors but album does, use album contributors
|
||||
if 'contributors' not in infos:
|
||||
infos['contributors'] = full_album_json.get('contributors')
|
||||
|
||||
return tracking(infos)
|
||||
|
||||
@classmethod
|
||||
def get_album(cls, album_id):
|
||||
url = f"{cls.__api_link}album/{album_id}"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
if infos.get("error"):
|
||||
logger.error(f"Deezer API error when fetching album {album_id}: {infos.get('error')}")
|
||||
return tracking_album(infos)
|
||||
|
||||
# After fetching with UPC, we get the numeric album ID in the response.
|
||||
numeric_album_id = infos.get('id')
|
||||
if not numeric_album_id:
|
||||
logger.error(f"Could not get numeric album ID for {album_id}")
|
||||
return tracking_album(infos)
|
||||
|
||||
# Check if we need to handle pagination for tracks
|
||||
if infos.get('nb_tracks', 0) > 25 and 'tracks' in infos and 'data' in infos['tracks']:
|
||||
# Get all tracks with pagination
|
||||
all_tracks = infos['tracks']['data']
|
||||
initial_track_count = len(all_tracks)
|
||||
|
||||
logger.debug(f"Album has {infos['nb_tracks']} tracks, initially retrieved {initial_track_count}. Starting pagination.")
|
||||
|
||||
# Keep fetching next pages using index-based pagination
|
||||
page_count = 1
|
||||
remaining_tracks = infos['nb_tracks'] - initial_track_count
|
||||
|
||||
while remaining_tracks > 0:
|
||||
page_count += 1
|
||||
next_url = f"{cls.__api_link}album/{album_id}/tracks?index={initial_track_count + (page_count-2)*25}"
|
||||
logger.debug(f"Fetching page {page_count} of album tracks from: {next_url}")
|
||||
# Get detailed track information from the dedicated tracks endpoint
|
||||
tracks_url = f"{cls.__api_link}album/{numeric_album_id}/tracks?limit=100"
|
||||
detailed_tracks = []
|
||||
|
||||
try:
|
||||
tracks_response = cls.__get_api(tracks_url)
|
||||
if tracks_response and 'data' in tracks_response:
|
||||
detailed_tracks = tracks_response['data']
|
||||
|
||||
try:
|
||||
next_data = cls.__get_api(next_url)
|
||||
if 'data' in next_data and next_data['data']:
|
||||
all_tracks.extend(next_data['data'])
|
||||
logger.debug(f"Now have {len(all_tracks)}/{infos['nb_tracks']} tracks after page {page_count}")
|
||||
remaining_tracks = infos['nb_tracks'] - len(all_tracks)
|
||||
else:
|
||||
logger.warning(f"No data found in next page response, stopping pagination")
|
||||
# Handle pagination for albums with more than 100 tracks
|
||||
next_url = tracks_response.get('next')
|
||||
while next_url:
|
||||
try:
|
||||
next_data = cls.__get_api(next_url)
|
||||
if 'data' in next_data:
|
||||
detailed_tracks.extend(next_data['data'])
|
||||
next_url = next_data.get('next')
|
||||
else:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching next page for album tracks: {str(e)}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching next page: {str(e)}")
|
||||
break
|
||||
|
||||
# Replace the simplified track data in album response with detailed track data
|
||||
if 'tracks' in infos:
|
||||
infos['tracks']['data'] = detailed_tracks
|
||||
|
||||
logger.info(f"Fetched {len(detailed_tracks)} detailed tracks for album {numeric_album_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch detailed tracks for album {numeric_album_id}: {e}")
|
||||
# Continue with regular album tracks if detailed fetch fails
|
||||
|
||||
# Replace the tracks data with our complete list
|
||||
infos['tracks']['data'] = all_tracks
|
||||
logger.info(f"Album pagination complete: retrieved all {len(all_tracks)} tracks for album {album_id}")
|
||||
# Handle pagination for regular album endpoint if detailed fetch failed
|
||||
if infos.get('nb_tracks', 0) > 25 and 'tracks' in infos and 'next' in infos['tracks']:
|
||||
all_tracks = infos['tracks']['data']
|
||||
next_url = infos['tracks']['next']
|
||||
while next_url:
|
||||
try:
|
||||
next_data = cls.__get_api(next_url)
|
||||
if 'data' in next_data:
|
||||
all_tracks.extend(next_data['data'])
|
||||
next_url = next_data.get('next')
|
||||
else:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching next page for album tracks: {str(e)}")
|
||||
break
|
||||
infos['tracks']['data'] = all_tracks
|
||||
|
||||
return infos
|
||||
return tracking_album(infos)
|
||||
|
||||
@classmethod
|
||||
def get_playlist(cls, playlist_id):
|
||||
url = f"{cls.__api_link}playlist/{playlist_id}"
|
||||
infos = cls.__get_api(url)
|
||||
if 'tracks' in infos and 'next' in infos['tracks']:
|
||||
all_tracks = infos['tracks']['data']
|
||||
next_url = infos['tracks']['next']
|
||||
while next_url:
|
||||
try:
|
||||
next_data = cls.__get_api(next_url)
|
||||
if 'data' in next_data:
|
||||
all_tracks.extend(next_data['data'])
|
||||
next_url = next_data.get('next')
|
||||
else:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching next page for playlist tracks: {str(e)}")
|
||||
break
|
||||
infos['tracks']['data'] = all_tracks
|
||||
return tracking_playlist(infos)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def get_episode(cls, episode_id):
|
||||
url = f"{cls.__api_link}episode/{episode_id}"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def get_artist(cls, ids):
|
||||
url = f"{cls.__api_link}artist/{ids}"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def get_artist_top_tracks(cls, ids, limit = 25):
|
||||
url = f"{cls.__api_link}artist/{ids}/top?limit={limit}"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def get_artist_top_albums(cls, ids, limit = 25):
|
||||
url = f"{cls.__api_link}artist/{ids}/albums?limit={limit}"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def get_artist_related(cls, ids):
|
||||
url = f"{cls.__api_link}artist/{ids}/related"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def get_artist_radio(cls, ids):
|
||||
url = f"{cls.__api_link}artist/{ids}/radio"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def get_artist_top_playlists(cls, ids, limit = 25):
|
||||
url = f"{cls.__api_link}artist/{ids}/playlists?limit={limit}"
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def search(cls, query, limit=25):
|
||||
url = f"{cls.__api_link}search"
|
||||
def search(cls, query, limit=25, search_type="track"):
|
||||
url = f"{cls.__api_link}search/{search_type}"
|
||||
params = {
|
||||
"q": query,
|
||||
"limit": limit
|
||||
@@ -158,356 +179,27 @@ class API:
|
||||
|
||||
if infos['total'] == 0:
|
||||
raise NoDataApi(query)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def search_track(cls, query, limit=None):
|
||||
url = f"{cls.__api_link}search/track/?q={query}"
|
||||
|
||||
# Add the limit parameter to the URL if it is provided
|
||||
if limit is not None:
|
||||
url += f"&limit={limit}"
|
||||
if search_type == "track":
|
||||
return [tracking(t) for t in infos.get('data', []) if t]
|
||||
elif search_type == "album":
|
||||
return [tracking_album(a) for a in infos.get('data', []) if a]
|
||||
elif search_type == "playlist":
|
||||
return [tracking_playlist(p) for p in infos.get('data', []) if p]
|
||||
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
if infos['total'] == 0:
|
||||
raise NoDataApi(query)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def search_album(cls, query, limit=None):
|
||||
url = f"{cls.__api_link}search/album/?q={query}"
|
||||
|
||||
# Add the limit parameter to the URL if it is provided
|
||||
if limit is not None:
|
||||
url += f"&limit={limit}"
|
||||
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
if infos['total'] == 0:
|
||||
raise NoDataApi(query)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def search_playlist(cls, query, limit=None):
|
||||
url = f"{cls.__api_link}search/playlist/?q={query}"
|
||||
|
||||
# Add the limit parameter to the URL if it is provided
|
||||
if limit is not None:
|
||||
url += f"&limit={limit}"
|
||||
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
if infos['total'] == 0:
|
||||
raise NoDataApi(query)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def search_artist(cls, query, limit=None):
|
||||
url = f"{cls.__api_link}search/artist/?q={query}"
|
||||
|
||||
# Add the limit parameter to the URL if it is provided
|
||||
if limit is not None:
|
||||
url += f"&limit={limit}"
|
||||
|
||||
infos = cls.__get_api(url)
|
||||
|
||||
if infos['total'] == 0:
|
||||
raise NoDataApi(query)
|
||||
|
||||
return infos
|
||||
|
||||
@classmethod
|
||||
def not_found(cls, song, title):
|
||||
try:
|
||||
data = cls.search_track(song)['data']
|
||||
except NoDataApi:
|
||||
raise TrackNotFound(song)
|
||||
|
||||
ids = None
|
||||
|
||||
for track in data:
|
||||
if (
|
||||
track['title'] == title
|
||||
) or (
|
||||
title in track['title_short']
|
||||
):
|
||||
ids = track['id']
|
||||
break
|
||||
|
||||
if not ids:
|
||||
raise TrackNotFound(song)
|
||||
|
||||
return str(ids)
|
||||
return infos.get('data', [])
|
||||
|
||||
@classmethod
|
||||
def get_img_url(cls, md5_image, size = "1200x1200"):
|
||||
cover = cls.__cover.format(size)
|
||||
image_url = cover % md5_image
|
||||
|
||||
return image_url
|
||||
|
||||
@classmethod
|
||||
def choose_img(cls, md5_image, size = "1200x1200"):
|
||||
image_url = cls.get_img_url(md5_image, size)
|
||||
image = req_get(image_url).content
|
||||
|
||||
if len(image) == 13:
|
||||
logger.debug(f"Received 13-byte image for md5_image: {md5_image}. Attempting fallback image.")
|
||||
image_url = cls.get_img_url("", size)
|
||||
image = req_get(image_url).content
|
||||
if len(image) == 13:
|
||||
logger.warning(f"Fallback image for md5_image {md5_image} (using empty md5) also resulted in a 13-byte response.")
|
||||
|
||||
return image
|
||||
|
||||
@classmethod
|
||||
def tracking(cls, ids, album = False, market = None) -> dict:
|
||||
song_metadata = {}
|
||||
json_track = cls.get_track(ids)
|
||||
|
||||
# Market availability check
|
||||
if market:
|
||||
available_countries = json_track.get("available_countries")
|
||||
track_available_in_specified_markets = False
|
||||
markets_checked_str = ""
|
||||
|
||||
if isinstance(market, list):
|
||||
markets_checked_str = ", ".join([m.upper() for m in market])
|
||||
if available_countries:
|
||||
for m_code in market:
|
||||
if m_code.upper() in available_countries:
|
||||
track_available_in_specified_markets = True
|
||||
break # Found in one market, no need to check further
|
||||
else: # available_countries is None or empty
|
||||
track_available_in_specified_markets = False # Cannot be available if API lists no countries
|
||||
elif isinstance(market, str):
|
||||
markets_checked_str = market.upper()
|
||||
if available_countries and market.upper() in available_countries:
|
||||
track_available_in_specified_markets = True
|
||||
else: # available_countries is None or empty, or market not in list
|
||||
track_available_in_specified_markets = False
|
||||
else:
|
||||
logger.warning(f"Market parameter has an unexpected type: {type(market)}. Skipping market check.")
|
||||
track_available_in_specified_markets = True # Default to available if market param is malformed
|
||||
|
||||
if not track_available_in_specified_markets:
|
||||
track_title = json_track.get('title', 'Unknown Title')
|
||||
artist_name = json_track.get('artist', {}).get('name', 'Unknown Artist')
|
||||
error_msg = f"Track '{track_title}' by '{artist_name}' (ID: {ids}) is not available in market(s): '{markets_checked_str}'."
|
||||
logger.warning(error_msg)
|
||||
raise MarketAvailabilityError(message=error_msg)
|
||||
|
||||
song_metadata['isrc'] = json_track.get('isrc', '')
|
||||
|
||||
if not album:
|
||||
album_ids = json_track['album']['id']
|
||||
album_json = cls.get_album(album_ids)
|
||||
genres = []
|
||||
|
||||
if "genres" in album_json:
|
||||
for genre in album_json['genres']['data']:
|
||||
genres.append(genre['name'])
|
||||
|
||||
song_metadata['genre'] = "; ".join(genres)
|
||||
ar_album = []
|
||||
|
||||
for contributor in album_json['contributors']:
|
||||
if contributor['role'] == "Main":
|
||||
ar_album.append(contributor['name'])
|
||||
|
||||
song_metadata['ar_album'] = "; ".join(ar_album)
|
||||
song_metadata['album'] = album_json['title']
|
||||
song_metadata['label'] = album_json['label']
|
||||
song_metadata['upc'] = album_json.get('upc', '')
|
||||
song_metadata['nb_tracks'] = album_json['nb_tracks']
|
||||
|
||||
song_metadata['music'] = json_track['title']
|
||||
array = []
|
||||
|
||||
for contributor in json_track['contributors']:
|
||||
if contributor['name'] != "":
|
||||
array.append(contributor['name'])
|
||||
|
||||
array.append(
|
||||
json_track['artist']['name']
|
||||
)
|
||||
|
||||
song_metadata['artist'] = artist_sort(array)
|
||||
song_metadata['tracknum'] = json_track['track_position']
|
||||
song_metadata['discnum'] = json_track['disk_number']
|
||||
song_metadata['year'] = convert_to_date(json_track['release_date'])
|
||||
song_metadata['bpm'] = json_track['bpm']
|
||||
song_metadata['duration'] = json_track['duration']
|
||||
song_metadata['gain'] = json_track['gain']
|
||||
|
||||
return song_metadata
|
||||
|
||||
@classmethod
|
||||
def tracking_album(cls, album_json, market = None):
|
||||
song_metadata: dict[
|
||||
str,
|
||||
Union[list, str, int, datetime]
|
||||
] = {
|
||||
"music": [],
|
||||
"artist": [],
|
||||
"tracknum": [],
|
||||
"discnum": [],
|
||||
"bpm": [],
|
||||
"duration": [],
|
||||
"isrc": [],
|
||||
"gain": [],
|
||||
"album": album_json['title'],
|
||||
"label": album_json['label'],
|
||||
"year": convert_to_date(album_json['release_date']),
|
||||
"upc": album_json.get('upc', ''),
|
||||
"nb_tracks": album_json['nb_tracks']
|
||||
}
|
||||
|
||||
genres = []
|
||||
|
||||
if "genres" in album_json:
|
||||
for a in album_json['genres']['data']:
|
||||
genres.append(a['name'])
|
||||
|
||||
song_metadata['genre'] = "; ".join(genres)
|
||||
ar_album = []
|
||||
|
||||
for a in album_json['contributors']:
|
||||
if a['role'] == "Main":
|
||||
ar_album.append(a['name'])
|
||||
|
||||
song_metadata['ar_album'] = "; ".join(ar_album)
|
||||
sm_items = song_metadata.items()
|
||||
|
||||
# Ensure we have all tracks (including paginated ones)
|
||||
album_tracks = album_json['tracks']['data']
|
||||
logger.debug(f"Processing metadata for {len(album_tracks)} tracks in album '{album_json['title']}'")
|
||||
|
||||
for track_idx, track_info_from_album_json in enumerate(album_tracks, 1):
|
||||
c_ids = track_info_from_album_json['id']
|
||||
track_title = track_info_from_album_json.get('title', 'Unknown Track')
|
||||
track_artist = track_info_from_album_json.get('artist', {}).get('name', 'Unknown Artist')
|
||||
|
||||
logger.debug(f"Processing track {track_idx}/{len(album_tracks)}: '{track_title}' by '{track_artist}' (ID: {c_ids})")
|
||||
|
||||
current_track_metadata_or_error = None
|
||||
track_failed = False
|
||||
|
||||
try:
|
||||
# Get detailed metadata for the current track
|
||||
current_track_metadata_or_error = cls.tracking(c_ids, album=True, market=market)
|
||||
except MarketAvailabilityError as e:
|
||||
market_str = market
|
||||
if isinstance(market, list):
|
||||
market_str = ", ".join([m.upper() for m in market])
|
||||
elif isinstance(market, str):
|
||||
market_str = market.upper()
|
||||
logger.warning(f"Track '{track_title}' (ID: {c_ids}) in album '{album_json.get('title','Unknown Album')}' not available in market(s) '{market_str}': {e.message}")
|
||||
current_track_metadata_or_error = {
|
||||
'error_type': 'MarketAvailabilityError',
|
||||
'message': e.message,
|
||||
'name': track_title,
|
||||
'artist': track_artist,
|
||||
'ids': c_ids,
|
||||
'checked_markets': market_str # Store the markets that were checked
|
||||
}
|
||||
track_failed = True
|
||||
except NoDataApi as e_nd:
|
||||
logger.warning(f"Track '{track_title}' (ID: {c_ids}) in album '{album_json.get('title','Unknown Album')}' data not found: {str(e_nd)}")
|
||||
|
||||
# Even when the track API fails, use data from the album response
|
||||
if not track_failed:
|
||||
current_track_metadata_or_error = {
|
||||
'music': track_title,
|
||||
'artist': track_artist,
|
||||
'tracknum': str(track_info_from_album_json.get('track_position', track_idx)),
|
||||
'discnum': str(track_info_from_album_json.get('disk_number', 1)),
|
||||
'duration': track_info_from_album_json.get('duration', 0)
|
||||
}
|
||||
else:
|
||||
current_track_metadata_or_error = {
|
||||
'error_type': 'NoDataApi',
|
||||
'message': str(e_nd),
|
||||
'name': track_title,
|
||||
'artist': track_artist,
|
||||
'ids': c_ids
|
||||
}
|
||||
track_failed = True
|
||||
except requests.exceptions.ConnectionError as e_conn:
|
||||
logger.warning(f"Connection error fetching metadata for track '{track_title}' (ID: {c_ids}) in album '{album_json.get('title','Unknown Album')}': {str(e_conn)}")
|
||||
|
||||
# Use album track data as fallback
|
||||
if not track_failed:
|
||||
current_track_metadata_or_error = {
|
||||
'music': track_title,
|
||||
'artist': track_artist,
|
||||
'tracknum': str(track_info_from_album_json.get('track_position', track_idx)),
|
||||
'discnum': str(track_info_from_album_json.get('disk_number', 1)),
|
||||
'duration': track_info_from_album_json.get('duration', 0)
|
||||
}
|
||||
else:
|
||||
current_track_metadata_or_error = {
|
||||
'error_type': 'ConnectionError',
|
||||
'message': f"Connection error: {str(e_conn)}",
|
||||
'name': track_title,
|
||||
'artist': track_artist,
|
||||
'ids': c_ids
|
||||
}
|
||||
track_failed = True
|
||||
except Exception as e_other_track_meta:
|
||||
logger.warning(f"Unexpected error fetching metadata for track '{track_title}' (ID: {c_ids}) in album '{album_json.get('title','Unknown Album')}': {str(e_other_track_meta)}")
|
||||
|
||||
# Use album track data as fallback
|
||||
if not track_failed:
|
||||
current_track_metadata_or_error = {
|
||||
'music': track_title,
|
||||
'artist': track_artist,
|
||||
'tracknum': str(track_info_from_album_json.get('track_position', track_idx)),
|
||||
'discnum': str(track_info_from_album_json.get('disk_number', 1)),
|
||||
'duration': track_info_from_album_json.get('duration', 0)
|
||||
}
|
||||
else:
|
||||
current_track_metadata_or_error = {
|
||||
'error_type': 'TrackMetadataError',
|
||||
'message': str(e_other_track_meta),
|
||||
'name': track_title,
|
||||
'artist': track_artist,
|
||||
'ids': c_ids
|
||||
}
|
||||
track_failed = True
|
||||
|
||||
for key, list_template in sm_items:
|
||||
if isinstance(list_template, list):
|
||||
if track_failed:
|
||||
if key == 'music':
|
||||
song_metadata[key].append(current_track_metadata_or_error)
|
||||
elif key == 'artist' and isinstance(current_track_metadata_or_error, dict):
|
||||
song_metadata[key].append(current_track_metadata_or_error.get('artist'))
|
||||
elif key == 'ids' and isinstance(current_track_metadata_or_error, dict):
|
||||
pass
|
||||
else:
|
||||
song_metadata[key].append(None)
|
||||
else:
|
||||
if key in current_track_metadata_or_error:
|
||||
song_metadata[key].append(current_track_metadata_or_error.get(key))
|
||||
elif key == 'music':
|
||||
# Fallback to track title from album data
|
||||
song_metadata[key].append(track_title)
|
||||
elif key == 'artist':
|
||||
# Fallback to artist from album data
|
||||
song_metadata[key].append(track_artist)
|
||||
elif key == 'tracknum':
|
||||
# Fallback to position in album
|
||||
song_metadata[key].append(str(track_info_from_album_json.get('track_position', track_idx)))
|
||||
elif key == 'discnum':
|
||||
# Fallback to disk number from album data
|
||||
song_metadata[key].append(str(track_info_from_album_json.get('disk_number', 1)))
|
||||
else:
|
||||
song_metadata[key].append(None)
|
||||
|
||||
return song_metadata
|
||||
|
||||
@@ -150,6 +150,22 @@ class ProgressReporter:
|
||||
# }
|
||||
#
|
||||
|
||||
def _remove_nulls(data):
|
||||
"""
|
||||
Recursively remove null values from dictionaries and lists.
|
||||
|
||||
Args:
|
||||
data: Any Python data structure (dict, list, etc.)
|
||||
|
||||
Returns:
|
||||
The same structure with null values removed
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
return {k: _remove_nulls(v) for k, v in data.items() if v is not None}
|
||||
elif isinstance(data, list):
|
||||
return [_remove_nulls(item) for item in data if item is not None]
|
||||
return data
|
||||
|
||||
def report_progress(
|
||||
reporter: Optional["ProgressReporter"],
|
||||
callback_obj: Union[trackCallbackObject, albumCallbackObject, playlistCallbackObject]
|
||||
@@ -165,8 +181,9 @@ def report_progress(
|
||||
if not isinstance(callback_obj, (trackCallbackObject, albumCallbackObject, playlistCallbackObject)):
|
||||
raise TypeError(f"callback_obj must be of type trackCallbackObject, albumCallbackObject, or playlistCallbackObject, got {type(callback_obj)}")
|
||||
|
||||
# Convert the callback object to a dictionary and report it
|
||||
report_dict = asdict(callback_obj)
|
||||
# Convert the callback object to a dictionary and filter out null values
|
||||
report_dict = _remove_nulls(asdict(callback_obj))
|
||||
|
||||
if reporter:
|
||||
reporter.report(report_dict)
|
||||
else:
|
||||
|
||||
@@ -164,6 +164,13 @@ def apply_custom_format(format_str, metadata: dict, pad_tracks=True) -> str:
|
||||
# Original non-indexed placeholder logic (for %album%, %title%, %artist%, %ar_album%, etc.)
|
||||
value = metadata.get(full_key, '')
|
||||
|
||||
# Handle None values safely
|
||||
if value is None:
|
||||
if full_key in ['tracknum', 'discnum']:
|
||||
value = '1' if full_key == 'discnum' else '0'
|
||||
else:
|
||||
value = ''
|
||||
|
||||
if full_key == 'year' and value:
|
||||
if isinstance(value, datetime):
|
||||
return str(value.year)
|
||||
|
||||
@@ -33,6 +33,7 @@ class trackAlbumObject:
|
||||
genres: List[str] = field(default_factory=list)
|
||||
ids: IDs = field(default_factory=IDs)
|
||||
artists: List[artistTrackAlbumObject] = field(default_factory=list)
|
||||
explicit: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -46,6 +46,7 @@ class trackPlaylistObject:
|
||||
ids: IDs = field(default_factory=IDs)
|
||||
disc_number: int = 1
|
||||
track_number: int = 1
|
||||
explicit: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
Reference in New Issue
Block a user