vanna-clubpetro/rls_runner.py
leonardosalazar-cp 1d152c0dce Initial commit: Vanna 2.0 deployment for ClubPetro
Wrapper application around upstream Vanna with:
- Tenant-aware ChromaDB memory (per program/store)
- ClickHouse RLS runner with introspection guards
- PT-BR system prompt and chat translations
- Custom Plotly chart generator (ranked bar, datetime coercion)
- Embed bootstrap (theme pierce + i18n + markdown) shared by demo and React app
- Event sink for chat turn observability
2026-04-29 17:22:05 -03:00

156 lines
6.1 KiB
Python

"""ClickHouseRunner com RLS via additional_table_filters."""
from __future__ import annotations
import re
from datetime import date, datetime
from decimal import Decimal
import pandas as pd
from vanna.capabilities.sql_runner import RunSqlToolArgs
from vanna.core.tool import ToolContext
from vanna.integrations.clickhouse import ClickHouseRunner
# Casas decimais exibidas no dataframe rich component da UI. ClickHouse retorna
# Decimal(N, 6) por padrão e o web component renderiza via toLocaleString() sem
# rounding, então valores tipo `307427.030000` aparecem na tabela. Arredondar
# aqui é mais simples que forkar o renderer upstream (rebuild npm + perde no
# git pull) e cobre todas as queries sem depender de `round(..., 2)` no SELECT.
_DISPLAY_DECIMALS = 2
# Formato pt-BR pras datas exibidas no dataframe. ClickHouse devolve Date /
# DateTime, pandas mantém como datetime64 ou objects de date/datetime, e o
# upstream `from_records` não infere `column_types` (fica vazio), então o
# renderer JS cai no default `String(value)` e mostra ISO `2026-01-01`.
# Formatamos pra string aqui — dataframe + CSV ficam consistentes.
# `_coerce_datetime_columns` em viz_tool.py reconhece o formato BR pra
# que charts de série temporal continuem funcionando.
_DATE_FMT = "%d/%m/%Y"
_DATETIME_FMT = "%d/%m/%Y %H:%M"
RLS_TABLES = ("gold.sales",)
DENIED_TABLES = ("gold.vw_relatorios_exportaveis_analitico_vendas",)
# Bloqueio app-side de introspecção de schema. ClickHouse Cloud não permite
# REVOKE efetivo em system.* (acesso herdado de role default), então
# rejeitamos a query antes dela chegar no banco. Caso contrário o LLM lê
# system.tables / SHOW TABLES e descobre colunas revogadas no DDL, depois
# propõe queries que falham e dão UX ruim ao usuário.
_FORBIDDEN_SCHEMA_RE = re.compile(
r"\b(?:system|information_schema)\s*\.\s*[a-zA-Z_][a-zA-Z0-9_]*",
re.IGNORECASE,
)
_INTROSPECTION_STMT_RE = re.compile(
r"\b(?:SHOW|DESCRIBE|EXPLAIN)\s+",
re.IGNORECASE,
)
_ID_RE = re.compile(r"^[A-Za-z0-9_-]+$")
def _require_id(name: str, value: object) -> str:
if value is None or not isinstance(value, str) or not _ID_RE.fullmatch(value):
raise PermissionError(
f"RLS context missing/invalid: {name} must match {_ID_RE.pattern}"
)
return value
def _quote(value: str) -> str:
return "'" + value.replace("'", "''") + "'"
def _format_table_filter_map(entries):
# additional_table_filters expects a ClickHouse Map(String, String) literal.
# clickhouse_connect serializes Python dicts as JSON (double quotes) which
# the server rejects, so we build the literal text ourselves.
items = ", ".join(f"{_quote(table)}: {_quote(expr)}" for table, expr in entries)
return "{" + items + "}"
def _round_value(v, ndigits: int):
if v is None:
return v
if isinstance(v, Decimal):
return float(round(v, ndigits))
if isinstance(v, float):
return round(v, ndigits)
return v
def _round_decimal_columns(df: pd.DataFrame, ndigits: int = _DISPLAY_DECIMALS) -> pd.DataFrame:
for col in df.columns:
s = df[col]
if pd.api.types.is_float_dtype(s):
df[col] = s.round(ndigits)
elif s.dtype == object:
sample = s.dropna().head(5)
if len(sample) and all(isinstance(v, (Decimal, float)) for v in sample):
df[col] = s.map(lambda v: _round_value(v, ndigits))
return df
def _format_date_columns(df: pd.DataFrame) -> pd.DataFrame:
for col in df.columns:
s = df[col]
if pd.api.types.is_datetime64_any_dtype(s):
# datetime64 inclui timestamps com hora; usar HH:MM se houver
# algum valor com componente temporal não-zero, senão só data.
has_time = bool(
((s.dt.hour.fillna(0) != 0) | (s.dt.minute.fillna(0) != 0)).any()
)
df[col] = s.dt.strftime(_DATETIME_FMT if has_time else _DATE_FMT)
elif s.dtype == object:
sample = s.dropna().head(5)
if len(sample) and all(isinstance(v, (date, datetime)) for v in sample):
has_time = any(
isinstance(v, datetime) and (v.hour or v.minute or v.second)
for v in sample
)
fmt = _DATETIME_FMT if has_time else _DATE_FMT
df[col] = s.map(lambda v: v.strftime(fmt) if v is not None else v)
return df
class RLSClickHouseRunner(ClickHouseRunner):
"""ClickHouseRunner que injeta additional_table_filters em toda query."""
async def run_sql(
self, args: RunSqlToolArgs, context: ToolContext
) -> pd.DataFrame:
if _FORBIDDEN_SCHEMA_RE.search(args.sql) or _INTROSPECTION_STMT_RE.search(args.sql):
raise PermissionError(
"Query rejected: schema introspection (SHOW / DESCRIBE / "
"EXPLAIN) and access to system.* / information_schema.* are "
"not allowed. The only table available is `gold.sales` — "
"use the columns listed in your context to query it directly."
)
program_id = _require_id("program_id", getattr(context.user, "program_id", None))
store_id = _require_id("store_id", getattr(context.user, "store_id", None))
filter_expr = f"program_id = '{program_id}' AND store_id = '{store_id}'"
entries = [(table, filter_expr) for table in RLS_TABLES]
entries += [(table, "0") for table in DENIED_TABLES]
additional = _format_table_filter_map(entries)
client = self.clickhouse_connect.get_client(
host=self.host,
port=self.port,
username=self.user,
password=self.password,
database=self.database,
**self.kwargs,
)
try:
result = client.query(
args.sql,
settings={"additional_table_filters": additional},
)
df = pd.DataFrame(result.result_rows, columns=result.column_names)
df = _round_decimal_columns(df)
return _format_date_columns(df)
finally:
client.close()