improve test scripts, bump deezspot and fix playlist issues

This commit is contained in:
Xoconoch
2025-06-08 18:08:13 -06:00
parent 1cdb6dc915
commit ca77c0e9f3
9 changed files with 194 additions and 183 deletions

View File

@@ -11,7 +11,7 @@ load_dotenv()
# --- Environment-based secrets for testing ---
SPOTIFY_API_CLIENT_ID = os.environ.get("SPOTIFY_API_CLIENT_ID", "your_spotify_client_id")
SPOTIFY_API_CLIENT_SECRET = os.environ.get("SPOTIFY_API_CLIENT_SECRET", "your_spotify_client_secret")
SPOTIFY_BLOB_CONTENT_STR = os.environ.get("SPOTIFY_BLOB_CONTENT_STR", '{}')
SPOTIFY_BLOB_CONTENT_STR = os.environ.get("SPOTIFY_BLOB_CONTENT", '{}')
try:
SPOTIFY_BLOB_CONTENT = json.loads(SPOTIFY_BLOB_CONTENT_STR)
except json.JSONDecodeError:
@@ -46,12 +46,12 @@ def wait_for_task(base_url, task_id, timeout=600):
response.raise_for_status() # Raise an exception for bad status codes
statuses = response.json()
if not statuses:
data = response.json()
if not data or not data.get("last_line"):
time.sleep(1)
continue
last_status = statuses[-1]
last_status = data["last_line"]
status = last_status.get("status")
# More verbose logging for debugging during tests

View File

@@ -20,17 +20,25 @@ def test_get_main_config(base_url):
assert "maxConcurrentDownloads" in config
assert "spotify" in config # Should be set by conftest
assert "deezer" in config # Should be set by conftest
assert "fallback" in config
assert "realTime" in config
assert "maxRetries" in config
def test_update_main_config(base_url, reset_config):
"""Tests updating various fields in the main configuration."""
"""Tests updating various fields in the main configuration based on frontend capabilities."""
new_settings = {
"maxConcurrentDownloads": 5,
"spotifyQuality": "HIGH",
"deezerQuality": "MP3_128",
"deezerQuality": "FLAC",
"customDirFormat": "%artist%/%album%",
"customTrackFormat": "%tracknum% %title%",
"save_cover": False,
"fallback": True,
"realTime": False,
"maxRetries": 5,
"retryDelaySeconds": 10,
"retry_delay_increase": 10,
"tracknum_padding": False,
}
response = requests.post(f"{base_url}/config", json=new_settings)
@@ -45,8 +53,9 @@ def test_get_watch_config(base_url):
response = requests.get(f"{base_url}/config/watch")
assert response.status_code == 200
config = response.json()
assert "delay_between_playlists_seconds" in config
assert "delay_between_artists_seconds" in config
assert "enabled" in config
assert "watchPollIntervalSeconds" in config
assert "watchedArtistAlbumGroup" in config
def test_update_watch_config(base_url):
"""Tests updating the watch-specific configuration."""
@@ -54,14 +63,19 @@ def test_update_watch_config(base_url):
original_config = response.json()
new_settings = {
"delay_between_playlists_seconds": 120,
"delay_between_artists_seconds": 240,
"auto_add_new_releases_to_queue": False,
"enabled": False,
"watchPollIntervalSeconds": 7200,
"watchedArtistAlbumGroup": ["album", "single"],
}
response = requests.post(f"{base_url}/config/watch", json=new_settings)
assert response.status_code == 200
updated_config = response.json()
# The response for updating watch config is just a success message,
# so we need to GET the config again to verify.
verify_response = requests.get(f"{base_url}/config/watch")
assert verify_response.status_code == 200
updated_config = verify_response.json()
for key, value in new_settings.items():
assert updated_config[key] == value
@@ -71,24 +85,29 @@ def test_update_watch_config(base_url):
def test_update_conversion_config(base_url, reset_config):
"""
Iterates through all supported conversion formats and bitrates,
updating the config and verifying the changes for each combination.
Iterates through supported conversion formats and bitrates from the frontend,
updating the config and verifying the changes.
"""
conversion_formats = ["mp3", "flac", "ogg", "opus", "m4a"]
# Formats and bitrates aligned with src/js/config.ts
conversion_formats = ["MP3", "AAC", "OGG", "OPUS", "FLAC", "WAV", "ALAC"]
bitrates = {
"mp3": ["320", "256", "192", "128"],
"ogg": ["500", "320", "192", "160"],
"opus": ["256", "192", "128", "96"],
"m4a": ["320k", "256k", "192k", "128k"],
"flac": [None] # Bitrate is not applicable for FLAC
"MP3": ["128k", "320k"],
"AAC": ["128k", "256k"],
"OGG": ["128k", "320k"],
"OPUS": ["96k", "256k"],
"FLAC": [None],
"WAV": [None],
"ALAC": [None],
}
for format in conversion_formats:
for br in bitrates.get(format, [None]):
print(f"Testing conversion config: format={format}, bitrate={br}")
new_settings = {"convertTo": format, "bitrate": br}
for format_val in conversion_formats:
for br in bitrates.get(format_val, [None]):
print(f"Testing conversion config: format={format_val}, bitrate={br}")
new_settings = {"convertTo": format_val, "bitrate": br}
response = requests.post(f"{base_url}/config", json=new_settings)
assert response.status_code == 200
updated_config = response.json()
assert updated_config["convertTo"] == format
assert updated_config["convertTo"] == format_val
# The backend might return null for empty bitrate, which is fine
assert updated_config["bitrate"] == br

