Files
UAV-CO2/src/gasflux/janitor.py

209 lines
8.1 KiB
Python

"""
Janitor Module
Handles background cleanup of expired tasks and their output directories.
"""
import threading
import time
import shutil
from pathlib import Path
import sqlite3
from flask import current_app
def start_janitor(app):
"""Start the background janitor thread for cleaning up expired tasks."""
def worker():
with app.app_context():
while True:
try:
cleanup_expired_tasks()
except Exception as e:
app.logger.error(f"Janitor cleanup error: {str(e)}", exc_info=True)
finally:
# Sleep for 30 seconds before next cleanup cycle
time.sleep(30)
# Create daemon thread so it doesn't prevent app shutdown
thread = threading.Thread(target=worker, daemon=True, name="janitor")
thread.start()
app.logger.info("Janitor thread started for background cleanup")
def cleanup_expired_tasks():
"""Clean up tasks that have exceeded their deletion time."""
db_path = _get_db_path()
dry_run_config = current_app.config.get('JANITOR_DRY_RUN', 'false')
if isinstance(dry_run_config, str):
dry_run = dry_run_config.lower() == 'true'
else:
dry_run = bool(dry_run_config)
try:
conn = sqlite3.connect(str(db_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys=ON")
# Log current time for debugging (using same timezone as setting)
current_time = conn.execute("SELECT datetime('now', '+8 hours')").fetchone()[0]
current_app.logger.info(f"Janitor cleanup check at: {current_time}")
# Debug: Check for tasks with delete_after_at set
debug_rows = conn.execute("""
SELECT task_id, delete_after_at, downloaded_at
FROM tasks
WHERE delete_after_at IS NOT NULL
AND deleted_at IS NULL
""").fetchall()
if debug_rows:
current_app.logger.info(f"Found {len(debug_rows)} tasks with delete_after_at set:")
for row in debug_rows:
current_app.logger.info(f" Task {row['task_id']}: delete_at={row['delete_after_at']}, downloaded_at={row['downloaded_at']}")
# Find tasks that need to be deleted
rows = conn.execute("""
SELECT task_id, output_dir
FROM tasks
WHERE delete_after_at IS NOT NULL
AND deleted_at IS NULL
AND delete_after_at <= datetime('now', '+8 hours')
""").fetchall()
if not rows:
return # No tasks to clean up
current_app.logger.info(f"Janitor found {len(rows)} expired tasks to clean up")
upload_base = Path(current_app.config.get('UPLOAD_FOLDER') or '')
output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '')
for row in rows:
task_id = row['task_id']
output_dir = row['output_dir']
try:
delete_targets = []
# 记录在库中的 output_dir
if output_dir:
p = Path(output_dir)
delete_targets.append(p)
# 兜底:按约定 outputs/<task_id>
derived_output_dir = output_base / task_id if output_base else None
if derived_output_dir and derived_output_dir not in delete_targets:
delete_targets.append(derived_output_dir)
# 同时删除 uploads/<task_id>
derived_upload_dir = upload_base / task_id if upload_base else None
if derived_upload_dir:
delete_targets.append(derived_upload_dir)
if dry_run:
for tgt in delete_targets:
if tgt:
current_app.logger.info(f"[DRY RUN] Would delete task {task_id} path: {tgt}")
else:
# 实际删除
for tgt in delete_targets:
try:
if tgt and tgt.exists():
shutil.rmtree(tgt, ignore_errors=True)
current_app.logger.info(f"Deleted path for task {task_id}: {tgt}")
else:
current_app.logger.warning(f"Path not found for task {task_id}: {tgt}")
except Exception as e:
current_app.logger.error(f"Failed to delete path {tgt} for task {task_id}: {e}")
# Hard delete from database
conn.execute(
"DELETE FROM tasks WHERE task_id = ?",
(task_id,)
)
conn.commit()
current_app.logger.info(f"Hard deleted task {task_id} from database")
except Exception as e:
current_app.logger.error(f"Failed to delete task {task_id}: {str(e)}", exc_info=True)
# Continue with other tasks even if one fails
except Exception as e:
current_app.logger.error(f"Database error during cleanup: {str(e)}", exc_info=True)
finally:
if 'conn' in locals():
conn.close()
def reconcile_tasks_on_startup():
"""Reconcile task states on application startup."""
db_path = _get_db_path()
try:
conn = sqlite3.connect(str(db_path), check_same_thread=False)
conn.execute("PRAGMA foreign_keys=ON")
# Fix tasks that were downloaded but don't have delete_after_at set
# This handles cases where the app crashed after marking downloaded but before setting delete time
conn.execute("""
UPDATE tasks
SET delete_after_at = COALESCE(delete_after_at, datetime(downloaded_at, '+10 minutes'))
WHERE downloaded_at IS NOT NULL
AND deleted_at IS NULL
AND delete_after_at IS NULL
""")
updated_count = conn.total_changes
if updated_count > 0:
current_app.logger.info(f"Startup reconciliation: Fixed delete_after_at for {updated_count} downloaded tasks")
# Check for tasks with output directories that no longer exist
# This helps clean up database entries for manually deleted directories
rows = conn.execute("""
SELECT task_id, output_dir
FROM tasks
WHERE output_dir IS NOT NULL
AND deleted_at IS NULL
""").fetchall()
orphaned_count = 0
for row in rows:
task_id_from_db = row[0] # task_id是第一个字段
output_dir_from_db = row[1] # output_dir是第二个字段
if not Path(output_dir_from_db).exists():
conn.execute(
"DELETE FROM tasks WHERE task_id = ?",
(task_id_from_db,)
)
orphaned_count += 1
if orphaned_count > 0:
current_app.logger.info(f"Startup reconciliation: Marked {orphaned_count} tasks as deleted (directories not found)")
# Backfill output_dir for rows with NULL, using OUTPUT_FOLDER/<task_id>
output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '')
if output_base:
rows2 = conn.execute("""
SELECT task_id FROM tasks
WHERE (output_dir IS NULL OR output_dir = '')
AND deleted_at IS NULL
""").fetchall()
for r in rows2:
task_id_from_db = r[0] # task_id是第一个字段
guess = output_base / task_id_from_db
conn.execute("UPDATE tasks SET output_dir = ? WHERE task_id = ?", (str(guess), task_id_from_db))
current_app.logger.info(f"Backfilled output_dir for task {task_id_from_db}: {guess}")
conn.commit()
except Exception as e:
current_app.logger.error(f"Startup reconciliation error: {str(e)}", exc_info=True)
finally:
if 'conn' in locals():
conn.close()
def _get_db_path():
"""Get database file path from app config."""
from .db import get_db_path as get_config_db_path
return get_config_db_path(current_app)