Add clangd-query

This commit is contained in:
2026-03-03 21:15:48 -05:00
parent 2dd07f1a9c
commit 528a7b14df

510
tools/clangd-query.py Executable file
View File

@@ -0,0 +1,510 @@
#!/usr/bin/env python3
"""Query clangd for symbol information.
Usage:
python3 tools/clangd-query.py symbol <name>
python3 tools/clangd-query.py definition <file> <line> <col>
python3 tools/clangd-query.py references <file> <line> <col>
python3 tools/clangd-query.py stop
Commands:
symbol <name> Search for symbols by name across the project.
definition <file> <line> <col> Find where a symbol is defined.
references <file> <line> <col> Find all references to a symbol.
stop Stop the background daemon.
File paths are relative to the project root. Line and column are 1-based.
Examples:
python3 tools/clangd-query.py symbol DrivenEngine
python3 tools/clangd-query.py definition luprex/cpp/core/drivenengine.cpp 32 19
python3 tools/clangd-query.py references Source/Integration/Tangible.h 37 30
How it works:
The first invocation starts a background clangd daemon process. It loads
the project index from .vscode/.cache/clangd/index/ (shared with VS Code's
clangd). This takes ~10 seconds. Subsequent queries hit the warm daemon
and return in milliseconds.
Clangd configuration (binary path, flags) is read from
Integration.code-workspace so it stays in sync with VS Code.
The daemon writes its PID to .clangd-query.pid and listens on a Unix
socket at .clangd-query.sock. Use 'stop' to shut it down, or just kill
the PID. Starting a new daemon automatically kills any existing one.
"""
import json
import os
import signal
import socket
import struct
import subprocess
import sys
import threading
import time
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_DIR = os.path.dirname(SCRIPT_DIR)
WORKSPACE_FILE = os.path.join(PROJECT_DIR, "Integration.code-workspace")
SOCKET_PATH = os.path.join(PROJECT_DIR, ".clangd-query.sock")
PID_FILE = os.path.join(PROJECT_DIR, ".clangd-query.pid")
# A small file to open on startup to trigger background index loading
TRIGGER_FILE = os.path.join(PROJECT_DIR, "luprex", "cpp", "core", "util.hpp")
def read_workspace_config():
"""Read clangd path and arguments from Integration.code-workspace."""
with open(WORKSPACE_FILE) as f:
ws = json.load(f)
settings = ws.get("settings", {})
clangd_path = settings.get("clangd.path", "clangd")
clangd_args = list(settings.get("clangd.arguments", []))
return clangd_path, clangd_args
def make_lsp_message(obj):
"""Encode a JSON-RPC message with Content-Length header."""
body = json.dumps(obj).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
return header + body
def read_lsp_message(stream):
"""Read one LSP message from a byte stream. Returns parsed JSON or None."""
headers = {}
while True:
line = stream.readline()
if not line:
return None
line = line.decode("utf-8").strip()
if line == "":
break
if ": " in line:
key, value = line.split(": ", 1)
headers[key] = value
length = int(headers.get("Content-Length", 0))
if length == 0:
return None
body = stream.read(length)
return json.loads(body.decode("utf-8"))
# --- Socket protocol helpers ---
# Messages over the Unix socket are length-prefixed JSON:
# 4 bytes big-endian length, then UTF-8 JSON body.
def sock_send(sock, obj):
data = json.dumps(obj).encode("utf-8")
sock.sendall(struct.pack("!I", len(data)) + data)
def sock_recv(sock):
raw_len = _recv_exact(sock, 4)
if not raw_len:
return None
length = struct.unpack("!I", raw_len)[0]
data = _recv_exact(sock, length)
if not data:
return None
return json.loads(data.decode("utf-8"))
def _recv_exact(sock, n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
# --- LSP SymbolKind enum ---
SYMBOL_KINDS = {
1: "File", 2: "Module", 3: "Namespace", 4: "Package", 5: "Class",
6: "Method", 7: "Property", 8: "Field", 9: "Constructor", 10: "Enum",
11: "Interface", 12: "Function", 13: "Variable", 14: "Constant",
15: "String", 16: "Number", 17: "Boolean", 18: "Array", 19: "Object",
20: "Key", 21: "Null", 22: "EnumMember", 23: "Struct", 24: "Event",
25: "Operator", 26: "TypeParameter",
}
def format_uri(uri):
"""Convert file:// URI to a project-relative path."""
path = uri.replace("file://", "")
if path.startswith(PROJECT_DIR + "/"):
path = path[len(PROJECT_DIR) + 1:]
return path
# ============================================================
# Daemon process
# ============================================================
class ClangdDaemon:
"""Manages a clangd subprocess and serves queries over a Unix socket."""
def __init__(self):
clangd_path, clangd_args = read_workspace_config()
cmd = [clangd_path] + clangd_args + ["-j=4"]
self.proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=PROJECT_DIR,
)
self.request_id = 0
self.pending = {}
self.lock = threading.Lock()
self.events = {}
self.index_loaded = threading.Event()
self.opened_files = set()
threading.Thread(target=self._stdout_reader, daemon=True).start()
threading.Thread(target=self._stderr_reader, daemon=True).start()
def _stdout_reader(self):
while True:
msg = read_lsp_message(self.proc.stdout)
if msg is None:
break
msg_id = msg.get("id")
if msg_id is not None and msg_id in self.pending:
with self.lock:
self.pending[msg_id] = msg
self.events[msg_id].set()
def _stderr_reader(self):
for raw_line in self.proc.stderr:
line = raw_line.decode("utf-8", errors="replace").rstrip()
if "BackgroundIndex: serving" in line:
self.index_loaded.set()
def send_request(self, method, params):
self.request_id += 1
rid = self.request_id
event = threading.Event()
with self.lock:
self.pending[rid] = None
self.events[rid] = event
msg = {"jsonrpc": "2.0", "id": rid, "method": method, "params": params}
self.proc.stdin.write(make_lsp_message(msg))
self.proc.stdin.flush()
event.wait(timeout=30)
with self.lock:
result = self.pending.pop(rid, None)
self.events.pop(rid, None)
return result
def send_notification(self, method, params):
msg = {"jsonrpc": "2.0", "method": method, "params": params}
self.proc.stdin.write(make_lsp_message(msg))
self.proc.stdin.flush()
def initialize(self):
self.send_request("initialize", {
"processId": os.getpid(),
"rootUri": f"file://{PROJECT_DIR}",
"capabilities": {},
})
self.send_notification("initialized", {})
self.open_file(TRIGGER_FILE)
def open_file(self, filepath):
abs_path = os.path.abspath(filepath)
if abs_path in self.opened_files:
return
self.opened_files.add(abs_path)
with open(abs_path) as f:
text = f.read()
self.send_notification("textDocument/didOpen", {
"textDocument": {
"uri": f"file://{abs_path}",
"languageId": "cpp",
"version": 1,
"text": text,
}
})
def handle_query(self, query):
"""Handle a query dict and return a result dict."""
cmd = query["command"]
args = query.get("args", [])
if cmd == "symbol":
self.index_loaded.wait(timeout=15)
resp = self.send_request("workspace/symbol", {"query": args[0]})
return resp
if cmd == "definition":
filepath, line, col = args[0], int(args[1]), int(args[2])
abs_path = os.path.abspath(filepath)
self.open_file(abs_path)
return self.send_request("textDocument/definition", {
"textDocument": {"uri": f"file://{abs_path}"},
"position": {"line": line - 1, "character": col - 1},
})
if cmd == "references":
filepath, line, col = args[0], int(args[1]), int(args[2])
abs_path = os.path.abspath(filepath)
self.open_file(abs_path)
return self.send_request("textDocument/references", {
"textDocument": {"uri": f"file://{abs_path}"},
"position": {"line": line - 1, "character": col - 1},
"context": {"includeDeclaration": True},
})
if cmd == "stop":
return {"stop": True}
return {"error": f"Unknown command: {cmd}"}
def shutdown(self):
try:
self.send_request("shutdown", None)
self.send_notification("exit", None)
self.proc.wait(timeout=5)
except Exception:
self.proc.kill()
def kill_existing_daemon():
"""Kill any existing daemon process and clean up."""
if os.path.exists(PID_FILE):
try:
with open(PID_FILE) as f:
old_pid = int(f.read().strip())
os.kill(old_pid, signal.SIGTERM)
# Wait briefly for it to die
for _ in range(20):
try:
os.kill(old_pid, 0)
time.sleep(0.1)
except OSError:
break
except (ValueError, OSError):
pass
try:
os.unlink(PID_FILE)
except OSError:
pass
if os.path.exists(SOCKET_PATH):
try:
os.unlink(SOCKET_PATH)
except OSError:
pass
def run_daemon():
"""Run the daemon process: start clangd, listen on Unix socket."""
kill_existing_daemon()
# Write our PID
with open(PID_FILE, "w") as f:
f.write(str(os.getpid()))
daemon = ClangdDaemon()
daemon.initialize()
# Set up Unix socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(SOCKET_PATH)
server.listen(4)
def cleanup(*_args):
daemon.shutdown()
server.close()
try:
os.unlink(SOCKET_PATH)
except OSError:
pass
try:
os.unlink(PID_FILE)
except OSError:
pass
sys.exit(0)
signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)
# Signal to parent that we're ready
sys.stdout.write("READY\n")
sys.stdout.flush()
while True:
try:
conn, _ = server.accept()
except OSError:
break
try:
query = sock_recv(conn)
if query is None:
conn.close()
continue
result = daemon.handle_query(query)
sock_send(conn, result)
conn.close()
if isinstance(result, dict) and result.get("stop"):
break
except Exception:
try:
conn.close()
except Exception:
pass
cleanup()
# ============================================================
# Client side
# ============================================================
def daemon_is_alive():
"""Check if the daemon is running and responsive."""
if not os.path.exists(SOCKET_PATH):
return False
if not os.path.exists(PID_FILE):
return False
# Check if PID is alive
try:
with open(PID_FILE) as f:
pid = int(f.read().strip())
os.kill(pid, 0)
except (ValueError, OSError):
# Stale PID file
try:
os.unlink(SOCKET_PATH)
except OSError:
pass
try:
os.unlink(PID_FILE)
except OSError:
pass
return False
return True
def start_daemon():
"""Start the daemon as a background process and wait for it to be ready."""
proc = subprocess.Popen(
[sys.executable, __file__, "--daemon"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
cwd=PROJECT_DIR,
start_new_session=True,
)
# Wait for READY signal
line = proc.stdout.readline().decode("utf-8").strip()
proc.stdout.close()
if line != "READY":
print("Failed to start daemon.", file=sys.stderr)
sys.exit(1)
def send_query(query):
"""Send a query to the daemon and return the response."""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(60)
sock.connect(SOCKET_PATH)
sock_send(sock, query)
result = sock_recv(sock)
sock.close()
return result
def ensure_daemon():
"""Make sure the daemon is running."""
if not daemon_is_alive():
start_daemon()
def format_symbol_results(resp):
if not resp or "result" not in resp:
print("No results (or clangd error).", file=sys.stderr)
return
results = resp["result"]
if not results:
print("No symbols found.", file=sys.stderr)
return
for sym in results:
loc = sym.get("location", {})
uri = loc.get("uri", "")
line = loc.get("range", {}).get("start", {}).get("line", 0) + 1
kind = SYMBOL_KINDS.get(sym.get("kind", 0), "Unknown")
path = format_uri(uri)
container = sym.get("containerName", "")
qualified = f"{container}::{sym['name']}" if container else sym["name"]
print(f"{path}:{line} [{kind}] {qualified}")
def format_location_results(resp, label="results"):
if not resp or "result" not in resp:
print(f"No {label} (or clangd error).", file=sys.stderr)
return
results = resp["result"]
if not results:
print(f"No {label} found.", file=sys.stderr)
return
if isinstance(results, dict):
results = [results]
for loc in results:
uri = loc.get("uri", loc.get("targetUri", ""))
rng = loc.get("range", loc.get("targetRange", {}))
line = rng.get("start", {}).get("line", 0) + 1
print(f"{format_uri(uri)}:{line}")
def main():
if len(sys.argv) >= 2 and sys.argv[1] == "--daemon":
run_daemon()
return
if len(sys.argv) < 2:
print(__doc__, file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
args = sys.argv[2:]
if command == "stop":
if not daemon_is_alive():
print("Daemon is not running.", file=sys.stderr)
return
send_query({"command": "stop"})
print("Daemon stopped.")
return
if command == "symbol":
if len(args) < 1:
print("Usage: clangd-query.py symbol <name>", file=sys.stderr)
sys.exit(1)
elif command in ("definition", "references"):
if len(args) < 3:
print(f"Usage: clangd-query.py {command} <file> <line> <col>",
file=sys.stderr)
sys.exit(1)
else:
print(f"Unknown command: {command}", file=sys.stderr)
print(__doc__, file=sys.stderr)
sys.exit(1)
ensure_daemon()
resp = send_query({"command": command, "args": args})
if command == "symbol":
format_symbol_results(resp)
elif command == "definition":
format_location_results(resp, "definition")
elif command == "references":
format_location_results(resp, "references")
if __name__ == "__main__":
main()