Wrapper application around upstream Vanna with: - Tenant-aware ChromaDB memory (per program/store) - ClickHouse RLS runner with introspection guards - PT-BR system prompt and chat translations - Custom Plotly chart generator (ranked bar, datetime coercion) - Embed bootstrap (theme pierce + i18n + markdown) shared by demo and React app - Event sink for chat turn observability
156 lines
6.1 KiB
Python
156 lines
6.1 KiB
Python
"""ClickHouseRunner com RLS via additional_table_filters."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from datetime import date, datetime
|
|
from decimal import Decimal
|
|
|
|
import pandas as pd
|
|
from vanna.capabilities.sql_runner import RunSqlToolArgs
|
|
from vanna.core.tool import ToolContext
|
|
from vanna.integrations.clickhouse import ClickHouseRunner
|
|
|
|
# Casas decimais exibidas no dataframe rich component da UI. ClickHouse retorna
|
|
# Decimal(N, 6) por padrão e o web component renderiza via toLocaleString() sem
|
|
# rounding, então valores tipo `307427.030000` aparecem na tabela. Arredondar
|
|
# aqui é mais simples que forkar o renderer upstream (rebuild npm + perde no
|
|
# git pull) e cobre todas as queries sem depender de `round(..., 2)` no SELECT.
|
|
_DISPLAY_DECIMALS = 2
|
|
|
|
# Formato pt-BR pras datas exibidas no dataframe. ClickHouse devolve Date /
|
|
# DateTime, pandas mantém como datetime64 ou objects de date/datetime, e o
|
|
# upstream `from_records` não infere `column_types` (fica vazio), então o
|
|
# renderer JS cai no default `String(value)` e mostra ISO `2026-01-01`.
|
|
# Formatamos pra string aqui — dataframe + CSV ficam consistentes.
|
|
# `_coerce_datetime_columns` em viz_tool.py reconhece o formato BR pra
|
|
# que charts de série temporal continuem funcionando.
|
|
_DATE_FMT = "%d/%m/%Y"
|
|
_DATETIME_FMT = "%d/%m/%Y %H:%M"
|
|
|
|
RLS_TABLES = ("gold.sales",)
|
|
|
|
DENIED_TABLES = ("gold.vw_relatorios_exportaveis_analitico_vendas",)
|
|
|
|
# Bloqueio app-side de introspecção de schema. ClickHouse Cloud não permite
|
|
# REVOKE efetivo em system.* (acesso herdado de role default), então
|
|
# rejeitamos a query antes dela chegar no banco. Caso contrário o LLM lê
|
|
# system.tables / SHOW TABLES e descobre colunas revogadas no DDL, depois
|
|
# propõe queries que falham e dão UX ruim ao usuário.
|
|
_FORBIDDEN_SCHEMA_RE = re.compile(
|
|
r"\b(?:system|information_schema)\s*\.\s*[a-zA-Z_][a-zA-Z0-9_]*",
|
|
re.IGNORECASE,
|
|
)
|
|
_INTROSPECTION_STMT_RE = re.compile(
|
|
r"\b(?:SHOW|DESCRIBE|EXPLAIN)\s+",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_ID_RE = re.compile(r"^[A-Za-z0-9_-]+$")
|
|
|
|
|
|
def _require_id(name: str, value: object) -> str:
|
|
if value is None or not isinstance(value, str) or not _ID_RE.fullmatch(value):
|
|
raise PermissionError(
|
|
f"RLS context missing/invalid: {name} must match {_ID_RE.pattern}"
|
|
)
|
|
return value
|
|
|
|
|
|
def _quote(value: str) -> str:
|
|
return "'" + value.replace("'", "''") + "'"
|
|
|
|
|
|
def _format_table_filter_map(entries):
|
|
# additional_table_filters expects a ClickHouse Map(String, String) literal.
|
|
# clickhouse_connect serializes Python dicts as JSON (double quotes) which
|
|
# the server rejects, so we build the literal text ourselves.
|
|
items = ", ".join(f"{_quote(table)}: {_quote(expr)}" for table, expr in entries)
|
|
return "{" + items + "}"
|
|
|
|
|
|
def _round_value(v, ndigits: int):
|
|
if v is None:
|
|
return v
|
|
if isinstance(v, Decimal):
|
|
return float(round(v, ndigits))
|
|
if isinstance(v, float):
|
|
return round(v, ndigits)
|
|
return v
|
|
|
|
|
|
def _round_decimal_columns(df: pd.DataFrame, ndigits: int = _DISPLAY_DECIMALS) -> pd.DataFrame:
|
|
for col in df.columns:
|
|
s = df[col]
|
|
if pd.api.types.is_float_dtype(s):
|
|
df[col] = s.round(ndigits)
|
|
elif s.dtype == object:
|
|
sample = s.dropna().head(5)
|
|
if len(sample) and all(isinstance(v, (Decimal, float)) for v in sample):
|
|
df[col] = s.map(lambda v: _round_value(v, ndigits))
|
|
return df
|
|
|
|
|
|
def _format_date_columns(df: pd.DataFrame) -> pd.DataFrame:
|
|
for col in df.columns:
|
|
s = df[col]
|
|
if pd.api.types.is_datetime64_any_dtype(s):
|
|
# datetime64 inclui timestamps com hora; usar HH:MM se houver
|
|
# algum valor com componente temporal não-zero, senão só data.
|
|
has_time = bool(
|
|
((s.dt.hour.fillna(0) != 0) | (s.dt.minute.fillna(0) != 0)).any()
|
|
)
|
|
df[col] = s.dt.strftime(_DATETIME_FMT if has_time else _DATE_FMT)
|
|
elif s.dtype == object:
|
|
sample = s.dropna().head(5)
|
|
if len(sample) and all(isinstance(v, (date, datetime)) for v in sample):
|
|
has_time = any(
|
|
isinstance(v, datetime) and (v.hour or v.minute or v.second)
|
|
for v in sample
|
|
)
|
|
fmt = _DATETIME_FMT if has_time else _DATE_FMT
|
|
df[col] = s.map(lambda v: v.strftime(fmt) if v is not None else v)
|
|
return df
|
|
|
|
|
|
class RLSClickHouseRunner(ClickHouseRunner):
|
|
"""ClickHouseRunner que injeta additional_table_filters em toda query."""
|
|
|
|
async def run_sql(
|
|
self, args: RunSqlToolArgs, context: ToolContext
|
|
) -> pd.DataFrame:
|
|
if _FORBIDDEN_SCHEMA_RE.search(args.sql) or _INTROSPECTION_STMT_RE.search(args.sql):
|
|
raise PermissionError(
|
|
"Query rejected: schema introspection (SHOW / DESCRIBE / "
|
|
"EXPLAIN) and access to system.* / information_schema.* are "
|
|
"not allowed. The only table available is `gold.sales` — "
|
|
"use the columns listed in your context to query it directly."
|
|
)
|
|
|
|
program_id = _require_id("program_id", getattr(context.user, "program_id", None))
|
|
store_id = _require_id("store_id", getattr(context.user, "store_id", None))
|
|
|
|
filter_expr = f"program_id = '{program_id}' AND store_id = '{store_id}'"
|
|
entries = [(table, filter_expr) for table in RLS_TABLES]
|
|
entries += [(table, "0") for table in DENIED_TABLES]
|
|
additional = _format_table_filter_map(entries)
|
|
|
|
client = self.clickhouse_connect.get_client(
|
|
host=self.host,
|
|
port=self.port,
|
|
username=self.user,
|
|
password=self.password,
|
|
database=self.database,
|
|
**self.kwargs,
|
|
)
|
|
try:
|
|
result = client.query(
|
|
args.sql,
|
|
settings={"additional_table_filters": additional},
|
|
)
|
|
df = pd.DataFrame(result.result_rows, columns=result.column_names)
|
|
df = _round_decimal_columns(df)
|
|
return _format_date_columns(df)
|
|
finally:
|
|
client.close()
|