fixed audio convertion fr this time

This commit is contained in:
Xoconoch
2025-06-03 22:25:12 -06:00
parent d38c64c643
commit 93d626bbc4
10 changed files with 1263 additions and 982 deletions

View File

@@ -15,6 +15,7 @@ AUDIO_FORMATS = {
"extension": ".mp3",
"mime": "audio/mpeg",
"ffmpeg_codec": "libmp3lame",
"ffmpeg_format_flag": "mp3",
"default_bitrate": "320k",
"bitrates": ["32k", "64k", "96k", "128k", "192k", "256k", "320k"],
},
@@ -22,6 +23,7 @@ AUDIO_FORMATS = {
"extension": ".m4a",
"mime": "audio/mp4",
"ffmpeg_codec": "aac",
"ffmpeg_format_flag": "ipod",
"default_bitrate": "256k",
"bitrates": ["32k", "64k", "96k", "128k", "192k", "256k"],
},
@@ -29,6 +31,7 @@ AUDIO_FORMATS = {
"extension": ".ogg",
"mime": "audio/ogg",
"ffmpeg_codec": "libvorbis",
"ffmpeg_format_flag": "ogg",
"default_bitrate": "256k",
"bitrates": ["64k", "96k", "128k", "192k", "256k", "320k"],
},
@@ -36,6 +39,7 @@ AUDIO_FORMATS = {
"extension": ".opus",
"mime": "audio/opus",
"ffmpeg_codec": "libopus",
"ffmpeg_format_flag": "opus",
"default_bitrate": "128k",
"bitrates": ["32k", "64k", "96k", "128k", "192k", "256k"],
},
@@ -43,6 +47,7 @@ AUDIO_FORMATS = {
"extension": ".flac",
"mime": "audio/flac",
"ffmpeg_codec": "flac",
"ffmpeg_format_flag": "flac",
"default_bitrate": None, # Lossless, no bitrate needed
"bitrates": [],
},
@@ -50,6 +55,7 @@ AUDIO_FORMATS = {
"extension": ".wav",
"mime": "audio/wav",
"ffmpeg_codec": "pcm_s16le",
"ffmpeg_format_flag": "wav",
"default_bitrate": None, # Lossless, no bitrate needed
"bitrates": [],
},
@@ -57,6 +63,7 @@ AUDIO_FORMATS = {
"extension": ".m4a",
"mime": "audio/mp4",
"ffmpeg_codec": "alac",
"ffmpeg_format_flag": "ipod",
"default_bitrate": None, # Lossless, no bitrate needed
"bitrates": [],
}
@@ -69,50 +76,6 @@ def check_ffmpeg_available():
return False
return True
def parse_format_string(format_string):
"""
Parse a format string like "MP3_320" into (format, bitrate).
Returns (format_name, bitrate) or (None, None) if invalid.
"""
if not format_string or format_string.lower() == "false":
return None, None
# Check for format with bitrate specification
format_match = re.match(r"^([A-Za-z]+)(?:_(\d+[kK]))?$", format_string)
if format_match:
format_name = format_match.group(1).upper()
bitrate = format_match.group(2)
# Validate format name
if format_name not in AUDIO_FORMATS:
logger.warning(f"Unknown audio format: {format_name}. Using original format.")
return None, None
# If format is lossless but bitrate was specified, log a warning
if bitrate and AUDIO_FORMATS[format_name]["default_bitrate"] is None:
logger.warning(f"Bitrate specified for lossless format {format_name}. Ignoring bitrate.")
bitrate = None
# If bitrate wasn't specified, use default
if not bitrate and AUDIO_FORMATS[format_name]["default_bitrate"]:
bitrate = AUDIO_FORMATS[format_name]["default_bitrate"]
# Validate bitrate if specified
if bitrate and AUDIO_FORMATS[format_name]["bitrates"] and bitrate.lower() not in [b.lower() for b in AUDIO_FORMATS[format_name]["bitrates"]]:
logger.warning(f"Invalid bitrate {bitrate} for {format_name}. Using default {AUDIO_FORMATS[format_name]['default_bitrate']}.")
bitrate = AUDIO_FORMATS[format_name]["default_bitrate"]
return format_name, bitrate
# Simple format name without bitrate
if format_string.upper() in AUDIO_FORMATS:
format_name = format_string.upper()
bitrate = AUDIO_FORMATS[format_name]["default_bitrate"]
return format_name, bitrate
logger.warning(f"Invalid format specification: {format_string}. Using original format.")
return None, None
def get_output_path(input_path, format_name):
"""Get the output path with the new extension based on the format."""
if not format_name or format_name not in AUDIO_FORMATS:
@@ -155,7 +118,7 @@ def convert_audio(input_path, format_name=None, bitrate=None, register_func=None
Args:
input_path: Path to the input audio file
format_name: Target format name (e.g., 'MP3', 'OGG', 'FLAC')
bitrate: Target bitrate (e.g., '320k', '128k')
bitrate: Target bitrate (e.g., '320k', '128k'). If None, uses default for lossy formats.
register_func: Function to register a file as being actively downloaded
unregister_func: Function to unregister a file from the active downloads list
@@ -176,21 +139,45 @@ def convert_audio(input_path, format_name=None, bitrate=None, register_func=None
return input_path
# Validate format and get format details
if format_name not in AUDIO_FORMATS:
format_name_upper = format_name.upper()
if format_name_upper not in AUDIO_FORMATS:
logger.warning(f"Unknown format: {format_name}. Using original format.")
return input_path
format_details = AUDIO_FORMATS[format_name]
format_details = AUDIO_FORMATS[format_name_upper]
# Skip conversion if the file is already in the target format
# Determine effective bitrate
effective_bitrate = bitrate
if format_details["default_bitrate"] is not None: # Lossy format
if effective_bitrate:
# Validate provided bitrate
if effective_bitrate.lower() not in [b.lower() for b in format_details["bitrates"]]:
logger.warning(f"Invalid bitrate {effective_bitrate} for {format_name_upper}. Using default {format_details['default_bitrate']}.")
effective_bitrate = format_details["default_bitrate"]
else: # No bitrate provided for lossy format, use default
effective_bitrate = format_details["default_bitrate"]
elif effective_bitrate: # Lossless format but bitrate was specified
logger.warning(f"Bitrate specified for lossless format {format_name_upper}. Ignoring bitrate.")
effective_bitrate = None
# Skip conversion if the file is already in the target format and bitrate matches (or not applicable)
if input_path.lower().endswith(format_details["extension"].lower()):
# Only do conversion if a specific bitrate is requested
if not bitrate or format_details["default_bitrate"] is None:
logger.info(f"File {input_path} is already in {format_name} format. Skipping conversion.")
return input_path
# For lossless, or if effective_bitrate matches (or no specific bitrate needed for format)
if format_details["default_bitrate"] is None: # Lossless
logger.info(f"File {input_path} is already in {format_name_upper} (lossless) format. Skipping conversion.")
return input_path
# For lossy, if no specific bitrate was relevant (already handled by effective_bitrate logic)
# This condition might be redundant if we always convert to ensure bitrate.
# Let's assume for now, if it's already the right extension, we don't re-encode unless bitrate implies so.
# However, the original logic converted if bitrate was specified even for same extension.
# To maintain similar behavior: if a bitrate is effectively set for a lossy format, we proceed.
# If effective_bitrate is None (e.g. for FLAC, WAV), and extension matches, skip.
if not effective_bitrate and format_details["default_bitrate"] is not None:
logger.info(f"File {input_path} is already in {format_name_upper} format with a suitable bitrate. Skipping conversion.")
return input_path
# Get the output path
output_path = get_output_path(input_path, format_name)
output_path = get_output_path(input_path, format_name_upper)
# Use a temporary file for the conversion to avoid conflicts
temp_output = output_path + ".tmp"
@@ -201,24 +188,28 @@ def convert_audio(input_path, format_name=None, bitrate=None, register_func=None
try:
cmd = ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", input_path]
# Add bitrate parameter for lossy formats
if bitrate and format_details["bitrates"]:
cmd.extend(["-b:a", bitrate])
# Add bitrate parameter for lossy formats if an effective_bitrate is set
if effective_bitrate and format_details["bitrates"]: # format_details["bitrates"] implies lossy
cmd.extend(["-b:a", effective_bitrate])
# Add codec parameter
cmd.extend(["-c:a", format_details["ffmpeg_codec"]])
# Add format flag
if "ffmpeg_format_flag" in format_details:
cmd.extend(["-f", format_details["ffmpeg_format_flag"]])
# For some formats, add additional parameters
if format_name == "MP3":
if format_name_upper == "MP3":
# Use high quality settings for MP3
if not bitrate or int(bitrate.replace('k', '')) >= 256:
if not effective_bitrate or int(effective_bitrate.replace('k', '')) >= 256:
cmd.extend(["-q:a", "0"])
# Add output file
cmd.append(temp_output)
# Run the conversion
logger.info(f"Converting {input_path} to {format_name}" + (f" at {bitrate}" if bitrate else ""))
logger.info(f"Converting {input_path} to {format_name_upper}" + (f" at {effective_bitrate}" if effective_bitrate else ""))
process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if process.returncode != 0:
@@ -240,7 +231,7 @@ def convert_audio(input_path, format_name=None, bitrate=None, register_func=None
os.remove(input_path)
unregister_active_download(input_path)
logger.info(f"Successfully converted to {format_name}" + (f" at {bitrate}" if bitrate else ""))
logger.info(f"Successfully converted to {format_name_upper}" + (f" at {effective_bitrate}" if effective_bitrate else ""))
return output_path
except Exception as e:

View File

@@ -0,0 +1,64 @@
import os
import sys
import signal
import atexit
from deezspot.libutils.logging_utils import logger
# --- Global tracking of active downloads ---
ACTIVE_DOWNLOADS = set()
CLEANUP_LOCK = False
CURRENT_DOWNLOAD = None
def register_active_download(file_path):
"""Register a file as being actively downloaded"""
global CURRENT_DOWNLOAD
ACTIVE_DOWNLOADS.add(file_path)
CURRENT_DOWNLOAD = file_path
def unregister_active_download(file_path):
"""Remove a file from the active downloads list"""
global CURRENT_DOWNLOAD
if file_path in ACTIVE_DOWNLOADS:
ACTIVE_DOWNLOADS.remove(file_path)
if CURRENT_DOWNLOAD == file_path:
CURRENT_DOWNLOAD = None
def cleanup_active_downloads():
"""Clean up any incomplete downloads during process termination"""
global CLEANUP_LOCK, CURRENT_DOWNLOAD
if CLEANUP_LOCK:
return
CLEANUP_LOCK = True
# Only remove the file that was in progress when stopped
if CURRENT_DOWNLOAD:
try:
if os.path.exists(CURRENT_DOWNLOAD):
logger.info(f"Removing incomplete download: {CURRENT_DOWNLOAD}")
os.remove(CURRENT_DOWNLOAD)
# No need to call unregister_active_download here,
# as the process is terminating.
except Exception as e:
logger.error(f"Error cleaning up file {CURRENT_DOWNLOAD}: {str(e)}")
CLEANUP_LOCK = False
# Register the cleanup function to run on exit
atexit.register(cleanup_active_downloads)
# Set up signal handlers
def signal_handler(sig, frame):
logger.info(f"Received termination signal {sig}. Cleaning up...")
cleanup_active_downloads()
if sig == signal.SIGINT:
logger.info("CTRL+C received. Exiting...")
sys.exit(0)
# Register signal handlers for common termination signals
signal.signal(signal.SIGINT, signal_handler) # CTRL+C
signal.signal(signal.SIGTERM, signal_handler) # Normal termination
try:
# These may not be available on all platforms
signal.signal(signal.SIGHUP, signal_handler) # Terminal closed
signal.signal(signal.SIGQUIT, signal_handler) # CTRL+\
except AttributeError:
pass

View File

@@ -0,0 +1,159 @@
#!/usr/bin/python3
import os
from mutagen import File
from mutagen.easyid3 import EasyID3
from mutagen.oggvorbis import OggVorbis
from mutagen.flac import FLAC
# from mutagen.mp4 import MP4 # MP4 is usually handled by File for .m4a
# AUDIO_FORMATS and get_output_path will be imported from audio_converter
# We need to ensure this doesn't create circular dependencies.
# If audio_converter also imports something from libutils that might import this,
# it could be an issue. For now, proceeding with direct import.
from deezspot.libutils.audio_converter import AUDIO_FORMATS, get_output_path
# Logger instance will be passed as an argument to functions that need it.
def read_metadata_from_file(file_path, logger):
"""Reads title and album metadata from an audio file."""
try:
if not os.path.isfile(file_path):
logger.debug(f"File not found for metadata reading: {file_path}")
return None, None
audio = File(file_path, easy=False) # easy=False to access format-specific tags better
if audio is None:
logger.warning(f"Could not load audio file with mutagen: {file_path}")
return None, None
title = None
album = None
if isinstance(audio, EasyID3): # MP3
title = audio.get('title', [None])[0]
album = audio.get('album', [None])[0]
elif isinstance(audio, OggVorbis): # OGG
title = audio.get('TITLE', [None])[0] # Vorbis tags are case-insensitive but typically uppercase
album = audio.get('ALBUM', [None])[0]
elif isinstance(audio, FLAC): # FLAC
title = audio.get('TITLE', [None])[0]
album = audio.get('ALBUM', [None])[0]
elif file_path.lower().endswith('.m4a'): # M4A (AAC/ALAC)
# Mutagen's File(filepath) for .m4a returns an MP4 object
title = audio.get('\xa9nam', [None])[0] # iTunes title tag
album = audio.get('\xa9alb', [None])[0] # iTunes album tag
else:
logger.warning(f"Unsupported file type for metadata extraction by read_metadata_from_file: {file_path} (type: {type(audio)})")
return None, None
return title, album
except Exception as e:
logger.error(f"Error reading metadata from {file_path}: {str(e)}")
return None, None
def check_track_exists(original_song_path, title, album, convert_to, logger):
"""Checks if a track exists, considering original and target converted formats.
Args:
original_song_path (str): The expected path for the song in its original download format.
title (str): The title of the track to check.
album (str): The album of the track to check.
convert_to (str | None): The target format for conversion (e.g., 'MP3', 'FLAC'), or None.
logger (logging.Logger): Logger instance.
Returns:
tuple[bool, str | None]: (True, path_to_existing_file) if exists, else (False, None).
"""
scan_dir = os.path.dirname(original_song_path)
if not os.path.exists(scan_dir):
logger.debug(f"Scan directory {scan_dir} does not exist. Track cannot exist.")
return False, None
# Priority 1: Check if the file exists in the target converted format
if convert_to:
target_format_upper = convert_to.upper()
if target_format_upper in AUDIO_FORMATS:
final_expected_converted_path = get_output_path(original_song_path, target_format_upper)
final_target_ext = AUDIO_FORMATS[target_format_upper]["extension"].lower()
# Check exact predicted path for converted file
if os.path.exists(final_expected_converted_path):
existing_title, existing_album = read_metadata_from_file(final_expected_converted_path, logger)
if existing_title == title and existing_album == album:
logger.info(f"Found existing track (exact converted path match): {title} - {album} at {final_expected_converted_path}")
return True, final_expected_converted_path
# Scan directory for other files with the target extension
for file_in_dir in os.listdir(scan_dir):
if file_in_dir.lower().endswith(final_target_ext):
file_path_to_check = os.path.join(scan_dir, file_in_dir)
# Skip if it's the same as the one we just checked (and it matched or didn't exist)
if file_path_to_check == final_expected_converted_path and os.path.exists(final_expected_converted_path):
continue
existing_title, existing_album = read_metadata_from_file(file_path_to_check, logger)
if existing_title == title and existing_album == album:
logger.info(f"Found existing track (converted extension scan): {title} - {album} at {file_path_to_check}")
return True, file_path_to_check
# If conversion is specified, and we didn't find the converted file, we should not report other formats as existing.
# The intention is to get the file in the `convert_to` format.
return False, None
else:
logger.warning(f"Invalid convert_to format: '{convert_to}'. Checking for original/general format.")
# Fall through to check original/general if convert_to was invalid
# Priority 2: Check if the file exists in its original download format
original_ext_lower = os.path.splitext(original_song_path)[1].lower()
if os.path.exists(original_song_path):
existing_title, existing_album = read_metadata_from_file(original_song_path, logger)
if existing_title == title and existing_album == album:
logger.info(f"Found existing track (exact original path match): {title} - {album} at {original_song_path}")
return True, original_song_path
# Scan directory for other files with the original extension (if no conversion target)
for file_in_dir in os.listdir(scan_dir):
if file_in_dir.lower().endswith(original_ext_lower):
file_path_to_check = os.path.join(scan_dir, file_in_dir)
if file_path_to_check == original_song_path: # Already checked this one
continue
existing_title, existing_album = read_metadata_from_file(file_path_to_check, logger)
if existing_title == title and existing_album == album:
logger.info(f"Found existing track (original extension scan): {title} - {album} at {file_path_to_check}")
return True, file_path_to_check
# Priority 3: General scan for any known audio format if no conversion was specified OR if convert_to was invalid
# This part only runs if convert_to is None or was an invalid format string.
if not convert_to or (convert_to and convert_to.upper() not in AUDIO_FORMATS):
for file_in_dir in os.listdir(scan_dir):
file_lower = file_in_dir.lower()
# Check against all known audio format extensions
is_known_audio_format = False
for fmt_details in AUDIO_FORMATS.values():
if file_lower.endswith(fmt_details["extension"].lower()):
is_known_audio_format = True
break
if is_known_audio_format:
# Skip if it's the original extension and we've already scanned for those
if file_lower.endswith(original_ext_lower):
# We've already checked exact original_song_path and scanned for original_ext_lower
# so this specific file would have been caught unless it's the original_song_path itself,
# or another file with original_ext_lower that didn't match metadata.
# This avoids re-checking files already covered by Priority 2 logic more explicitly.
pass # Let it proceed to metadata check if it wasn't an exact match path-wise
file_path_to_check = os.path.join(scan_dir, file_in_dir)
# Avoid re-checking original_song_path if it exists, it was covered by Priority 2's exact match.
if os.path.exists(original_song_path) and file_path_to_check == original_song_path:
continue
existing_title, existing_album = read_metadata_from_file(file_path_to_check, logger)
if existing_title == title and existing_album == album:
logger.info(f"Found existing track (general audio format scan): {title} - {album} at {file_path_to_check}")
return True, file_path_to_check
return False, None