Wrapper application around upstream Vanna with: - Tenant-aware ChromaDB memory (per program/store) - ClickHouse RLS runner with introspection guards - PT-BR system prompt and chat translations - Custom Plotly chart generator (ranked bar, datetime coercion) - Embed bootstrap (theme pierce + i18n + markdown) shared by demo and React app - Event sink for chat turn observability
108 lines
3.5 KiB
Python
108 lines
3.5 KiB
Python
"""Fetch ClickHouse schema and store it as text memories for the agent."""
|
|
|
|
import asyncio
|
|
import os
|
|
import clickhouse_connect
|
|
from dotenv import load_dotenv
|
|
|
|
from vanna.core.tool import ToolContext
|
|
from vanna.integrations.chromadb import ChromaAgentMemory
|
|
from vanna import User
|
|
|
|
from rls_runner import RLS_TABLES
|
|
|
|
# Colunas usadas internamente pelo RLS (additional_table_filters em rls_runner.py).
|
|
# Granted ao wren_ia para o filtro funcionar, mas escondidas do contexto do LLM.
|
|
RLS_INTERNAL_COLS = {"program_id", "store_id"}
|
|
|
|
load_dotenv()
|
|
|
|
|
|
def fetch_schema_docs() -> list[str]:
|
|
client = clickhouse_connect.get_client(
|
|
host=os.environ["CLICKHOUSE_HOST"],
|
|
port=int(os.environ["CLICKHOUSE_PORT"]),
|
|
username=os.environ["CLICKHOUSE_USER"],
|
|
password=os.environ["CLICKHOUSE_PASSWORD"],
|
|
database=os.environ["CLICKHOUSE_DATABASE"],
|
|
secure=True,
|
|
)
|
|
db = os.environ["CLICKHOUSE_DATABASE"]
|
|
|
|
allowed = sorted(
|
|
t.split(".", 1)[1] for t in RLS_TABLES if t.startswith(f"{db}.")
|
|
)
|
|
if not allowed:
|
|
raise RuntimeError(
|
|
f"No RLS_TABLES match database={db!r}; nothing to train on."
|
|
)
|
|
|
|
docs: list[str] = []
|
|
for table in allowed:
|
|
# system.columns já é filtrado pelo ClickHouse: retorna apenas colunas
|
|
# com GRANT SELECT para o user atual (wren_ia). Fonte de verdade do
|
|
# que é acessível. SELECT *, DESCRIBE TABLE e system.tables não são
|
|
# usados aqui — exigem privilégios extras que o user de runtime não tem.
|
|
cols_raw = client.query(
|
|
"SELECT name, type, comment FROM system.columns "
|
|
"WHERE database = %(db)s AND table = %(t)s ORDER BY position",
|
|
parameters={"db": db, "t": table},
|
|
).result_rows
|
|
if not cols_raw:
|
|
raise RuntimeError(
|
|
f"No accessible columns for {db}.{table} — verify the table "
|
|
f"exists and {os.environ['CLICKHOUSE_USER']!r} has SELECT "
|
|
f"grants on it."
|
|
)
|
|
visible = [n for n, _, _ in cols_raw if n not in RLS_INTERNAL_COLS]
|
|
cols = [(n, t, c) for n, t, c in cols_raw if n in visible]
|
|
col_lines = "\n".join(
|
|
f" - {n} {t}" + (f" -- {c}" if c else "") for n, t, c in cols
|
|
)
|
|
|
|
col_list = ", ".join(visible)
|
|
sample_rows = client.query(
|
|
f"SELECT {col_list} FROM {db}.{table} LIMIT 3"
|
|
)
|
|
sample_cols = ", ".join(sample_rows.column_names)
|
|
sample_preview = "\n".join(
|
|
" " + " | ".join(str(v) for v in row)
|
|
for row in sample_rows.result_rows
|
|
)
|
|
|
|
doc = (
|
|
f"Table `{db}.{table}` (ClickHouse).\n"
|
|
f"Columns:\n{col_lines}\n\n"
|
|
f"Sample rows ({sample_cols}):\n{sample_preview}"
|
|
)
|
|
docs.append(doc)
|
|
|
|
client.close()
|
|
return docs
|
|
|
|
|
|
async def main() -> None:
|
|
memory = ChromaAgentMemory(
|
|
persist_directory="./chroma_db",
|
|
collection_name="vanna_clickhouse_gold",
|
|
)
|
|
user = User(id="trainer", username="trainer")
|
|
context = ToolContext(
|
|
user=user,
|
|
conversation_id="train",
|
|
request_id="train",
|
|
agent_memory=memory,
|
|
)
|
|
|
|
docs = fetch_schema_docs()
|
|
for i, doc in enumerate(docs, 1):
|
|
await memory.save_text_memory(content=doc, context=context)
|
|
first_line = doc.splitlines()[0]
|
|
print(f"[{i}/{len(docs)}] saved: {first_line}")
|
|
|
|
print(f"\nTrained on {len(docs)} table(s).")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|