212 lines
8.5 KiB
Python
212 lines
8.5 KiB
Python
import requests
|
|
import pytest
|
|
import os
|
|
import shutil
|
|
|
|
# URLs for testing
|
|
SPOTIFY_TRACK_URL = "https://open.spotify.com/track/1Cts4YV9aOXVAP3bm3Ro6r"
|
|
SPOTIFY_ALBUM_URL = "https://open.spotify.com/album/4K0JVP5veNYTVI6IMamlla"
|
|
SPOTIFY_PLAYLIST_URL = "https://open.spotify.com/playlist/26CiMxIxdn5WhXyccMCPOB"
|
|
SPOTIFY_ARTIST_URL = "https://open.spotify.com/artist/7l6cdPhOLYO7lehz5xfzLV"
|
|
|
|
# Corresponding IDs extracted from URLs
|
|
TRACK_ID = SPOTIFY_TRACK_URL.split('/')[-1].split('?')[0]
|
|
ALBUM_ID = SPOTIFY_ALBUM_URL.split('/')[-1].split('?')[0]
|
|
PLAYLIST_ID = SPOTIFY_PLAYLIST_URL.split('/')[-1].split('?')[0]
|
|
ARTIST_ID = SPOTIFY_ARTIST_URL.split('/')[-1].split('?')[0]
|
|
|
|
DOWNLOAD_DIR = "downloads/"
|
|
|
|
|
|
def get_downloaded_files(directory=DOWNLOAD_DIR):
|
|
"""Walks a directory and returns a list of all file paths."""
|
|
file_paths = []
|
|
if not os.path.isdir(directory):
|
|
return file_paths
|
|
for root, _, files in os.walk(directory):
|
|
for file in files:
|
|
# Ignore hidden files like .DS_Store
|
|
if not file.startswith('.'):
|
|
file_paths.append(os.path.join(root, file))
|
|
return file_paths
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def cleanup_downloads_dir():
|
|
"""
|
|
Ensures the download directory is removed and recreated, providing a clean
|
|
slate before and after each test.
|
|
"""
|
|
if os.path.exists(DOWNLOAD_DIR):
|
|
shutil.rmtree(DOWNLOAD_DIR)
|
|
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
|
|
yield
|
|
if os.path.exists(DOWNLOAD_DIR):
|
|
shutil.rmtree(DOWNLOAD_DIR)
|
|
|
|
|
|
@pytest.fixture
|
|
def reset_config(base_url):
|
|
"""
|
|
Fixture to get original config, set single concurrent download for test
|
|
isolation, and restore the original config after the test.
|
|
"""
|
|
response = requests.get(f"{base_url}/config")
|
|
original_config = response.json()
|
|
|
|
# Set max concurrent downloads to 1 for all tests using this fixture.
|
|
requests.post(f"{base_url}/config", json={"maxConcurrentDownloads": 1})
|
|
|
|
yield
|
|
|
|
# Restore original config
|
|
requests.post(f"{base_url}/config", json=original_config)
|
|
|
|
|
|
@pytest.mark.parametrize("download_type, item_id, timeout, expected_files_min", [
|
|
("track", TRACK_ID, 600, 1),
|
|
("album", ALBUM_ID, 900, 14), # "After Hours" has 14 tracks
|
|
("playlist", PLAYLIST_ID, 1200, 4), # Test playlist has 4 tracks
|
|
])
|
|
def test_spotify_download_and_verify_files(base_url, task_waiter, reset_config, download_type, item_id, timeout, expected_files_min):
|
|
"""
|
|
Tests downloading a track, album, or playlist and verifies that the
|
|
expected number of files are created on disk.
|
|
"""
|
|
print(f"\n--- Testing Spotify-only '{download_type}' download and verifying files ---")
|
|
config_payload = {
|
|
"service": "spotify",
|
|
"fallback": False,
|
|
"realTime": True,
|
|
"spotifyQuality": "NORMAL"
|
|
}
|
|
requests.post(f"{base_url}/config", json=config_payload)
|
|
|
|
response = requests.get(f"{base_url}/{download_type}/download/{item_id}")
|
|
assert response.status_code == 202
|
|
task_id = response.json()["task_id"]
|
|
|
|
final_status = task_waiter(task_id, timeout=timeout)
|
|
assert final_status["status"] == "complete", f"Task failed for {download_type} {item_id}: {final_status.get('error')}"
|
|
|
|
# Verify that the correct number of files were downloaded
|
|
downloaded_files = get_downloaded_files()
|
|
assert len(downloaded_files) >= expected_files_min, (
|
|
f"Expected at least {expected_files_min} file(s) for {download_type} {item_id}, "
|
|
f"but found {len(downloaded_files)}."
|
|
)
|
|
|
|
|
|
def test_artist_download_and_verify_files(base_url, task_waiter, reset_config):
|
|
"""
|
|
Tests queuing an artist download and verifies that files are created.
|
|
Does not check for exact file count due to the variability of artist discographies.
|
|
"""
|
|
print("\n--- Testing Spotify-only artist download and verifying files ---")
|
|
config_payload = {"service": "spotify", "fallback": False, "realTime": True, "spotifyQuality": "NORMAL"}
|
|
requests.post(f"{base_url}/config", json=config_payload)
|
|
|
|
response = requests.get(f"{base_url}/artist/download/{ARTIST_ID}?album_type=album,single")
|
|
assert response.status_code == 202
|
|
response_data = response.json()
|
|
queued_albums = response_data.get("queued_albums", [])
|
|
assert len(queued_albums) > 0, "No albums were queued for the artist."
|
|
|
|
for album in queued_albums:
|
|
task_id = album["task_id"]
|
|
print(f"--- Waiting for artist album: {album['name']} ({task_id}) ---")
|
|
final_status = task_waiter(task_id, timeout=900)
|
|
assert final_status["status"] == "complete", f"Artist album task {album['name']} failed: {final_status.get('error')}"
|
|
|
|
# After all tasks complete, verify that at least some files were downloaded.
|
|
downloaded_files = get_downloaded_files()
|
|
assert len(downloaded_files) > 0, "Artist download ran but no files were found in the download directory."
|
|
|
|
|
|
def test_download_with_deezer_fallback_and_verify_files(base_url, task_waiter, reset_config):
|
|
"""Tests downloading with Deezer fallback and verifies the file exists."""
|
|
print("\n--- Testing track download with Deezer fallback and verifying files ---")
|
|
config_payload = {
|
|
"service": "spotify",
|
|
"fallback": True,
|
|
"deezerQuality": "FLAC" # Test with high quality fallback
|
|
}
|
|
requests.post(f"{base_url}/config", json=config_payload)
|
|
|
|
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
|
assert response.status_code == 202
|
|
task_id = response.json()["task_id"]
|
|
|
|
final_status = task_waiter(task_id)
|
|
assert final_status["status"] == "complete", f"Task failed with fallback: {final_status.get('error')}"
|
|
|
|
# Verify that at least one file was downloaded.
|
|
downloaded_files = get_downloaded_files()
|
|
assert len(downloaded_files) >= 1, "Fallback download completed but no file was found."
|
|
|
|
|
|
def test_download_without_realtime_and_verify_files(base_url, task_waiter, reset_config):
|
|
"""Tests a non-realtime download and verifies the file exists."""
|
|
print("\n--- Testing download with realTime: False and verifying files ---")
|
|
config_payload = {
|
|
"service": "spotify",
|
|
"fallback": False,
|
|
"realTime": False,
|
|
"spotifyQuality": "NORMAL"
|
|
}
|
|
requests.post(f"{base_url}/config", json=config_payload)
|
|
|
|
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
|
assert response.status_code == 202
|
|
task_id = response.json()["task_id"]
|
|
|
|
final_status = task_waiter(task_id)
|
|
assert final_status["status"] == "complete", f"Task failed with realTime=False: {final_status.get('error')}"
|
|
|
|
# Verify that at least one file was downloaded.
|
|
downloaded_files = get_downloaded_files()
|
|
assert len(downloaded_files) >= 1, "Non-realtime download completed but no file was found."
|
|
|
|
|
|
# Aligned with formats in src/js/config.ts's CONVERSION_FORMATS
|
|
@pytest.mark.parametrize("format_name,bitrate,expected_ext", [
|
|
("mp3", "320k", ".mp3"),
|
|
("aac", "256k", ".m4a"), # AAC is typically in an M4A container
|
|
("ogg", "320k", ".ogg"),
|
|
("opus", "256k", ".opus"),
|
|
("flac", None, ".flac"),
|
|
("wav", None, ".wav"),
|
|
("alac", None, ".m4a"), # ALAC is also in an M4A container
|
|
])
|
|
def test_download_with_conversion_and_verify_format(base_url, task_waiter, reset_config, format_name, bitrate, expected_ext):
|
|
"""
|
|
Tests downloading a track with various conversion formats and verifies
|
|
that the created file has the correct extension.
|
|
"""
|
|
print(f"\n--- Testing conversion: {format_name.upper()} @ {bitrate or 'default'} ---")
|
|
config_payload = {
|
|
"service": "spotify",
|
|
"fallback": False,
|
|
"realTime": True,
|
|
"spotifyQuality": "NORMAL",
|
|
"convertTo": format_name.upper(),
|
|
"bitrate": bitrate
|
|
}
|
|
requests.post(f"{base_url}/config", json=config_payload)
|
|
|
|
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
|
assert response.status_code == 202
|
|
task_id = response.json()["task_id"]
|
|
|
|
final_status = task_waiter(task_id)
|
|
assert final_status["status"] == "complete", f"Download failed for format {format_name} bitrate {bitrate}: {final_status.get('error')}"
|
|
|
|
# Verify that a file with the correct extension was created.
|
|
downloaded_files = get_downloaded_files()
|
|
assert len(downloaded_files) >= 1, "Conversion download completed but no file was found."
|
|
|
|
found_correct_format = any(f.lower().endswith(expected_ext) for f in downloaded_files)
|
|
assert found_correct_format, (
|
|
f"No file with expected extension '{expected_ext}' found for format '{format_name}'. "
|
|
f"Found files: {downloaded_files}"
|
|
) |