#!/usr/bin/env python3 """ multi-agent deep research thingy """ # SQLite workaround for ChromaDB on older systems __import__('pysqlite3') import sys sys.modules['sqlite3'] = sys.modules.pop('pysqlite3') import os, re from dataclasses import dataclass, field from pathlib import Path import torch from transformers import AutoModelForCausalLM, AutoTokenizer from sentence_transformers import SentenceTransformer import chromadb import httpx from bs4 import BeautifulSoup from ddgs import DDGS # ==================== CONFIG ==================== @dataclass class Config: model_name: str = "Qwen/Qwen2.5-1.5B-Instruct" embedding_model: str = "all-MiniLM-L6-v2" device: str = field(default_factory=lambda: "cuda" if torch.cuda.is_available() else "cpu") docs_dir: str = field(default_factory=lambda: os.getenv("DOCS_DIR", "./documents")) max_critique_rounds: int = 3 CFG = Config() log = lambda tag, msg: print(f"[{tag}] {msg}") # ==================== MODEL init ==================== class _Models: def __init__(self): self._ready = False self.embedder = self.tokenizer = self.llm = self.collection = None def _init(self): if self._ready: return log("init", f"Device: {CFG.device}") log("init", f"Loading embedder ({CFG.embedding_model})...") self.embedder = SentenceTransformer(CFG.embedding_model, device=CFG.device) log("init", f"Loading LLM ({CFG.model_name})...") self.tokenizer = AutoTokenizer.from_pretrained(CFG.model_name) self.llm = AutoModelForCausalLM.from_pretrained( CFG.model_name, dtype=torch.float16, device_map="auto" ) client = chromadb.Client() self.collection = client.get_or_create_collection("research_docs", metadata={"hnsw:space": "cosine"}) log("init", "Ready.") self._ready = True def embed(self, text: str) -> list[float]: self._init() return self.embedder.encode(text).tolist() def generate(self, task: str, instructions: str = "", max_tokens: int = 512) -> str: self._init() msgs = ([{"role": "system", "content": instructions}] if instructions else []) + [{"role": "user", "content": task}] text = self.tokenizer.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True) inputs = self.tokenizer(text, return_tensors="pt").to(self.llm.device) with torch.no_grad(): out = self.llm.generate(**inputs, max_new_tokens=max_tokens, temperature=0.7, do_sample=True, pad_token_id=self.tokenizer.eos_token_id) return self.tokenizer.decode(out[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True).strip() M = _Models() # ==================== MEMORY ==================== @dataclass class Memory: findings: list = field(default_factory=list) def save(self, source: str, query: str, content: str) -> str: summary = (content[:300].replace("\n", " ").strip() + "...") if len(content) > 300 else content self.findings.append({"source": source, "query": query, "summary": summary}) return summary def by_source(self, src: str) -> list[dict]: return [f for f in self.findings if f["source"] == src] def all_summaries(self) -> str: return "\n".join(f"- [{f['source']}] {f['query']}: {f['summary']}" for f in self.findings) # ==================== TOOLS ==================== def web_search(query: str, max_results: int = 5) -> list[dict]: try: log("duck", f"Searching: {query}") with DDGS() as ddgs: raw = list(ddgs.text(query, max_results=max_results)) results = [{"title": r.get("title", ""), "snippet": r.get("body", ""), "url": r.get("href", "")} for r in raw] for item in results[:2]: item["content"] = fetch_url(item["url"]) return results except Exception as e: log("duck", f"Error: {e}") return [] def fetch_url(url: str, max_chars: int = 3000) -> str: try: r = httpx.get(url, timeout=2, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 (compatible; ResearchBot/1.0)"}) soup = BeautifulSoup(r.text, "html.parser") for tag in soup(["script", "style", "nav", "header", "footer"]): tag.decompose() return soup.get_text(separator="\n", strip=True)[:max_chars] except Exception as e: log("fetch", f"Failed: {e}") return "" def doc_search(query: str, n_results: int = 5) -> list[dict]: if M.collection.count() == 0: return [] results = M.collection.query(query_embeddings=[M.embed(query)], n_results=n_results, include=["documents", "distances"]) if not results["documents"] or not results["documents"][0]: return [] docs = results["documents"][0] dists = results.get("distances", [[1.0] * len(docs)])[0] return [{"content": d, "score": 1 - dist} for d, dist in zip(docs, dists)] def index_documents(docs_dir: str = None): path = Path(docs_dir or CFG.docs_dir) if not path.exists(): log("docs", f"Directory not found: {path}") return docs, ids = [], [] for f in path.rglob("*"): if f.suffix not in {".txt"}: continue try: content = f.read_text() log("docs", f"Loading: {f.name} ({len(content)} chars)") docs.append(content) ids.append(str(f)) except Exception as e: log("docs", f"Failed to load {f}: {e}") if docs: embeddings = [M.embed(d) for d in docs] # triggers _init() M.collection.add(documents=docs, embeddings=embeddings, ids=ids) log("docs", f"Indexed {len(docs)} chunks") # ==================== AGENTS ==================== def parse_action(text: str) -> tuple[str | None, str]: if m := re.search(r'\[\[(\w+):(.+?)\]\]', text, re.DOTALL): return m.group(1).upper(), m.group(2).strip() if m := re.search(r'\[\[(\w+)\]\]', text): return m.group(1).upper(), "" return None, "" def extract_findings(resp: str) -> str: """Extract content from [[FINDINGS:...]] or return raw response.""" if m := re.search(r'\[\[FINDINGS:(.*?)\]\]', resp, re.DOTALL): return m.group(1).strip() return resp def agent(name: str, instructions: str, task: str, max_tokens: int = 512) -> str: resp = M.generate(task, instructions=instructions, max_tokens=max_tokens) log(name, resp[:1000] + ("..." if len(resp) > 1000 else "")) return resp INSTRUCTIONS = { "planner": """You are a research planner. Break the query into 3 subtopics MAX. Output EXACTLY: [[PLAN:\n- subtopic 1\n- subtopic 2\n]]\nKeep subtopics short (3-5 words). No explanations.""", "researcher": """You are a research agent. Be CONCISE - max 2-3 sentences. Extract ANY facts from the documents that relate to the query. Output format: [[FINDINGS:\nThe relevant facts found.\n]]""", "critic": """You are a research critic. Review findings for completeness and accuracy. If sufficient: [[SATISFIED]] If gaps exist: [[ISSUES:what specific information is missing]] Be concise and specific.""", "writer": "You are a research writer. Be CONCISE and DIRECT. No fluff, no hedging. Just state the facts." } def plan(mem: Memory, query: str) -> list[str]: resp = agent("planner", INSTRUCTIONS["planner"], f"Research query: {query}") if m := re.search(r'\[\[PLAN:(.*?)\]\]', resp, re.DOTALL): subtopics = [l.strip().lstrip("-").strip() for l in m.group(1).strip().split("\n")] subtopics = [s for s in subtopics if len(s) > 3] if subtopics: mem.save("planner", query, "\n".join(subtopics)) return subtopics return [query] def do_research(mem: Memory, query: str, source: str = "web"): log("research", f"Searching {source.upper()} for: {query}") results = doc_search(query) if source == "local" else web_search(query) if not results: log("research", f"No {source} results") return log("research", f"Found {len(results)} {source} results") if source == "local": content = "\n".join(f"[{i}] (sim: {r['score']:.2f})\n{r['content'][:1000]}" for i, r in enumerate(results, 1)) else: content = "\n".join(f"[{i}] {r['title']}\n{r['url']}\n{r.get('content', r.get('snippet', ''))[:1000]}" for i, r in enumerate(results, 1)) prompt = f"Research query: {query}\n\nResults:\n{content[:3000]}\n\nExtract key findings." findings = extract_findings(agent("research", INSTRUCTIONS["researcher"], prompt)) mem.save(source, query, findings) def critique(mem: Memory, query: str) -> tuple[bool, str]: prompt = f"Original query: {query}\n\nResearch so far:\n{mem.all_summaries()}\n\nIs this sufficient?" resp = agent("critic", INSTRUCTIONS["critic"], prompt, max_tokens=200) action, arg = parse_action(resp) if action == "SATISFIED": return True, "Research approved" if action == "ISSUES": mem.save("critic", "gap identified", arg) return False, arg return True, "Assumed complete" def write(mem: Memory, query: str) -> str: fmt = lambda t, s, e: f"## {t}\n" + ("\n".join(f"- {f['summary']}" for f in mem.by_source(s)) or e) sections = [ fmt("LOCAL DOCUMENTS", "local", "No relevant local documents."), fmt("WEB SEARCH", "web", "No relevant web results.") ] prompt = f"Query: {query}\n\nFindings:\n{mem.all_summaries()}\n\nWrite a 2-3 sentence answer." sections.append(f"## ANSWER\n{agent('writer', INSTRUCTIONS['writer'], prompt, 150)}") return "\n\n".join(sections) # ==================== ORCHESTRATOR ==================== def research(query: str, verbose: bool = True) -> dict: vlog = (lambda phase, msg: print(f"\n[Phase {phase}] {msg}")) if verbose else (lambda *_: None) if verbose: print(f"\n{'='*60}\nRESEARCH: {query}\n{'='*60}\n") mem = Memory() index_documents() vlog(1, "Searching local documents...") do_research(mem, query, "local") vlog(2, "Planning web research...") subtopics = plan(mem, query) if verbose: print(f"Subtopics: {subtopics}\n") vlog(3, "Web research...") for topic in subtopics: if verbose: print(f"\n--- Web: {topic} ---") do_research(mem, topic, "web") vlog(4, "Critique loop...") for rnd in range(CFG.max_critique_rounds): if verbose: print(f"\n--- Critique round {rnd+1} ---") ok, feedback = critique(mem, query) if ok: if verbose: print("Critic satisfied") break if verbose: print(f"Gap: {feedback[:100]}...") do_research(mem, feedback, "web") vlog(5, "Writing synthesis...") return {"query": query, "subtopics": subtopics, "answer": write(mem, query)} # ==================== CLI ==================== def test_model(): log("test", "Loading model and asking: 'What is an apple?'") resp = M.generate("What is an apple? Answer in 2-3 sentences.", max_tokens=100) print(f"[response] {resp}\n[test] Done.") if __name__ == "__main__": if len(sys.argv) < 2: print(""" Installation: (pytorch higher version should work as well, the gpu i have is a bit old) pip install -r requirements.txt --extra-index-url https://download.pytorch.org/whl/cu121 Usage: ./search.py "your research question" ./search.py --test # sanity check to test llm workability ./search.py "what does mcdonalds serve?" ./search.py "is new york one of the locations of the sept 11 attacks?" ./search.py "strawberries. what colour are they?" """) sys.exit(1) if sys.argv[1] == "--test": test_model() else: result = research(sys.argv[1]) print(f"\n{'='*60}\nFINAL ANSWER\n{'='*60}\n{result['answer']}")