View File

@@ -1,7 +1,9 @@
import requests
import pytest
import os
import shutil
# URLs provided by the user for testing
# URLs for testing
SPOTIFY_TRACK_URL = "https://open.spotify.com/track/1Cts4YV9aOXVAP3bm3Ro6r"
SPOTIFY_ALBUM_URL = "https://open.spotify.com/album/4K0JVP5veNYTVI6IMamlla"
SPOTIFY_PLAYLIST_URL = "https://open.spotify.com/playlist/26CiMxIxdn5WhXyccMCPOB"
@@ -13,68 +15,101 @@ ALBUM_ID = SPOTIFY_ALBUM_URL.split('/')[-1].split('?')[0]
PLAYLIST_ID = SPOTIFY_PLAYLIST_URL.split('/')[-1].split('?')[0]
ARTIST_ID = SPOTIFY_ARTIST_URL.split('/')[-1].split('?')[0]
DOWNLOAD_DIR = "downloads/"
def get_downloaded_files(directory=DOWNLOAD_DIR):
"""Walks a directory and returns a list of all file paths."""
file_paths = []
if not os.path.isdir(directory):
return file_paths
for root, _, files in os.walk(directory):
for file in files:
# Ignore hidden files like .DS_Store
if not file.startswith('.'):
file_paths.append(os.path.join(root, file))
return file_paths
@pytest.fixture(autouse=True)
def cleanup_downloads_dir():
"""
Ensures the download directory is removed and recreated, providing a clean
slate before and after each test.
"""
if os.path.exists(DOWNLOAD_DIR):
shutil.rmtree(DOWNLOAD_DIR)
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
yield
if os.path.exists(DOWNLOAD_DIR):
shutil.rmtree(DOWNLOAD_DIR)
@pytest.fixture
def reset_config(base_url):
"""Fixture to reset the main config after a test to avoid side effects."""
"""
Fixture to get original config, set single concurrent download for test
isolation, and restore the original config after the test.
"""
response = requests.get(f"{base_url}/config")
original_config = response.json()
# Set max concurrent downloads to 1 for all tests using this fixture.
requests.post(f"{base_url}/config", json={"maxConcurrentDownloads": 1})
yield
# Restore original config
requests.post(f"{base_url}/config", json=original_config)
def test_download_track_spotify_only(base_url, task_waiter, reset_config):
"""Tests downloading a single track from Spotify with real-time download enabled."""
print("\n--- Testing Spotify-only track download ---")
@pytest.mark.parametrize("download_type, item_id, timeout, expected_files_min", [
("track", TRACK_ID, 600, 1),
("album", ALBUM_ID, 900, 14), # "After Hours" has 14 tracks
("playlist", PLAYLIST_ID, 1200, 4), # Test playlist has 4 tracks
])
def test_spotify_download_and_verify_files(base_url, task_waiter, reset_config, download_type, item_id, timeout, expected_files_min):
"""
Tests downloading a track, album, or playlist and verifies that the
expected number of files are created on disk.
"""
print(f"\n--- Testing Spotify-only '{download_type}' download and verifying files ---")
config_payload = {
"service": "spotify",
"fallback": False,
"realTime": True,
"spotifyQuality": "NORMAL" # Simulating free account quality
"spotifyQuality": "NORMAL"
}
requests.post(f"{base_url}/config", json=config_payload)
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
response = requests.get(f"{base_url}/{download_type}/download/{item_id}")
assert response.status_code == 202
task_id = response.json()["task_id"]
final_status = task_waiter(task_id)
assert final_status["status"] == "complete", f"Task failed: {final_status.get('error')}"
final_status = task_waiter(task_id, timeout=timeout)
assert final_status["status"] == "complete", f"Task failed for {download_type} {item_id}: {final_status.get('error')}"
def test_download_album_spotify_only(base_url, task_waiter, reset_config):
"""Tests downloading a full album from Spotify with real-time download enabled."""
print("\n--- Testing Spotify-only album download ---")
config_payload = {"service": "spotify", "fallback": False, "realTime": True, "spotifyQuality": "NORMAL"}
requests.post(f"{base_url}/config", json=config_payload)
# Verify that the correct number of files were downloaded
downloaded_files = get_downloaded_files()
assert len(downloaded_files) >= expected_files_min, (
f"Expected at least {expected_files_min} file(s) for {download_type} {item_id}, "
f"but found {len(downloaded_files)}."
)
response = requests.get(f"{base_url}/album/download/{ALBUM_ID}")
assert response.status_code == 202
task_id = response.json()["task_id"]
final_status = task_waiter(task_id, timeout=900)
assert final_status["status"] == "complete", f"Task failed: {final_status.get('error')}"
def test_download_playlist_spotify_only(base_url, task_waiter, reset_config):
"""Tests downloading a full playlist from Spotify with real-time download enabled."""
print("\n--- Testing Spotify-only playlist download ---")
config_payload = {"service": "spotify", "fallback": False, "realTime": True, "spotifyQuality": "NORMAL"}
requests.post(f"{base_url}/config", json=config_payload)
response = requests.get(f"{base_url}/playlist/download/{PLAYLIST_ID}")
assert response.status_code == 202
task_id = response.json()["task_id"]
final_status = task_waiter(task_id, timeout=1200)
assert final_status["status"] == "complete", f"Task failed: {final_status.get('error')}"
def test_download_artist_spotify_only(base_url, task_waiter, reset_config):
"""Tests queuing downloads for an artist's entire discography from Spotify."""
print("\n--- Testing Spotify-only artist download ---")
def test_artist_download_and_verify_files(base_url, task_waiter, reset_config):
"""
Tests queuing an artist download and verifies that files are created.
Does not check for exact file count due to the variability of artist discographies.
"""
print("\n--- Testing Spotify-only artist download and verifying files ---")
config_payload = {"service": "spotify", "fallback": False, "realTime": True, "spotifyQuality": "NORMAL"}
requests.post(f"{base_url}/config", json=config_payload)
response = requests.get(f"{base_url}/artist/download/{ARTIST_ID}?album_type=album,single")
assert response.status_code == 202
response_data = response.json()
queued_albums = response_data.get("successfully_queued_albums", [])
queued_albums = response_data.get("queued_albums", [])
assert len(queued_albums) > 0, "No albums were queued for the artist."
for album in queued_albums:
@@ -83,13 +118,18 @@ def test_download_artist_spotify_only(base_url, task_waiter, reset_config):
final_status = task_waiter(task_id, timeout=900)
assert final_status["status"] == "complete", f"Artist album task {album['name']} failed: {final_status.get('error')}"
def test_download_track_with_fallback(base_url, task_waiter, reset_config):
"""Tests downloading a Spotify track with Deezer fallback enabled."""
print("\n--- Testing track download with Deezer fallback ---")
# After all tasks complete, verify that at least some files were downloaded.
downloaded_files = get_downloaded_files()
assert len(downloaded_files) > 0, "Artist download ran but no files were found in the download directory."
def test_download_with_deezer_fallback_and_verify_files(base_url, task_waiter, reset_config):
"""Tests downloading with Deezer fallback and verifies the file exists."""
print("\n--- Testing track download with Deezer fallback and verifying files ---")
config_payload = {
"service": "spotify",
"fallback": True,
"deezerQuality": "MP3_320" # Simulating higher quality from Deezer free
"deezerQuality": "FLAC" # Test with high quality fallback
}
requests.post(f"{base_url}/config", json=config_payload)
@@ -98,24 +138,58 @@ def test_download_track_with_fallback(base_url, task_waiter, reset_config):
task_id = response.json()["task_id"]
final_status = task_waiter(task_id)
assert final_status["status"] == "complete", f"Task failed: {final_status.get('error')}"
assert final_status["status"] == "complete", f"Task failed with fallback: {final_status.get('error')}"
@pytest.mark.parametrize("format,bitrate", [
("mp3", "320"), ("mp3", "128"),
("flac", None),
("ogg", "160"),
("opus", "128"),
("m4a", "128k")
# Verify that at least one file was downloaded.
downloaded_files = get_downloaded_files()
assert len(downloaded_files) >= 1, "Fallback download completed but no file was found."
def test_download_without_realtime_and_verify_files(base_url, task_waiter, reset_config):
"""Tests a non-realtime download and verifies the file exists."""
print("\n--- Testing download with realTime: False and verifying files ---")
config_payload = {
"service": "spotify",
"fallback": False,
"realTime": False,
"spotifyQuality": "NORMAL"
}
requests.post(f"{base_url}/config", json=config_payload)
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
assert response.status_code == 202
task_id = response.json()["task_id"]
final_status = task_waiter(task_id)
assert final_status["status"] == "complete", f"Task failed with realTime=False: {final_status.get('error')}"
# Verify that at least one file was downloaded.
downloaded_files = get_downloaded_files()
assert len(downloaded_files) >= 1, "Non-realtime download completed but no file was found."
# Aligned with formats in src/js/config.ts's CONVERSION_FORMATS
@pytest.mark.parametrize("format_name,bitrate,expected_ext", [
("mp3", "320k", ".mp3"),
("aac", "256k", ".m4a"), # AAC is typically in an M4A container
("ogg", "320k", ".ogg"),
("opus", "256k", ".opus"),
("flac", None, ".flac"),
("wav", None, ".wav"),
("alac", None, ".m4a"), # ALAC is also in an M4A container
])
def test_download_with_conversion(base_url, task_waiter, reset_config, format, bitrate):
"""Tests downloading a track with various conversion formats and bitrates."""
print(f"\n--- Testing conversion: {format} @ {bitrate or 'default'} ---")
def test_download_with_conversion_and_verify_format(base_url, task_waiter, reset_config, format_name, bitrate, expected_ext):
"""
Tests downloading a track with various conversion formats and verifies
that the created file has the correct extension.
"""
print(f"\n--- Testing conversion: {format_name.upper()} @ {bitrate or 'default'} ---")
config_payload = {
"service": "spotify",
"fallback": False,
"realTime": True,
"spotifyQuality": "NORMAL",
"convertTo": format,
"convertTo": format_name.upper(),
"bitrate": bitrate
}
requests.post(f"{base_url}/config", json=config_payload)
@@ -125,4 +199,14 @@ def test_download_with_conversion(base_url, task_waiter, reset_config, format, b
task_id = response.json()["task_id"]
final_status = task_waiter(task_id)
assert final_status["status"] == "complete", f"Download failed for format {format} bitrate {bitrate}: {final_status.get('error')}"
assert final_status["status"] == "complete", f"Download failed for format {format_name} bitrate {bitrate}: {final_status.get('error')}"
# Verify that a file with the correct extension was created.
downloaded_files = get_downloaded_files()
assert len(downloaded_files) >= 1, "Conversion download completed but no file was found."
found_correct_format = any(f.lower().endswith(expected_ext) for f in downloaded_files)
assert found_correct_format, (
f"No file with expected extension '{expected_ext}' found for format '{format_name}'. "
f"Found files: {downloaded_files}"
)

View File

@@ -21,7 +21,7 @@ def test_history_logging_and_filtering(base_url, task_waiter, reset_config):
config_payload = {"service": "spotify", "fallback": False, "realTime": True}
requests.post(f"{base_url}/config", json=config_payload)
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
assert response.status_code == 202
assert response.status_code == 200
task_id = response.json()["task_id"]
task_waiter(task_id) # Wait for the download to complete

View File

@@ -1,93 +0,0 @@
import requests
import pytest
import time
# Use a known, short track for quick tests
TRACK_ID = "1Cts4YV9aOXVAP3bm3Ro6r"
# Use a long playlist to ensure there's time to cancel it
LONG_PLAYLIST_ID = "6WsyUEITURbQXZsqtEewb1" # Today's Top Hits on Spotify
@pytest.fixture
def reset_config(base_url):
"""Fixture to reset the main config after a test."""
response = requests.get(f"{base_url}/config")
original_config = response.json()
yield
requests.post(f"{base_url}/config", json=original_config)
def test_list_tasks(base_url, reset_config):
"""Tests listing all active tasks."""
config_payload = {"service": "spotify", "fallback": False, "realTime": True}
requests.post(f"{base_url}/config", json=config_payload)
# Start a task
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
assert response.status_code == 202
task_id = response.json()["task_id"]
# Check the list to see if our task appears
response = requests.get(f"{base_url}/prgs/list")
assert response.status_code == 200
tasks = response.json()
assert isinstance(tasks, list)
assert any(t['task_id'] == task_id for t in tasks)
# Clean up by cancelling the task
requests.post(f"{base_url}/prgs/cancel/{task_id}")
def test_get_task_progress_and_log(base_url, task_waiter, reset_config):
"""Tests getting progress for a running task and retrieving its log after completion."""
config_payload = {"service": "spotify", "fallback": False, "realTime": True}
requests.post(f"{base_url}/config", json=config_payload)
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
assert response.status_code == 202
task_id = response.json()["task_id"]
# Poll progress a few times while it's running to check the endpoint
for _ in range(3):
time.sleep(1)
res = requests.get(f"{base_url}/prgs/{task_id}")
if res.status_code == 200 and res.json():
statuses = res.json()
assert isinstance(statuses, list)
assert "status" in statuses[-1]
break
else:
pytest.fail("Could not get a valid task status in time.")
# Wait for completion
final_status = task_waiter(task_id)
assert final_status["status"] == "complete"
# After completion, check the task log endpoint
res = requests.get(f"{base_url}/prgs/{task_id}?log=true")
assert res.status_code == 200
log_data = res.json()
assert "task_log" in log_data
assert len(log_data["task_log"]) > 0
assert "status" in log_data["task_log"][0]
def test_cancel_task(base_url, reset_config):
"""Tests cancelling a task shortly after it has started."""
config_payload = {"service": "spotify", "fallback": False, "realTime": True}
requests.post(f"{base_url}/config", json=config_payload)
response = requests.get(f"{base_url}/playlist/download/{LONG_PLAYLIST_ID}")
assert response.status_code == 202
task_id = response.json()["task_id"]
# Give it a moment to ensure it has started processing
time.sleep(3)
# Cancel the task
response = requests.post(f"{base_url}/prgs/cancel/{task_id}")
assert response.status_code == 200
assert response.json()["status"] == "cancelled"
# Check the final status to confirm it's marked as cancelled
time.sleep(2) # Allow time for the final status to propagate
res = requests.get(f"{base_url}/prgs/{task_id}")
assert res.status_code == 200
last_status = res.json()[-1]
assert last_status["status"] == "cancelled"