341 lines
13 KiB
Python
341 lines
13 KiB
Python
#!/usr/bin/python3
|
|
|
|
import re
|
|
from os import makedirs
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
from requests import get as req_get
|
|
from zipfile import ZipFile, ZIP_DEFLATED
|
|
from deezspot.models.track import Track
|
|
from deezspot.exceptions import InvalidLink
|
|
from deezspot.libutils.others_settings import supported_link, header
|
|
from deezspot.libutils.logging_utils import ProgressReporter, logger
|
|
|
|
from os.path import (
|
|
isdir, basename,
|
|
join, isfile, dirname
|
|
)
|
|
|
|
def link_is_valid(link):
|
|
netloc = urlparse(link).netloc
|
|
|
|
if not any(
|
|
c_link == netloc
|
|
for c_link in supported_link
|
|
):
|
|
raise InvalidLink(link)
|
|
|
|
def get_ids(link):
|
|
parsed = urlparse(link)
|
|
path = parsed.path
|
|
ids = path.split("/")[-1]
|
|
|
|
return ids
|
|
|
|
def request(url):
|
|
thing = req_get(url, headers=header)
|
|
return thing
|
|
|
|
def __check_dir(directory):
|
|
if not isdir(directory):
|
|
makedirs(directory)
|
|
|
|
def sanitize_name(string, max_length=200):
|
|
"""Sanitize a string for use as a filename or directory name.
|
|
|
|
Args:
|
|
string: The string to sanitize
|
|
max_length: Maximum length for the resulting string
|
|
|
|
Returns:
|
|
A sanitized string safe for use in file paths
|
|
"""
|
|
if string is None:
|
|
return "Unknown"
|
|
|
|
# Convert to string if not already
|
|
string = str(string)
|
|
|
|
# Enhance character replacement for filenames
|
|
replacements = {
|
|
"\\": "-", # Backslash to hyphen
|
|
"/": "-", # Forward slash to hyphen
|
|
":": "-", # Colon to hyphen
|
|
"*": "+", # Asterisk to plus
|
|
"?": "", # Question mark removed
|
|
"\"": "'", # Double quote to single quote
|
|
"<": "[", # Less than to open bracket
|
|
">": "]", # Greater than to close bracket
|
|
"|": "-", # Pipe to hyphen
|
|
"&": "and", # Ampersand to 'and'
|
|
"$": "s", # Dollar to 's'
|
|
";": ",", # Semicolon to comma
|
|
"\t": " ", # Tab to space
|
|
"\n": " ", # Newline to space
|
|
"\r": " ", # Carriage return to space
|
|
"\0": "", # Null byte removed
|
|
}
|
|
|
|
for old, new in replacements.items():
|
|
string = string.replace(old, new)
|
|
|
|
# Remove any other non-printable characters
|
|
string = ''.join(char for char in string if char.isprintable())
|
|
|
|
# Remove leading/trailing whitespace
|
|
string = string.strip()
|
|
|
|
# Replace multiple spaces with a single space
|
|
string = re.sub(r'\s+', ' ', string)
|
|
|
|
# Truncate if too long
|
|
if len(string) > max_length:
|
|
string = string[:max_length]
|
|
|
|
# Ensure we don't end with a dot or space (can cause issues in some filesystems)
|
|
string = string.rstrip('. ')
|
|
|
|
# Provide a fallback for empty strings
|
|
if not string:
|
|
string = "Unknown"
|
|
|
|
return string
|
|
|
|
# Keep the original function name for backward compatibility
|
|
def var_excape(string):
|
|
"""Legacy function name for backward compatibility."""
|
|
return sanitize_name(string)
|
|
|
|
def convert_to_date(date: str):
|
|
if date == "0000-00-00":
|
|
date = "0001-01-01"
|
|
elif date.isdigit():
|
|
date = f"{date}-01-01"
|
|
date = datetime.strptime(date, "%Y-%m-%d")
|
|
return date
|
|
|
|
def what_kind(link):
|
|
url = request(link).url
|
|
if url.endswith("/"):
|
|
url = url[:-1]
|
|
return url
|
|
|
|
def __get_tronc(string):
|
|
return string[:len(string) - 1]
|
|
|
|
def apply_custom_format(format_str, metadata: dict, pad_tracks=True) -> str:
|
|
def replacer(match):
|
|
full_key = match.group(1) # e.g., "artist", "ar_album_1"
|
|
|
|
# Check for specific indexed placeholders: artist_INDEX or ar_album_INDEX
|
|
# Allows %artist_1%, %ar_album_1%, etc.
|
|
indexed_artist_match = re.fullmatch(r'(artist|ar_album)_(\d+)', full_key)
|
|
|
|
if indexed_artist_match:
|
|
base_key = indexed_artist_match.group(1) # "artist" or "ar_album"
|
|
try:
|
|
index = int(indexed_artist_match.group(2))
|
|
except ValueError: # Should not happen with \d+ but good practice
|
|
return ""
|
|
|
|
|
|
raw_value = metadata.get(base_key) # Get the value of "artist" or "ar_album"
|
|
items = []
|
|
|
|
if isinstance(raw_value, str):
|
|
# Split semicolon-separated strings and strip whitespace
|
|
items = [item.strip() for item in raw_value.split(';') if item.strip()]
|
|
elif isinstance(raw_value, list):
|
|
# Convert all items to string, strip whitespace
|
|
items = [str(item).strip() for item in raw_value if str(item).strip()]
|
|
# If raw_value is not string or list, items remains []
|
|
|
|
if items: # If we have a list of artists/ar_album
|
|
if 1 <= index <= len(items):
|
|
return items[index - 1]
|
|
elif items: # Index out of bounds, but list is not empty
|
|
return items[0] # Fallback to the first item
|
|
# If items is empty after processing, fall through
|
|
|
|
# Fallback if no items or base_key was not found or not list/string
|
|
return ""
|
|
|
|
else:
|
|
# Original non-indexed placeholder logic (for %album%, %title%, %artist%, %ar_album%, etc.)
|
|
value = metadata.get(full_key, '')
|
|
|
|
if full_key == 'year' and value:
|
|
if isinstance(value, datetime):
|
|
return str(value.year)
|
|
# Fallback for string-based dates like "YYYY-MM-DD" or just "YYYY"
|
|
return str(value).split('-')[0]
|
|
|
|
if pad_tracks and full_key in ['tracknum', 'discnum']:
|
|
str_value = str(value)
|
|
# Pad with leading zero if it's a single digit
|
|
if str_value.isdigit() and len(str_value) == 1:
|
|
return str_value.zfill(2)
|
|
return str(value)
|
|
|
|
return re.sub(r'%([^%]+)%', replacer, format_str)
|
|
|
|
def __get_dir(song_metadata, output_dir, custom_dir_format=None, pad_tracks=True):
|
|
# If custom_dir_format is explicitly empty or None, use output_dir directly
|
|
if not custom_dir_format:
|
|
# Ensure output_dir itself exists, as __check_dir won't be called on a subpath
|
|
__check_dir(output_dir)
|
|
return output_dir
|
|
|
|
# Apply the custom format string.
|
|
# pad_tracks is passed along in case 'tracknum' or 'discnum' are used in dir format.
|
|
formatted_path_segment = apply_custom_format(custom_dir_format, song_metadata, pad_tracks)
|
|
|
|
# Sanitize each component of the formatted path segment
|
|
sanitized_path_segment = "/".join(
|
|
sanitize_name(part) for part in formatted_path_segment.split("/")
|
|
)
|
|
|
|
# Join with the base output directory
|
|
path = join(output_dir, sanitized_path_segment)
|
|
|
|
# __check_dir will create the directory if it doesn't exist.
|
|
__check_dir(path)
|
|
return path
|
|
|
|
def set_path(
|
|
song_metadata, output_dir,
|
|
song_quality, file_format,
|
|
is_episode=False,
|
|
custom_dir_format=None,
|
|
custom_track_format=None,
|
|
pad_tracks=True
|
|
):
|
|
# Determine the directory for the song
|
|
directory = __get_dir(
|
|
song_metadata,
|
|
output_dir,
|
|
custom_dir_format=custom_dir_format,
|
|
pad_tracks=pad_tracks
|
|
)
|
|
|
|
# Determine the filename for the song
|
|
# Default track format if no custom one is provided
|
|
if custom_track_format is None:
|
|
if is_episode:
|
|
# Default for episodes: %music%
|
|
custom_track_format = "%music%"
|
|
else:
|
|
# Default for tracks: %artist% - %music%
|
|
custom_track_format = "%artist% - %music%"
|
|
|
|
# Prepare metadata for formatting, including quality if available
|
|
effective_metadata = dict(song_metadata) # Create a mutable copy
|
|
if song_quality:
|
|
effective_metadata['quality'] = f"[{song_quality}]"
|
|
# else: if song_quality is None or empty, 'quality' won't be in effective_metadata,
|
|
# so %quality% placeholder will be replaced by an empty string by apply_custom_format.
|
|
|
|
# Apply the custom format string for the track filename.
|
|
# pad_tracks is passed along for track/disc numbers in filename.
|
|
track_filename_base = apply_custom_format(custom_track_format, effective_metadata, pad_tracks)
|
|
track_filename_base = sanitize_name(track_filename_base)
|
|
|
|
# Add file format (extension) to the filename
|
|
if file_format:
|
|
ext = file_format if file_format.startswith('.') else f".{file_format}"
|
|
filename = f"{track_filename_base}{ext}"
|
|
else: # No file_format provided (should not happen for standard audio, but handle defensively)
|
|
filename = track_filename_base
|
|
|
|
return join(directory, filename)
|
|
|
|
def create_zip(
|
|
tracks: list[Track],
|
|
output_dir=None,
|
|
song_metadata=None, # Album/Playlist level metadata
|
|
song_quality=None, # Overall quality for the zip, if applicable
|
|
zip_name=None, # Specific name for the zip file
|
|
custom_dir_format=None # To determine zip name if not provided, and for paths inside zip
|
|
):
|
|
# Determine the zip file name and path
|
|
if zip_name:
|
|
# If zip_name is a full path, use it as is.
|
|
# Otherwise, prepend output_dir.
|
|
if not basename(zip_name) == zip_name: # Checks if it's just a filename
|
|
actual_zip_path = zip_name
|
|
else:
|
|
# Ensure output_dir exists for placing the zip file
|
|
if not output_dir:
|
|
# Fallback to a default if output_dir is not provided with a relative zip_name
|
|
output_dir = "."
|
|
__check_dir(output_dir)
|
|
actual_zip_path = join(output_dir, zip_name)
|
|
elif song_metadata and output_dir: # Construct default name if song_metadata and output_dir exist
|
|
# Use album/playlist name and quality for default zip name
|
|
# Sanitize the album/playlist name part of the zip file
|
|
name_part = sanitize_name(song_metadata.get('album', song_metadata.get('name', 'archive')))
|
|
quality_part = f" [{song_quality}]" if song_quality else ""
|
|
actual_zip_path = join(output_dir, f"{name_part}{quality_part}.zip")
|
|
else:
|
|
# Fallback zip name if not enough info
|
|
actual_zip_path = join(output_dir if output_dir else ".", "archive.zip")
|
|
|
|
# Ensure the directory for the zip file exists
|
|
zip_dir = dirname(actual_zip_path)
|
|
__check_dir(zip_dir)
|
|
|
|
with ZipFile(actual_zip_path, 'w', ZIP_DEFLATED) as zf:
|
|
for track in tracks:
|
|
if track.success and isfile(track.song_path):
|
|
# Determine path inside the zip
|
|
# This uses the same logic as saving individual files,
|
|
# but relative to the zip root.
|
|
# We pass an empty string as base_output_dir to set_path essentially,
|
|
# so it generates a relative path structure.
|
|
path_in_zip = set_path(
|
|
track.tags, # Use individual track metadata for path inside zip
|
|
"", # Base output dir (empty for relative paths in zip)
|
|
track.quality,
|
|
track.file_format,
|
|
custom_dir_format=custom_dir_format, # Use album/playlist custom dir format
|
|
custom_track_format=track.tags.get('custom_track_format'), # Use track specific if available
|
|
pad_tracks=track.tags.get('pad_tracks', True)
|
|
)
|
|
# Remove leading slash if any, to ensure it's relative inside zip
|
|
path_in_zip = path_in_zip.lstrip('/').lstrip('\\')
|
|
|
|
zf.write(track.song_path, arcname=path_in_zip)
|
|
return actual_zip_path
|
|
|
|
def trasform_sync_lyric(lyric):
|
|
sync_array = []
|
|
for a in lyric:
|
|
if "milliseconds" in a:
|
|
arr = (a['line'], int(a['milliseconds']))
|
|
sync_array.append(arr)
|
|
return sync_array
|
|
|
|
def save_cover_image(image_data: bytes, directory_path: str, cover_filename: str = "cover.jpg"):
|
|
if not image_data:
|
|
logger.warning(f"No image data provided to save cover in {directory_path}.")
|
|
return
|
|
|
|
if not isdir(directory_path):
|
|
# This case should ideally be handled by prior directory creation (e.g., __get_dir)
|
|
# but as a fallback, we can try to create it or log a warning.
|
|
logger.warning(f"Directory {directory_path} does not exist. Attempting to create it for cover image.")
|
|
try:
|
|
makedirs(directory_path, exist_ok=True)
|
|
logger.info(f"Created directory {directory_path} for cover image.")
|
|
except OSError as e:
|
|
logger.error(f"Failed to create directory {directory_path} for cover: {e}")
|
|
return
|
|
|
|
cover_path = join(directory_path, cover_filename)
|
|
try:
|
|
with open(cover_path, "wb") as f:
|
|
f.write(image_data)
|
|
logger.info(f"Successfully saved cover image to {cover_path}")
|
|
except OSError as e:
|
|
logger.error(f"Failed to save cover image to {cover_path}: {e}")
|