15
.github/FUNDING.yml
vendored
Normal file
15
.github/FUNDING.yml
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
# These are supported funding model platforms
|
||||
|
||||
github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2]
|
||||
patreon: # Replace with a single Patreon username
|
||||
open_collective: # Replace with a single Open Collective username
|
||||
ko_fi: spotizerr
|
||||
tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
|
||||
community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
|
||||
liberapay: # Replace with a single Liberapay username
|
||||
issuehunt: # Replace with a single IssueHunt username
|
||||
lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cloud-foundry
|
||||
polar: # Replace with a single Polar username
|
||||
buy_me_a_coffee: # Replace with a single Buy Me a Coffee username
|
||||
thanks_dev: # Replace with a single thanks.dev username
|
||||
custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2']
|
||||
@@ -224,8 +224,9 @@ class CeleryDownloadQueueManager:
|
||||
if not existing_task_id:
|
||||
continue
|
||||
|
||||
existing_task_info = get_task_info(existing_task_id)
|
||||
existing_last_status_obj = get_last_task_status(existing_task_id)
|
||||
# Use the pre-fetched full task info
|
||||
existing_task_info = task_summary.get("task_info")
|
||||
existing_last_status_obj = task_summary.get("last_status")
|
||||
|
||||
if not existing_task_info or not existing_last_status_obj:
|
||||
continue
|
||||
|
||||
@@ -181,68 +181,41 @@ def get_task_info(task_id):
|
||||
return {}
|
||||
|
||||
|
||||
def delete_task_data(task_id):
|
||||
"""Deletes all Redis data associated with a task_id."""
|
||||
def get_all_tasks():
|
||||
"""Get all active task IDs and their full info"""
|
||||
try:
|
||||
redis_client.delete(f"task:{task_id}:info")
|
||||
redis_client.delete(f"task:{task_id}:status")
|
||||
redis_client.delete(f"task:{task_id}:status:next_id")
|
||||
logger.info(f"Deleted data for task {task_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting data for task {task_id}: {e}")
|
||||
# Use SCAN for better performance than KEYS in production
|
||||
task_ids = [
|
||||
key.decode("utf-8").split(":")[1]
|
||||
for key in redis_client.scan_iter("task:*:info")
|
||||
]
|
||||
|
||||
|
||||
CLEANUP_THRESHOLD_SECONDS = 3600 # 1 hour
|
||||
|
||||
|
||||
@celery_app.task(name="routes.utils.celery_tasks.cleanup_old_tasks")
|
||||
def cleanup_old_tasks():
|
||||
"""
|
||||
Periodically cleans up old, finished tasks from Redis to prevent data buildup.
|
||||
"""
|
||||
logger.info("Starting cleanup of old finished tasks...")
|
||||
|
||||
# Define terminal states that are safe to clean up
|
||||
TERMINAL_STATES = {
|
||||
ProgressState.COMPLETE,
|
||||
ProgressState.DONE,
|
||||
ProgressState.CANCELLED,
|
||||
ProgressState.ERROR,
|
||||
ProgressState.ERROR_RETRIED,
|
||||
ProgressState.ERROR_AUTO_CLEANED,
|
||||
}
|
||||
|
||||
cleaned_count = 0
|
||||
# Scan for all task info keys, which serve as the master record for a task's existence
|
||||
task_info_keys = redis_client.keys("task:*:info")
|
||||
|
||||
for key in task_info_keys:
|
||||
try:
|
||||
task_id = key.decode("utf-8").split(":")[1]
|
||||
tasks = []
|
||||
for task_id in task_ids:
|
||||
task_info = get_task_info(task_id)
|
||||
last_status = get_last_task_status(task_id)
|
||||
|
||||
if not last_status:
|
||||
# If there's no status, we can't determine age or state.
|
||||
# Could be an orphaned task info key. Consider a separate cleanup for these.
|
||||
continue
|
||||
if task_info and last_status:
|
||||
tasks.append(
|
||||
{
|
||||
"task_id": task_id,
|
||||
"task_info": task_info, # Pass full info
|
||||
"last_status": last_status, # Pass last status
|
||||
# Keep original fields for backward compatibility
|
||||
"type": task_info.get("type", "unknown"),
|
||||
"name": task_info.get("name", "Unknown"),
|
||||
"artist": task_info.get("artist", ""),
|
||||
"download_type": task_info.get("download_type", "unknown"),
|
||||
"status": last_status.get("status", "unknown"),
|
||||
"timestamp": last_status.get("timestamp", 0),
|
||||
}
|
||||
)
|
||||
|
||||
status = last_status.get("status")
|
||||
timestamp = last_status.get("timestamp", 0)
|
||||
return tasks
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting all tasks: {e}")
|
||||
return []
|
||||
|
||||
# Check if the task is in a terminal state and has expired
|
||||
if status in TERMINAL_STATES:
|
||||
if (time.time() - timestamp) > CLEANUP_THRESHOLD_SECONDS:
|
||||
logger.info(
|
||||
f"Cleaning up expired task {task_id} (status: {status}, age: {time.time() - timestamp}s)"
|
||||
)
|
||||
delete_task_data(task_id)
|
||||
cleaned_count += 1
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing task key {key} for cleanup: {e}", exc_info=True
|
||||
)
|
||||
|
||||
logger.info(f"Finished cleanup of old tasks. Removed {cleaned_count} tasks.")
|
||||
|
||||
|
||||
# --- History Logging Helper ---
|
||||
@@ -576,63 +549,6 @@ def retry_task(task_id):
|
||||
logger.error(f"Error retrying task {task_id}: {e}", exc_info=True)
|
||||
return {"status": "error", "error": str(e)}
|
||||
|
||||
|
||||
def get_all_tasks(include_finished=False):
|
||||
"""Get all active task IDs, with an option to include finished tasks."""
|
||||
try:
|
||||
task_keys = redis_client.scan_iter("task:*:info")
|
||||
tasks = []
|
||||
|
||||
TERMINAL_STATES = {
|
||||
ProgressState.COMPLETE,
|
||||
ProgressState.DONE,
|
||||
ProgressState.CANCELLED,
|
||||
ProgressState.ERROR,
|
||||
ProgressState.ERROR_AUTO_CLEANED,
|
||||
ProgressState.ERROR_RETRIED,
|
||||
}
|
||||
|
||||
for key in task_keys:
|
||||
task_id = key.decode("utf-8").split(":")[1]
|
||||
last_status = get_last_task_status(task_id)
|
||||
current_status = None
|
||||
|
||||
if last_status:
|
||||
# Accommodate for status being nested inside 'status_info' or at the top level
|
||||
if "status" in last_status:
|
||||
current_status = last_status.get("status")
|
||||
elif isinstance(last_status.get("status_info"), dict):
|
||||
current_status = last_status.get("status_info", {}).get("status")
|
||||
|
||||
is_terminal = current_status in TERMINAL_STATES
|
||||
if not include_finished and is_terminal:
|
||||
continue # Skip terminal tasks if not requested
|
||||
|
||||
task_info = get_task_info(task_id)
|
||||
if task_info:
|
||||
task_summary = {
|
||||
"task_id": task_id,
|
||||
"type": task_info.get("type", "unknown"),
|
||||
"name": task_info.get("name", "Unknown"),
|
||||
"artist": task_info.get("artist", ""),
|
||||
"download_type": task_info.get("download_type", "unknown"),
|
||||
"created_at": task_info.get("created_at", 0),
|
||||
}
|
||||
if last_status:
|
||||
task_summary["status"] = current_status if current_status else "unknown"
|
||||
task_summary["summary"] = last_status.get("summary")
|
||||
else:
|
||||
task_summary["status"] = "unknown"
|
||||
task_summary["summary"] = None
|
||||
|
||||
tasks.append(task_summary)
|
||||
|
||||
return tasks
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting all tasks: {e}", exc_info=True)
|
||||
return []
|
||||
|
||||
|
||||
class ProgressTrackingTask(Task):
|
||||
"""Base task class that tracks progress through callbacks"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user