Coverage for app/backend/src/couchers/servicers/bugs.py: 99%

155 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import json 

2import logging 

3import time 

4import uuid 

5from datetime import UTC, datetime 

6from functools import lru_cache 

7from typing import Any 

8 

9import grpc 

10import requests 

11from google.protobuf import empty_pb2, struct_pb2 

12from sqlalchemy import insert, select 

13from sqlalchemy.dialects.postgresql import insert as pg_insert 

14from sqlalchemy.orm import Session 

15from sqlalchemy.sql import func 

16 

17from couchers import urls 

18from couchers.config import config 

19from couchers.constants import STABLE_THRESHOLD_SECONDS 

20from couchers.context import CouchersContext 

21from couchers.descriptor_pool import get_descriptors_pb 

22from couchers.metrics import ( 

23 observe_native_banned_bundle_hit, 

24 observe_native_binary_age, 

25 observe_native_bundle_age, 

26 observe_native_client_checkin, 

27 observe_native_ota_manifest_request, 

28 observe_native_update_decision, 

29) 

30from couchers.models import NativeClientUser, User 

31from couchers.models.logging import EventLog, EventSource, ExperimentExposure, ExposureSource 

32from couchers.models.ota import OTAPackage, OTAPlatform 

33from couchers.native_updates import ( 

34 NativeClientInfo, 

35 Severity, 

36 UpdateAction, 

37 UpdateCause, 

38 client_info_from_request, 

39 decide_native_update, 

40) 

41from couchers.proto import bugs_pb2, bugs_pb2_grpc 

42from couchers.proto.google.api import httpbody_pb2 

43 

44logger = logging.getLogger(__name__) 

45 

46_start_time = time.monotonic() 

47 

48updateaction2api = { 

49 UpdateAction.unspecified: bugs_pb2.NATIVE_UPDATE_ACTION_UNSPECIFIED, 

50 UpdateAction.none: bugs_pb2.NATIVE_UPDATE_ACTION_NONE, 

51 UpdateAction.ota: bugs_pb2.NATIVE_UPDATE_ACTION_OTA, 

52 UpdateAction.store: bugs_pb2.NATIVE_UPDATE_ACTION_STORE, 

53 UpdateAction.reinstall: bugs_pb2.NATIVE_UPDATE_ACTION_REINSTALL, 

54} 

55 

56api2updateaction = { 

57 bugs_pb2.NATIVE_UPDATE_ACTION_UNSPECIFIED: UpdateAction.unspecified, 

58 bugs_pb2.NATIVE_UPDATE_ACTION_NONE: UpdateAction.none, 

59 bugs_pb2.NATIVE_UPDATE_ACTION_OTA: UpdateAction.ota, 

60 bugs_pb2.NATIVE_UPDATE_ACTION_STORE: UpdateAction.store, 

61 bugs_pb2.NATIVE_UPDATE_ACTION_REINSTALL: UpdateAction.reinstall, 

62} 

63 

64updatecause2api = { 

65 UpdateCause.unspecified: bugs_pb2.NATIVE_UPDATE_CAUSE_UNSPECIFIED, 

66 UpdateCause.age: bugs_pb2.NATIVE_UPDATE_CAUSE_AGE, 

67 UpdateCause.banned: bugs_pb2.NATIVE_UPDATE_CAUSE_BANNED, 

68} 

69 

70api2updatecause = { 

71 bugs_pb2.NATIVE_UPDATE_CAUSE_UNSPECIFIED: UpdateCause.unspecified, 

72 bugs_pb2.NATIVE_UPDATE_CAUSE_AGE: UpdateCause.age, 

73 bugs_pb2.NATIVE_UPDATE_CAUSE_BANNED: UpdateCause.banned, 

74} 

75 

76_OTA_BOUNDARY = "COUCHERS_OTA_BOUNDARY" 

77 

78 

79def _ota_multipart_body(field_name: str, content: dict[str, Any]) -> bytes: 

80 # Expo Updates protocol v1 multipart/mixed framing. field_name is "manifest" for 

