Add migration scripts for 3.0.6

This commit is contained in:
Xoconoch
2025-08-17 11:27:31 -06:00
parent d8635400fa
commit 65d26fabac
8 changed files with 863 additions and 203 deletions

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,215 @@
import sqlite3
from pathlib import Path
import pytest
# Override the autouse credentials fixture from conftest for this module
@pytest.fixture(scope="session", autouse=True)
def setup_credentials_for_tests():
# No-op to avoid external API calls; this shadows the session autouse fixture in conftest.py
yield
def _create_306_history_db(db_path: Path) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(str(db_path)) as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS download_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
download_type TEXT NOT NULL,
title TEXT NOT NULL,
artists TEXT,
timestamp REAL NOT NULL,
status TEXT NOT NULL,
service TEXT,
quality_format TEXT,
quality_bitrate TEXT,
total_tracks INTEGER,
successful_tracks INTEGER,
failed_tracks INTEGER,
skipped_tracks INTEGER,
children_table TEXT,
task_id TEXT,
external_ids TEXT,
metadata TEXT,
release_date TEXT,
genres TEXT,
images TEXT,
owner TEXT,
album_type TEXT,
duration_total_ms INTEGER,
explicit BOOLEAN
);
CREATE INDEX IF NOT EXISTS idx_download_history_timestamp ON download_history(timestamp);
CREATE INDEX IF NOT EXISTS idx_download_history_type_status ON download_history(download_type, status);
CREATE INDEX IF NOT EXISTS idx_download_history_task_id ON download_history(task_id);
CREATE UNIQUE INDEX IF NOT EXISTS uq_download_history_task_type_ids ON download_history(task_id, download_type, external_ids);
"""
)
# Insert rows that reference non-existent children tables
conn.execute(
"""
INSERT INTO download_history (
download_type, title, artists, timestamp, status, service,
quality_format, quality_bitrate, total_tracks, successful_tracks,
failed_tracks, skipped_tracks, children_table, task_id,
external_ids, metadata, release_date, genres, images, owner,
album_type, duration_total_ms, explicit
) VALUES (?, ?, ?, strftime('%s','now'), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"album",
"Test Album",
"[]",
"completed",
"spotify",
"FLAC",
"1411kbps",
10,
8,
1,
1,
"album_test1",
"task-album-1",
"{}",
"{}",
"{}",
"[]",
"[]",
"{}",
"album",
123456,
0,
),
)
conn.execute(
"""
INSERT INTO download_history (
download_type, title, artists, timestamp, status, service,
quality_format, quality_bitrate, total_tracks, successful_tracks,
failed_tracks, skipped_tracks, children_table, task_id,
external_ids, metadata, release_date, genres, images, owner,
album_type, duration_total_ms, explicit
) VALUES (?, ?, ?, strftime('%s','now'), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"playlist",
"Test Playlist",
"[]",
"partial",
"spotify",
"MP3",
"320kbps",
20,
15,
3,
2,
"playlist_test2",
"task-playlist-1",
"{}",
"{}",
"{}",
"[]",
"[]",
"{}",
"",
654321,
0,
),
)
# Create a legacy children table with too-few columns to test schema upgrade
conn.execute(
"CREATE TABLE IF NOT EXISTS album_legacy (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)"
)
def _create_306_watch_dbs(playlists_db: Path, artists_db: Path) -> None:
playlists_db.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(str(playlists_db)) as pconn:
pconn.executescript(
"""
CREATE TABLE IF NOT EXISTS watched_playlists (
spotify_id TEXT PRIMARY KEY,
name TEXT,
owner_id TEXT,
owner_name TEXT,
total_tracks INTEGER,
link TEXT,
snapshot_id TEXT,
last_checked INTEGER,
added_at INTEGER,
is_active INTEGER DEFAULT 1
);
"""
)
with sqlite3.connect(str(artists_db)) as aconn:
aconn.executescript(
"""
CREATE TABLE IF NOT EXISTS watched_artists (
spotify_id TEXT PRIMARY KEY,
name TEXT,
link TEXT,
total_albums_on_spotify INTEGER,
last_checked INTEGER,
added_at INTEGER,
is_active INTEGER DEFAULT 1,
genres TEXT,
popularity INTEGER,
image_url TEXT
);
"""
)
def _get_columns(db_path: Path, table: str) -> set[str]:
with sqlite3.connect(str(db_path)) as conn:
cur = conn.execute(f"PRAGMA table_info({table})")
return {row[1] for row in cur.fetchall()}
def test_migration_children_tables_created_and_upgraded(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
# Arrange temp paths
data_dir = tmp_path / "data"
history_db = data_dir / "history" / "download_history.db"
playlists_db = data_dir / "watch" / "playlists.db"
artists_db = data_dir / "watch" / "artists.db"
# Create 3.0.6 base schemas and sample data
_create_306_history_db(history_db)
_create_306_watch_dbs(playlists_db, artists_db)
# Point the migration runner to our temp DBs
from routes.migrations import runner
monkeypatch.setattr(runner, "DATA_DIR", data_dir)
monkeypatch.setattr(runner, "HISTORY_DB", history_db)
monkeypatch.setattr(runner, "WATCH_DIR", data_dir / "watch")
monkeypatch.setattr(runner, "PLAYLISTS_DB", playlists_db)
monkeypatch.setattr(runner, "ARTISTS_DB", artists_db)
# Act: run migrations
runner.run_migrations_if_needed()
# Run twice to ensure idempotency
runner.run_migrations_if_needed()
# Assert: referenced children tables exist with expected columns
expected_children_cols = {
"id",
"title",
"artists",
"album_title",
"duration_ms",
"track_number",
"disc_number",
"explicit",
"status",
"external_ids",
"genres",
"isrc",
"timestamp",
"position",
"metadata",
}
assert _get_columns(history_db, "album_test1").issuperset(expected_children_cols)
assert _get_columns(history_db, "playlist_test2").issuperset(expected_children_cols)
# Legacy table upgraded
assert _get_columns(history_db, "album_legacy").issuperset(expected_children_cols)