#!/usr/bin/env python3 """ Debug script to test failed task cleanup logic """ import sqlite3 import os from pathlib import Path def test_cleanup_logic(): """Test the cleanup logic with a simple SQLite database""" # Create a temporary database for testing db_path = Path("test_cleanup.db") if db_path.exists(): db_path.unlink() conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row # Create tasks table conn.execute(""" CREATE TABLE tasks ( task_id TEXT PRIMARY KEY, status TEXT, created_at TEXT, started_at TEXT, finished_at TEXT, deleted_at TEXT, output_dir TEXT ) """) # Insert test data - a failed task from 2 minutes ago conn.execute(""" INSERT INTO tasks (task_id, status, created_at, started_at, finished_at) VALUES (?, 'failed', datetime('now', '-2 minutes'), datetime('now', '-2 minutes'), datetime('now', '-2 minutes')) """, ('test_task_123',)) # Insert test data - a failed task from 30 seconds ago (should not be cleaned) conn.execute(""" INSERT INTO tasks (task_id, status, created_at, started_at, finished_at) VALUES (?, 'failed', datetime('now', '-30 seconds'), datetime('now', '-30 seconds'), datetime('now', '-30 seconds')) """, ('test_task_456',)) conn.commit() # Test the cleanup query with 60 seconds threshold failed_task_cleanup_age = 60 print("Testing cleanup logic...") print(f"Cleanup age: {failed_task_cleanup_age} seconds") # Show all failed tasks all_failed = conn.execute(""" SELECT task_id, status, created_at, finished_at FROM tasks WHERE status = 'failed' AND deleted_at IS NULL """).fetchall() print(f"\nTotal failed tasks: {len(all_failed)}") for row in all_failed: print(f" Task {row['task_id']}: finished_at={row['finished_at']}") # Test cleanup query cleanup_rows = conn.execute(""" SELECT task_id, output_dir FROM tasks WHERE status = 'failed' AND deleted_at IS NULL AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '-' || ? || ' seconds') """, (failed_task_cleanup_age,)).fetchall() print(f"\nTasks to be cleaned up: {len(cleanup_rows)}") for row in cleanup_rows: print(f" Task {row['task_id']} will be cleaned up") # Test with timezone adjustment (+8 hours) print(" Testing with timezone adjustment (+8 hours):") cleanup_rows_tz = conn.execute(""" SELECT task_id, output_dir FROM tasks WHERE status = 'failed' AND deleted_at IS NULL AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '+8 hours', '-' || ? || ' seconds') """, (failed_task_cleanup_age,)).fetchall() print(f"Tasks to be cleaned up (with TZ): {len(cleanup_rows_tz)}") for row in cleanup_rows_tz: print(f" Task {row['task_id']} will be cleaned up") conn.close() db_path.unlink() if __name__ == "__main__": test_cleanup_logic()