81 # an update or "directive" for a noUpdateAvailable/rollBackToEmbedded directive. 

82 def part(name: str, body: str, content_type: str) -> str: 

83 return ( 

84 f"--{_OTA_BOUNDARY}\r\n" 

85 f'content-disposition: form-data; name="{name}"\r\n' 

86 f"content-type: {content_type}\r\n\r\n" 

87 f"{body}\r\n" 

88 ) 

89 

90 body = ( 

91 part(field_name, json.dumps(content), "application/json; charset=utf-8") 

92 + part("extensions", json.dumps({"assetRequestHeaders": {}}), "application/json") 

93 + f"--{_OTA_BOUNDARY}--\r\n" 

94 ) 

95 return body.encode("utf-8") 

96 

97 

98def _native_ota_manifest_url(*, cdn_root: str, version: str, platform: str) -> str: 

99 return f"{cdn_root}/{version}/{platform}/manifest" 

100 

101 

102def _is_update_id_banned(session: Session, info: NativeClientInfo) -> bool: 

103 if not info.update_id or info.platform not in OTAPlatform.__members__: 

104 return False 

105 return ( 

106 session.execute( 

107 select(OTAPackage.id) 

108 .where(OTAPackage.platform == OTAPlatform[info.platform]) 

109 .where(OTAPackage.manifest_id == info.update_id) 

110 .where(OTAPackage.banned_at.is_not(None)) 

111 .limit(1) 

112 ).scalar_one_or_none() 

113 is not None 

114 ) 

115 

116 

117def _newest_non_banned_ota_package(session: Session, platform: str, fingerprint: str) -> OTAPackage | None: 

118 if platform not in OTAPlatform.__members__ or not fingerprint: 

119 return None 

120 return session.execute( 

121 select(OTAPackage) 

122 .where(OTAPackage.platform == OTAPlatform[platform]) 

123 .where(OTAPackage.fingerprint == fingerprint) 

124 .where(OTAPackage.banned_at.is_(None)) 

125 .order_by(OTAPackage.manifest_created_at.desc(), OTAPackage.id.desc()) 

126 .limit(1) 

127 ).scalar_one_or_none() 

128 

129 

130def _observe_native_check_metrics( 

131 request: bugs_pb2.CheckNativeStatusReq, 

132 info: NativeClientInfo, 

133 decision: Any, 

134 now: datetime, 

135 *, 

136 banned: bool, 

137) -> None: 

138 if info.binary_created_at is not None: 

139 observe_native_binary_age(info.platform, (now - info.binary_created_at).total_seconds()) 

140 if info.bundle_created_at is not None: 

141 observe_native_bundle_age(info.platform, info.is_ota_launch, (now - info.bundle_created_at).total_seconds()) 

142 observe_native_update_decision(info.platform, decision.action.name, decision.severity.name) 

143 observe_native_client_checkin( 

144 platform=info.platform, 

145 is_ota_launch=info.is_ota_launch, 

146 embedded_display_version=request.embedded_display_version, 

147 embedded_runtime_version=info.runtime_version, 

148 ota_display_version=request.running_display_version if info.is_ota_launch else "", 

149 ota_update_id=info.update_id or "", 

150 ) 

151 if banned: 

152 observe_native_banned_bundle_hit(info.platform) 

153 

154 

155@lru_cache(maxsize=64) 

156def _fetch_signed_manifest(url: str) -> tuple[str, bytes]: 

157 # The publish job signs each manifest and uploads it under its immutable version, so the 

158 # bytes never change once published: fetch once, cache forever, and serve them (signature 

159 # and all) untouched so the on-device signature check sees exactly what was signed. 

160 response = requests.get(url, timeout=10) 

161 response.raise_for_status() 

162 return response.headers["content-type"], response.content 

163 

164 

165class Bugs(bugs_pb2_grpc.BugsServicer): 

166 def _version(self) -> str: 

167 return config.VERSION 

168 

169 def Version(self, request: empty_pb2.Empty, context: CouchersContext, session: Session) -> bugs_pb2.VersionInfo: 

