"""Fetch ClickHouse schema and store it as text memories for the agent.""" import asyncio import os import clickhouse_connect from dotenv import load_dotenv from vanna.core.tool import ToolContext from vanna.integrations.chromadb import ChromaAgentMemory from vanna import User from rls_runner import RLS_TABLES # Colunas usadas internamente pelo RLS (additional_table_filters em rls_runner.py). # Granted ao wren_ia para o filtro funcionar, mas escondidas do contexto do LLM. RLS_INTERNAL_COLS = {"program_id", "store_id"} load_dotenv() def fetch_schema_docs() -> list[str]: client = clickhouse_connect.get_client( host=os.environ["CLICKHOUSE_HOST"], port=int(os.environ["CLICKHOUSE_PORT"]), username=os.environ["CLICKHOUSE_USER"], password=os.environ["CLICKHOUSE_PASSWORD"], database=os.environ["CLICKHOUSE_DATABASE"], secure=True, ) db = os.environ["CLICKHOUSE_DATABASE"] allowed = sorted( t.split(".", 1)[1] for t in RLS_TABLES if t.startswith(f"{db}.") ) if not allowed: raise RuntimeError( f"No RLS_TABLES match database={db!r}; nothing to train on." ) docs: list[str] = [] for table in allowed: # system.columns já é filtrado pelo ClickHouse: retorna apenas colunas # com GRANT SELECT para o user atual (wren_ia). Fonte de verdade do # que é acessível. SELECT *, DESCRIBE TABLE e system.tables não são # usados aqui — exigem privilégios extras que o user de runtime não tem. cols_raw = client.query( "SELECT name, type, comment FROM system.columns " "WHERE database = %(db)s AND table = %(t)s ORDER BY position", parameters={"db": db, "t": table}, ).result_rows if not cols_raw: raise RuntimeError( f"No accessible columns for {db}.{table} — verify the table " f"exists and {os.environ['CLICKHOUSE_USER']!r} has SELECT " f"grants on it." ) visible = [n for n, _, _ in cols_raw if n not in RLS_INTERNAL_COLS] cols = [(n, t, c) for n, t, c in cols_raw if n in visible] col_lines = "\n".join( f" - {n} {t}" + (f" -- {c}" if c else "") for n, t, c in cols ) col_list = ", ".join(visible) sample_rows = client.query( f"SELECT {col_list} FROM {db}.{table} LIMIT 3" ) sample_cols = ", ".join(sample_rows.column_names) sample_preview = "\n".join( " " + " | ".join(str(v) for v in row) for row in sample_rows.result_rows ) doc = ( f"Table `{db}.{table}` (ClickHouse).\n" f"Columns:\n{col_lines}\n\n" f"Sample rows ({sample_cols}):\n{sample_preview}" ) docs.append(doc) client.close() return docs async def main() -> None: memory = ChromaAgentMemory( persist_directory="./chroma_db", collection_name="vanna_clickhouse_gold", ) user = User(id="trainer", username="trainer") context = ToolContext( user=user, conversation_id="train", request_id="train", agent_memory=memory, ) docs = fetch_schema_docs() for i, doc in enumerate(docs, 1): await memory.save_text_memory(content=doc, context=context) first_line = doc.splitlines()[0] print(f"[{i}/{len(docs)}] saved: {first_line}") print(f"\nTrained on {len(docs)} table(s).") if __name__ == "__main__": asyncio.run(main())