"""Sweep periódico dos CSVs efêmeros gerados pelo RunSqlTool em data_storage/.""" from __future__ import annotations import asyncio import os import time from pathlib import Path from typing import Optional CSV_DIR = Path(__file__).parent / "data_storage" TTL_SECONDS = int(os.environ.get("CSV_TTL_SECONDS", "1800")) SWEEP_INTERVAL_SECONDS = int(os.environ.get("CSV_SWEEP_INTERVAL_SECONDS", "600")) _task: Optional[asyncio.Task] = None def sweep_once() -> int: """Apaga CSVs com mtime > TTL_SECONDS. Idempotente.""" if not CSV_DIR.exists(): return 0 cutoff = time.time() - TTL_SECONDS removed = 0 for csv in CSV_DIR.rglob("query_results_*.csv"): try: if csv.stat().st_mtime < cutoff: csv.unlink() removed += 1 except FileNotFoundError: pass return removed async def _periodic() -> None: while True: await asyncio.sleep(SWEEP_INTERVAL_SECONDS) try: n = sweep_once() if n: print(f"[csv_cleanup] periodic sweep removed {n} CSVs") except Exception as e: print(f"[csv_cleanup] sweep failed: {e!r}") async def startup() -> None: global _task n = sweep_once() if n: print(f"[csv_cleanup] startup sweep removed {n} CSVs") _task = asyncio.create_task(_periodic()) async def shutdown() -> None: global _task if _task is not None: _task.cancel() try: await _task except asyncio.CancelledError: pass _task = None