170 return bugs_pb2.VersionInfo(version=self._version()) 

171 

172 def ReportBug( 

173 self, request: bugs_pb2.ReportBugReq, context: CouchersContext, session: Session 

174 ) -> bugs_pb2.ReportBugRes: 

175 if not config.BUG_TOOL_ENABLED: 

176 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "bug_tool_disabled") 

177 

178 repo = config.BUG_TOOL_GITHUB_REPO 

179 auth = (config.BUG_TOOL_GITHUB_USERNAME, config.BUG_TOOL_GITHUB_TOKEN) 

180 

181 if context.is_logged_in(): 

182 username = session.execute(select(User.username).where(User.id == context.user_id)).scalar_one() 

183 user_details = f"[@{username}]({urls.user_link(username=username)}) ({context.user_id})" 

184 else: 

185 user_details = "<not logged in>" 

186 

187 issue_title = request.subject 

188 issue_body = ( 

189 f"# {request.subject}\n" 

190 f"## Description\n" 

191 f"{request.description}\n" 

192 f"\n" 

193 f"## Results\n" 

194 f"{request.results}\n" 

195 f"\n" 

196 f"## Diagnostics\n" 

197 f"**Backend version**: `{self._version()}`\n" 

198 f"**Frontend version**: `{request.frontend_version}`\n" 

199 f"**User Agent**: `{request.user_agent}`\n" 

200 f"**Locale**: `{context.localization.locale}`\n" 

201 f"**Screen resolution**: {request.screen_resolution.width}x{request.screen_resolution.height}\n" 

202 f"**Page**: {request.page}\n" 

203 f"**User**: {user_details} / `{(context._sofa or '')[:12]}`" 

204 ) 

205 issue_labels = ["bug tool", "bug: triage needed"] 

206 

207 json_body = {"title": issue_title, "body": issue_body, "labels": issue_labels} 

208 

209 r = requests.post(f"https://api.github.com/repos/{repo}/issues", auth=auth, json=json_body) 

210 if not r.status_code == 201: 

211 context.abort_with_error_code(grpc.StatusCode.INTERNAL, "bug_tool_request_failed") 

212 

213 issue_number = r.json()["number"] 

214 

215 return bugs_pb2.ReportBugRes( 

216 bug_id=f"#{issue_number}", bug_url=f"https://github.com/{repo}/issues/{issue_number}" 

217 ) 

218 

219 def Status(self, request: bugs_pb2.StatusReq, context: CouchersContext, session: Session) -> bugs_pb2.StatusRes: 

220 coucher_count = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

221 

222 return bugs_pb2.StatusRes( 

223 nonce=request.nonce, 

224 version=self._version(), 

225 coucher_count=coucher_count, 

226 stable=time.monotonic() - _start_time >= STABLE_THRESHOLD_SECONDS, 

227 ) 

228 

229 def GetDescriptors( 

230 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

231 ) -> httpbody_pb2.HttpBody: 

232 return httpbody_pb2.HttpBody( 

233 content_type="application/octet-stream", 

234 data=get_descriptors_pb(), 

235 ) 

236 

237 def GetNativeUpdateManifest( 

238 self, request: httpbody_pb2.HttpBody, context: CouchersContext, session: Session 

239 ) -> httpbody_pb2.HttpBody: 

240 platform = context.get_header("expo-platform") or "" 

241 fingerprint = context.get_header("expo-runtime-version") or "" 

242 eas_client_id = uuid.UUID(context.get_header("eas-client-id") or "") 

243 if context.get_boolean_value("log_native_ota_requests", False): 

244 logger.info( 

245 "OTA GetNativeUpdateManifest: platform=%s fingerprint=%s eas_client_id=%s " 

246 "content_type=%r headers=%s body=%r", 

247 platform, 

248 fingerprint, 

249 eas_client_id, 

250 request.content_type, 

251 dict(context.headers), 

252 request.data, 

253 ) 

