Wrapper application around upstream Vanna with: - Tenant-aware ChromaDB memory (per program/store) - ClickHouse RLS runner with introspection guards - PT-BR system prompt and chat translations - Custom Plotly chart generator (ranked bar, datetime coercion) - Embed bootstrap (theme pierce + i18n + markdown) shared by demo and React app - Event sink for chat turn observability
138 lines
5.3 KiB
Python
138 lines
5.3 KiB
Python
"""ChatHandler subclass que filtra rich types, traduz strings e grava
|
|
1 row por turno em `events.vanna_ai` (via `EventSink`).
|
|
|
|
- Whitelist de rich.type: tudo que não esteja em ALLOWED_RICH_TYPES é dropado
|
|
antes de virar chunk SSE. Reduz ruído na UI (sem status_card, task_tracker,
|
|
notification, log_viewer, etc.).
|
|
- Tradução: strings hardcoded em inglês emitidas por
|
|
vanna/src/vanna/core/agent/agent.py (Response complete, Ready for next
|
|
message, etc.) viram PT via tabela exata. Não regex — quem cair fora da
|
|
tabela passa intocado.
|
|
- Captura de turno: cria um TurnRecord no início do stream, acumula
|
|
resposta (rich.type=='text'), e flush via EventSink no `finally`. Hooks
|
|
de tools/SQL/charts vivem em `agent.py` e `viz_tool.py` e escrevem no
|
|
mesmo TurnRecord via ContextVar.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import AsyncGenerator, Optional
|
|
|
|
from vanna.core import Agent
|
|
from vanna.servers.base import ChatHandler, ChatRequest, ChatStreamChunk
|
|
|
|
from events_sink import EventSink, TurnRecord, reset_turn, set_turn
|
|
|
|
ALLOWED_RICH_TYPES = frozenset(
|
|
{
|
|
"text",
|
|
"dataframe",
|
|
"chart",
|
|
"status_bar_update",
|
|
"chat_input_update",
|
|
}
|
|
)
|
|
|
|
|
|
TRANSLATIONS = {
|
|
# status_bar_update — message
|
|
"Ready": "Pronto",
|
|
"Workflow complete": "Fluxo concluído",
|
|
"Response complete": "Resposta concluída",
|
|
"Processing your request...": "Processando sua solicitação...",
|
|
"Executing tools...": "Executando ferramentas...",
|
|
"Tool limit reached": "Limite de ferramentas atingido",
|
|
"Error occurred": "Ocorreu um erro",
|
|
# status_bar_update — detail
|
|
"Choose an option or type a message": "Escolha uma opção ou digite uma mensagem",
|
|
"Analyzing query": "Analisando consulta",
|
|
"Ready for next message": "Pronto para a próxima mensagem",
|
|
"An unexpected error occurred while processing your message": "Ocorreu um erro inesperado ao processar sua mensagem",
|
|
# chat_input_update — placeholder
|
|
"Try again...": "Tente novamente...",
|
|
"Ask a question...": "Faça uma pergunta...",
|
|
"Ask a follow-up question...": "Faça uma pergunta complementar...",
|
|
"Continue the task or ask me something else...": "Continue a tarefa ou pergunte outra coisa...",
|
|
# dataframe — title
|
|
"Query Results": "Resultados",
|
|
}
|
|
|
|
|
|
def _translate(value):
|
|
if isinstance(value, str):
|
|
return TRANSLATIONS.get(value, value)
|
|
return value
|
|
|
|
|
|
def _rewrite_dataframe(data: dict) -> None:
|
|
"""Reescreve title/description do DataFrameComponent em pt-BR.
|
|
|
|
title "Query Results" → "Resultados" (via TRANSLATIONS).
|
|
description "SQL query returned N rows with M columns" — string dinâmica
|
|
do upstream (run_sql.py:112). Substituímos por contagem PT sem mencionar
|
|
"query"/SQL, usando os campos row_count/column_count que já vêm no chunk.
|
|
"""
|
|
if "description" in data and isinstance(data["description"], str):
|
|
if data["description"].startswith("SQL query returned"):
|
|
rows = data.get("row_count", 0)
|
|
cols = data.get("column_count", 0)
|
|
row_word = "linha" if rows == 1 else "linhas"
|
|
col_word = "coluna" if cols == 1 else "colunas"
|
|
data["description"] = f"{rows} {row_word} em {cols} {col_word}"
|
|
|
|
|
|
class FilteringChatHandler(ChatHandler):
|
|
"""Filtra rich.type, traduz UI strings, e grava 1 row por turno."""
|
|
|
|
def __init__(self, agent: Agent, event_sink: Optional[EventSink] = None):
|
|
super().__init__(agent)
|
|
self._sink = event_sink
|
|
|
|
async def handle_stream(
|
|
self, request: ChatRequest
|
|
) -> AsyncGenerator[ChatStreamChunk, None]:
|
|
qp = request.request_context.query_params or {}
|
|
rec = TurnRecord(
|
|
conversation_id=request.conversation_id or "",
|
|
request_id=request.request_id or "",
|
|
question=request.message or "",
|
|
program_id=str(qp.get("program_id", "")),
|
|
store_id=str(qp.get("store_id", "")),
|
|
user_id=str(qp.get("user_id", "")),
|
|
)
|
|
token = set_turn(rec)
|
|
try:
|
|
async for chunk in super().handle_stream(request):
|
|
if not rec.conversation_id:
|
|
rec.conversation_id = chunk.conversation_id
|
|
if not rec.request_id:
|
|
rec.request_id = chunk.request_id
|
|
|
|
rich = chunk.rich or {}
|
|
rich_type = rich.get("type")
|
|
if rich_type not in ALLOWED_RICH_TYPES:
|
|
continue
|
|
|
|
data = rich.get("data")
|
|
if isinstance(data, dict):
|
|
for field_name in ("message", "detail", "placeholder", "title"):
|
|
if field_name in data:
|
|
data[field_name] = _translate(data[field_name])
|
|
|
|
if rich_type == "dataframe":
|
|
_rewrite_dataframe(data)
|
|
elif rich_type == "text":
|
|
content = data.get("content")
|
|
if isinstance(content, str) and content:
|
|
rec.response_parts.append(content)
|
|
|
|
yield chunk
|
|
except Exception as e:
|
|
rec.status = "error"
|
|
rec.error_message = str(e)
|
|
raise
|
|
finally:
|
|
reset_turn(token)
|
|
if self._sink is not None:
|
|
await self._sink.flush(rec)
|