Coverage for app/backend/src/couchers/metrics.py: 95%

242 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import threading 

2import time 

3from collections.abc import Callable 

4from concurrent.futures import ThreadPoolExecutor 

5from datetime import datetime, timedelta 

6from typing import Any, cast 

7 

8from opentelemetry import trace 

9from prometheus_client import ( 

10 CONTENT_TYPE_LATEST, 

11 CollectorRegistry, 

12 Counter, 

13 Gauge, 

14 Histogram, 

15 exposition, 

16 generate_latest, 

17 multiprocess, 

18) 

19from prometheus_client.registry import CollectorRegistry 

20from sqlalchemy import Engine, and_, case, select 

21from sqlalchemy.pool import QueuePool 

22from sqlalchemy.sql import distinct, func 

23from sqlalchemy.sql.selectable import Select 

24 

25from couchers import experimentation 

26from couchers.config import config 

27from couchers.db import session_scope 

28from couchers.helpers.completed_profile import has_completed_profile_expression 

29from couchers.materialized_views import ClusterSubscriptionCount 

30from couchers.models import ( 

31 BackgroundJob, 

32 ClientPlatform, 

33 Cluster, 

34 EventOccurrenceAttendee, 

35 HostingStatus, 

36 HostRequest, 

37 Message, 

38 Node, 

39 NodeType, 

40 NonvisibleUserAccessType, 

41 NonvisibleUserState, 

42 Reference, 

43 User, 

44 UserActivity, 

45) 

46from couchers.models.moderation import ( 

47 ModerationAction, 

48 ModerationObjectType, 

49 ModerationQueueItem, 

50 ModerationState, 

51 ModerationTrigger, 

52 ModerationVisibility, 

53) 

54from couchers.perf import PerfResult 

55 

56tracer = trace.get_tracer(__name__) 

57 

58registry: CollectorRegistry = CollectorRegistry() 

59multiprocess.MultiProcessCollector(registry) # type: ignore[no-untyped-call] 

60 

61_INF: float = float("inf") 

62 

63# Dense from 1ms to ~300ms where most calls land, sparse out to 10min for long background jobs. 

64MACHINE_DURATION_SECONDS: tuple[float, ...] = ( 

65 0.001, 

66 0.0025, 

67 0.005, 

68 0.0075, 

69 0.01, 

70 0.015, 

71 0.02, 

72 0.03, 

73 0.04, 

74 0.05, 

75 0.06, 

76 0.075, 

77 0.1, 

78 0.125, 

79 0.15, 

80 0.2, 

81 0.25, 

82 0.3, 

83 0.4, 

84 0.5, 

85 0.75, 

86 1.0, 

87 1.5, 

88 2.0, 

89 3.0, 

90 5.0, 

91 7.5, 

92 10.0, 

93 15.0, 

94 30.0, 

95 60, 

96 120, 

97 300, 

98 600, 

99 _INF, 

100) 

101 

102start_time_gauge: Gauge = Gauge( 

103 "couchers_start_time_seconds", 

104 "Unix timestamp of when the process started", 

105 multiprocess_mode="max", 

106) 

107start_time_gauge.set(time.time()) 

108 

109commit_timestamp_gauge: Gauge = Gauge( 

110 "couchers_commit_timestamp_seconds", 

111 "Unix timestamp of the deployed commit, 0 if not a CI build", 

112 multiprocess_mode="max", 

113) 

114# left at its default of 0 when COMMIT_TIMESTAMP is empty (i.e. not a CI build) 

115if config.COMMIT_TIMESTAMP: 115 ↛ 118line 115 didn't jump to line 118 because the condition on line 115 was always true

116 commit_timestamp_gauge.set(datetime.fromisoformat(config.COMMIT_TIMESTAMP).timestamp()) 

117 

118jobs_duration_histogram: Histogram = Histogram( 

119 "couchers_background_jobs_seconds", 

120 "Durations of background jobs", 

121 labelnames=["job", "status", "attempt", "exception"], 

122 buckets=MACHINE_DURATION_SECONDS, 

123) 

124 

125 

126def observe_in_jobs_duration_histogram( 

127 job_type: str, job_state: str, try_count: int, exception_name: str, duration_s: float 

128) -> None: 

129 jobs_duration_histogram.labels(job_type, job_state, str(try_count), exception_name).observe(duration_s) 

130 

131 

132jobs_queued_histogram: Histogram = Histogram( 

133 "couchers_background_jobs_queued_seconds", 

134 "Time background job spent queued before being picked up", 

135 labelnames=["priority"], 

136 buckets=( 

137 0.01, 

138 0.05, 

139 0.1, 

140 0.5, 

141 1.0, 

142 2.5, 

143 5.0, 

144 10, 

145 20, 

146 30, 

147 40, 

148 50, 

149 60, 

150 90, 

151 120, 

152 180, 

153 240, 

154 300, 

155 360, 

156 420, 

157 480, 

158 540, 

159 600, 

160 720, 

161 900, 

162 1800, 

163 3600, 

164 _INF, 

165 ), 

166) 

167 

168 

169servicer_duration_histogram: Histogram = Histogram( 

170 "couchers_servicer_duration_seconds", 

171 "Durations of processing gRPC calls", 

172 labelnames=["method", "logged_in", "code", "exception"], 

173 buckets=MACHINE_DURATION_SECONDS, 

174) 

175 

176 

177def observe_in_servicer_duration_histogram( 

178 method: str, user_id: Any, status_code: str, exception_type: str, duration_s: float 

179) -> None: 

180 servicer_duration_histogram.labels(method, user_id is not None, status_code, exception_type).observe(duration_s) 

181 

182 

183servicer_setup_errors_counter: Counter = Counter( 

184 "couchers_servicer_setup_errors_total", 

185 "Number of unexpected errors raised during gRPC interceptor setup, before the handler is invoked", 

186 labelnames=["method", "exception"], 

187) 

188 

189 