254 # Expo rejects the manifest without these; Envoy forwards them as HTTP response headers. 

255 context.set_response_headers([("expo-protocol-version", "1"), ("expo-sfv-version", "0")]) 

256 

257 # Newest non-banned bundle for the build's fingerprint, by manifest createdAt. The device's 

258 # selection policy only applies it if it's newer than what it's running, so a stale store build 

259 # self-heals while a newer one keeps its embedded bundle. 

260 package = _newest_non_banned_ota_package(session, platform, fingerprint) 

261 

262 if package is None: 

263 observe_native_ota_manifest_request(platform, "no_match" if not fingerprint else "no_update") 

264 return httpbody_pb2.HttpBody( 

265 content_type=f"multipart/mixed; boundary={_OTA_BOUNDARY}", 

266 data=_ota_multipart_body("directive", {"type": "noUpdateAvailable"}), 

267 ) 

268 

269 cdn_root = context.get_string_value("native_ota_cdn_root", "https://cdn.couchers.org/native/ota") 

270 url = _native_ota_manifest_url(cdn_root=cdn_root, version=package.version, platform=platform) 

271 content_type, body = _fetch_signed_manifest(url) 

272 observe_native_ota_manifest_request(platform, "served") 

273 return httpbody_pb2.HttpBody(content_type=content_type, data=body) 

274 

275 def ReportDiagnostics( 

276 self, request: bugs_pb2.ReportDiagnosticsReq, context: CouchersContext, session: Session 

277 ) -> empty_pb2.Empty: 

278 if len(request.infos) > 100: 

279 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "too_many_diagnostic_infos") 

280 

281 events = [] 

282 for info in request.infos: 

283 try: 

284 properties = json.loads(info.properties_json) 

285 except json.JSONDecodeError, ValueError: 

286 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_diagnostics_json") 

287 

288 occurred = info.occurred.ToDatetime(tzinfo=UTC) if info.HasField("occurred") else datetime.now(UTC) 

289 

290 events.append( 

291 { 

292 "event_type": info.tag, 

293 "user_id": context._user_id, 

294 "sofa": context._sofa, 

295 "version": request.frontend_version, 

296 "properties": properties, 

297 "value": info.value, 

298 "source": EventSource.frontend, 

299 "occurred": occurred, 

300 } 

301 ) 

302 

303 if events: 

304 session.execute(insert(EventLog), events) 

305 

306 return empty_pb2.Empty() 

307 

308 def CheckNativeStatus( 

309 self, request: bugs_pb2.CheckNativeStatusReq, context: CouchersContext, session: Session 

310 ) -> bugs_pb2.CheckNativeStatusRes: 

311 info = client_info_from_request(request) 

312 logger.info( 

313 "CheckNativeStatus: user_id=%s install_id=%s eas_client_id=%s platform=%s app_version=%s " 

314 "running_debug_version_ota=%s update_id=%s launch_source=%s debug_json=%s", 

315 context._user_id, 

316 request.install_id, 

317 info.eas_client_id, 

318 request.platform, 

319 request.app_version, 

320 request.running_debug_version_ota, 

321 request.update_id, 

322 request.launch_source, 

323 request.debug_json, 

324 ) 

325 now = datetime.now(UTC) 

326 banned = _is_update_id_banned(session, info) 

327 decision = decide_native_update(context, info, now, banned=banned) 

328 

329 # An OTA block with no newer bundle to serve would loop the client on the block screen 

330 # forever, so refuse to serve it: raise (pages via Sentry) and the client, which ignores 

331 # these errors, stays unblocked. Store blocks aren't checkable — no record of the latest build. 

332 if decision.action == UpdateAction.ota and decision.severity == Severity.block: 

333 newest = _newest_non_banned_ota_package(session, info.platform, info.runtime_version) 

334 newer_available = newest is not None and ( 

335 info.bundle_created_at is None or newest.manifest_created_at > info.bundle_created_at 

336 ) 

337 if not newer_available: 

