#!/usr/bin/env python3 """Query clangd for symbol information. Usage: python3 tools/clangd-query.py symbol python3 tools/clangd-query.py diagnostics python3 tools/clangd-query.py definition python3 tools/clangd-query.py references python3 tools/clangd-query.py stop Commands: symbol Search for symbols by name across the project. diagnostics Report clangd diagnostics for a file. definition Find where a symbol is defined. references Find all references to a symbol. stop Stop the background daemon. File paths are relative to the project root. Line and column are 1-based. Examples: python3 tools/clangd-query.py symbol DrivenEngine python3 tools/clangd-query.py definition luprex/cpp/core/drivenengine.cpp 32 19 python3 tools/clangd-query.py references Source/Integration/Tangible.h 37 30 How it works: The first invocation starts a background clangd daemon process. It loads the project through a dedicated .clangd-query compile-commands directory, which gives it a separate clangd cache. This takes ~10 seconds. Subsequent queries hit the warm daemon and return in milliseconds. Clangd configuration (binary path, flags) is read from Integration.code-workspace so it stays in sync with VS Code. The daemon writes its PID to .clangd-query/pid and listens on a Unix socket at .clangd-query/sock. Use 'stop' to shut it down, or just kill the PID. Starting a new daemon automatically kills any existing one. """ import json import os import signal import socket import struct import subprocess import sys import threading import time SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_DIR = os.path.dirname(SCRIPT_DIR) WORKSPACE_FILE = os.path.join(PROJECT_DIR, "Integration.code-workspace") DAEMON_DIR = os.path.join(PROJECT_DIR, ".clangd-query") SOCKET_PATH = os.path.join(DAEMON_DIR, "sock") PID_FILE = os.path.join(DAEMON_DIR, "pid") LOG_FILE = os.path.join(DAEMON_DIR, "log") def read_workspace_config(): """Read clangd path and arguments from Integration.code-workspace.""" with open(WORKSPACE_FILE) as f: ws = json.load(f) settings = ws.get("settings", {}) clangd_path = settings.get("clangd.path", "clangd") clangd_args = list(settings.get("clangd.arguments", [])) return clangd_path, clangd_args def make_lsp_message(obj): """Encode a JSON-RPC message with Content-Length header.""" body = json.dumps(obj).encode("utf-8") header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8") return header + body def read_lsp_message(stream): """Read one LSP message from a byte stream. Returns parsed JSON or None.""" headers = {} while True: line = stream.readline() if not line: return None line = line.decode("utf-8").strip() if line == "": break if ": " in line: key, value = line.split(": ", 1) headers[key] = value length = int(headers.get("Content-Length", 0)) if length == 0: return None body = stream.read(length) return json.loads(body.decode("utf-8")) # --- Socket protocol helpers --- # Messages over the Unix socket are length-prefixed JSON: # 4 bytes big-endian length, then UTF-8 JSON body. def sock_send(sock, obj): data = json.dumps(obj).encode("utf-8") sock.sendall(struct.pack("!I", len(data)) + data) def sock_recv(sock): raw_len = _recv_exact(sock, 4) if not raw_len: return None length = struct.unpack("!I", raw_len)[0] data = _recv_exact(sock, length) if not data: return None return json.loads(data.decode("utf-8")) def _recv_exact(sock, n): buf = b"" while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: return None buf += chunk return buf # --- LSP SymbolKind enum --- SYMBOL_KINDS = { 1: "File", 2: "Module", 3: "Namespace", 4: "Package", 5: "Class", 6: "Method", 7: "Property", 8: "Field", 9: "Constructor", 10: "Enum", 11: "Interface", 12: "Function", 13: "Variable", 14: "Constant", 15: "String", 16: "Number", 17: "Boolean", 18: "Array", 19: "Object", 20: "Key", 21: "Null", 22: "EnumMember", 23: "Struct", 24: "Event", 25: "Operator", 26: "TypeParameter", } def format_uri(uri): """Convert file:// URI to a project-relative path.""" path = uri.replace("file://", "") if path.startswith(PROJECT_DIR + "/"): path = path[len(PROJECT_DIR) + 1:] return path # ============================================================ # Daemon process # ============================================================ def truncate_text(obj): """Deep-copy obj, truncating any 'text' string values to 80 chars.""" if isinstance(obj, dict): return {k: (v[:80] + "..." if k == "text" and isinstance(v, str) and len(v) > 80 else truncate_text(v)) for k, v in obj.items()} if isinstance(obj, list): return [truncate_text(v) for v in obj] return obj def lsp_log(direction, msg): """Log an LSP message to the log file.""" with open(LOG_FILE, "a") as f: f.write(f"{direction} {json.dumps(truncate_text(msg))}\n") class ClangdDaemon: """Manages a clangd subprocess and serves queries over a Unix socket.""" def __init__(self): clangd_path, clangd_args = read_workspace_config() # Give the daemon its own compile-commands dir so it gets a separate # clangd index cache (clangd stores .cache/ next to compile_commands.json). daemon_cc_dir = os.path.join(PROJECT_DIR, ".clangd-query") os.makedirs(daemon_cc_dir, exist_ok=True) cc_link = os.path.join(daemon_cc_dir, "compile_commands.json") cc_src = os.path.join(PROJECT_DIR, ".vscode", "compile_commands.json") if os.path.islink(cc_link) or os.path.exists(cc_link): os.unlink(cc_link) os.symlink(cc_src, cc_link) # Replace --compile-commands-dir in args to point to our dir clangd_args = [ f"--compile-commands-dir={daemon_cc_dir}" if a.startswith("--compile-commands-dir") else a for a in clangd_args ] cmd = [clangd_path] + clangd_args + ["-j=4"] self.proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=PROJECT_DIR, ) self.request_id = 0 self.pending = {} self.lock = threading.Lock() self.events = {} self.index_loaded = threading.Event() self.opened_files = set() self.file_versions = {} self.cached_diagnostics = {} # uri -> list of diagnostics self.file_status_events = {} # uri -> threading.Event (set when idle) threading.Thread(target=self._stdout_reader, daemon=True).start() threading.Thread(target=self._stderr_reader, daemon=True).start() def _stdout_reader(self): while True: msg = read_lsp_message(self.proc.stdout) if msg is None: break lsp_log("<<<", msg) if msg.get("method") == "textDocument/publishDiagnostics": uri = msg["params"]["uri"] with self.lock: self.cached_diagnostics[uri] = msg["params"]["diagnostics"] continue if msg.get("method") == "textDocument/clangd.fileStatus": uri = msg["params"]["uri"] state = msg["params"]["state"] with self.lock: if state == "idle" and uri in self.file_status_events: self.file_status_events[uri].set() continue msg_id = msg.get("id") if msg_id is not None and msg_id in self.pending: with self.lock: self.pending[msg_id] = msg self.events[msg_id].set() def _stderr_reader(self): for raw_line in self.proc.stderr: line = raw_line.decode("utf-8", errors="replace").rstrip() if "BackgroundIndex: serving" in line: self.index_loaded.set() def send_request(self, method, params): self.request_id += 1 rid = self.request_id event = threading.Event() with self.lock: self.pending[rid] = None self.events[rid] = event msg = {"jsonrpc": "2.0", "id": rid, "method": method, "params": params} lsp_log(">>>", msg) self.proc.stdin.write(make_lsp_message(msg)) self.proc.stdin.flush() event.wait(timeout=30) with self.lock: result = self.pending.pop(rid, None) self.events.pop(rid, None) return result def send_notification(self, method, params): msg = {"jsonrpc": "2.0", "method": method, "params": params} lsp_log(">>>", msg) self.proc.stdin.write(make_lsp_message(msg)) self.proc.stdin.flush() def initialize(self): self.send_request("initialize", { "processId": os.getpid(), "rootUri": f"file://{PROJECT_DIR}", "capabilities": {}, "initializationOptions": { "clangdFileStatus": True, }, }) self.send_notification("initialized", {}) self.send_notification("textDocument/didOpen", { "textDocument": { "uri": f"file://{PROJECT_DIR}/fake.cpp", "languageId": "cpp", "version": 1, "text": "", } }) def open_file(self, filepath): abs_path = os.path.abspath(filepath) with open(abs_path) as f: text = f.read() if abs_path in self.opened_files: self.file_versions[abs_path] = self.file_versions.get(abs_path, 1) + 1 self.send_notification("textDocument/didChange", { "textDocument": { "uri": f"file://{abs_path}", "version": self.file_versions[abs_path], }, "contentChanges": [{"text": text}], }) return self.opened_files.add(abs_path) self.file_versions[abs_path] = 1 self.send_notification("textDocument/didOpen", { "textDocument": { "uri": f"file://{abs_path}", "languageId": "cpp", "version": 1, "text": text, } }) def handle_query(self, query): """Handle a query dict and return a result dict.""" cmd = query["command"] args = query.get("args", []) with open(LOG_FILE, "a") as f: f.write(f"\n\n\n------------------------ QUERY: {cmd} {' '.join(args)} ----------------\n\n\n") if cmd == "symbol": self.index_loaded.wait(timeout=15) resp = self.send_request("workspace/symbol", {"query": args[0]}) return resp if cmd == "diagnostics": filepath = args[0] abs_path = os.path.abspath(filepath) uri = f"file://{abs_path}" with open(abs_path) as f: text = f.read() event = threading.Event() with self.lock: self.cached_diagnostics.pop(uri, None) self.file_status_events[uri] = event self.send_notification("textDocument/didOpen", { "textDocument": {"uri": uri, "languageId": "cpp", "version": 1, "text": text} }) event.wait(timeout=30) with self.lock: self.file_status_events.pop(uri, None) diags = self.cached_diagnostics.pop(uri, None) self.send_notification("textDocument/didClose", { "textDocument": {"uri": uri} }) if diags is None: return {"error": "Timed out waiting for diagnostics"} return {"result": diags} if cmd == "definition": filepath, line, col = args[0], int(args[1]), int(args[2]) abs_path = os.path.abspath(filepath) self.open_file(abs_path) return self.send_request("textDocument/definition", { "textDocument": {"uri": f"file://{abs_path}"}, "position": {"line": line - 1, "character": col - 1}, }) if cmd == "references": filepath, line, col = args[0], int(args[1]), int(args[2]) abs_path = os.path.abspath(filepath) self.open_file(abs_path) return self.send_request("textDocument/references", { "textDocument": {"uri": f"file://{abs_path}"}, "position": {"line": line - 1, "character": col - 1}, "context": {"includeDeclaration": True}, }) if cmd == "stop": return {"stop": True} return {"error": f"Unknown command: {cmd}"} def shutdown(self): try: self.send_request("shutdown", None) self.send_notification("exit", None) self.proc.wait(timeout=5) except Exception: self.proc.kill() def kill_existing_daemon(): """Kill any existing daemon process and clean up.""" if os.path.exists(PID_FILE): try: with open(PID_FILE) as f: old_pid = int(f.read().strip()) os.kill(old_pid, signal.SIGTERM) # Wait briefly for it to die for _ in range(20): try: os.kill(old_pid, 0) time.sleep(0.1) except OSError: break except (ValueError, OSError): pass try: os.unlink(PID_FILE) except OSError: pass if os.path.exists(SOCKET_PATH): try: os.unlink(SOCKET_PATH) except OSError: pass def run_daemon(): """Run the daemon process: start clangd, listen on Unix socket.""" os.makedirs(DAEMON_DIR, exist_ok=True) kill_existing_daemon() # Write our PID with open(PID_FILE, "w") as f: f.write(str(os.getpid())) daemon = ClangdDaemon() daemon.initialize() # Set up Unix socket server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) server.bind(SOCKET_PATH) server.listen(4) def cleanup(*_args): daemon.shutdown() server.close() try: os.unlink(SOCKET_PATH) except OSError: pass try: os.unlink(PID_FILE) except OSError: pass sys.exit(0) signal.signal(signal.SIGTERM, cleanup) signal.signal(signal.SIGINT, cleanup) # Signal to parent that we're ready sys.stdout.write("READY\n") sys.stdout.flush() while True: try: conn, _ = server.accept() except OSError: break try: query = sock_recv(conn) if query is None: conn.close() continue result = daemon.handle_query(query) sock_send(conn, result) conn.close() if isinstance(result, dict) and result.get("stop"): break except Exception: try: conn.close() except Exception: pass cleanup() # ============================================================ # Client side # ============================================================ def daemon_is_alive(): """Check if the daemon is running and responsive.""" if not os.path.exists(SOCKET_PATH): return False if not os.path.exists(PID_FILE): return False # Check if PID is alive try: with open(PID_FILE) as f: pid = int(f.read().strip()) os.kill(pid, 0) except (ValueError, OSError): # Stale PID file try: os.unlink(SOCKET_PATH) except OSError: pass try: os.unlink(PID_FILE) except OSError: pass return False return True def start_daemon(): """Start the daemon as a background process and wait for it to be ready.""" proc = subprocess.Popen( [sys.executable, __file__, "--daemon"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, cwd=PROJECT_DIR, start_new_session=True, ) # Wait for READY signal line = proc.stdout.readline().decode("utf-8").strip() proc.stdout.close() if line != "READY": print("Failed to start daemon.", file=sys.stderr) sys.exit(1) def send_query(query, retries=3): """Send a query to the daemon and return the response.""" for attempt in range(retries): try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(60) sock.connect(SOCKET_PATH) sock_send(sock, query) result = sock_recv(sock) sock.close() return result except (ConnectionResetError, ConnectionRefusedError, BrokenPipeError, OSError): sock.close() if attempt < retries - 1: import time time.sleep(2) # Daemon may have died — restart it if not daemon_is_alive(): start_daemon() else: raise def ensure_daemon(): """Make sure the daemon is running.""" if not daemon_is_alive(): start_daemon() def format_symbol_results(resp): if not resp or "result" not in resp: print("No results (or clangd error).", file=sys.stderr) return results = resp["result"] if not results: print("No symbols found.", file=sys.stderr) return for sym in results: loc = sym.get("location", {}) uri = loc.get("uri", "") line = loc.get("range", {}).get("start", {}).get("line", 0) + 1 kind = SYMBOL_KINDS.get(sym.get("kind", 0), "Unknown") path = format_uri(uri) container = sym.get("containerName", "") qualified = f"{container}::{sym['name']}" if container else sym["name"] print(f"{path}:{line} [{kind}] {qualified}") def format_diagnostics(resp): if not resp or "result" not in resp: print("No diagnostics (or clangd error).", file=sys.stderr) return diags = resp["result"] if not diags: print("No problems found.") return for d in diags: line = d.get("range", {}).get("start", {}).get("line", 0) + 1 col = d.get("range", {}).get("start", {}).get("character", 0) + 1 severity = {1: "error", 2: "warning", 3: "info", 4: "hint"}.get(d.get("severity", 0), "unknown") msg = d.get("message", "") print(f" {line}:{col} [{severity}] {msg}") def format_location_results(resp, label="results"): if not resp or "result" not in resp: print(f"No {label} (or clangd error).", file=sys.stderr) return results = resp["result"] if not results: print(f"No {label} found.", file=sys.stderr) return if isinstance(results, dict): results = [results] for loc in results: uri = loc.get("uri", loc.get("targetUri", "")) rng = loc.get("range", loc.get("targetRange", {})) line = rng.get("start", {}).get("line", 0) + 1 print(f"{format_uri(uri)}:{line}") def main(): if len(sys.argv) >= 2 and sys.argv[1] == "--daemon": run_daemon() return if len(sys.argv) < 2: print(__doc__, file=sys.stderr) sys.exit(1) command = sys.argv[1] args = sys.argv[2:] if command == "stop": if not daemon_is_alive(): print("Daemon is not running.", file=sys.stderr) return send_query({"command": "stop"}) print("Daemon stopped.") return if command == "symbol": if len(args) < 1: print("Usage: clangd-query.py symbol ", file=sys.stderr) sys.exit(1) elif command == "diagnostics": if len(args) < 1: print("Usage: clangd-query.py diagnostics ", file=sys.stderr) sys.exit(1) elif command in ("definition", "references"): if len(args) < 3: print(f"Usage: clangd-query.py {command} ", file=sys.stderr) sys.exit(1) else: print(f"Unknown command: {command}", file=sys.stderr) print(__doc__, file=sys.stderr) sys.exit(1) ensure_daemon() resp = send_query({"command": command, "args": args}) if command == "symbol": format_symbol_results(resp) elif command == "diagnostics": format_diagnostics(resp) elif command == "definition": format_location_results(resp, "definition") elif command == "references": format_location_results(resp, "references") if __name__ == "__main__": main()