190def observe_in_servicer_setup_errors_counter(method: str, exception_type: str) -> None: 

191 servicer_setup_errors_counter.labels(method, exception_type).inc() 

192 

193 

194# Per-request resource accounting (see couchers/perf.py), labelled by method only to keep cardinality modest. The 

195# histogram _sum gives the cost rate per endpoint via rate() (DB-seconds/sec, CPU-seconds/sec); the buckets give the 

196# per-call distribution. 

197servicer_db_time_histogram: Histogram = Histogram( 

198 "couchers_servicer_db_time_seconds", 

199 "Time spent in DB cursor execution per gRPC call", 

200 labelnames=["method"], 

201 buckets=MACHINE_DURATION_SECONDS, 

202) 

203servicer_cpu_time_histogram: Histogram = Histogram( 

204 "couchers_servicer_cpu_seconds", 

205 "Backend thread CPU time per gRPC call", 

206 labelnames=["method"], 

207 buckets=MACHINE_DURATION_SECONDS, 

208) 

209# Fibonacci bucket boundaries: roughly exponential, good resolution for an unbounded value 

210servicer_db_query_count_histogram: Histogram = Histogram( 

211 "couchers_servicer_db_query_count", 

212 "Number of SQL statements executed per gRPC call", 

213 labelnames=["method"], 

214 buckets=(1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, _INF), 

215) 

216servicer_db_write_query_count_histogram: Histogram = Histogram( 

217 "couchers_servicer_db_write_query_count", 

218 "Number of INSERT/UPDATE/DELETE statements executed per gRPC call", 

219 labelnames=["method"], 

220 buckets=(1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, _INF), 

221) 

222 

223 

224def observe_in_servicer_perf_histograms(method: str, perf: PerfResult | None) -> None: 

225 if perf is None: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true

226 return 

227 servicer_db_time_histogram.labels(method).observe(perf.db_time_ms / 1000) 

228 servicer_cpu_time_histogram.labels(method).observe(perf.cpu_ms / 1000) 

229 servicer_db_query_count_histogram.labels(method).observe(perf.db_query_count) 

230 servicer_db_write_query_count_histogram.labels(method).observe(perf.db_write_query_count) 

231 

232 

233# Auth/setup phase (everything before the handler body), same db-vs-cpu split as the handler-body histograms above. 

234servicer_setup_db_time_histogram: Histogram = Histogram( 

235 "couchers_servicer_setup_db_time_seconds", 

236 "Time spent in DB cursor execution during the auth/setup phase per gRPC call", 

237 labelnames=["method"], 

238 buckets=MACHINE_DURATION_SECONDS, 

239) 

240servicer_setup_cpu_time_histogram: Histogram = Histogram( 

241 "couchers_servicer_setup_cpu_seconds", 

242 "Backend thread CPU time during the auth/setup phase per gRPC call", 

243 labelnames=["method"], 

244 buckets=MACHINE_DURATION_SECONDS, 

245) 

246 

247 

248def observe_in_servicer_setup_histogram(method: str, perf: PerfResult | None) -> None: 

249 if perf is None: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true

250 return 

251 servicer_setup_db_time_histogram.labels(method).observe(perf.db_time_ms / 1000) 

252 servicer_setup_cpu_time_histogram.labels(method).observe(perf.cpu_ms / 1000) 

253 

254 

255servicer_pool_wait_histogram: Histogram = Histogram( 

256 "couchers_servicer_pool_wait_seconds", 

257 "Time spent waiting to check out a DB connection from the pool per gRPC call", 

258 labelnames=["method"], 

259 buckets=MACHINE_DURATION_SECONDS, 

260) 

261 

262 

263def observe_in_servicer_pool_wait_histogram(method: str, pool_wait_s: float) -> None: 

264 servicer_pool_wait_histogram.labels(method).observe(pool_wait_s) 

265 

266 

267# Separate diagnostic, not part of the additive duration pie: "serialize" runs after the duration window closes. 

268servicer_serde_histogram: Histogram = Histogram( 

269 "couchers_servicer_serde_seconds", 

270 "Protobuf request deserialization / response serialization time per gRPC call", 

271 labelnames=["method", "direction"], 

272 buckets=MACHINE_DURATION_SECONDS, 

273) 

274 

275 

276def observe_in_servicer_serde_histogram(method: str, direction: str, serde_s: float) -> None: 

277 servicer_serde_histogram.labels(method, direction).observe(serde_s) 

278 

279 

280# liveall keeps one series per worker pid (and drops dead workers), so these also show load balance across workers. 

281# Updated from inside each worker since the /metrics scrape runs in the parent, which has neither pool. 

282grpc_in_flight_gauge: Gauge = Gauge( 

283 "couchers_grpc_in_flight", 

284 "Outstanding gRPC calls (running plus queued for a server thread), per worker process", 

285 multiprocess_mode="liveall", 

286) 

287grpc_threadpool_queue_depth_gauge: Gauge = Gauge( 

288 "couchers_grpc_threadpool_queue_depth", 

289 "gRPC calls queued waiting for a free server thread, per worker process", 

290 multiprocess_mode="liveall", 

291) 

292db_pool_checked_out_gauge: Gauge = Gauge( 

293 "couchers_db_pool_checked_out", 

294 "Checked-out DB connections, per worker process", 

295 multiprocess_mode="liveall", 

296) 

297 

298 

299def start_worker_resource_sampler(executor: ThreadPoolExecutor, engine: Engine, interval: float = 1.0) -> None: 

300 def sample() -> None: 

301 while True: 

302 # _work_queue is private but stable: tasks gRPC has submitted that no thread has picked up yet 

303 grpc_threadpool_queue_depth_gauge.set(executor._work_queue.qsize()) 

304 db_pool_checked_out_gauge.set(cast(QueuePool, engine.pool).checkedout()) 

305 time.sleep(interval) 

306 

307 threading.Thread(target=sample, daemon=True, name="resource-sampler").start() 

