vanna-clubpetro/events_sink.py
leonardosalazar-cp 1d152c0dce Initial commit: Vanna 2.0 deployment for ClubPetro
Wrapper application around upstream Vanna with:
- Tenant-aware ChromaDB memory (per program/store)
- ClickHouse RLS runner with introspection guards
- PT-BR system prompt and chat translations
- Custom Plotly chart generator (ranked bar, datetime coercion)
- Embed bootstrap (theme pierce + i18n + markdown) shared by demo and React app
- Event sink for chat turn observability
2026-04-29 17:22:05 -03:00

155 lines
4.7 KiB
Python

"""Sink que grava 1 row por turno de chat em `events.vanna_ai` (ClickHouse).
Fluxo:
- `chat_filter.py` cria um `TurnRecord` no início de `handle_stream` e seta o
ContextVar `_current_turn`.
- Hooks em `EventCapturingToolRegistry.transform_args` (tools/SQL) e
`VisualizeDataToolPT.execute` (charts) appendam no TurnRecord ativo via
`record_tool` / `record_sql` / `record_chart`.
- `chat_filter.py` acumula response chunks (rich.type=='text') e chama
`EventSink.flush(rec)` no fim do stream.
ContextVar funciona porque o agent loop do Vanna roda inteiro num único
asyncio task — todos os hooks downstream herdam o context do `set_turn`.
Falhas no insert NUNCA quebram a resposta ao usuário (try/except + log).
"""
from __future__ import annotations
import asyncio
import contextvars
import logging
import os
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional, Tuple
import clickhouse_connect
logger = logging.getLogger(__name__)
@dataclass
class TurnRecord:
started_at: float = field(default_factory=time.time)
program_id: str = ""
store_id: str = ""
user_id: str = ""
conversation_id: str = ""
request_id: str = ""
question: str = ""
response_parts: List[str] = field(default_factory=list)
tools_called: List[str] = field(default_factory=list)
sqls_executed: List[str] = field(default_factory=list)
charts_emitted: List[Tuple[str, str]] = field(default_factory=list)
status: str = "ok"
error_message: str = ""
_current_turn: contextvars.ContextVar[Optional[TurnRecord]] = contextvars.ContextVar(
"vanna_current_turn", default=None
)
def get_turn() -> Optional[TurnRecord]:
return _current_turn.get()
def set_turn(rec: TurnRecord) -> contextvars.Token:
return _current_turn.set(rec)
def reset_turn(token: contextvars.Token) -> None:
_current_turn.reset(token)
def record_tool(name: str) -> None:
rec = _current_turn.get()
if rec is not None:
rec.tools_called.append(name)
def record_sql(sql: str) -> None:
rec = _current_turn.get()
if rec is not None:
rec.sqls_executed.append(sql)
def record_chart(chart_type: str, title: str) -> None:
rec = _current_turn.get()
if rec is not None:
rec.charts_emitted.append((chart_type, title))
class EventSink:
"""Insere TurnRecord em `events.vanna_ai`. Cliente CH lazy."""
def __init__(self, database: str = "events", table: str = "vanna_ai"):
self._database = database
self._table = table
self._client = None
def _get_client(self):
if self._client is None:
self._client = clickhouse_connect.get_client(
host=os.environ["CLICKHOUSE_HOST"],
port=int(os.environ["CLICKHOUSE_PORT"]),
username=os.environ["CLICKHOUSE_USER"],
password=os.environ["CLICKHOUSE_PASSWORD"],
secure=os.environ.get("CLICKHOUSE_SECURE", "true").lower() == "true",
)
return self._client
async def flush(self, rec: TurnRecord) -> None:
try:
await asyncio.to_thread(self._insert, rec)
except Exception as e:
logger.warning("event sink flush failed: %s", e)
def _insert(self, rec: TurnRecord) -> None:
ended_at = time.time()
duration_ms = int((ended_at - rec.started_at) * 1000)
response = "".join(rec.response_parts)
client = self._get_client()
client.insert(
table=self._table,
database=self._database,
data=[
[
datetime.fromtimestamp(rec.started_at, tz=timezone.utc),
datetime.fromtimestamp(ended_at, tz=timezone.utc),
duration_ms,
rec.conversation_id,
rec.request_id,
rec.program_id,
rec.store_id,
rec.user_id,
rec.question,
response,
rec.tools_called,
rec.sqls_executed,
rec.charts_emitted,
rec.status,
rec.error_message,
]
],
column_names=[
"started_at",
"ended_at",
"duration_ms",
"conversation_id",
"request_id",
"program_id",
"store_id",
"user_id",
"question",
"response",
"tools_called",
"sqls_executed",
"charts_emitted",
"status",
"error_message",
],
)