Files
integration/tools/clangd-query.py

645 lines
21 KiB
Python
Raw Normal View History

2026-03-03 21:15:48 -05:00
#!/usr/bin/env python3
"""Query clangd for symbol information.
Usage:
python3 tools/clangd-query.py symbol <name>
2026-04-08 03:26:22 -04:00
python3 tools/clangd-query.py diagnostics <file>
2026-03-03 21:15:48 -05:00
python3 tools/clangd-query.py definition <file> <line> <col>
python3 tools/clangd-query.py references <file> <line> <col>
python3 tools/clangd-query.py stop
Commands:
symbol <name> Search for symbols by name across the project.
2026-04-08 03:26:22 -04:00
diagnostics <file> Report clangd diagnostics for a file.
2026-03-03 21:15:48 -05:00
definition <file> <line> <col> Find where a symbol is defined.
references <file> <line> <col> Find all references to a symbol.
stop Stop the background daemon.
File paths are relative to the project root. Line and column are 1-based.
Examples:
python3 tools/clangd-query.py symbol DrivenEngine
python3 tools/clangd-query.py definition luprex/cpp/core/drivenengine.cpp 32 19
python3 tools/clangd-query.py references Source/Integration/Tangible.h 37 30
How it works:
The first invocation starts a background clangd daemon process. It loads
2026-04-08 03:26:22 -04:00
the project through a dedicated .clangd-query compile-commands directory,
which gives it a separate clangd cache. This takes ~10 seconds.
Subsequent queries hit the warm daemon and return in milliseconds.
2026-03-03 21:15:48 -05:00
Clangd configuration (binary path, flags) is read from
Integration.code-workspace so it stays in sync with VS Code.
2026-04-08 03:26:22 -04:00
The daemon writes its PID to .clangd-query/pid and listens on a Unix
socket at .clangd-query/sock. Use 'stop' to shut it down, or just kill
2026-03-03 21:15:48 -05:00
the PID. Starting a new daemon automatically kills any existing one.
"""
import json
import os
import signal
import socket
import struct
import subprocess
import sys
import threading
import time
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_DIR = os.path.dirname(SCRIPT_DIR)
WORKSPACE_FILE = os.path.join(PROJECT_DIR, "Integration.code-workspace")
2026-03-09 06:47:43 -04:00
DAEMON_DIR = os.path.join(PROJECT_DIR, ".clangd-query")
SOCKET_PATH = os.path.join(DAEMON_DIR, "sock")
PID_FILE = os.path.join(DAEMON_DIR, "pid")
LOG_FILE = os.path.join(DAEMON_DIR, "log")
2026-03-03 21:15:48 -05:00
def read_workspace_config():
"""Read clangd path and arguments from Integration.code-workspace."""
with open(WORKSPACE_FILE) as f:
ws = json.load(f)
settings = ws.get("settings", {})
clangd_path = settings.get("clangd.path", "clangd")
clangd_args = list(settings.get("clangd.arguments", []))
return clangd_path, clangd_args
def make_lsp_message(obj):
"""Encode a JSON-RPC message with Content-Length header."""
body = json.dumps(obj).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
return header + body
def read_lsp_message(stream):
"""Read one LSP message from a byte stream. Returns parsed JSON or None."""
headers = {}
while True:
line = stream.readline()
if not line:
return None
line = line.decode("utf-8").strip()
if line == "":
break
if ": " in line:
key, value = line.split(": ", 1)
headers[key] = value
length = int(headers.get("Content-Length", 0))
if length == 0:
return None
body = stream.read(length)
return json.loads(body.decode("utf-8"))
# --- Socket protocol helpers ---
# Messages over the Unix socket are length-prefixed JSON:
# 4 bytes big-endian length, then UTF-8 JSON body.
def sock_send(sock, obj):
data = json.dumps(obj).encode("utf-8")
sock.sendall(struct.pack("!I", len(data)) + data)
def sock_recv(sock):
raw_len = _recv_exact(sock, 4)
if not raw_len:
return None
length = struct.unpack("!I", raw_len)[0]
data = _recv_exact(sock, length)
if not data:
return None
return json.loads(data.decode("utf-8"))
def _recv_exact(sock, n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
# --- LSP SymbolKind enum ---
SYMBOL_KINDS = {
1: "File", 2: "Module", 3: "Namespace", 4: "Package", 5: "Class",
6: "Method", 7: "Property", 8: "Field", 9: "Constructor", 10: "Enum",
11: "Interface", 12: "Function", 13: "Variable", 14: "Constant",
15: "String", 16: "Number", 17: "Boolean", 18: "Array", 19: "Object",
20: "Key", 21: "Null", 22: "EnumMember", 23: "Struct", 24: "Event",
25: "Operator", 26: "TypeParameter",
}
def format_uri(uri):
"""Convert file:// URI to a project-relative path."""
path = uri.replace("file://", "")
if path.startswith(PROJECT_DIR + "/"):
path = path[len(PROJECT_DIR) + 1:]
return path
# ============================================================
# Daemon process
# ============================================================
2026-03-09 06:47:43 -04:00
def truncate_text(obj):
"""Deep-copy obj, truncating any 'text' string values to 80 chars."""
if isinstance(obj, dict):
return {k: (v[:80] + "..." if k == "text" and isinstance(v, str) and len(v) > 80
else truncate_text(v))
for k, v in obj.items()}
if isinstance(obj, list):
return [truncate_text(v) for v in obj]
return obj
def lsp_log(direction, msg):
"""Log an LSP message to the log file."""
with open(LOG_FILE, "a") as f:
f.write(f"{direction} {json.dumps(truncate_text(msg))}\n")
2026-03-03 21:15:48 -05:00
class ClangdDaemon:
"""Manages a clangd subprocess and serves queries over a Unix socket."""
def __init__(self):
clangd_path, clangd_args = read_workspace_config()
2026-03-09 00:21:39 -04:00
# Give the daemon its own compile-commands dir so it gets a separate
# clangd index cache (clangd stores .cache/ next to compile_commands.json).
daemon_cc_dir = os.path.join(PROJECT_DIR, ".clangd-query")
os.makedirs(daemon_cc_dir, exist_ok=True)
cc_link = os.path.join(daemon_cc_dir, "compile_commands.json")
cc_src = os.path.join(PROJECT_DIR, ".vscode", "compile_commands.json")
if os.path.islink(cc_link) or os.path.exists(cc_link):
os.unlink(cc_link)
os.symlink(cc_src, cc_link)
# Replace --compile-commands-dir in args to point to our dir
clangd_args = [
f"--compile-commands-dir={daemon_cc_dir}" if a.startswith("--compile-commands-dir") else a
for a in clangd_args
]
2026-03-03 21:15:48 -05:00
cmd = [clangd_path] + clangd_args + ["-j=4"]
self.proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=PROJECT_DIR,
)
self.request_id = 0
self.pending = {}
self.lock = threading.Lock()
self.events = {}
self.index_loaded = threading.Event()
self.opened_files = set()
2026-03-09 06:47:43 -04:00
self.file_versions = {}
self.cached_diagnostics = {} # uri -> list of diagnostics
self.file_status_events = {} # uri -> threading.Event (set when idle)
2026-03-03 21:15:48 -05:00
threading.Thread(target=self._stdout_reader, daemon=True).start()
threading.Thread(target=self._stderr_reader, daemon=True).start()
def _stdout_reader(self):
while True:
msg = read_lsp_message(self.proc.stdout)
if msg is None:
break
2026-03-09 06:47:43 -04:00
lsp_log("<<<", msg)
2026-03-09 00:21:39 -04:00
if msg.get("method") == "textDocument/publishDiagnostics":
uri = msg["params"]["uri"]
with self.lock:
2026-03-09 06:47:43 -04:00
self.cached_diagnostics[uri] = msg["params"]["diagnostics"]
continue
if msg.get("method") == "textDocument/clangd.fileStatus":
uri = msg["params"]["uri"]
state = msg["params"]["state"]
with self.lock:
if state == "idle" and uri in self.file_status_events:
self.file_status_events[uri].set()
2026-03-09 00:21:39 -04:00
continue
2026-03-03 21:15:48 -05:00
msg_id = msg.get("id")
if msg_id is not None and msg_id in self.pending:
with self.lock:
self.pending[msg_id] = msg
self.events[msg_id].set()
def _stderr_reader(self):
for raw_line in self.proc.stderr:
line = raw_line.decode("utf-8", errors="replace").rstrip()
if "BackgroundIndex: serving" in line:
self.index_loaded.set()
def send_request(self, method, params):
self.request_id += 1
rid = self.request_id
event = threading.Event()
with self.lock:
self.pending[rid] = None
self.events[rid] = event
msg = {"jsonrpc": "2.0", "id": rid, "method": method, "params": params}
2026-03-09 06:47:43 -04:00
lsp_log(">>>", msg)
2026-03-03 21:15:48 -05:00
self.proc.stdin.write(make_lsp_message(msg))
self.proc.stdin.flush()
event.wait(timeout=30)
with self.lock:
result = self.pending.pop(rid, None)
self.events.pop(rid, None)
return result
def send_notification(self, method, params):
msg = {"jsonrpc": "2.0", "method": method, "params": params}
2026-03-09 06:47:43 -04:00
lsp_log(">>>", msg)
2026-03-03 21:15:48 -05:00
self.proc.stdin.write(make_lsp_message(msg))
self.proc.stdin.flush()
def initialize(self):
self.send_request("initialize", {
"processId": os.getpid(),
"rootUri": f"file://{PROJECT_DIR}",
"capabilities": {},
2026-03-09 06:47:43 -04:00
"initializationOptions": {
"clangdFileStatus": True,
},
2026-03-03 21:15:48 -05:00
})
self.send_notification("initialized", {})
2026-03-09 06:47:43 -04:00
self.send_notification("textDocument/didOpen", {
"textDocument": {
"uri": f"file://{PROJECT_DIR}/fake.cpp",
"languageId": "cpp",
"version": 1,
"text": "",
}
})
2026-03-03 21:15:48 -05:00
def open_file(self, filepath):
abs_path = os.path.abspath(filepath)
2026-03-09 06:47:43 -04:00
with open(abs_path) as f:
text = f.read()
2026-03-03 21:15:48 -05:00
if abs_path in self.opened_files:
2026-03-09 06:47:43 -04:00
self.file_versions[abs_path] = self.file_versions.get(abs_path, 1) + 1
self.send_notification("textDocument/didChange", {
"textDocument": {
"uri": f"file://{abs_path}",
"version": self.file_versions[abs_path],
},
"contentChanges": [{"text": text}],
})
2026-03-03 21:15:48 -05:00
return
self.opened_files.add(abs_path)
2026-03-09 06:47:43 -04:00
self.file_versions[abs_path] = 1
2026-03-03 21:15:48 -05:00
self.send_notification("textDocument/didOpen", {
"textDocument": {
"uri": f"file://{abs_path}",
"languageId": "cpp",
"version": 1,
"text": text,
}
})
def handle_query(self, query):
"""Handle a query dict and return a result dict."""
cmd = query["command"]
args = query.get("args", [])
2026-03-09 06:47:43 -04:00
with open(LOG_FILE, "a") as f:
f.write(f"\n\n\n------------------------ QUERY: {cmd} {' '.join(args)} ----------------\n\n\n")
2026-03-03 21:15:48 -05:00
if cmd == "symbol":
self.index_loaded.wait(timeout=15)
resp = self.send_request("workspace/symbol", {"query": args[0]})
return resp
2026-03-09 00:21:39 -04:00
if cmd == "diagnostics":
filepath = args[0]
abs_path = os.path.abspath(filepath)
uri = f"file://{abs_path}"
2026-03-09 06:47:43 -04:00
with open(abs_path) as f:
text = f.read()
event = threading.Event()
2026-03-09 00:21:39 -04:00
with self.lock:
2026-03-09 06:47:43 -04:00
self.cached_diagnostics.pop(uri, None)
self.file_status_events[uri] = event
self.send_notification("textDocument/didOpen", {
"textDocument": {"uri": uri, "languageId": "cpp",
"version": 1, "text": text}
})
event.wait(timeout=30)
2026-03-09 00:21:39 -04:00
with self.lock:
2026-03-09 06:47:43 -04:00
self.file_status_events.pop(uri, None)
diags = self.cached_diagnostics.pop(uri, None)
self.send_notification("textDocument/didClose", {
"textDocument": {"uri": uri}
})
if diags is None:
return {"error": "Timed out waiting for diagnostics"}
2026-03-09 00:21:39 -04:00
return {"result": diags}
2026-03-03 21:15:48 -05:00
if cmd == "definition":
filepath, line, col = args[0], int(args[1]), int(args[2])
abs_path = os.path.abspath(filepath)
self.open_file(abs_path)
return self.send_request("textDocument/definition", {
"textDocument": {"uri": f"file://{abs_path}"},
"position": {"line": line - 1, "character": col - 1},
})
if cmd == "references":
filepath, line, col = args[0], int(args[1]), int(args[2])
abs_path = os.path.abspath(filepath)
self.open_file(abs_path)
return self.send_request("textDocument/references", {
"textDocument": {"uri": f"file://{abs_path}"},
"position": {"line": line - 1, "character": col - 1},
"context": {"includeDeclaration": True},
})
if cmd == "stop":
return {"stop": True}
return {"error": f"Unknown command: {cmd}"}
def shutdown(self):
try:
self.send_request("shutdown", None)
self.send_notification("exit", None)
self.proc.wait(timeout=5)
except Exception:
self.proc.kill()
def kill_existing_daemon():
"""Kill any existing daemon process and clean up."""
if os.path.exists(PID_FILE):
try:
with open(PID_FILE) as f:
old_pid = int(f.read().strip())
os.kill(old_pid, signal.SIGTERM)
# Wait briefly for it to die
for _ in range(20):
try:
os.kill(old_pid, 0)
time.sleep(0.1)
except OSError:
break
except (ValueError, OSError):
pass
try:
os.unlink(PID_FILE)
except OSError:
pass
if os.path.exists(SOCKET_PATH):
try:
os.unlink(SOCKET_PATH)
except OSError:
pass
def run_daemon():
"""Run the daemon process: start clangd, listen on Unix socket."""
2026-03-09 06:47:43 -04:00
os.makedirs(DAEMON_DIR, exist_ok=True)
2026-03-03 21:15:48 -05:00
kill_existing_daemon()
# Write our PID
with open(PID_FILE, "w") as f:
f.write(str(os.getpid()))
daemon = ClangdDaemon()
daemon.initialize()
# Set up Unix socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(SOCKET_PATH)
server.listen(4)
def cleanup(*_args):
daemon.shutdown()
server.close()
try:
os.unlink(SOCKET_PATH)
except OSError:
pass
try:
os.unlink(PID_FILE)
except OSError:
pass
sys.exit(0)
signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)
# Signal to parent that we're ready
sys.stdout.write("READY\n")
sys.stdout.flush()
while True:
try:
conn, _ = server.accept()
except OSError:
break
try:
query = sock_recv(conn)
if query is None:
conn.close()
continue
result = daemon.handle_query(query)
sock_send(conn, result)
conn.close()
if isinstance(result, dict) and result.get("stop"):
break
except Exception:
try:
conn.close()
except Exception:
pass
cleanup()
# ============================================================
# Client side
# ============================================================
def daemon_is_alive():
"""Check if the daemon is running and responsive."""
if not os.path.exists(SOCKET_PATH):
return False
if not os.path.exists(PID_FILE):
return False
# Check if PID is alive
try:
with open(PID_FILE) as f:
pid = int(f.read().strip())
os.kill(pid, 0)
except (ValueError, OSError):
# Stale PID file
try:
os.unlink(SOCKET_PATH)
except OSError:
pass
try:
os.unlink(PID_FILE)
except OSError:
pass
return False
return True
def start_daemon():
"""Start the daemon as a background process and wait for it to be ready."""
proc = subprocess.Popen(
[sys.executable, __file__, "--daemon"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
cwd=PROJECT_DIR,
start_new_session=True,
)
# Wait for READY signal
line = proc.stdout.readline().decode("utf-8").strip()
proc.stdout.close()
if line != "READY":
print("Failed to start daemon.", file=sys.stderr)
sys.exit(1)
2026-03-09 00:21:39 -04:00
def send_query(query, retries=3):
2026-03-03 21:15:48 -05:00
"""Send a query to the daemon and return the response."""
2026-03-09 00:21:39 -04:00
for attempt in range(retries):
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(60)
sock.connect(SOCKET_PATH)
sock_send(sock, query)
result = sock_recv(sock)
sock.close()
return result
except (ConnectionResetError, ConnectionRefusedError, BrokenPipeError, OSError):
sock.close()
if attempt < retries - 1:
import time
time.sleep(2)
# Daemon may have died — restart it
if not daemon_is_alive():
start_daemon()
else:
raise
2026-03-03 21:15:48 -05:00
def ensure_daemon():
"""Make sure the daemon is running."""
if not daemon_is_alive():
start_daemon()
def format_symbol_results(resp):
if not resp or "result" not in resp:
print("No results (or clangd error).", file=sys.stderr)
return
results = resp["result"]
if not results:
print("No symbols found.", file=sys.stderr)
return
for sym in results:
loc = sym.get("location", {})
uri = loc.get("uri", "")
line = loc.get("range", {}).get("start", {}).get("line", 0) + 1
kind = SYMBOL_KINDS.get(sym.get("kind", 0), "Unknown")
path = format_uri(uri)
container = sym.get("containerName", "")
qualified = f"{container}::{sym['name']}" if container else sym["name"]
print(f"{path}:{line} [{kind}] {qualified}")
2026-03-09 00:21:39 -04:00
def format_diagnostics(resp):
if not resp or "result" not in resp:
print("No diagnostics (or clangd error).", file=sys.stderr)
return
diags = resp["result"]
if not diags:
print("No problems found.")
return
for d in diags:
line = d.get("range", {}).get("start", {}).get("line", 0) + 1
col = d.get("range", {}).get("start", {}).get("character", 0) + 1
severity = {1: "error", 2: "warning", 3: "info", 4: "hint"}.get(d.get("severity", 0), "unknown")
msg = d.get("message", "")
print(f" {line}:{col} [{severity}] {msg}")
2026-03-03 21:15:48 -05:00
def format_location_results(resp, label="results"):
if not resp or "result" not in resp:
print(f"No {label} (or clangd error).", file=sys.stderr)
return
results = resp["result"]
if not results:
print(f"No {label} found.", file=sys.stderr)
return
if isinstance(results, dict):
results = [results]
for loc in results:
uri = loc.get("uri", loc.get("targetUri", ""))
rng = loc.get("range", loc.get("targetRange", {}))
line = rng.get("start", {}).get("line", 0) + 1
print(f"{format_uri(uri)}:{line}")
def main():
if len(sys.argv) >= 2 and sys.argv[1] == "--daemon":
run_daemon()
return
if len(sys.argv) < 2:
print(__doc__, file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
args = sys.argv[2:]
if command == "stop":
if not daemon_is_alive():
print("Daemon is not running.", file=sys.stderr)
return
send_query({"command": "stop"})
print("Daemon stopped.")
return
if command == "symbol":
if len(args) < 1:
print("Usage: clangd-query.py symbol <name>", file=sys.stderr)
sys.exit(1)
2026-03-09 00:21:39 -04:00
elif command == "diagnostics":
if len(args) < 1:
print("Usage: clangd-query.py diagnostics <file>", file=sys.stderr)
sys.exit(1)
2026-03-03 21:15:48 -05:00
elif command in ("definition", "references"):
if len(args) < 3:
print(f"Usage: clangd-query.py {command} <file> <line> <col>",
file=sys.stderr)
sys.exit(1)
else:
print(f"Unknown command: {command}", file=sys.stderr)
print(__doc__, file=sys.stderr)
sys.exit(1)
ensure_daemon()
resp = send_query({"command": command, "args": args})
if command == "symbol":
format_symbol_results(resp)
2026-03-09 00:21:39 -04:00
elif command == "diagnostics":
format_diagnostics(resp)
2026-03-03 21:15:48 -05:00
elif command == "definition":
format_location_results(resp, "definition")
elif command == "references":
format_location_results(resp, "references")
if __name__ == "__main__":
main()