diff --git a/tools/clangd-query.py b/tools/clangd-query.py new file mode 100755 index 00000000..d5b19bd6 --- /dev/null +++ b/tools/clangd-query.py @@ -0,0 +1,510 @@ +#!/usr/bin/env python3 +"""Query clangd for symbol information. + +Usage: + python3 tools/clangd-query.py symbol + python3 tools/clangd-query.py definition + python3 tools/clangd-query.py references + python3 tools/clangd-query.py stop + +Commands: + symbol Search for symbols by name across the project. + definition Find where a symbol is defined. + references Find all references to a symbol. + stop Stop the background daemon. + +File paths are relative to the project root. Line and column are 1-based. + +Examples: + python3 tools/clangd-query.py symbol DrivenEngine + python3 tools/clangd-query.py definition luprex/cpp/core/drivenengine.cpp 32 19 + python3 tools/clangd-query.py references Source/Integration/Tangible.h 37 30 + +How it works: + The first invocation starts a background clangd daemon process. It loads + the project index from .vscode/.cache/clangd/index/ (shared with VS Code's + clangd). This takes ~10 seconds. Subsequent queries hit the warm daemon + and return in milliseconds. + + Clangd configuration (binary path, flags) is read from + Integration.code-workspace so it stays in sync with VS Code. + + The daemon writes its PID to .clangd-query.pid and listens on a Unix + socket at .clangd-query.sock. Use 'stop' to shut it down, or just kill + the PID. Starting a new daemon automatically kills any existing one. +""" + +import json +import os +import signal +import socket +import struct +import subprocess +import sys +import threading +import time + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +PROJECT_DIR = os.path.dirname(SCRIPT_DIR) +WORKSPACE_FILE = os.path.join(PROJECT_DIR, "Integration.code-workspace") +SOCKET_PATH = os.path.join(PROJECT_DIR, ".clangd-query.sock") +PID_FILE = os.path.join(PROJECT_DIR, ".clangd-query.pid") + +# A small file to open on startup to trigger background index loading +TRIGGER_FILE = os.path.join(PROJECT_DIR, "luprex", "cpp", "core", "util.hpp") + + +def read_workspace_config(): + """Read clangd path and arguments from Integration.code-workspace.""" + with open(WORKSPACE_FILE) as f: + ws = json.load(f) + settings = ws.get("settings", {}) + clangd_path = settings.get("clangd.path", "clangd") + clangd_args = list(settings.get("clangd.arguments", [])) + return clangd_path, clangd_args + + +def make_lsp_message(obj): + """Encode a JSON-RPC message with Content-Length header.""" + body = json.dumps(obj).encode("utf-8") + header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8") + return header + body + + +def read_lsp_message(stream): + """Read one LSP message from a byte stream. Returns parsed JSON or None.""" + headers = {} + while True: + line = stream.readline() + if not line: + return None + line = line.decode("utf-8").strip() + if line == "": + break + if ": " in line: + key, value = line.split(": ", 1) + headers[key] = value + length = int(headers.get("Content-Length", 0)) + if length == 0: + return None + body = stream.read(length) + return json.loads(body.decode("utf-8")) + + +# --- Socket protocol helpers --- +# Messages over the Unix socket are length-prefixed JSON: +# 4 bytes big-endian length, then UTF-8 JSON body. + +def sock_send(sock, obj): + data = json.dumps(obj).encode("utf-8") + sock.sendall(struct.pack("!I", len(data)) + data) + + +def sock_recv(sock): + raw_len = _recv_exact(sock, 4) + if not raw_len: + return None + length = struct.unpack("!I", raw_len)[0] + data = _recv_exact(sock, length) + if not data: + return None + return json.loads(data.decode("utf-8")) + + +def _recv_exact(sock, n): + buf = b"" + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + return None + buf += chunk + return buf + + +# --- LSP SymbolKind enum --- +SYMBOL_KINDS = { + 1: "File", 2: "Module", 3: "Namespace", 4: "Package", 5: "Class", + 6: "Method", 7: "Property", 8: "Field", 9: "Constructor", 10: "Enum", + 11: "Interface", 12: "Function", 13: "Variable", 14: "Constant", + 15: "String", 16: "Number", 17: "Boolean", 18: "Array", 19: "Object", + 20: "Key", 21: "Null", 22: "EnumMember", 23: "Struct", 24: "Event", + 25: "Operator", 26: "TypeParameter", +} + + +def format_uri(uri): + """Convert file:// URI to a project-relative path.""" + path = uri.replace("file://", "") + if path.startswith(PROJECT_DIR + "/"): + path = path[len(PROJECT_DIR) + 1:] + return path + + +# ============================================================ +# Daemon process +# ============================================================ + +class ClangdDaemon: + """Manages a clangd subprocess and serves queries over a Unix socket.""" + + def __init__(self): + clangd_path, clangd_args = read_workspace_config() + cmd = [clangd_path] + clangd_args + ["-j=4"] + + self.proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=PROJECT_DIR, + ) + self.request_id = 0 + self.pending = {} + self.lock = threading.Lock() + self.events = {} + self.index_loaded = threading.Event() + self.opened_files = set() + + threading.Thread(target=self._stdout_reader, daemon=True).start() + threading.Thread(target=self._stderr_reader, daemon=True).start() + + def _stdout_reader(self): + while True: + msg = read_lsp_message(self.proc.stdout) + if msg is None: + break + msg_id = msg.get("id") + if msg_id is not None and msg_id in self.pending: + with self.lock: + self.pending[msg_id] = msg + self.events[msg_id].set() + + def _stderr_reader(self): + for raw_line in self.proc.stderr: + line = raw_line.decode("utf-8", errors="replace").rstrip() + if "BackgroundIndex: serving" in line: + self.index_loaded.set() + + def send_request(self, method, params): + self.request_id += 1 + rid = self.request_id + event = threading.Event() + with self.lock: + self.pending[rid] = None + self.events[rid] = event + msg = {"jsonrpc": "2.0", "id": rid, "method": method, "params": params} + self.proc.stdin.write(make_lsp_message(msg)) + self.proc.stdin.flush() + event.wait(timeout=30) + with self.lock: + result = self.pending.pop(rid, None) + self.events.pop(rid, None) + return result + + def send_notification(self, method, params): + msg = {"jsonrpc": "2.0", "method": method, "params": params} + self.proc.stdin.write(make_lsp_message(msg)) + self.proc.stdin.flush() + + def initialize(self): + self.send_request("initialize", { + "processId": os.getpid(), + "rootUri": f"file://{PROJECT_DIR}", + "capabilities": {}, + }) + self.send_notification("initialized", {}) + self.open_file(TRIGGER_FILE) + + def open_file(self, filepath): + abs_path = os.path.abspath(filepath) + if abs_path in self.opened_files: + return + self.opened_files.add(abs_path) + with open(abs_path) as f: + text = f.read() + self.send_notification("textDocument/didOpen", { + "textDocument": { + "uri": f"file://{abs_path}", + "languageId": "cpp", + "version": 1, + "text": text, + } + }) + + def handle_query(self, query): + """Handle a query dict and return a result dict.""" + cmd = query["command"] + args = query.get("args", []) + + if cmd == "symbol": + self.index_loaded.wait(timeout=15) + resp = self.send_request("workspace/symbol", {"query": args[0]}) + return resp + + if cmd == "definition": + filepath, line, col = args[0], int(args[1]), int(args[2]) + abs_path = os.path.abspath(filepath) + self.open_file(abs_path) + return self.send_request("textDocument/definition", { + "textDocument": {"uri": f"file://{abs_path}"}, + "position": {"line": line - 1, "character": col - 1}, + }) + + if cmd == "references": + filepath, line, col = args[0], int(args[1]), int(args[2]) + abs_path = os.path.abspath(filepath) + self.open_file(abs_path) + return self.send_request("textDocument/references", { + "textDocument": {"uri": f"file://{abs_path}"}, + "position": {"line": line - 1, "character": col - 1}, + "context": {"includeDeclaration": True}, + }) + + if cmd == "stop": + return {"stop": True} + + return {"error": f"Unknown command: {cmd}"} + + def shutdown(self): + try: + self.send_request("shutdown", None) + self.send_notification("exit", None) + self.proc.wait(timeout=5) + except Exception: + self.proc.kill() + + +def kill_existing_daemon(): + """Kill any existing daemon process and clean up.""" + if os.path.exists(PID_FILE): + try: + with open(PID_FILE) as f: + old_pid = int(f.read().strip()) + os.kill(old_pid, signal.SIGTERM) + # Wait briefly for it to die + for _ in range(20): + try: + os.kill(old_pid, 0) + time.sleep(0.1) + except OSError: + break + except (ValueError, OSError): + pass + try: + os.unlink(PID_FILE) + except OSError: + pass + if os.path.exists(SOCKET_PATH): + try: + os.unlink(SOCKET_PATH) + except OSError: + pass + + +def run_daemon(): + """Run the daemon process: start clangd, listen on Unix socket.""" + kill_existing_daemon() + + # Write our PID + with open(PID_FILE, "w") as f: + f.write(str(os.getpid())) + + daemon = ClangdDaemon() + daemon.initialize() + + # Set up Unix socket + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server.bind(SOCKET_PATH) + server.listen(4) + + def cleanup(*_args): + daemon.shutdown() + server.close() + try: + os.unlink(SOCKET_PATH) + except OSError: + pass + try: + os.unlink(PID_FILE) + except OSError: + pass + sys.exit(0) + + signal.signal(signal.SIGTERM, cleanup) + signal.signal(signal.SIGINT, cleanup) + + # Signal to parent that we're ready + sys.stdout.write("READY\n") + sys.stdout.flush() + + while True: + try: + conn, _ = server.accept() + except OSError: + break + try: + query = sock_recv(conn) + if query is None: + conn.close() + continue + result = daemon.handle_query(query) + sock_send(conn, result) + conn.close() + if isinstance(result, dict) and result.get("stop"): + break + except Exception: + try: + conn.close() + except Exception: + pass + + cleanup() + + +# ============================================================ +# Client side +# ============================================================ + +def daemon_is_alive(): + """Check if the daemon is running and responsive.""" + if not os.path.exists(SOCKET_PATH): + return False + if not os.path.exists(PID_FILE): + return False + # Check if PID is alive + try: + with open(PID_FILE) as f: + pid = int(f.read().strip()) + os.kill(pid, 0) + except (ValueError, OSError): + # Stale PID file + try: + os.unlink(SOCKET_PATH) + except OSError: + pass + try: + os.unlink(PID_FILE) + except OSError: + pass + return False + return True + + +def start_daemon(): + """Start the daemon as a background process and wait for it to be ready.""" + proc = subprocess.Popen( + [sys.executable, __file__, "--daemon"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + cwd=PROJECT_DIR, + start_new_session=True, + ) + # Wait for READY signal + line = proc.stdout.readline().decode("utf-8").strip() + proc.stdout.close() + if line != "READY": + print("Failed to start daemon.", file=sys.stderr) + sys.exit(1) + + +def send_query(query): + """Send a query to the daemon and return the response.""" + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(60) + sock.connect(SOCKET_PATH) + sock_send(sock, query) + result = sock_recv(sock) + sock.close() + return result + + +def ensure_daemon(): + """Make sure the daemon is running.""" + if not daemon_is_alive(): + start_daemon() + + +def format_symbol_results(resp): + if not resp or "result" not in resp: + print("No results (or clangd error).", file=sys.stderr) + return + results = resp["result"] + if not results: + print("No symbols found.", file=sys.stderr) + return + for sym in results: + loc = sym.get("location", {}) + uri = loc.get("uri", "") + line = loc.get("range", {}).get("start", {}).get("line", 0) + 1 + kind = SYMBOL_KINDS.get(sym.get("kind", 0), "Unknown") + path = format_uri(uri) + container = sym.get("containerName", "") + qualified = f"{container}::{sym['name']}" if container else sym["name"] + print(f"{path}:{line} [{kind}] {qualified}") + + +def format_location_results(resp, label="results"): + if not resp or "result" not in resp: + print(f"No {label} (or clangd error).", file=sys.stderr) + return + results = resp["result"] + if not results: + print(f"No {label} found.", file=sys.stderr) + return + if isinstance(results, dict): + results = [results] + for loc in results: + uri = loc.get("uri", loc.get("targetUri", "")) + rng = loc.get("range", loc.get("targetRange", {})) + line = rng.get("start", {}).get("line", 0) + 1 + print(f"{format_uri(uri)}:{line}") + + +def main(): + if len(sys.argv) >= 2 and sys.argv[1] == "--daemon": + run_daemon() + return + + if len(sys.argv) < 2: + print(__doc__, file=sys.stderr) + sys.exit(1) + + command = sys.argv[1] + args = sys.argv[2:] + + if command == "stop": + if not daemon_is_alive(): + print("Daemon is not running.", file=sys.stderr) + return + send_query({"command": "stop"}) + print("Daemon stopped.") + return + + if command == "symbol": + if len(args) < 1: + print("Usage: clangd-query.py symbol ", file=sys.stderr) + sys.exit(1) + elif command in ("definition", "references"): + if len(args) < 3: + print(f"Usage: clangd-query.py {command} ", + file=sys.stderr) + sys.exit(1) + else: + print(f"Unknown command: {command}", file=sys.stderr) + print(__doc__, file=sys.stderr) + sys.exit(1) + + ensure_daemon() + resp = send_query({"command": command, "args": args}) + + if command == "symbol": + format_symbol_results(resp) + elif command == "definition": + format_location_results(resp, "definition") + elif command == "references": + format_location_results(resp, "references") + + +if __name__ == "__main__": + main()