vanna-clubpetro/train.py
leonardosalazar-cp 1d152c0dce Initial commit: Vanna 2.0 deployment for ClubPetro
Wrapper application around upstream Vanna with:
- Tenant-aware ChromaDB memory (per program/store)
- ClickHouse RLS runner with introspection guards
- PT-BR system prompt and chat translations
- Custom Plotly chart generator (ranked bar, datetime coercion)
- Embed bootstrap (theme pierce + i18n + markdown) shared by demo and React app
- Event sink for chat turn observability
2026-04-29 17:22:05 -03:00

108 lines
3.5 KiB
Python

"""Fetch ClickHouse schema and store it as text memories for the agent."""
import asyncio
import os
import clickhouse_connect
from dotenv import load_dotenv
from vanna.core.tool import ToolContext
from vanna.integrations.chromadb import ChromaAgentMemory
from vanna import User
from rls_runner import RLS_TABLES
# Colunas usadas internamente pelo RLS (additional_table_filters em rls_runner.py).
# Granted ao wren_ia para o filtro funcionar, mas escondidas do contexto do LLM.
RLS_INTERNAL_COLS = {"program_id", "store_id"}
load_dotenv()
def fetch_schema_docs() -> list[str]:
client = clickhouse_connect.get_client(
host=os.environ["CLICKHOUSE_HOST"],
port=int(os.environ["CLICKHOUSE_PORT"]),
username=os.environ["CLICKHOUSE_USER"],
password=os.environ["CLICKHOUSE_PASSWORD"],
database=os.environ["CLICKHOUSE_DATABASE"],
secure=True,
)
db = os.environ["CLICKHOUSE_DATABASE"]
allowed = sorted(
t.split(".", 1)[1] for t in RLS_TABLES if t.startswith(f"{db}.")
)
if not allowed:
raise RuntimeError(
f"No RLS_TABLES match database={db!r}; nothing to train on."
)
docs: list[str] = []
for table in allowed:
# system.columns já é filtrado pelo ClickHouse: retorna apenas colunas
# com GRANT SELECT para o user atual (wren_ia). Fonte de verdade do
# que é acessível. SELECT *, DESCRIBE TABLE e system.tables não são
# usados aqui — exigem privilégios extras que o user de runtime não tem.
cols_raw = client.query(
"SELECT name, type, comment FROM system.columns "
"WHERE database = %(db)s AND table = %(t)s ORDER BY position",
parameters={"db": db, "t": table},
).result_rows
if not cols_raw:
raise RuntimeError(
f"No accessible columns for {db}.{table} — verify the table "
f"exists and {os.environ['CLICKHOUSE_USER']!r} has SELECT "
f"grants on it."
)
visible = [n for n, _, _ in cols_raw if n not in RLS_INTERNAL_COLS]
cols = [(n, t, c) for n, t, c in cols_raw if n in visible]
col_lines = "\n".join(
f" - {n} {t}" + (f" -- {c}" if c else "") for n, t, c in cols
)
col_list = ", ".join(visible)
sample_rows = client.query(
f"SELECT {col_list} FROM {db}.{table} LIMIT 3"
)
sample_cols = ", ".join(sample_rows.column_names)
sample_preview = "\n".join(
" " + " | ".join(str(v) for v in row)
for row in sample_rows.result_rows
)
doc = (
f"Table `{db}.{table}` (ClickHouse).\n"
f"Columns:\n{col_lines}\n\n"
f"Sample rows ({sample_cols}):\n{sample_preview}"
)
docs.append(doc)
client.close()
return docs
async def main() -> None:
memory = ChromaAgentMemory(
persist_directory="./chroma_db",
collection_name="vanna_clickhouse_gold",
)
user = User(id="trainer", username="trainer")
context = ToolContext(
user=user,
conversation_id="train",
request_id="train",
agent_memory=memory,
)
docs = fetch_schema_docs()
for i, doc in enumerate(docs, 1):
await memory.save_text_memory(content=doc, context=context)
first_line = doc.splitlines()[0]
print(f"[{i}/{len(docs)}] saved: {first_line}")
print(f"\nTrained on {len(docs)} table(s).")
if __name__ == "__main__":
asyncio.run(main())