308 

309 

310supervised_children_alive_gauge: Gauge = Gauge( 

311 "couchers_supervised_children_alive", 

312 "Child processes (API workers, background workers, scheduler) the supervisor currently sees alive", 

313 multiprocess_mode="mostrecent", 

314) 

315 

316 

317# Simple count of API calls, broken down by method and the client platform header. Cheap (a counter, no buckets) and 

318# answers "how much traffic comes from each platform". 

319api_calls_counter: Counter = Counter( 

320 "couchers_api_calls_total", 

321 "Number of gRPC API calls", 

322 labelnames=["method", "platform"], 

323) 

324 

325 

326def observe_api_call(method: str, client_platform: ClientPlatform | None) -> None: 

327 api_calls_counter.labels(method, client_platform.name if client_platform is not None else "unknown").inc() 

328 

329 

330# list of gauge names and function to execute to set value to 

331# the python prometheus client does not support Gauge.set_function, so instead we hack around it and set each gauge just 

332# before collection with this 

333_set_hacky_gauges_funcs: list[tuple[Gauge, Callable[[], Any]]] = [] 

334 

335 

336def _make_gauge_from_query(name: str, description: str, statement: Select[Any]) -> Gauge: 

337 """ 

338 Given a name, description and statement that is a sqlalchemy statement, creates a gauge from it 

339 

340 statement should be a sqlalchemy SELECT statement that returns a single number 

341 """ 

342 

343 def f() -> Any: 

344 with tracer.start_as_current_span(f"metric.{name}"): 

345 with session_scope() as session: 

346 return session.execute(statement).scalar_one() 

347 

348 gauge = Gauge(name, description, multiprocess_mode="mostrecent") 

349 _set_hacky_gauges_funcs.append((gauge, f)) 

350 return gauge 

351 

352 

353# list of labeled gauges and the function to populate their label values just before collection 

354_set_hacky_labeled_gauges_funcs: list[tuple[Gauge, Callable[[Gauge], None]]] = [] 

355 

356 

357def _make_labeled_gauge_from_query( 

358 name: str, 

359 description: str, 

360 labelname: str, 

361 statement: Select[Any], 

362 default_label_values: list[str] | None = None, 

363) -> Gauge: 

364 """ 

365 Given a name, description, label name and statement, creates a gauge with one label set from the statement. 

366 

367 statement should be a sqlalchemy SELECT statement that returns rows of (label_value, count). 

368 

369 default_label_values, if given, are seeded to zero before the query results are applied, so that label 

370 values with no matching rows are still emitted. 

371 """ 

372 

373 gauge = Gauge(name, description, labelnames=[labelname], multiprocess_mode="mostrecent") 

374 

375 def f(g: Gauge) -> None: 

376 with tracer.start_as_current_span(f"metric.{name}"): 

377 with session_scope() as session: 

378 rows = session.execute(statement).all() 

379 for label_value in default_label_values or []: 

380 g.labels(label_value).set(0) 

381 for label_value, count in rows: 

382 g.labels(str(label_value)).set(count) 

383 

384 _set_hacky_labeled_gauges_funcs.append((gauge, f)) 

385 return gauge 

386 

387 

388_active_user_periods: list[tuple[str, str, timedelta]] = [ 

389 ("5m", "5 min", timedelta(minutes=5)), 

390 ("24h", "24 hours", timedelta(hours=24)), 

391 ("1month", "1 month", timedelta(weeks=4)), 

392 ("3month", "3 months", timedelta(weeks=13)), 

393 ("6month", "6 months", timedelta(weeks=26)), 

394 ("12month", "12 months", timedelta(days=365)), 

395] 

396 

397active_users_gauges: list[Gauge] = [ 

398 _make_gauge_from_query( 

399 f"couchers_active_users_{name}", 

400 f"Number of active users in the last {description}", 

401 (select(func.count()).select_from(User).where(User.is_visible).where(User.last_active > func.now() - interval)), 

402 ) 

403 for name, description, interval in _active_user_periods 

404] 

405 

406users_gauge: Gauge = _make_gauge_from_query( 

407 "couchers_users", "Total number of users", select(func.count()).select_from(User).where(User.is_visible) 

408) 

409 

410# Number of users per community, labeled by community name. Only includes communities at the region level or 

411# broader (world, macroregion, region). 

412users_per_community_gauge: Gauge = _make_labeled_gauge_from_query( 

413 "couchers_users_per_community", 

414 "Number of users per community, for regions and broader", 

415 "community", 

416 ( 

417 select(Cluster.name, func.coalesce(ClusterSubscriptionCount.count, 0)) 

418 .select_from(Node) 

419 .join(Cluster, and_(Cluster.parent_node_id == Node.id, Cluster.is_official_cluster)) 

420 .outerjoin(ClusterSubscriptionCount, ClusterSubscriptionCount.cluster_id == Cluster.id) 

421 .where(Node.node_type <= NodeType.region) 

422 ), 

423) 

424 

425# Number of users bucketed by how recently they were last active. 

426_active_users_buckets: list[tuple[str, timedelta | None]] = [ 

427 ("<1d", timedelta(days=1)), 

428 ("1d-1w", timedelta(days=7)), 

429 ("1w-1m", timedelta(weeks=4)), 

430 ("1m-6m", timedelta(weeks=26)), 

431 ("6m-12m", timedelta(days=365)), 

432 ("12m-24m", timedelta(days=730)), 

433 ("24m+", None), 

434] 

435_active_users_age = func.now() - User.last_active 

436active_users_by_recency_gauge: Gauge = _make_labeled_gauge_from_query( 

437 "couchers_active_users_by_recency", 

438 "Number of users bucketed by how recently they were last active", 

439 "period", 

440 ( 

441 select( 

442 case( 

443 *[(_active_users_age < interval, label) for label, interval in _active_users_buckets if interval], 

444 else_=_active_users_buckets[-1][0], 

445 ).label("period"), 

446 func.count(), 

447 ) 

448 .select_from(User) 

449 .where(User.is_visible) 

450 .group_by("period") 

451 ), 

452 default_label_values=[label for label, _ in _active_users_buckets], 

453) 

