"""ClickHouseRunner com RLS via additional_table_filters.""" from __future__ import annotations import re from datetime import date, datetime from decimal import Decimal import pandas as pd from vanna.capabilities.sql_runner import RunSqlToolArgs from vanna.core.tool import ToolContext from vanna.integrations.clickhouse import ClickHouseRunner # Casas decimais exibidas no dataframe rich component da UI. ClickHouse retorna # Decimal(N, 6) por padrão e o web component renderiza via toLocaleString() sem # rounding, então valores tipo `307427.030000` aparecem na tabela. Arredondar # aqui é mais simples que forkar o renderer upstream (rebuild npm + perde no # git pull) e cobre todas as queries sem depender de `round(..., 2)` no SELECT. _DISPLAY_DECIMALS = 2 # Formato pt-BR pras datas exibidas no dataframe. ClickHouse devolve Date / # DateTime, pandas mantém como datetime64 ou objects de date/datetime, e o # upstream `from_records` não infere `column_types` (fica vazio), então o # renderer JS cai no default `String(value)` e mostra ISO `2026-01-01`. # Formatamos pra string aqui — dataframe + CSV ficam consistentes. # `_coerce_datetime_columns` em viz_tool.py reconhece o formato BR pra # que charts de série temporal continuem funcionando. _DATE_FMT = "%d/%m/%Y" _DATETIME_FMT = "%d/%m/%Y %H:%M" RLS_TABLES = ("gold.sales",) DENIED_TABLES = ("gold.vw_relatorios_exportaveis_analitico_vendas",) # Bloqueio app-side de introspecção de schema. ClickHouse Cloud não permite # REVOKE efetivo em system.* (acesso herdado de role default), então # rejeitamos a query antes dela chegar no banco. Caso contrário o LLM lê # system.tables / SHOW TABLES e descobre colunas revogadas no DDL, depois # propõe queries que falham e dão UX ruim ao usuário. _FORBIDDEN_SCHEMA_RE = re.compile( r"\b(?:system|information_schema)\s*\.\s*[a-zA-Z_][a-zA-Z0-9_]*", re.IGNORECASE, ) _INTROSPECTION_STMT_RE = re.compile( r"\b(?:SHOW|DESCRIBE|EXPLAIN)\s+", re.IGNORECASE, ) _ID_RE = re.compile(r"^[A-Za-z0-9_-]+$") def _require_id(name: str, value: object) -> str: if value is None or not isinstance(value, str) or not _ID_RE.fullmatch(value): raise PermissionError( f"RLS context missing/invalid: {name} must match {_ID_RE.pattern}" ) return value def _quote(value: str) -> str: return "'" + value.replace("'", "''") + "'" def _format_table_filter_map(entries): # additional_table_filters expects a ClickHouse Map(String, String) literal. # clickhouse_connect serializes Python dicts as JSON (double quotes) which # the server rejects, so we build the literal text ourselves. items = ", ".join(f"{_quote(table)}: {_quote(expr)}" for table, expr in entries) return "{" + items + "}" def _round_value(v, ndigits: int): if v is None: return v if isinstance(v, Decimal): return float(round(v, ndigits)) if isinstance(v, float): return round(v, ndigits) return v def _round_decimal_columns(df: pd.DataFrame, ndigits: int = _DISPLAY_DECIMALS) -> pd.DataFrame: for col in df.columns: s = df[col] if pd.api.types.is_float_dtype(s): df[col] = s.round(ndigits) elif s.dtype == object: sample = s.dropna().head(5) if len(sample) and all(isinstance(v, (Decimal, float)) for v in sample): df[col] = s.map(lambda v: _round_value(v, ndigits)) return df def _format_date_columns(df: pd.DataFrame) -> pd.DataFrame: for col in df.columns: s = df[col] if pd.api.types.is_datetime64_any_dtype(s): # datetime64 inclui timestamps com hora; usar HH:MM se houver # algum valor com componente temporal não-zero, senão só data. has_time = bool( ((s.dt.hour.fillna(0) != 0) | (s.dt.minute.fillna(0) != 0)).any() ) df[col] = s.dt.strftime(_DATETIME_FMT if has_time else _DATE_FMT) elif s.dtype == object: sample = s.dropna().head(5) if len(sample) and all(isinstance(v, (date, datetime)) for v in sample): has_time = any( isinstance(v, datetime) and (v.hour or v.minute or v.second) for v in sample ) fmt = _DATETIME_FMT if has_time else _DATE_FMT df[col] = s.map(lambda v: v.strftime(fmt) if v is not None else v) return df class RLSClickHouseRunner(ClickHouseRunner): """ClickHouseRunner que injeta additional_table_filters em toda query.""" async def run_sql( self, args: RunSqlToolArgs, context: ToolContext ) -> pd.DataFrame: if _FORBIDDEN_SCHEMA_RE.search(args.sql) or _INTROSPECTION_STMT_RE.search(args.sql): raise PermissionError( "Query rejected: schema introspection (SHOW / DESCRIBE / " "EXPLAIN) and access to system.* / information_schema.* are " "not allowed. The only table available is `gold.sales` — " "use the columns listed in your context to query it directly." ) program_id = _require_id("program_id", getattr(context.user, "program_id", None)) store_id = _require_id("store_id", getattr(context.user, "store_id", None)) filter_expr = f"program_id = '{program_id}' AND store_id = '{store_id}'" entries = [(table, filter_expr) for table in RLS_TABLES] entries += [(table, "0") for table in DENIED_TABLES] additional = _format_table_filter_map(entries) client = self.clickhouse_connect.get_client( host=self.host, port=self.port, username=self.user, password=self.password, database=self.database, **self.kwargs, ) try: result = client.query( args.sql, settings={"additional_table_filters": additional}, ) df = pd.DataFrame(result.result_rows, columns=result.column_names) df = _round_decimal_columns(df) return _format_date_columns(df) finally: client.close()