"""Sink que grava 1 row por turno de chat em `events.vanna_ai` (ClickHouse). Fluxo: - `chat_filter.py` cria um `TurnRecord` no início de `handle_stream` e seta o ContextVar `_current_turn`. - Hooks em `EventCapturingToolRegistry.transform_args` (tools/SQL) e `VisualizeDataToolPT.execute` (charts) appendam no TurnRecord ativo via `record_tool` / `record_sql` / `record_chart`. - `chat_filter.py` acumula response chunks (rich.type=='text') e chama `EventSink.flush(rec)` no fim do stream. ContextVar funciona porque o agent loop do Vanna roda inteiro num único asyncio task — todos os hooks downstream herdam o context do `set_turn`. Falhas no insert NUNCA quebram a resposta ao usuário (try/except + log). """ from __future__ import annotations import asyncio import contextvars import logging import os import time from dataclasses import dataclass, field from datetime import datetime, timezone from typing import List, Optional, Tuple import clickhouse_connect logger = logging.getLogger(__name__) @dataclass class TurnRecord: started_at: float = field(default_factory=time.time) program_id: str = "" store_id: str = "" user_id: str = "" conversation_id: str = "" request_id: str = "" question: str = "" response_parts: List[str] = field(default_factory=list) tools_called: List[str] = field(default_factory=list) sqls_executed: List[str] = field(default_factory=list) charts_emitted: List[Tuple[str, str]] = field(default_factory=list) status: str = "ok" error_message: str = "" _current_turn: contextvars.ContextVar[Optional[TurnRecord]] = contextvars.ContextVar( "vanna_current_turn", default=None ) def get_turn() -> Optional[TurnRecord]: return _current_turn.get() def set_turn(rec: TurnRecord) -> contextvars.Token: return _current_turn.set(rec) def reset_turn(token: contextvars.Token) -> None: _current_turn.reset(token) def record_tool(name: str) -> None: rec = _current_turn.get() if rec is not None: rec.tools_called.append(name) def record_sql(sql: str) -> None: rec = _current_turn.get() if rec is not None: rec.sqls_executed.append(sql) def record_chart(chart_type: str, title: str) -> None: rec = _current_turn.get() if rec is not None: rec.charts_emitted.append((chart_type, title)) class EventSink: """Insere TurnRecord em `events.vanna_ai`. Cliente CH lazy.""" def __init__(self, database: str = "events", table: str = "vanna_ai"): self._database = database self._table = table self._client = None def _get_client(self): if self._client is None: self._client = clickhouse_connect.get_client( host=os.environ["CLICKHOUSE_HOST"], port=int(os.environ["CLICKHOUSE_PORT"]), username=os.environ["CLICKHOUSE_USER"], password=os.environ["CLICKHOUSE_PASSWORD"], secure=os.environ.get("CLICKHOUSE_SECURE", "true").lower() == "true", ) return self._client async def flush(self, rec: TurnRecord) -> None: try: await asyncio.to_thread(self._insert, rec) except Exception as e: logger.warning("event sink flush failed: %s", e) def _insert(self, rec: TurnRecord) -> None: ended_at = time.time() duration_ms = int((ended_at - rec.started_at) * 1000) response = "".join(rec.response_parts) client = self._get_client() client.insert( table=self._table, database=self._database, data=[ [ datetime.fromtimestamp(rec.started_at, tz=timezone.utc), datetime.fromtimestamp(ended_at, tz=timezone.utc), duration_ms, rec.conversation_id, rec.request_id, rec.program_id, rec.store_id, rec.user_id, rec.question, response, rec.tools_called, rec.sqls_executed, rec.charts_emitted, rec.status, rec.error_message, ] ], column_names=[ "started_at", "ended_at", "duration_ms", "conversation_id", "request_id", "program_id", "store_id", "user_id", "question", "response", "tools_called", "sqls_executed", "charts_emitted", "status", "error_message", ], )