454 

455# Window for the per-platform daily-active-user metrics. Kept to 24h so the user_activity scan stays cheap (an index 

456# scan of just the last day's rows), letting these gauges be computed inline on every scrape. 

457_ACTIVE_USERS_BY_PLATFORM_WINDOW = timedelta(hours=24) 

458# Platforms counted as "mobile" for the mobile-share fraction (native apps plus the mobile web viewport). 

459_MOBILE_PLATFORMS = [ClientPlatform.web_mobile, ClientPlatform.app_ios, ClientPlatform.app_android] 

460 

461 

462def active_users_by_platform_statement() -> Select[Any]: 

463 # one scan of the last 24h of user_activity: distinct active users in total, the mobile subset (for the share 

464 # fraction), and a breakdown per platform. client_platform is set from a header the client explicitly sends; it's 

465 # null for some other client (e.g. an API key script) or activity from before the header existed, so the 

466 # per-platform counts don't sum to the total and "mobile" needs its own union count rather than summing labels. 

467 distinct_users = func.count(distinct(UserActivity.user_id)) 

468 return ( 

469 select( 

470 distinct_users.label("total"), 

471 distinct_users.filter(UserActivity.client_platform.in_(_MOBILE_PLATFORMS)).label("mobile"), 

472 *[ 

473 distinct_users.filter(UserActivity.client_platform == platform).label(platform.name) 

474 for platform in ClientPlatform 

475 ], 

476 ) 

477 .select_from(UserActivity) 

478 .join(User, User.id == UserActivity.user_id) 

479 .where(User.is_visible) 

480 .where(UserActivity.period > func.now() - _ACTIVE_USERS_BY_PLATFORM_WINDOW) 

481 ) 

482 

483 

484# Distinct active users in the last 24h, split by client platform. 

485active_users_by_platform_gauge: Gauge = Gauge( 

486 "couchers_active_users_by_platform", 

487 "Distinct active users in the last 24h, split by client platform (web_desktop, web_mobile, app_ios, app_android)", 

488 labelnames=["platform"], 

489 multiprocess_mode="mostrecent", 

490) 

491 

492# Fraction of the last 24h's distinct active users who had any mobile activity. The headline "mobile is key" number. 

493active_users_mobile_fraction_gauge: Gauge = Gauge( 

494 "couchers_active_users_mobile_fraction", 

495 "Fraction of distinct active users in the last 24h with any mobile activity (web_mobile, app_ios, app_android)", 

496 multiprocess_mode="mostrecent", 

497) 

498 

499 

500def _set_active_users_by_platform(gauge: Gauge) -> None: 

501 with tracer.start_as_current_span("metric.couchers_active_users_by_platform"): 

502 with session_scope() as session: 

503 row = session.execute(active_users_by_platform_statement()).one()._mapping 

504 for platform in ClientPlatform: 

505 gauge.labels(platform.name).set(row[platform.name]) 

506 total = row["total"] 

507 active_users_mobile_fraction_gauge.set(row["mobile"] / total if total else 0.0) 

508 

509 

510_set_hacky_labeled_gauges_funcs.append((active_users_by_platform_gauge, _set_active_users_by_platform)) 

511 

512man_gauge: Gauge = _make_gauge_from_query( 

513 "couchers_users_man", 

514 "Total number of users with gender 'Man'", 

515 select(func.count()).select_from(User).where(User.is_visible).where(User.gender == "Man"), 

516) 

517 

518woman_gauge: Gauge = _make_gauge_from_query( 

519 "couchers_users_woman", 

520 "Total number of users with gender 'Woman'", 

521 select(func.count()).select_from(User).where(User.is_visible).where(User.gender == "Woman"), 

522) 

523 

524nonbinary_gauge: Gauge = _make_gauge_from_query( 

525 "couchers_users_nonbinary", 

526 "Total number of users with gender 'Non-binary'", 

527 select(func.count()).select_from(User).where(User.is_visible).where(User.gender == "Non-binary"), 

528) 

529 

530can_host_gauge: Gauge = _make_gauge_from_query( 

531 "couchers_users_can_host", 

532 "Total number of users with hosting status 'can_host'", 

533 select(func.count()).select_from(User).where(User.is_visible).where(User.hosting_status == HostingStatus.can_host), 

534) 

535 

536cant_host_gauge: Gauge = _make_gauge_from_query( 

537 "couchers_users_cant_host", 

538 "Total number of users with hosting status 'cant_host'", 

539 select(func.count()).select_from(User).where(User.is_visible).where(User.hosting_status == HostingStatus.cant_host), 

540) 

541 

542maybe_gauge: Gauge = _make_gauge_from_query( 

543 "couchers_users_maybe", 

544 "Total number of users with hosting status 'maybe'", 

545 select(func.count()).select_from(User).where(User.is_visible).where(User.hosting_status == HostingStatus.maybe), 

546) 

547 

548completed_profile_gauge: Gauge = _make_gauge_from_query( 

549 "couchers_users_completed_profile", 

550 "Total number of users with a completed profile", 

551 select(func.count()).select_from(User).where(User.is_visible).where(has_completed_profile_expression()), 

552) 

553 

554completed_my_home_gauge: Gauge = _make_gauge_from_query( 

555 "couchers_users_completed_my_home", 

556 "Total number of users with a completed my home section", 

557 select(func.count()).select_from(User).where(User.is_visible).where(User.has_completed_my_home), 

558) 

559 

560sent_message_gauge: Gauge = _make_gauge_from_query( 

561 "couchers_users_sent_message", 

562 "Total number of users who have sent a message", 

563 (select(func.count(distinct(Message.author_id))).join(User, User.id == Message.author_id).where(User.is_visible)), 

564) 