338 raise Exception( 

339 "CheckNativeStatus would force an OTA update with no newer bundle to move to " 

340 f"(platform={info.platform!r} fingerprint={info.runtime_version!r} " 

341 f"cause={decision.cause.name} update_id={info.update_id!r} " 

342 f"running_bundle_created_at={info.bundle_created_at} " 

343 f"newest_non_banned_created_at={None if newest is None else newest.manifest_created_at})" 

344 ) 

345 

346 _observe_native_check_metrics(request, info, decision, now, banned=banned) 

347 

348 if context.is_logged_in(): 

349 session.add(NativeClientUser(eas_client_id=info.eas_client_id, user_id=context.user_id)) 

350 

351 # message and link_text intentionally left empty for the standard cases — the client 

352 # hardcodes those. The fields are reserved for special-case overrides; nothing in the 

353 # current decision logic populates them. 

354 update_info = bugs_pb2.NativeUpdateInfo( 

355 action=updateaction2api[decision.action], 

356 required=decision.severity != Severity.none, 

357 cause=updatecause2api[decision.cause], 

358 ) 

359 if decision.act_by is not None: 

360 update_info.act_by.FromDatetime(decision.act_by) 

361 return bugs_pb2.CheckNativeStatusRes(update_info=update_info) 

362 

363 def GeolocationSearchInfo( 

364 self, request: bugs_pb2.GeolocationSearchInfoReq, context: CouchersContext, session: Session 

365 ) -> empty_pb2.Empty: 

366 return empty_pb2.Empty() 

367 

368 def GeolocationClickInfo( 

369 self, request: bugs_pb2.GeolocationClickInfoReq, context: CouchersContext, session: Session 

370 ) -> empty_pb2.Empty: 

371 return empty_pb2.Empty() 

372 

373 def EvaluateFeatureFlag( 

374 self, request: bugs_pb2.EvaluateFeatureFlagReq, context: CouchersContext, session: Session 

375 ) -> bugs_pb2.EvaluateFeatureFlagRes: 

376 # None default: an unconfigured flag comes back as None and the value field is left unset, so 

377 # the frontend applies its own in-code default. get_object_value is the generic typed 

378 # accessor; like every value method it fires exposure/usage logging as a side effect, here 

379 # for exactly the one flag the client is reading. 

380 value: Any = context.get_object_value(request.flag_key, None) 

381 res = bugs_pb2.EvaluateFeatureFlagRes() 

382 if value is not None: 

383 # google.protobuf.Value has no direct constructor from a Python value; round-trip 

384 # through a Struct, which knows how to encode bool/number/str/list/dict. 

385 holder = struct_pb2.Struct() 

386 holder["value"] = value 

387 res.value.CopyFrom(holder.fields["value"]) 

388 return res 

389 

390 def LogExperimentExposure( 

391 self, request: bugs_pb2.LogExperimentExposureReq, context: CouchersContext, session: Session 

392 ) -> empty_pb2.Empty: 

393 # need a logged-in user to attribute the exposure to 

394 if context.is_logged_in(): 

395 data = { 

396 "experiment_name": request.experiment_name, 

397 "variation_key": request.variation_key, 

398 "variation_name": request.variation_name, 

399 "hash_attribute": request.hash_attribute, 

400 "hash_value": request.hash_value, 

401 "bucket": request.bucket if request.HasField("bucket") else None, 

402 "in_experiment": request.in_experiment, 

403 "hash_used": request.hash_used if request.HasField("hash_used") else None, 

404 "sticky_bucket_used": (request.sticky_bucket_used if request.HasField("sticky_bucket_used") else None), 

405 "feature_id": request.feature_id, 

406 } 

407 session.execute( 

408 pg_insert(ExperimentExposure) 

409 .values( 

410 user_id=context.user_id, 

411 experiment_key=request.experiment_key, 

412 variation_id=request.variation_id, 

413 source=ExposureSource.client, 

414 data=data, 

415 ) 

416 .on_conflict_do_nothing(constraint="uq_experiment_exposures_user_exp_var") 

417 ) 

418 return empty_pb2.Empty()