vanna-clubpetro/chat_filter.py
leonardosalazar-cp 1d152c0dce Initial commit: Vanna 2.0 deployment for ClubPetro
Wrapper application around upstream Vanna with:
- Tenant-aware ChromaDB memory (per program/store)
- ClickHouse RLS runner with introspection guards
- PT-BR system prompt and chat translations
- Custom Plotly chart generator (ranked bar, datetime coercion)
- Embed bootstrap (theme pierce + i18n + markdown) shared by demo and React app
- Event sink for chat turn observability
2026-04-29 17:22:05 -03:00

138 lines
5.3 KiB
Python

"""ChatHandler subclass que filtra rich types, traduz strings e grava
1 row por turno em `events.vanna_ai` (via `EventSink`).
- Whitelist de rich.type: tudo que não esteja em ALLOWED_RICH_TYPES é dropado
antes de virar chunk SSE. Reduz ruído na UI (sem status_card, task_tracker,
notification, log_viewer, etc.).
- Tradução: strings hardcoded em inglês emitidas por
vanna/src/vanna/core/agent/agent.py (Response complete, Ready for next
message, etc.) viram PT via tabela exata. Não regex — quem cair fora da
tabela passa intocado.
- Captura de turno: cria um TurnRecord no início do stream, acumula
resposta (rich.type=='text'), e flush via EventSink no `finally`. Hooks
de tools/SQL/charts vivem em `agent.py` e `viz_tool.py` e escrevem no
mesmo TurnRecord via ContextVar.
"""
from __future__ import annotations
from typing import AsyncGenerator, Optional
from vanna.core import Agent
from vanna.servers.base import ChatHandler, ChatRequest, ChatStreamChunk
from events_sink import EventSink, TurnRecord, reset_turn, set_turn
ALLOWED_RICH_TYPES = frozenset(
{
"text",
"dataframe",
"chart",
"status_bar_update",
"chat_input_update",
}
)
TRANSLATIONS = {
# status_bar_update — message
"Ready": "Pronto",
"Workflow complete": "Fluxo concluído",
"Response complete": "Resposta concluída",
"Processing your request...": "Processando sua solicitação...",
"Executing tools...": "Executando ferramentas...",
"Tool limit reached": "Limite de ferramentas atingido",
"Error occurred": "Ocorreu um erro",
# status_bar_update — detail
"Choose an option or type a message": "Escolha uma opção ou digite uma mensagem",
"Analyzing query": "Analisando consulta",
"Ready for next message": "Pronto para a próxima mensagem",
"An unexpected error occurred while processing your message": "Ocorreu um erro inesperado ao processar sua mensagem",
# chat_input_update — placeholder
"Try again...": "Tente novamente...",
"Ask a question...": "Faça uma pergunta...",
"Ask a follow-up question...": "Faça uma pergunta complementar...",
"Continue the task or ask me something else...": "Continue a tarefa ou pergunte outra coisa...",
# dataframe — title
"Query Results": "Resultados",
}
def _translate(value):
if isinstance(value, str):
return TRANSLATIONS.get(value, value)
return value
def _rewrite_dataframe(data: dict) -> None:
"""Reescreve title/description do DataFrameComponent em pt-BR.
title "Query Results""Resultados" (via TRANSLATIONS).
description "SQL query returned N rows with M columns" — string dinâmica
do upstream (run_sql.py:112). Substituímos por contagem PT sem mencionar
"query"/SQL, usando os campos row_count/column_count que já vêm no chunk.
"""
if "description" in data and isinstance(data["description"], str):
if data["description"].startswith("SQL query returned"):
rows = data.get("row_count", 0)
cols = data.get("column_count", 0)
row_word = "linha" if rows == 1 else "linhas"
col_word = "coluna" if cols == 1 else "colunas"
data["description"] = f"{rows} {row_word} em {cols} {col_word}"
class FilteringChatHandler(ChatHandler):
"""Filtra rich.type, traduz UI strings, e grava 1 row por turno."""
def __init__(self, agent: Agent, event_sink: Optional[EventSink] = None):
super().__init__(agent)
self._sink = event_sink
async def handle_stream(
self, request: ChatRequest
) -> AsyncGenerator[ChatStreamChunk, None]:
qp = request.request_context.query_params or {}
rec = TurnRecord(
conversation_id=request.conversation_id or "",
request_id=request.request_id or "",
question=request.message or "",
program_id=str(qp.get("program_id", "")),
store_id=str(qp.get("store_id", "")),
user_id=str(qp.get("user_id", "")),
)
token = set_turn(rec)
try:
async for chunk in super().handle_stream(request):
if not rec.conversation_id:
rec.conversation_id = chunk.conversation_id
if not rec.request_id:
rec.request_id = chunk.request_id
rich = chunk.rich or {}
rich_type = rich.get("type")
if rich_type not in ALLOWED_RICH_TYPES:
continue
data = rich.get("data")
if isinstance(data, dict):
for field_name in ("message", "detail", "placeholder", "title"):
if field_name in data:
data[field_name] = _translate(data[field_name])
if rich_type == "dataframe":
_rewrite_dataframe(data)
elif rich_type == "text":
content = data.get("content")
if isinstance(content, str) and content:
rec.response_parts.append(content)
yield chunk
except Exception as e:
rec.status = "error"
rec.error_message = str(e)
raise
finally:
reset_turn(token)
if self._sink is not None:
await self._sink.flush(rec)