565 

566sent_request_gauge: Gauge = _make_gauge_from_query( 

567 "couchers_users_sent_request", 

568 "Total number of users who have sent a host request", 

569 ( 

570 select(func.count(distinct(HostRequest.initiator_user_id))) 

571 .join(User, User.id == HostRequest.initiator_user_id) 

572 .where(User.is_visible) 

573 ), 

574) 

575 

576has_reference_gauge: Gauge = _make_gauge_from_query( 

577 "couchers_users_has_reference", 

578 "Total number of users who have a reference", 

579 ( 

580 select(func.count(distinct(Reference.to_user_id))) 

581 .join(User, User.id == Reference.to_user_id) 

582 .where(User.is_visible) 

583 ), 

584) 

585 

586rsvpd_to_event_gauge: Gauge = _make_gauge_from_query( 

587 "couchers_users_rsvpd_to_event", 

588 "Total number of users who have RSVPd to an event", 

589 ( 

590 select(func.count(distinct(EventOccurrenceAttendee.user_id))) 

591 .join(User, User.id == EventOccurrenceAttendee.user_id) 

592 .where(User.is_visible) 

593 ), 

594) 

595 

596background_jobs_ready_to_execute_gauge: Gauge = _make_gauge_from_query( 

597 "couchers_background_jobs_ready_to_execute", 

598 "Total number of background jobs ready to execute", 

599 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.ready_for_retry), 

600) 

601 

602background_jobs_serialization_errors_counter: Counter = Counter( 

603 "couchers_background_jobs_serialization_errors_total", 

604 "Number of times a bg worker has a serialization error", 

605) 

606 

607background_jobs_no_jobs_counter: Counter = Counter( 

608 "couchers_background_jobs_no_jobs_total", 

609 "Number of times a bg worker tries to grab a job but there is none", 

610) 

611 

612background_jobs_got_job_counter: Counter = Counter( 

613 "couchers_background_jobs_got_job_total", 

614 "Number of times a bg worker grabbed a job", 

615) 

616 

617 

618signup_initiations_counter: Counter = Counter( 

619 "couchers_signup_initiations_total", 

620 "Number of initiated signups", 

621) 

622signup_completions_counter: Counter = Counter( 

623 "couchers_signup_completions_total", 

624 "Number of completed signups", 

625 labelnames=["gender"], 

626) 

627# Per-step signup funnel counters. Each fires once, the first time a signup flow satisfies the given gate, so 

628# that step_total/initiations_total gives the fraction of signups that reached that step. Unlabeled to match 

629# signup_initiations_counter for clean ratios. 

630signup_account_filled_counter: Counter = Counter( 

631 "couchers_signup_account_filled_total", 

632 "Number of signup flows that filled in their account details", 

633) 

634signup_email_verified_counter: Counter = Counter( 

635 "couchers_signup_email_verified_total", 

636 "Number of signup flows that verified their email address", 

637) 

638signup_guidelines_accepted_counter: Counter = Counter( 

639 "couchers_signup_guidelines_accepted_total", 

640 "Number of signup flows that accepted the community guidelines", 

641) 

642signup_motivations_filled_counter: Counter = Counter( 

643 "couchers_signup_motivations_filled_total", 

644 "Number of signup flows that filled in their motivations", 

645) 

646signup_time_histogram: Histogram = Histogram( 

647 "couchers_signup_time_seconds", 

648 "Time taken for a user to sign up", 

649 labelnames=["gender"], 

650 buckets=(30, 60, 90, 120, 180, 240, 300, 360, 420, 480, 540, 600, 900, 1200, 1800, 3600, 7200, _INF), 

651) 

652 

653logins_counter: Counter = Counter( 

654 "couchers_logins_total", 

655 "Number of logins", 

656 labelnames=["gender"], 

657) 

658 

659password_reset_initiations_counter: Counter = Counter( 

660 "couchers_password_reset_initiations_total", 

661 "Number of password reset initiations", 

662) 

663password_reset_completions_counter: Counter = Counter( 

664 "couchers_password_reset_completions_total", 

665 "Number of password reset completions", 

666) 

667 

668account_deletion_initiations_counter: Counter = Counter( 

669 "couchers_account_deletion_initiations_total", 

670 "Number of account deletion initiations", 

671 labelnames=["gender"], 

672) 

673account_deletion_completions_counter: Counter = Counter( 

674 "couchers_account_deletion_completions_total", 

675 "Number of account deletion completions", 

676 labelnames=["gender"], 

677) 

678account_recoveries_counter: Counter = Counter( 

679 "couchers_account_recoveries_total", 

680 "Number of account recoveries", 

681 labelnames=["gender"], 

682) 

683 

684strong_verification_initiations_counter: Counter = Counter( 

685 "couchers_strong_verification_initiations_total", 

686 "Number of strong verification initiations", 

687 labelnames=["gender"], 

688) 

689strong_verification_completions_counter: Counter = Counter( 

690 "couchers_strong_verification_completions_total", 

691 "Number of strong verification completions", 

692) 

693strong_verification_data_deletions_counter: Counter = Counter( 

694 "couchers_strong_verification_data_deletions_total", 

695 "Number of strong verification data deletions", 

696 labelnames=["gender"], 

697) 

698 

699host_requests_sent_counter: Counter = Counter( 

700 "couchers_host_requests_total", 

701 "Number of host requests sent", 

702 labelnames=["from_gender", "to_gender"], 

703) 

704host_request_responses_counter: Counter = Counter( 

705 "couchers_host_requests_responses_total", 

706 "Number of responses to host requests", 

707 labelnames=["responder_gender", "other_gender", "response_type"], 

708) 

709 

710sent_messages_counter: Counter = Counter( 

711 "couchers_sent_messages_total", 

712 "Number of messages sent", 

713 labelnames=["gender", "message_type"], 

714) 

715 

716 

