Coverage for app/backend/src/couchers/perf.py: 91%

37 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-31 14:08 +0000

1import threading 

2from dataclasses import dataclass 

3from time import perf_counter_ns, thread_time_ns 

4 

5from sqlalchemy import Engine, event 

6 

7# Per-request resource accounting. The gRPC server is thread-per-request with synchronous psycopg, so the SQLAlchemy 

8# cursor-execute listeners fire on the same thread that runs the handler. We keep a thread-local accumulator that the 

9# interceptor arms (start_perf) right before invoking the handler and reads back (read_perf) right after, so the 

10# captured numbers cover the handler span only (not auth lookup or the _store_log insert that runs afterwards). 

11 

12_local = threading.local() 

13 

14 

15@dataclass(slots=True) 

16class _PerfAccumulator: 

17 cpu_start_ns: int 

18 db_query_count: int = 0 

19 db_write_query_count: int = 0 

20 db_time_ms: float = 0.0 

21 

22 

23@dataclass(frozen=True, slots=True) 

24class PerfResult: 

25 db_query_count: int 

26 db_write_query_count: int 

27 db_time_ms: float 

28 cpu_ms: float 

29 

30 

31def start_perf() -> None: 

32 """Arm per-request resource accounting on the current thread.""" 

33 _local.acc = _PerfAccumulator(cpu_start_ns=thread_time_ns()) 

34 

35 

36def read_perf() -> PerfResult | None: 

37 """Snapshot and clear the current thread's accumulator, or None if accounting wasn't armed. 

38 

39 Clearing means queries that run after this (e.g. the _store_log insert, or background work reusing the thread) 

40 aren't attributed to the just-finished request. 

41 """ 

42 acc: _PerfAccumulator | None = getattr(_local, "acc", None) 

43 if acc is None: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true

44 return None 

45 _local.acc = None 

46 return PerfResult( 

47 db_query_count=acc.db_query_count, 

48 db_write_query_count=acc.db_write_query_count, 

49 db_time_ms=acc.db_time_ms, 

50 cpu_ms=(thread_time_ns() - acc.cpu_start_ns) / 1e6, 

51 ) 

52 

53 

54def _before_cursor_execute(conn, cursor, statement, parameters, context, executemany): # type: ignore[no-untyped-def] 

55 # A stack handles re-entrant/nested executes on the same connection. 

56 conn.info.setdefault("_perf_query_starts", []).append(perf_counter_ns()) 

57 

58 

59def _after_cursor_execute(conn, cursor, statement, parameters, context, executemany): # type: ignore[no-untyped-def] 

60 starts = conn.info.get("_perf_query_starts") 

61 if not starts: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 return 

63 elapsed_ms = (perf_counter_ns() - starts.pop()) / 1e6 

64 acc: _PerfAccumulator | None = getattr(_local, "acc", None) 

65 if acc is None: 

66 # Query on a thread with no active request (background job, metrics scrape, etc.) - don't attribute it. 

67 return 

68 acc.db_query_count += 1 

69 acc.db_time_ms += elapsed_ms 

70 # SQLAlchemy sets these when it compiled an INSERT/UPDATE/DELETE. 

71 if context.isinsert or context.isupdate or context.isdelete: 

72 acc.db_write_query_count += 1 

73 

74 

75def register_perf_listeners(engine: Engine) -> None: 

76 event.listen(engine, "before_cursor_execute", _before_cursor_execute) 

77 event.listen(engine, "after_cursor_execute", _after_cursor_execute)