Wrapper application around upstream Vanna with: - Tenant-aware ChromaDB memory (per program/store) - ClickHouse RLS runner with introspection guards - PT-BR system prompt and chat translations - Custom Plotly chart generator (ranked bar, datetime coercion) - Embed bootstrap (theme pierce + i18n + markdown) shared by demo and React app - Event sink for chat turn observability
155 lines
4.7 KiB
Python
155 lines
4.7 KiB
Python
"""Sink que grava 1 row por turno de chat em `events.vanna_ai` (ClickHouse).
|
|
|
|
Fluxo:
|
|
- `chat_filter.py` cria um `TurnRecord` no início de `handle_stream` e seta o
|
|
ContextVar `_current_turn`.
|
|
- Hooks em `EventCapturingToolRegistry.transform_args` (tools/SQL) e
|
|
`VisualizeDataToolPT.execute` (charts) appendam no TurnRecord ativo via
|
|
`record_tool` / `record_sql` / `record_chart`.
|
|
- `chat_filter.py` acumula response chunks (rich.type=='text') e chama
|
|
`EventSink.flush(rec)` no fim do stream.
|
|
|
|
ContextVar funciona porque o agent loop do Vanna roda inteiro num único
|
|
asyncio task — todos os hooks downstream herdam o context do `set_turn`.
|
|
Falhas no insert NUNCA quebram a resposta ao usuário (try/except + log).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextvars
|
|
import logging
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional, Tuple
|
|
|
|
import clickhouse_connect
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TurnRecord:
|
|
started_at: float = field(default_factory=time.time)
|
|
program_id: str = ""
|
|
store_id: str = ""
|
|
user_id: str = ""
|
|
conversation_id: str = ""
|
|
request_id: str = ""
|
|
question: str = ""
|
|
response_parts: List[str] = field(default_factory=list)
|
|
tools_called: List[str] = field(default_factory=list)
|
|
sqls_executed: List[str] = field(default_factory=list)
|
|
charts_emitted: List[Tuple[str, str]] = field(default_factory=list)
|
|
status: str = "ok"
|
|
error_message: str = ""
|
|
|
|
|
|
_current_turn: contextvars.ContextVar[Optional[TurnRecord]] = contextvars.ContextVar(
|
|
"vanna_current_turn", default=None
|
|
)
|
|
|
|
|
|
def get_turn() -> Optional[TurnRecord]:
|
|
return _current_turn.get()
|
|
|
|
|
|
def set_turn(rec: TurnRecord) -> contextvars.Token:
|
|
return _current_turn.set(rec)
|
|
|
|
|
|
def reset_turn(token: contextvars.Token) -> None:
|
|
_current_turn.reset(token)
|
|
|
|
|
|
def record_tool(name: str) -> None:
|
|
rec = _current_turn.get()
|
|
if rec is not None:
|
|
rec.tools_called.append(name)
|
|
|
|
|
|
def record_sql(sql: str) -> None:
|
|
rec = _current_turn.get()
|
|
if rec is not None:
|
|
rec.sqls_executed.append(sql)
|
|
|
|
|
|
def record_chart(chart_type: str, title: str) -> None:
|
|
rec = _current_turn.get()
|
|
if rec is not None:
|
|
rec.charts_emitted.append((chart_type, title))
|
|
|
|
|
|
class EventSink:
|
|
"""Insere TurnRecord em `events.vanna_ai`. Cliente CH lazy."""
|
|
|
|
def __init__(self, database: str = "events", table: str = "vanna_ai"):
|
|
self._database = database
|
|
self._table = table
|
|
self._client = None
|
|
|
|
def _get_client(self):
|
|
if self._client is None:
|
|
self._client = clickhouse_connect.get_client(
|
|
host=os.environ["CLICKHOUSE_HOST"],
|
|
port=int(os.environ["CLICKHOUSE_PORT"]),
|
|
username=os.environ["CLICKHOUSE_USER"],
|
|
password=os.environ["CLICKHOUSE_PASSWORD"],
|
|
secure=os.environ.get("CLICKHOUSE_SECURE", "true").lower() == "true",
|
|
)
|
|
return self._client
|
|
|
|
async def flush(self, rec: TurnRecord) -> None:
|
|
try:
|
|
await asyncio.to_thread(self._insert, rec)
|
|
except Exception as e:
|
|
logger.warning("event sink flush failed: %s", e)
|
|
|
|
def _insert(self, rec: TurnRecord) -> None:
|
|
ended_at = time.time()
|
|
duration_ms = int((ended_at - rec.started_at) * 1000)
|
|
response = "".join(rec.response_parts)
|
|
client = self._get_client()
|
|
client.insert(
|
|
table=self._table,
|
|
database=self._database,
|
|
data=[
|
|
[
|
|
datetime.fromtimestamp(rec.started_at, tz=timezone.utc),
|
|
datetime.fromtimestamp(ended_at, tz=timezone.utc),
|
|
duration_ms,
|
|
rec.conversation_id,
|
|
rec.request_id,
|
|
rec.program_id,
|
|
rec.store_id,
|
|
rec.user_id,
|
|
rec.question,
|
|
response,
|
|
rec.tools_called,
|
|
rec.sqls_executed,
|
|
rec.charts_emitted,
|
|
rec.status,
|
|
rec.error_message,
|
|
]
|
|
],
|
|
column_names=[
|
|
"started_at",
|
|
"ended_at",
|
|
"duration_ms",
|
|
"conversation_id",
|
|
"request_id",
|
|
"program_id",
|
|
"store_id",
|
|
"user_id",
|
|
"question",
|
|
"response",
|
|
"tools_called",
|
|
"sqls_executed",
|
|
"charts_emitted",
|
|
"status",
|
|
"error_message",
|
|
],
|
|
)
|