717push_notification_counter: Counter = Counter( 

718 "couchers_push_notification_total", 

719 "Number of push notification delivery attempts", 

720 labelnames=["platform", "outcome"], 

721) 

722emails_counter: Counter = Counter( 

723 "couchers_emails_total", 

724 "Number of emails sent", 

725) 

726 

727 

728# Revenue from successful Stripe charges, in cents, split by source (donation vs merch). 

729revenue_cents_counter: Counter = Counter( 

730 "couchers_revenue_cents_total", 

731 "Revenue from successful Stripe charges, in cents", 

732 labelnames=["type"], 

733) 

734 

735 

736def observe_revenue(revenue_type: str, amount_cents: int) -> None: 

737 revenue_cents_counter.labels(revenue_type).inc(amount_cents) 

738 

739 

740antibots_assessed_counter: Counter = Counter( 

741 "couchers_antibots_assessed_total", 

742 "Number of times an antibot assessment is created", 

743 labelnames=["action"], 

744) 

745 

746antibot_score_histogram: Histogram = Histogram( 

747 "couchers_antibot_score", 

748 "Score of antibot assessments", 

749 labelnames=["action"], 

750 buckets=tuple(x / 20 for x in range(0, 21)), 

751) 

752 

753host_request_first_response_histogram: Histogram = Histogram( 

754 "couchers_host_request_first_response_seconds", 

755 "Response time to host requests", 

756 labelnames=["host_gender", "surfer_gender", "response_type"], 

757 buckets=( 

758 1 * 60, # 1m 

759 2 * 60, # 2m 

760 5 * 60, # 5m 

761 10 * 60, # 10m 

762 15 * 60, # 15m 

763 30 * 60, # 30m 

764 45 * 60, # 45m 

765 3_600, # 1h 

766 2 * 3_600, # 2h 

767 3 * 3_600, # 3h 

768 6 * 3_600, # 6h 

769 12 * 3_600, # 12h 

770 86_400, # 24h 

771 2 * 86_400, # 2d 

772 5 * 86_400, # 4d 

773 602_000, # 1w 

774 2 * 602_000, # 2w 

775 3 * 602_000, # 3w 

776 4 * 602_000, # 4w 

777 _INF, 

778 ), 

779) 

780account_age_on_host_request_create_histogram: Histogram = Histogram( 

781 "couchers_account_age_on_host_request_create_histogram_seconds", 

782 "Age of account sending a host request", 

783 labelnames=["surfer_gender", "host_gender"], 

784 buckets=( 

785 5 * 60, # 5m 

786 10 * 60, # 10m 

787 15 * 60, # 15m 

788 30 * 60, # 30m 

789 45 * 60, # 45m 

790 3_600, # 1h 

791 2 * 3_600, # 2h 

792 3 * 3_600, # 3h 

793 6 * 3_600, # 6h 

794 12 * 3_600, # 12h 

795 86_400, # 24h 

796 2 * 86_400, # 2d 

797 3 * 86_400, # 3d 

798 4 * 86_400, # 4d 

799 5 * 86_400, # 5d 

800 6 * 86_400, # 6d 

801 602_000, # 1w 

802 2 * 602_000, # 2w 

803 3 * 602_000, # 3w 

804 4 * 602_000, # 4w 

805 5 * 602_000, # 5w 

806 10 * 602_000, # 10w 

807 25 * 602_000, # 25w 

808 52 * 602_000, # 52w 

809 104 * 602_000, # 104w 

810 _INF, 

811 ), 

812) 

813 

814 

815# ============================================================================= 

816# Moderation metrics 

817# ============================================================================= 

818 

819# Gauges: Queue lengths 

820moderation_queue_length_gauge: Gauge = _make_gauge_from_query( 

821 "couchers_moderation_queue_length", 

822 "Total number of unresolved items in the moderation queue", 

823 select(func.count()).select_from(ModerationQueueItem).where(ModerationQueueItem.resolved_by_log_id.is_(None)), 

824) 

825 

826moderation_queue_length_by_trigger_gauges: list[Gauge] = [ 

827 _make_gauge_from_query( 

828 f"couchers_moderation_queue_length_{trigger.name.lower()}", 

829 f"Number of unresolved items in the moderation queue with trigger {trigger.name}", 

830 select(func.count()) 

831 .select_from(ModerationQueueItem) 

832 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

833 .where(ModerationQueueItem.trigger == trigger), 

834 ) 

835 for trigger in ModerationTrigger 

836] 

837 

838moderation_queue_length_by_object_type_gauges: list[Gauge] = [ 

839 _make_gauge_from_query( 

840 f"couchers_moderation_queue_length_{object_type.name.lower()}", 

841 f"Number of unresolved items in the moderation queue for {object_type.name}", 

842 select(func.count()) 

843 .select_from(ModerationQueueItem) 

844 .join(ModerationState, ModerationQueueItem.moderation_state_id == ModerationState.id) 

845 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

846 .where(ModerationState.object_type == object_type), 

847 ) 

848 for object_type in ModerationObjectType 

849] 

850 

851# Gauges: Items in each visibility state by object type 

852moderation_visibility_gauges: list[Gauge] = [ 

853 _make_gauge_from_query( 

854 f"couchers_moderation_items_{object_type.name.lower()}_{visibility.name.lower()}", 

855 f"Number of {object_type.name} items with visibility {visibility.name}", 

856 select(func.count()) 

857 .select_from(ModerationState) 

858 .where(ModerationState.object_type == object_type) 

859 .where(ModerationState.visibility == visibility), 

860 ) 

861 for object_type in ModerationObjectType 

862 for visibility in ModerationVisibility 

863] 

864 

865# Counters: Moderation actions taken 

866moderation_actions_counter: Counter = Counter( 

867 "couchers_moderation_actions_total", 

868 "Number of moderation actions taken", 

869 labelnames=["action", "object_type"], 

870) 

871 

872 

