Coverage for app/backend/src/couchers/perf.py: 91%
37 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-31 14:08 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-31 14:08 +0000
1import threading
2from dataclasses import dataclass
3from time import perf_counter_ns, thread_time_ns
5from sqlalchemy import Engine, event
7# Per-request resource accounting. The gRPC server is thread-per-request with synchronous psycopg, so the SQLAlchemy
8# cursor-execute listeners fire on the same thread that runs the handler. We keep a thread-local accumulator that the
9# interceptor arms (start_perf) right before invoking the handler and reads back (read_perf) right after, so the
10# captured numbers cover the handler span only (not auth lookup or the _store_log insert that runs afterwards).
12_local = threading.local()
15@dataclass(slots=True)
16class _PerfAccumulator:
17 cpu_start_ns: int
18 db_query_count: int = 0
19 db_write_query_count: int = 0
20 db_time_ms: float = 0.0
23@dataclass(frozen=True, slots=True)
24class PerfResult:
25 db_query_count: int
26 db_write_query_count: int
27 db_time_ms: float
28 cpu_ms: float
31def start_perf() -> None:
32 """Arm per-request resource accounting on the current thread."""
33 _local.acc = _PerfAccumulator(cpu_start_ns=thread_time_ns())
36def read_perf() -> PerfResult | None:
37 """Snapshot and clear the current thread's accumulator, or None if accounting wasn't armed.
39 Clearing means queries that run after this (e.g. the _store_log insert, or background work reusing the thread)
40 aren't attributed to the just-finished request.
41 """
42 acc: _PerfAccumulator | None = getattr(_local, "acc", None)
43 if acc is None: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true
44 return None
45 _local.acc = None
46 return PerfResult(
47 db_query_count=acc.db_query_count,
48 db_write_query_count=acc.db_write_query_count,
49 db_time_ms=acc.db_time_ms,
50 cpu_ms=(thread_time_ns() - acc.cpu_start_ns) / 1e6,
51 )
54def _before_cursor_execute(conn, cursor, statement, parameters, context, executemany): # type: ignore[no-untyped-def]
55 # A stack handles re-entrant/nested executes on the same connection.
56 conn.info.setdefault("_perf_query_starts", []).append(perf_counter_ns())
59def _after_cursor_execute(conn, cursor, statement, parameters, context, executemany): # type: ignore[no-untyped-def]
60 starts = conn.info.get("_perf_query_starts")
61 if not starts: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 return
63 elapsed_ms = (perf_counter_ns() - starts.pop()) / 1e6
64 acc: _PerfAccumulator | None = getattr(_local, "acc", None)
65 if acc is None:
66 # Query on a thread with no active request (background job, metrics scrape, etc.) - don't attribute it.
67 return
68 acc.db_query_count += 1
69 acc.db_time_ms += elapsed_ms
70 # SQLAlchemy sets these when it compiled an INSERT/UPDATE/DELETE.
71 if context.isinsert or context.isupdate or context.isdelete:
72 acc.db_write_query_count += 1
75def register_perf_listeners(engine: Engine) -> None:
76 event.listen(engine, "before_cursor_execute", _before_cursor_execute)
77 event.listen(engine, "after_cursor_execute", _after_cursor_execute)