Coverage for app/backend/src/couchers/servicers/bugs.py: 99%
155 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import json
2import logging
3import time
4import uuid
5from datetime import UTC, datetime
6from functools import lru_cache
7from typing import Any
9import grpc
10import requests
11from google.protobuf import empty_pb2, struct_pb2
12from sqlalchemy import insert, select
13from sqlalchemy.dialects.postgresql import insert as pg_insert
14from sqlalchemy.orm import Session
15from sqlalchemy.sql import func
17from couchers import urls
18from couchers.config import config
19from couchers.constants import STABLE_THRESHOLD_SECONDS
20from couchers.context import CouchersContext
21from couchers.descriptor_pool import get_descriptors_pb
22from couchers.metrics import (
23 observe_native_banned_bundle_hit,
24 observe_native_binary_age,
25 observe_native_bundle_age,
26 observe_native_client_checkin,
27 observe_native_ota_manifest_request,
28 observe_native_update_decision,
29)
30from couchers.models import NativeClientUser, User
31from couchers.models.logging import EventLog, EventSource, ExperimentExposure, ExposureSource
32from couchers.models.ota import OTAPackage, OTAPlatform
33from couchers.native_updates import (
34 NativeClientInfo,
35 Severity,
36 UpdateAction,
37 UpdateCause,
38 client_info_from_request,
39 decide_native_update,
40)
41from couchers.proto import bugs_pb2, bugs_pb2_grpc
42from couchers.proto.google.api import httpbody_pb2
44logger = logging.getLogger(__name__)
46_start_time = time.monotonic()
48updateaction2api = {
49 UpdateAction.unspecified: bugs_pb2.NATIVE_UPDATE_ACTION_UNSPECIFIED,
50 UpdateAction.none: bugs_pb2.NATIVE_UPDATE_ACTION_NONE,
51 UpdateAction.ota: bugs_pb2.NATIVE_UPDATE_ACTION_OTA,
52 UpdateAction.store: bugs_pb2.NATIVE_UPDATE_ACTION_STORE,
53 UpdateAction.reinstall: bugs_pb2.NATIVE_UPDATE_ACTION_REINSTALL,
54}
56api2updateaction = {
57 bugs_pb2.NATIVE_UPDATE_ACTION_UNSPECIFIED: UpdateAction.unspecified,
58 bugs_pb2.NATIVE_UPDATE_ACTION_NONE: UpdateAction.none,
59 bugs_pb2.NATIVE_UPDATE_ACTION_OTA: UpdateAction.ota,
60 bugs_pb2.NATIVE_UPDATE_ACTION_STORE: UpdateAction.store,
61 bugs_pb2.NATIVE_UPDATE_ACTION_REINSTALL: UpdateAction.reinstall,
62}
64updatecause2api = {
65 UpdateCause.unspecified: bugs_pb2.NATIVE_UPDATE_CAUSE_UNSPECIFIED,
66 UpdateCause.age: bugs_pb2.NATIVE_UPDATE_CAUSE_AGE,
67 UpdateCause.banned: bugs_pb2.NATIVE_UPDATE_CAUSE_BANNED,
68}
70api2updatecause = {
71 bugs_pb2.NATIVE_UPDATE_CAUSE_UNSPECIFIED: UpdateCause.unspecified,
72 bugs_pb2.NATIVE_UPDATE_CAUSE_AGE: UpdateCause.age,
73 bugs_pb2.NATIVE_UPDATE_CAUSE_BANNED: UpdateCause.banned,
74}
76_OTA_BOUNDARY = "COUCHERS_OTA_BOUNDARY"
79def _ota_multipart_body(field_name: str, content: dict[str, Any]) -> bytes:
80 # Expo Updates protocol v1 multipart/mixed framing. field_name is "manifest" for
81 # an update or "directive" for a noUpdateAvailable/rollBackToEmbedded directive.
82 def part(name: str, body: str, content_type: str) -> str:
83 return (
84 f"--{_OTA_BOUNDARY}\r\n"
85 f'content-disposition: form-data; name="{name}"\r\n'
86 f"content-type: {content_type}\r\n\r\n"
87 f"{body}\r\n"
88 )
90 body = (
91 part(field_name, json.dumps(content), "application/json; charset=utf-8")
92 + part("extensions", json.dumps({"assetRequestHeaders": {}}), "application/json")
93 + f"--{_OTA_BOUNDARY}--\r\n"
94 )
95 return body.encode("utf-8")
98def _native_ota_manifest_url(*, cdn_root: str, version: str, platform: str) -> str:
99 return f"{cdn_root}/{version}/{platform}/manifest"
102def _is_update_id_banned(session: Session, info: NativeClientInfo) -> bool:
103 if not info.update_id or info.platform not in OTAPlatform.__members__:
104 return False
105 return (
106 session.execute(
107 select(OTAPackage.id)
108 .where(OTAPackage.platform == OTAPlatform[info.platform])
109 .where(OTAPackage.manifest_id == info.update_id)
110 .where(OTAPackage.banned_at.is_not(None))
111 .limit(1)
112 ).scalar_one_or_none()
113 is not None
114 )
117def _newest_non_banned_ota_package(session: Session, platform: str, fingerprint: str) -> OTAPackage | None:
118 if platform not in OTAPlatform.__members__ or not fingerprint:
119 return None
120 return session.execute(
121 select(OTAPackage)
122 .where(OTAPackage.platform == OTAPlatform[platform])
123 .where(OTAPackage.fingerprint == fingerprint)
124 .where(OTAPackage.banned_at.is_(None))
125 .order_by(OTAPackage.manifest_created_at.desc(), OTAPackage.id.desc())
126 .limit(1)
127 ).scalar_one_or_none()
130def _observe_native_check_metrics(
131 request: bugs_pb2.CheckNativeStatusReq,
132 info: NativeClientInfo,
133 decision: Any,
134 now: datetime,
135 *,
136 banned: bool,
137) -> None:
138 if info.binary_created_at is not None:
139 observe_native_binary_age(info.platform, (now - info.binary_created_at).total_seconds())
140 if info.bundle_created_at is not None:
141 observe_native_bundle_age(info.platform, info.is_ota_launch, (now - info.bundle_created_at).total_seconds())
142 observe_native_update_decision(info.platform, decision.action.name, decision.severity.name)
143 observe_native_client_checkin(
144 platform=info.platform,
145 is_ota_launch=info.is_ota_launch,
146 embedded_display_version=request.embedded_display_version,
147 embedded_runtime_version=info.runtime_version,
148 ota_display_version=request.running_display_version if info.is_ota_launch else "",
149 ota_update_id=info.update_id or "",
150 )
151 if banned:
152 observe_native_banned_bundle_hit(info.platform)
155@lru_cache(maxsize=64)
156def _fetch_signed_manifest(url: str) -> tuple[str, bytes]:
157 # The publish job signs each manifest and uploads it under its immutable version, so the
158 # bytes never change once published: fetch once, cache forever, and serve them (signature
159 # and all) untouched so the on-device signature check sees exactly what was signed.
160 response = requests.get(url, timeout=10)
161 response.raise_for_status()
162 return response.headers["content-type"], response.content
165class Bugs(bugs_pb2_grpc.BugsServicer):
166 def _version(self) -> str:
167 return config.VERSION
169 def Version(self, request: empty_pb2.Empty, context: CouchersContext, session: Session) -> bugs_pb2.VersionInfo:
170 return bugs_pb2.VersionInfo(version=self._version())
172 def ReportBug(
173 self, request: bugs_pb2.ReportBugReq, context: CouchersContext, session: Session
174 ) -> bugs_pb2.ReportBugRes:
175 if not config.BUG_TOOL_ENABLED:
176 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "bug_tool_disabled")
178 repo = config.BUG_TOOL_GITHUB_REPO
179 auth = (config.BUG_TOOL_GITHUB_USERNAME, config.BUG_TOOL_GITHUB_TOKEN)
181 if context.is_logged_in():
182 username = session.execute(select(User.username).where(User.id == context.user_id)).scalar_one()
183 user_details = f"[@{username}]({urls.user_link(username=username)}) ({context.user_id})"
184 else:
185 user_details = "<not logged in>"
187 issue_title = request.subject
188 issue_body = (
189 f"# {request.subject}\n"
190 f"## Description\n"
191 f"{request.description}\n"
192 f"\n"
193 f"## Results\n"
194 f"{request.results}\n"
195 f"\n"
196 f"## Diagnostics\n"
197 f"**Backend version**: `{self._version()}`\n"
198 f"**Frontend version**: `{request.frontend_version}`\n"
199 f"**User Agent**: `{request.user_agent}`\n"
200 f"**Locale**: `{context.localization.locale}`\n"
201 f"**Screen resolution**: {request.screen_resolution.width}x{request.screen_resolution.height}\n"
202 f"**Page**: {request.page}\n"
203 f"**User**: {user_details} / `{(context._sofa or '')[:12]}`"
204 )
205 issue_labels = ["bug tool", "bug: triage needed"]
207 json_body = {"title": issue_title, "body": issue_body, "labels": issue_labels}
209 r = requests.post(f"https://api.github.com/repos/{repo}/issues", auth=auth, json=json_body)
210 if not r.status_code == 201:
211 context.abort_with_error_code(grpc.StatusCode.INTERNAL, "bug_tool_request_failed")
213 issue_number = r.json()["number"]
215 return bugs_pb2.ReportBugRes(
216 bug_id=f"#{issue_number}", bug_url=f"https://github.com/{repo}/issues/{issue_number}"
217 )
219 def Status(self, request: bugs_pb2.StatusReq, context: CouchersContext, session: Session) -> bugs_pb2.StatusRes:
220 coucher_count = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
222 return bugs_pb2.StatusRes(
223 nonce=request.nonce,
224 version=self._version(),
225 coucher_count=coucher_count,
226 stable=time.monotonic() - _start_time >= STABLE_THRESHOLD_SECONDS,
227 )
229 def GetDescriptors(
230 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
231 ) -> httpbody_pb2.HttpBody:
232 return httpbody_pb2.HttpBody(
233 content_type="application/octet-stream",
234 data=get_descriptors_pb(),
235 )
237 def GetNativeUpdateManifest(
238 self, request: httpbody_pb2.HttpBody, context: CouchersContext, session: Session
239 ) -> httpbody_pb2.HttpBody:
240 platform = context.get_header("expo-platform") or ""
241 fingerprint = context.get_header("expo-runtime-version") or ""
242 eas_client_id = uuid.UUID(context.get_header("eas-client-id") or "")
243 if context.get_boolean_value("log_native_ota_requests", False):
244 logger.info(
245 "OTA GetNativeUpdateManifest: platform=%s fingerprint=%s eas_client_id=%s "
246 "content_type=%r headers=%s body=%r",
247 platform,
248 fingerprint,
249 eas_client_id,
250 request.content_type,
251 dict(context.headers),
252 request.data,
253 )
254 # Expo rejects the manifest without these; Envoy forwards them as HTTP response headers.
255 context.set_response_headers([("expo-protocol-version", "1"), ("expo-sfv-version", "0")])
257 # Newest non-banned bundle for the build's fingerprint, by manifest createdAt. The device's
258 # selection policy only applies it if it's newer than what it's running, so a stale store build
259 # self-heals while a newer one keeps its embedded bundle.
260 package = _newest_non_banned_ota_package(session, platform, fingerprint)
262 if package is None:
263 observe_native_ota_manifest_request(platform, "no_match" if not fingerprint else "no_update")
264 return httpbody_pb2.HttpBody(
265 content_type=f"multipart/mixed; boundary={_OTA_BOUNDARY}",
266 data=_ota_multipart_body("directive", {"type": "noUpdateAvailable"}),
267 )
269 cdn_root = context.get_string_value("native_ota_cdn_root", "https://cdn.couchers.org/native/ota")
270 url = _native_ota_manifest_url(cdn_root=cdn_root, version=package.version, platform=platform)
271 content_type, body = _fetch_signed_manifest(url)
272 observe_native_ota_manifest_request(platform, "served")
273 return httpbody_pb2.HttpBody(content_type=content_type, data=body)
275 def ReportDiagnostics(
276 self, request: bugs_pb2.ReportDiagnosticsReq, context: CouchersContext, session: Session
277 ) -> empty_pb2.Empty:
278 if len(request.infos) > 100:
279 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "too_many_diagnostic_infos")
281 events = []
282 for info in request.infos:
283 try:
284 properties = json.loads(info.properties_json)
285 except json.JSONDecodeError, ValueError:
286 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_diagnostics_json")
288 occurred = info.occurred.ToDatetime(tzinfo=UTC) if info.HasField("occurred") else datetime.now(UTC)
290 events.append(
291 {
292 "event_type": info.tag,
293 "user_id": context._user_id,
294 "sofa": context._sofa,
295 "version": request.frontend_version,
296 "properties": properties,
297 "value": info.value,
298 "source": EventSource.frontend,
299 "occurred": occurred,
300 }
301 )
303 if events:
304 session.execute(insert(EventLog), events)
306 return empty_pb2.Empty()
308 def CheckNativeStatus(
309 self, request: bugs_pb2.CheckNativeStatusReq, context: CouchersContext, session: Session
310 ) -> bugs_pb2.CheckNativeStatusRes:
311 info = client_info_from_request(request)
312 logger.info(
313 "CheckNativeStatus: user_id=%s install_id=%s eas_client_id=%s platform=%s app_version=%s "
314 "running_debug_version_ota=%s update_id=%s launch_source=%s debug_json=%s",
315 context._user_id,
316 request.install_id,
317 info.eas_client_id,
318 request.platform,
319 request.app_version,
320 request.running_debug_version_ota,
321 request.update_id,
322 request.launch_source,
323 request.debug_json,
324 )
325 now = datetime.now(UTC)
326 banned = _is_update_id_banned(session, info)
327 decision = decide_native_update(context, info, now, banned=banned)
329 # An OTA block with no newer bundle to serve would loop the client on the block screen
330 # forever, so refuse to serve it: raise (pages via Sentry) and the client, which ignores
331 # these errors, stays unblocked. Store blocks aren't checkable — no record of the latest build.
332 if decision.action == UpdateAction.ota and decision.severity == Severity.block:
333 newest = _newest_non_banned_ota_package(session, info.platform, info.runtime_version)
334 newer_available = newest is not None and (
335 info.bundle_created_at is None or newest.manifest_created_at > info.bundle_created_at
336 )
337 if not newer_available:
338 raise Exception(
339 "CheckNativeStatus would force an OTA update with no newer bundle to move to "
340 f"(platform={info.platform!r} fingerprint={info.runtime_version!r} "
341 f"cause={decision.cause.name} update_id={info.update_id!r} "
342 f"running_bundle_created_at={info.bundle_created_at} "
343 f"newest_non_banned_created_at={None if newest is None else newest.manifest_created_at})"
344 )
346 _observe_native_check_metrics(request, info, decision, now, banned=banned)
348 if context.is_logged_in():
349 session.add(NativeClientUser(eas_client_id=info.eas_client_id, user_id=context.user_id))
351 # message and link_text intentionally left empty for the standard cases — the client
352 # hardcodes those. The fields are reserved for special-case overrides; nothing in the
353 # current decision logic populates them.
354 update_info = bugs_pb2.NativeUpdateInfo(
355 action=updateaction2api[decision.action],
356 required=decision.severity != Severity.none,
357 cause=updatecause2api[decision.cause],
358 )
359 if decision.act_by is not None:
360 update_info.act_by.FromDatetime(decision.act_by)
361 return bugs_pb2.CheckNativeStatusRes(update_info=update_info)
363 def GeolocationSearchInfo(
364 self, request: bugs_pb2.GeolocationSearchInfoReq, context: CouchersContext, session: Session
365 ) -> empty_pb2.Empty:
366 return empty_pb2.Empty()
368 def GeolocationClickInfo(
369 self, request: bugs_pb2.GeolocationClickInfoReq, context: CouchersContext, session: Session
370 ) -> empty_pb2.Empty:
371 return empty_pb2.Empty()
373 def EvaluateFeatureFlag(
374 self, request: bugs_pb2.EvaluateFeatureFlagReq, context: CouchersContext, session: Session
375 ) -> bugs_pb2.EvaluateFeatureFlagRes:
376 # None default: an unconfigured flag comes back as None and the value field is left unset, so
377 # the frontend applies its own in-code default. get_object_value is the generic typed
378 # accessor; like every value method it fires exposure/usage logging as a side effect, here
379 # for exactly the one flag the client is reading.
380 value: Any = context.get_object_value(request.flag_key, None)
381 res = bugs_pb2.EvaluateFeatureFlagRes()
382 if value is not None:
383 # google.protobuf.Value has no direct constructor from a Python value; round-trip
384 # through a Struct, which knows how to encode bool/number/str/list/dict.
385 holder = struct_pb2.Struct()
386 holder["value"] = value
387 res.value.CopyFrom(holder.fields["value"])
388 return res
390 def LogExperimentExposure(
391 self, request: bugs_pb2.LogExperimentExposureReq, context: CouchersContext, session: Session
392 ) -> empty_pb2.Empty:
393 # need a logged-in user to attribute the exposure to
394 if context.is_logged_in():
395 data = {
396 "experiment_name": request.experiment_name,
397 "variation_key": request.variation_key,
398 "variation_name": request.variation_name,
399 "hash_attribute": request.hash_attribute,
400 "hash_value": request.hash_value,
401 "bucket": request.bucket if request.HasField("bucket") else None,
402 "in_experiment": request.in_experiment,
403 "hash_used": request.hash_used if request.HasField("hash_used") else None,
404 "sticky_bucket_used": (request.sticky_bucket_used if request.HasField("sticky_bucket_used") else None),
405 "feature_id": request.feature_id,
406 }
407 session.execute(
408 pg_insert(ExperimentExposure)
409 .values(
410 user_id=context.user_id,
411 experiment_key=request.experiment_key,
412 variation_id=request.variation_id,
413 source=ExposureSource.client,
414 data=data,
415 )
416 .on_conflict_do_nothing(constraint="uq_experiment_exposures_user_exp_var")
417 )
418 return empty_pb2.Empty()