873def observe_moderation_action(action: ModerationAction, object_type: ModerationObjectType) -> None: 

874 moderation_actions_counter.labels(action.name, object_type.name).inc() 

875 

876 

877# Counters: Visibility state transitions 

878moderation_visibility_transitions_counter: Counter = Counter( 

879 "couchers_moderation_visibility_transitions_total", 

880 "Number of visibility state transitions", 

881 labelnames=["from_visibility", "to_visibility", "object_type"], 

882) 

883 

884 

885def observe_moderation_visibility_transition( 

886 from_visibility: ModerationVisibility, to_visibility: ModerationVisibility, object_type: ModerationObjectType 

887) -> None: 

888 moderation_visibility_transitions_counter.labels(from_visibility.name, to_visibility.name, object_type.name).inc() 

889 

890 

891# Counters: Auto-approved items 

892moderation_auto_approved_counter: Counter = Counter( 

893 "couchers_moderation_auto_approved_total", 

894 "Number of items that were auto-approved", 

895) 

896 

897 

898# Counters: Queue items created 

899moderation_queue_items_created_counter: Counter = Counter( 

900 "couchers_moderation_queue_items_created_total", 

901 "Number of moderation queue items created", 

902 labelnames=["trigger", "object_type"], 

903) 

904 

905 

906def observe_moderation_queue_item_created(trigger: ModerationTrigger, object_type: ModerationObjectType) -> None: 

907 moderation_queue_items_created_counter.labels(trigger.name, object_type.name).inc() 

908 

909 

910# Counters: Queue items resolved 

911moderation_queue_items_resolved_counter: Counter = Counter( 

912 "couchers_moderation_queue_items_resolved_total", 

913 "Number of moderation queue items resolved", 

914 labelnames=["trigger", "action", "object_type"], 

915) 

916 

917 

918def observe_moderation_queue_item_resolved( 

919 trigger: ModerationTrigger, action: ModerationAction, object_type: ModerationObjectType 

920) -> None: 

921 moderation_queue_items_resolved_counter.labels(trigger.name, action.name, object_type.name).inc() 

922 

923 

924# Histogram: Time to resolve queue items 

925moderation_queue_resolution_time_histogram: Histogram = Histogram( 

926 "couchers_moderation_queue_resolution_seconds", 

927 "Time taken to resolve moderation queue items", 

928 labelnames=["trigger", "action", "object_type"], 

929 buckets=( 

930 0.1, 

931 0.25, 

932 0.5, 

933 1, 

934 2.5, 

935 5, 

936 10, 

937 30, 

938 60, 

939 5 * 60, 

940 15 * 60, 

941 30 * 60, 

942 3_600, 

943 2 * 3_600, 

944 6 * 3_600, 

945 12 * 3_600, 

946 86_400, 

947 2 * 86_400, 

948 3 * 86_400, 

949 7 * 86_400, 

950 14 * 86_400, 

951 30 * 86_400, 

952 _INF, 

953 ), 

954) 

955 

956 

957def observe_moderation_queue_resolution_time( 

958 trigger: ModerationTrigger, action: ModerationAction, object_type: ModerationObjectType, duration_s: float 

959) -> None: 

960 moderation_queue_resolution_time_histogram.labels(trigger.name, action.name, object_type.name).observe(duration_s) 

961 

962 

963nonvisible_user_access_counter: Counter = Counter( 

964 "couchers_nonvisible_user_access_total", 

965 "Number of access events involving nonvisible (banned/shadowed/deleted) users", 

966 labelnames=["access_type", "target_state"], 

967) 

968 

969 

970def observe_nonvisible_user_access(access_type: NonvisibleUserAccessType, target_state: NonvisibleUserState) -> None: 

971 nonvisible_user_access_counter.labels(access_type.name, target_state.name).inc() 

972 

973 

974postcards_sent_counter: Counter = Counter( 

975 "couchers_postcards_sent_total", 

976 "Number of postcards sent via MyPostcard", 

977 labelnames=["country_code"], 

978) 

979 

980 

981# Native app / OTA update metrics. Bucket layout is minute-resolution at the low end (watch an OTA 

982# rolling out), dense around the OTA (~28d) and store (~91d) windows, and sparse past it for stragglers. 

983_NATIVE_AGE_BUCKETS: tuple[float, ...] = ( 

984 60, 

985 5 * 60, 

986 15 * 60, 

987 30 * 60, 

988 3_600, 

989 2 * 3_600, 

990 6 * 3_600, 

991 12 * 3_600, 

992 86_400, 

993 2 * 86_400, 

994 3 * 86_400, 

995 5 * 86_400, 

996 7 * 86_400, 

997 10 * 86_400, 

998 14 * 86_400, 

999 21 * 86_400, 

1000 28 * 86_400, 

1001 35 * 86_400, 

1002 45 * 86_400, 

1003 60 * 86_400, 

1004 75 * 86_400, 

1005 91 * 86_400, 

1006 120 * 86_400, 

1007 150 * 86_400, 

1008 180 * 86_400, 

1009 270 * 86_400, 

1010 365 * 86_400, 

1011 730 * 86_400, 

1012 _INF, 

1013) 

1014 

1015native_bundle_age_histogram: Histogram = Histogram( 

1016 "couchers_native_bundle_age_seconds", 

1017 "Age of the OTA bundle reported by the client at CheckNativeStatus, by platform and launch source", 

1018 labelnames=["platform", "is_ota_launch"], 

1019 buckets=_NATIVE_AGE_BUCKETS, 

1020) 

1021 

1022 

1023def observe_native_bundle_age(platform: str, is_ota_launch: bool, age_s: float) -> None: 

1024 native_bundle_age_histogram.labels(platform or "unknown", "true" if is_ota_launch else "false").observe(age_s) 

1025 

1026 

1027native_binary_age_histogram: Histogram = Histogram( 

1028 "couchers_native_binary_age_seconds", 

1029 "Age of the embedded native binary reported by the client at CheckNativeStatus, by platform", 

1030 labelnames=["platform"], 

1031 buckets=_NATIVE_AGE_BUCKETS, 

1032) 

1033 

1034 

1035def observe_native_binary_age(platform: str, age_s: float) -> None: 

1036 native_binary_age_histogram.labels(platform or "unknown").observe(age_s) 

1037 

1038 

1039native_update_decisions_counter: Counter = Counter( 

1040 "couchers_native_update_decisions_total", 

1041 "CheckNativeStatus decisions, by platform / action / severity", 

1042 labelnames=["platform", "action", "severity"], 

1043) 

1044 

1045 

1046def observe_native_update_decision(platform: str, action: str, severity: str) -> None: 

1047 native_update_decisions_counter.labels(platform or "unknown", action, severity).inc() 

1048 

1049 

1050native_banned_bundle_hits_counter: Counter = Counter( 

1051 "couchers_native_banned_bundle_hits_total", 

1052 "CheckNativeStatus calls from a device running a banned OTA bundle, by platform", 

1053 labelnames=["platform"], 

1054) 

1055 

1056 

1057def observe_native_banned_bundle_hit(platform: str) -> None: 

1058 native_banned_bundle_hits_counter.labels(platform or "unknown").inc() 

1059 

1060 

1061native_ota_manifest_requests_counter: Counter = Counter( 

1062 "couchers_native_ota_manifest_requests_total", 

1063 "GetNativeUpdateManifest requests, by platform and result (served, no_update, no_match)", 

1064 labelnames=["platform", "result"], 

1065) 

1066 

1067 

1068def observe_native_ota_manifest_request(platform: str, result: str) -> None: 

1069 native_ota_manifest_requests_counter.labels(platform or "unknown", result).inc() 

1070 

1071 

1072# One increment per CheckNativeStatus, labeled by build/bundle identity, to see the live mix of 

1073# versions and bundles running in the fleet. 

1074native_client_checkins_counter: Counter = Counter( 

1075 "couchers_native_client_checkins_total", 

1076 "CheckNativeStatus calls, labeled by build/bundle identity", 

1077 labelnames=[ 

1078 "platform", 

1079 "is_ota_launch", 

1080 "embedded_display_version", 

1081 "embedded_runtime_version", 

1082 "ota_display_version", 

1083 "ota_update_id", 

1084 ], 

1085) 

1086 

1087 

1088def observe_native_client_checkin( 

1089 platform: str, 

1090 is_ota_launch: bool, 

1091 embedded_display_version: str, 

1092 embedded_runtime_version: str, 

1093 ota_display_version: str, 

1094 ota_update_id: str, 

1095) -> None: 

1096 native_client_checkins_counter.labels( 

1097 platform or "unknown", 

1098 "true" if is_ota_launch else "false", 

1099 embedded_display_version or "unknown", 

1100 embedded_runtime_version or "unknown", 

1101 ota_display_version or "none", 

1102 ota_update_id or "none", 

1103 ).inc() 

1104 

1105 

1106# Recomputed at scrape time via the hacky-gauge mechanism, so it reflects live age. 0 when disabled 

1107# or never pulled. 

1108def _feature_flags_staleness_seconds() -> float: 

1109 return experimentation.seconds_since_last_fetch() or 0.0 

1110 

1111 

1112feature_flags_staleness_gauge: Gauge = Gauge( 

1113 "couchers_feature_flags_staleness_seconds", 

1114 "Seconds since feature flags were last successfully fetched from GrowthBook", 

1115 multiprocess_mode="mostrecent", 

1116) 

1117_set_hacky_gauges_funcs.append((feature_flags_staleness_gauge, _feature_flags_staleness_seconds)) 

1118 

1119 

1120feature_flag_evaluations_counter: Counter = Counter( 

1121 "couchers_feature_flag_evaluations_total", 

1122 "Number of feature flag evaluations, by flag key, evaluation source, and resolved value", 

1123 labelnames=["flag_key", "source", "value"], 

1124) 

1125 

1126_MAX_FLAG_VALUE_LABEL_LEN = 32 

1127 

1128 

1129def _stringify_flag_value(value: Any) -> str: 

1130 if isinstance(value, bool): 

1131 return "true" if value else "false" 

1132 if isinstance(value, (int, float, str)): 

1133 s = str(value) 

1134 return s if len(s) <= _MAX_FLAG_VALUE_LABEL_LEN else f"<{type(value).__name__}>" 

1135 if value is None: 1135 ↛ 1137line 1135 didn't jump to line 1137 because the condition on line 1135 was always true

1136 return "None" 

1137 return f"<{type(value).__name__}>" 

1138 

1139 

1140def observe_feature_flag_evaluation(flag_key: str, source: str, value: Any) -> None: 

1141 feature_flag_evaluations_counter.labels(flag_key, source, _stringify_flag_value(value)).inc() 

1142 

1143 

1144def create_prometheus_server(port: int) -> Any: 

1145 """custom start method to fix problem descrbied in https://github.com/prometheus/client_python/issues/155""" 

1146 

1147 def app(environ: Any, start_response: Any) -> Any: 

1148 # set hacky gauges 

1149 for gauge, f in _set_hacky_gauges_funcs: 

1150 gauge.set(f()) 

1151 for gauge, labeled_f in _set_hacky_labeled_gauges_funcs: 

1152 labeled_f(gauge) 

1153 

1154 data = generate_latest(registry) 

1155 start_response("200 OK", [("Content-type", CONTENT_TYPE_LATEST), ("Content-Length", str(len(data)))]) 

1156 return [data] 

1157 

1158 httpd = exposition.make_server( # type: ignore[attr-defined] 

1159 "", port, app, exposition.ThreadingWSGIServer, handler_class=exposition._SilentHandler 

1160 ) 

1161 t = threading.Thread(target=httpd.serve_forever) 

1162 t.daemon = True 

1163 t.start() 

1164 return httpd