Coverage for app/backend/src/couchers/servicers/admin.py: 79%

629 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import json 

2import logging 

3from datetime import UTC, datetime, timedelta 

4from typing import Any 

5 

6import grpc 

7from google.protobuf import empty_pb2 

8from google.protobuf.wrappers_pb2 import Int64Value 

9from sqlalchemy import select, tuple_ 

10from sqlalchemy.orm import Session, aliased, selectinload 

11from sqlalchemy.sql import and_, func, or_ 

12from user_agents import parse as user_agents_parse 

13 

14from couchers import urls 

15from couchers.context import CouchersContext 

16from couchers.crypto import urlsafe_secure_token 

17from couchers.helpers.badges import user_add_badge, user_remove_badge 

18from couchers.helpers.geoip import geoip_approximate_location, geoip_asn 

19from couchers.helpers.strong_verification import get_strong_verification_fields 

20from couchers.helpers.upload_uses import UploadUseType, get_upload_uses_for_keys 

21from couchers.jobs.enqueue import queue_job 

22from couchers.models import ( 

23 AccountDeletionToken, 

24 AdminAction, 

25 AdminActionLevel, 

26 AdminTag, 

27 Comment, 

28 ContentReport, 

29 Discussion, 

30 Event, 

31 EventOccurrence, 

32 FriendRelationship, 

33 GroupChat, 

34 GroupChatSubscription, 

35 HostRequest, 

36 LanguageAbility, 

37 Message, 

38 ModerationUserList, 

39 ModerationVisibility, 

40 ModNote, 

41 NonvisibleUserAccess, 

42 NonvisibleUserAccessType, 

43 NonvisibleUserState, 

44 OTAPackage, 

45 OTAPlatform, 

46 Reference, 

47 Reply, 

48 User, 

49 UserActivity, 

50 UserAdminTag, 

51 UserBadge, 

52) 

53from couchers.models.discussions import ( 

54 CommentVersion, 

55 ContentChangeType, 

56 DiscussionVersion, 

57 ReplyVersion, 

58) 

59from couchers.models.notifications import NotificationTopicAction 

60from couchers.models.uploads import Upload, has_avatar_photo_expression 

61from couchers.notifications.notify import notify 

62from couchers.proto import admin_pb2, admin_pb2_grpc, api_pb2, notification_data_pb2 

63from couchers.proto.internal import jobs_pb2 

64from couchers.resources import get_badge_dict 

65from couchers.servicers.api import user_model_to_pb 

66from couchers.servicers.auth import create_session 

67from couchers.servicers.bugs import _fetch_signed_manifest, _native_ota_manifest_url 

68from couchers.servicers.events import generate_event_delete_notifications 

69from couchers.servicers.moderation import bulk_set_user_content_visibility 

70from couchers.servicers.threads import unpack_thread_id 

71from couchers.sql import to_bool, username_or_email_or_id 

72from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime 

73 

74logger = logging.getLogger(__name__) 

75 

76MAX_PAGINATION_LENGTH = 250 

77 

78 

79adminactionlevel2api = { 

80 AdminActionLevel.trace: admin_pb2.ADMIN_ACTION_LEVEL_TRACE, 

81 AdminActionLevel.debug: admin_pb2.ADMIN_ACTION_LEVEL_DEBUG, 

82 AdminActionLevel.normal: admin_pb2.ADMIN_ACTION_LEVEL_NORMAL, 

83 AdminActionLevel.high: admin_pb2.ADMIN_ACTION_LEVEL_HIGH, 

84} 

85 

86api2adminactionlevel = { 

87 admin_pb2.ADMIN_ACTION_LEVEL_TRACE: AdminActionLevel.trace, 

88 admin_pb2.ADMIN_ACTION_LEVEL_DEBUG: AdminActionLevel.debug, 

89 admin_pb2.ADMIN_ACTION_LEVEL_NORMAL: AdminActionLevel.normal, 

90 admin_pb2.ADMIN_ACTION_LEVEL_HIGH: AdminActionLevel.high, 

91} 

92 

93uploadusetype2api = { 

94 None: admin_pb2.UPLOAD_USE_TYPE_UNSPECIFIED, 

95 UploadUseType.profile_gallery_photo: admin_pb2.UPLOAD_USE_TYPE_PROFILE_GALLERY_PHOTO, 

96 UploadUseType.profile_gallery_photo_avatar: admin_pb2.UPLOAD_USE_TYPE_PROFILE_GALLERY_PHOTO_AVATAR, 

97 UploadUseType.event: admin_pb2.UPLOAD_USE_TYPE_EVENT, 

98 UploadUseType.page: admin_pb2.UPLOAD_USE_TYPE_PAGE, 

99} 

100 

101otaplatform2api = { 

102 None: admin_pb2.OTA_PLATFORM_UNSPECIFIED, 

103 OTAPlatform.ios: admin_pb2.OTA_PLATFORM_IOS, 

104 OTAPlatform.android: admin_pb2.OTA_PLATFORM_ANDROID, 

105} 

106 

107api2otaplatform = { 

108 admin_pb2.OTA_PLATFORM_UNSPECIFIED: None, 

109 admin_pb2.OTA_PLATFORM_IOS: OTAPlatform.ios, 

110 admin_pb2.OTA_PLATFORM_ANDROID: OTAPlatform.android, 

111} 

112 

113nonvisibleuseraccesstype2api = { 

114 None: admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_UNSPECIFIED, 

115 NonvisibleUserAccessType.login_attempt: admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_LOGIN_ATTEMPT, 

116 NonvisibleUserAccessType.profile_view: admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_PROFILE_VIEW, 

117 NonvisibleUserAccessType.ghost_served: admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_GHOST_SERVED, 

118} 

119 

120nonvisibleuserstate2api = { 

121 None: admin_pb2.NONVISIBLE_USER_STATE_UNSPECIFIED, 

122 NonvisibleUserState.banned: admin_pb2.NONVISIBLE_USER_STATE_BANNED, 

123 NonvisibleUserState.shadowed: admin_pb2.NONVISIBLE_USER_STATE_SHADOWED, 

124 NonvisibleUserState.deleted: admin_pb2.NONVISIBLE_USER_STATE_DELETED, 

125} 

126 

127 

128def log_admin_action( 

129 session: Session, 

130 context: CouchersContext, 

131 target_user: User, 

132 action_type: str, 

133 note: str | None = None, 

134 data: object | None = None, 

135 tag: str | None = None, 

136 level: AdminActionLevel = AdminActionLevel.normal, 

137) -> AdminAction: 

138 action = AdminAction( 

139 admin_user_id=context.user_id, 

140 target_user_id=target_user.id, 

141 action_type=action_type, 

142 level=level, 

143 note=note, 

144 data=data, 

145 tag=tag, 

146 ) 

147 session.add(action) 

148 session.flush() 

149 return action 

150 

151 

152def _live_ota_package_ids(session: Session) -> set[int]: 

153 # The live package per (platform, fingerprint) is the newest non-banned one by manifest_created_at, 

154 # matching what GetNativeUpdateManifest resolves. DISTINCT ON picks the row with the leading ORDER BY 

155 # value per (platform, fingerprint) group in a single index-friendly query. 

156 return set( 

157 session.scalars( 

158 select(OTAPackage.id) 

159 .where(OTAPackage.banned_at.is_(None)) 

160 .distinct(OTAPackage.platform, OTAPackage.fingerprint) 

161 .order_by( 

162 OTAPackage.platform, 

163 OTAPackage.fingerprint, 

164 OTAPackage.manifest_created_at.desc(), 

165 OTAPackage.id.desc(), 

166 ) 

167 ) 

168 ) 

169 

170 

171def _extract_ota_manifest(body: bytes) -> dict[str, Any] | None: 

172 # The manifest object is the JSON in the "manifest" part of the signed multipart/mixed body. 

173 marker = body.find(b'name="manifest"') 

174 if marker == -1: 

175 return None 

176 body_start = body.find(b"\r\n\r\n", marker) 

177 if body_start == -1: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 return None 

179 body_end = body.find(b"\r\n--", body_start + 4) 

180 if body_end == -1: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 return None 

182 try: 

183 manifest = json.loads(body[body_start + 4 : body_end]) 

184 except json.JSONDecodeError: 

185 return None 

186 return manifest if isinstance(manifest, dict) else None 

187 

188 

189def _ota_package_to_pb(package: OTAPackage, live_ids: set[int]) -> admin_pb2.OTAPackage: 

190 return admin_pb2.OTAPackage( 

191 ota_package_id=package.id, 

192 created=Timestamp_from_datetime(package.created), 

193 creator_user_id=package.creator_user_id, 

194 platform=otaplatform2api[package.platform], 

195 fingerprint=package.fingerprint, 

196 version=package.version, 

197 manifest_created_at=Timestamp_from_datetime(package.manifest_created_at), 

198 manifest_id=package.manifest_id, 

199 banned=package.banned_at is not None, 

200 banned_at=Timestamp_from_datetime(package.banned_at) if package.banned_at else None, 

201 banned_by_user_id=package.banned_by_user_id or 0, 

202 banned_reason=package.banned_reason or "", 

203 live=package.id in live_ids, 

204 ) 

205 

206 

207def _user_to_details(session: Session, user: User) -> admin_pb2.UserDetails: 

208 # Query admin actions for this user 

209 actions = session.execute( 

210 select(AdminAction, User.username) 

211 .join(User, AdminAction.admin_user_id == User.id) 

212 .where(AdminAction.target_user_id == user.id) 

213 .order_by(AdminAction.created.asc()) 

214 ).all() 

215 

216 action_pbs = [] 

217 for action, admin_username in actions: 

218 action_pbs.append( 

219 admin_pb2.AdminActionLog( 

220 admin_action_id=action.id, 

221 created=Timestamp_from_datetime(action.created), 

222 admin_user_id=action.admin_user_id, 

223 admin_username=admin_username, 

224 action_type=action.action_type, 

225 level=adminactionlevel2api[action.level], 

226 note=action.note or "", 

227 data=json.dumps(action.data) if action.data is not None else "", 

228 tag=action.tag or "", 

229 target_user_id=action.target_user_id, 

230 target_username=user.username, 

231 ) 

232 ) 

233 

234 # Query admin tags 

235 admin_tags = ( 

236 session.execute( 

237 select(AdminTag.tag) 

238 .join(UserAdminTag, UserAdminTag.admin_tag_id == AdminTag.id) 

239 .where(UserAdminTag.user_id == user.id) 

240 .order_by(AdminTag.tag) 

241 ) 

242 .scalars() 

243 .all() 

244 ) 

245 

246 last_mod_note_acknowledged = session.execute( 

247 select(func.max(ModNote.acknowledged)).where(ModNote.user_id == user.id) 

248 ).scalar() 

249 

250 return admin_pb2.UserDetails( 

251 user_id=user.id, 

252 username=user.username, 

253 name=user.name, 

254 email=user.email, 

255 gender=user.gender, 

256 birthdate=date_to_api(user.birthdate), 

257 banned=user.banned_at is not None, 

258 deleted=user.deleted_at is not None, 

259 shadowed=user.shadowed_at is not None, 

260 do_not_email=user.do_not_email, 

261 badges=[badge.badge_id for badge in user.badges], 

262 **get_strong_verification_fields(session, user), 

263 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

264 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

265 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

266 last_mod_note_acknowledged=( 

267 Timestamp_from_datetime(last_mod_note_acknowledged) if last_mod_note_acknowledged else None 

268 ), 

269 admin_actions=action_pbs, 

270 admin_tags=list(admin_tags), 

271 mod_score=user.mod_score, 

272 ) 

273 

274 

275def _content_report_to_pb(content_report: ContentReport) -> admin_pb2.ContentReport: 

276 return admin_pb2.ContentReport( 

277 content_report_id=content_report.id, 

278 time=Timestamp_from_datetime(content_report.time), 

279 reporting_user_id=content_report.reporting_user_id, 

280 author_user_id=content_report.author_user_id, 

281 reason=content_report.reason, 

282 description=content_report.description, 

283 content_ref=content_report.content_ref, 

284 user_agent=content_report.user_agent, 

285 page=content_report.page, 

286 ) 

287 

288 

289def _reference_to_pb(reference: Reference) -> admin_pb2.AdminReference: 

290 return admin_pb2.AdminReference( 

291 reference_id=reference.id, 

292 from_user_id=reference.from_user_id, 

293 to_user_id=reference.to_user_id, 

294 reference_type=reference.reference_type.name, 

295 text=reference.text, 

296 private_text=reference.private_text or "", 

297 time=Timestamp_from_datetime(reference.time), 

298 host_request_id=reference.host_request_id or 0, 

299 rating=reference.rating, 

300 was_appropriate=reference.was_appropriate, 

301 ) 

302 

303 

304class Admin(admin_pb2_grpc.AdminServicer): 

305 def GetUserDetails( 

306 self, request: admin_pb2.GetUserDetailsReq, context: CouchersContext, session: Session 

307 ) -> admin_pb2.UserDetails: 

308 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

309 if not user: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

311 return _user_to_details(session, user) 

312 

313 def GetUser(self, request: admin_pb2.GetUserReq, context: CouchersContext, session: Session) -> api_pb2.User: 

314 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

315 if not user: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

317 return user_model_to_pb(user, session, context, is_admin_see_ghosts=True) 

318 

319 def SearchUsers( 

320 self, request: admin_pb2.SearchUsersReq, context: CouchersContext, session: Session 

321 ) -> admin_pb2.SearchUsersRes: 

322 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

323 next_user_id = int(request.page_token) if request.page_token else 0 

324 statement = select(User) 

325 if request.username: 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true

326 statement = statement.where(User.username.ilike(request.username)) 

327 if request.email: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 statement = statement.where(User.email.ilike(request.email)) 

329 if request.name: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 statement = statement.where(User.name.ilike(request.name)) 

331 if request.admin_action_log: 

332 statement = statement.where( 

333 User.id.in_(select(AdminAction.target_user_id).where(AdminAction.note.ilike(request.admin_action_log))) 

334 ) 

335 if request.city: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 statement = statement.where(User.city.ilike(request.city)) 

337 if request.min_user_id: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 statement = statement.where(User.id >= request.min_user_id) 

339 if request.max_user_id: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 statement = statement.where(User.id <= request.max_user_id) 

341 if request.min_birthdate: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true

342 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate)) 

343 if request.max_birthdate: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate)) 

345 if request.genders: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 statement = statement.where(User.gender.in_(request.genders)) 

347 if request.min_joined_date: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 statement = statement.where(User.joined >= parse_date(request.min_joined_date)) 

349 if request.max_joined_date: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 statement = statement.where(User.joined <= parse_date(request.max_joined_date)) 

351 if request.min_last_active_date: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true

352 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date)) 

353 if request.max_last_active_date: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date)) 

355 if request.genders: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 statement = statement.where(User.gender.in_(request.genders)) 

357 if request.language_codes: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 statement = statement.join( 

359 LanguageAbility, 

360 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)), 

361 ) 

362 if request.HasField("is_deleted"): 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true

363 statement = statement.where((User.deleted_at != None) == request.is_deleted.value) 

364 if request.HasField("is_banned"): 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true

365 statement = statement.where((User.banned_at != None) == request.is_banned.value) 

366 if request.HasField("is_shadowed"): 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 statement = statement.where((User.shadowed_at != None) == request.is_shadowed.value) 

368 if request.HasField("has_avatar"): 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 statement = statement.where(has_avatar_photo_expression(User) == request.has_avatar.value) 

370 if request.admin_tags: 

371 for tag_name in request.admin_tags: 

372 statement = statement.where( 

373 User.id.in_( 

374 select(UserAdminTag.user_id) 

375 .join(AdminTag, UserAdminTag.admin_tag_id == AdminTag.id) 

376 .where(AdminTag.tag == tag_name) 

377 ) 

378 ) 

379 users = ( 

380 session.execute( 

381 statement.where(User.id >= next_user_id) 

382 .order_by(User.id) 

383 .limit(page_size + 1) 

384 .options(selectinload(User.badges)) 

385 ) 

386 .scalars() 

387 .all() 

388 ) 

389 logger.info(users) 

390 return admin_pb2.SearchUsersRes( 

391 users=[_user_to_details(session, user) for user in users[:page_size]], 

392 next_page_token=str(users[-1].id) if len(users) > page_size else None, 

393 ) 

394 

395 def ChangeUserGender( 

396 self, request: admin_pb2.ChangeUserGenderReq, context: CouchersContext, session: Session 

397 ) -> admin_pb2.UserDetails: 

398 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

399 if not user: 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true

400 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

401 old_gender = user.gender 

402 user.gender = request.gender 

403 log_admin_action( 

404 session, context, user, "change_gender", note=f"Changed from '{old_gender}' to '{request.gender}'" 

405 ) 

406 session.commit() 

407 

408 notify( 

409 session, 

410 user_id=user.id, 

411 topic_action=NotificationTopicAction.gender__change, 

412 key="", 

413 data=notification_data_pb2.GenderChange( 

414 gender=request.gender, 

415 ), 

416 ) 

417 

418 return _user_to_details(session, user) 

419 

420 def ChangeUserBirthdate( 

421 self, request: admin_pb2.ChangeUserBirthdateReq, context: CouchersContext, session: Session 

422 ) -> admin_pb2.UserDetails: 

423 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

424 if not user: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true

425 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

426 if not (birthdate := parse_date(request.birthdate)): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_birthdate") 

428 

429 old_birthdate = user.birthdate 

430 user.birthdate = birthdate 

431 log_admin_action( 

432 session, context, user, "change_birthdate", note=f"Changed from {old_birthdate} to {request.birthdate}" 

433 ) 

434 session.commit() 

435 

436 notify( 

437 session, 

438 user_id=user.id, 

439 topic_action=NotificationTopicAction.birthdate__change, 

440 key="", 

441 data=notification_data_pb2.BirthdateChange( 

442 birthdate=request.birthdate, 

443 ), 

444 ) 

445 

446 return _user_to_details(session, user) 

447 

448 def AddBadge( 

449 self, request: admin_pb2.AddBadgeReq, context: CouchersContext, session: Session 

450 ) -> admin_pb2.UserDetails: 

451 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

452 if not user: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

454 

455 badge = get_badge_dict().get(request.badge_id) 

456 if not badge: 

457 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found") 

458 

459 if not badge.admin_editable: 

460 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:cannot_edit_badge") 

461 

462 if badge.id in [b.badge_id for b in user.badges]: 

463 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_already_has_badge") 

464 

465 user_add_badge(session, user.id, request.badge_id) 

466 log_admin_action(session, context, user, "add_badge", note=f"Added badge {request.badge_id}") 

467 

468 return _user_to_details(session, user) 

469 

470 def RemoveBadge( 

471 self, request: admin_pb2.RemoveBadgeReq, context: CouchersContext, session: Session 

472 ) -> admin_pb2.UserDetails: 

473 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

474 if not user: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true

475 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

476 

477 badge = get_badge_dict().get(request.badge_id) 

478 if not badge: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found") 

480 

481 if not badge.admin_editable: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:cannot_edit_badge") 

483 

484 user_badge = session.execute( 

485 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge.id) 

486 ).scalar_one_or_none() 

487 if not user_badge: 

488 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_does_not_have_badge") 

489 

490 user_remove_badge(session, user.id, request.badge_id) 

491 log_admin_action(session, context, user, "remove_badge", note=f"Removed badge {request.badge_id}") 

492 

493 return _user_to_details(session, user) 

494 

495 def SetPassportSexGenderException( 

496 self, request: admin_pb2.SetPassportSexGenderExceptionReq, context: CouchersContext, session: Session 

497 ) -> admin_pb2.UserDetails: 

498 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

499 if not user: 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true

500 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

501 old_exception = user.has_passport_sex_gender_exception 

502 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

503 log_admin_action( 

504 session, 

505 context, 

506 user, 

507 "set_passport_sex_gender_exception", 

508 note=f"Changed from {old_exception} to {request.passport_sex_gender_exception}", 

509 ) 

510 return _user_to_details(session, user) 

511 

512 def BanUser( 

513 self, request: admin_pb2.BanUserReq, context: CouchersContext, session: Session 

514 ) -> admin_pb2.UserDetails: 

515 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

516 if not user: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

518 if not request.admin_note.strip(): 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true

519 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_cant_be_empty") 

520 log_admin_action(session, context, user, "ban", note=request.admin_note, level=AdminActionLevel.high) 

521 user.banned_at = now() 

522 return _user_to_details(session, user) 

523 

524 def UnbanUser( 

525 self, request: admin_pb2.UnbanUserReq, context: CouchersContext, session: Session 

526 ) -> admin_pb2.UserDetails: 

527 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

528 if not user: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

530 if not request.admin_note.strip(): 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_cant_be_empty") 

532 log_admin_action(session, context, user, "unban", note=request.admin_note, level=AdminActionLevel.high) 

533 user.banned_at = None 

534 return _user_to_details(session, user) 

535 

536 def ShadowUser( 

537 self, request: admin_pb2.ShadowUserReq, context: CouchersContext, session: Session 

538 ) -> admin_pb2.UserDetails: 

539 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

540 if not user: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

542 if not request.admin_note.strip(): 

543 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_cant_be_empty") 

544 log_admin_action(session, context, user, "shadow", note=request.admin_note, level=AdminActionLevel.high) 

545 user.shadowed_at = now() 

546 # Bulk-shadow all UMS-governed content authored by this user so existing visible content is hidden too 

547 bulk_set_user_content_visibility( 

548 session=session, 

549 user=user, 

550 new_visibility=ModerationVisibility.shadowed, 

551 moderator_user_id=context.user_id, 

552 reason=f"User {user.id} shadowed: {request.admin_note}", 

553 ) 

554 return _user_to_details(session, user) 

555 

556 def UnshadowUser( 

557 self, request: admin_pb2.UnshadowUserReq, context: CouchersContext, session: Session 

558 ) -> admin_pb2.UserDetails: 

559 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

560 if not user: 560 ↛ 561line 560 didn't jump to line 561 because the condition on line 560 was never true

561 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

562 if not request.admin_note.strip(): 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true

563 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_cant_be_empty") 

564 log_admin_action(session, context, user, "unshadow", note=request.admin_note, level=AdminActionLevel.high) 

565 user.shadowed_at = None 

566 # Sweep content shadowed by the cascade back to visible; leave hidden/unlisted content where moderators put it 

567 bulk_set_user_content_visibility( 

568 session=session, 

569 user=user, 

570 new_visibility=ModerationVisibility.visible, 

571 moderator_user_id=context.user_id, 

572 from_visibilities={ModerationVisibility.shadowed}, 

573 reason=f"User {user.id} unshadowed: {request.admin_note}", 

574 ) 

575 return _user_to_details(session, user) 

576 

577 def AddAdminNote( 

578 self, request: admin_pb2.AddAdminNoteReq, context: CouchersContext, session: Session 

579 ) -> admin_pb2.UserDetails: 

580 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

581 if not user: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true

582 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

583 has_note = bool(request.admin_note.strip()) 

584 has_data = bool(request.data.strip()) 

585 if has_note == has_data: 

586 context.abort_with_error_code( 

587 grpc.StatusCode.INVALID_ARGUMENT, "admin:note_requires_exactly_one_of_note_or_data" 

588 ) 

589 data = None 

590 if has_data: 

591 try: 

592 data = json.loads(request.data) 

593 except json.JSONDecodeError: 

594 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_data_must_be_valid_json") 

595 level = api2adminactionlevel.get(request.level, AdminActionLevel.normal) 

596 log_admin_action( 

597 session, 

598 context, 

599 user, 

600 "note", 

601 note=request.admin_note if has_note else None, 

602 data=data, 

603 level=level, 

604 ) 

605 return _user_to_details(session, user) 

606 

607 def GetContentReport( 

608 self, request: admin_pb2.GetContentReportReq, context: CouchersContext, session: Session 

609 ) -> admin_pb2.GetContentReportRes: 

610 content_report = session.execute( 

611 select(ContentReport).where(ContentReport.id == request.content_report_id) 

612 ).scalar_one_or_none() 

613 if not content_report: 

614 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:content_report_not_found") 

615 return admin_pb2.GetContentReportRes( 

616 content_report=_content_report_to_pb(content_report), 

617 ) 

618 

619 def GetContentReportsForAuthor( 

620 self, request: admin_pb2.GetContentReportsForAuthorReq, context: CouchersContext, session: Session 

621 ) -> admin_pb2.GetContentReportsForAuthorRes: 

622 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

623 if not user: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true

624 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

625 content_reports = ( 

626 session.execute( 

627 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

628 ) 

629 .scalars() 

630 .all() 

631 ) 

632 return admin_pb2.GetContentReportsForAuthorRes( 

633 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

634 ) 

635 

636 def SendModNote( 

637 self, request: admin_pb2.SendModNoteReq, context: CouchersContext, session: Session 

638 ) -> admin_pb2.UserDetails: 

639 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

640 if not user: 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true

641 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

642 session.add( 

643 ModNote( 

644 user_id=user.id, 

645 internal_id=request.internal_id, 

646 creator_user_id=context.user_id, 

647 note_content=request.content, 

648 ) 

649 ) 

650 session.flush() 

651 notify_user = "No" if request.do_not_notify else "Yes" 

652 log_admin_action( 

653 session, 

654 context, 

655 user, 

656 "send_mod_note", 

657 note=f"Notify user: {notify_user}\n\n{request.content}", 

658 ) 

659 

660 if not request.do_not_notify: 

661 notify( 

662 session, 

663 user_id=user.id, 

664 topic_action=NotificationTopicAction.modnote__create, 

665 key="", 

666 ) 

667 

668 return _user_to_details(session, user) 

669 

670 def MarkUserNeedsLocationUpdate( 

671 self, request: admin_pb2.MarkUserNeedsLocationUpdateReq, context: CouchersContext, session: Session 

672 ) -> admin_pb2.UserDetails: 

673 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

674 if not user: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true

675 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

676 user.needs_to_update_location = True 

677 log_admin_action( 

678 session, context, user, "mark_needs_location_update", note="Marked user as needing location update" 

679 ) 

680 return _user_to_details(session, user) 

681 

682 def DeleteUser( 

683 self, request: admin_pb2.DeleteUserReq, context: CouchersContext, session: Session 

684 ) -> admin_pb2.UserDetails: 

685 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

686 if not user: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true

687 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

688 user.deleted_at = now() 

689 log_admin_action(session, context, user, "delete_user", level=AdminActionLevel.high) 

690 return _user_to_details(session, user) 

691 

692 def RecoverDeletedUser( 

693 self, request: admin_pb2.RecoverDeletedUserReq, context: CouchersContext, session: Session 

694 ) -> admin_pb2.UserDetails: 

695 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

696 if not user: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true

697 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

698 user.deleted_at = None 

699 user.undelete_token = None 

700 user.undelete_until = None 

701 log_admin_action(session, context, user, "recover_user", level=AdminActionLevel.high) 

702 return _user_to_details(session, user) 

703 

704 def CreateApiKey( 

705 self, request: admin_pb2.CreateApiKeyReq, context: CouchersContext, session: Session 

706 ) -> admin_pb2.CreateApiKeyRes: 

707 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

708 if not user: 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true

709 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

710 token, expiry = create_session( 

711 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

712 ) 

713 log_admin_action(session, context, user, "create_api_key") 

714 

715 notify( 

716 session, 

717 user_id=user.id, 

718 topic_action=NotificationTopicAction.api_key__create, 

719 key="", 

720 data=notification_data_pb2.ApiKeyCreate( 

721 api_key=token, 

722 expiry=Timestamp_from_datetime(expiry), 

723 ), 

724 ) 

725 

726 return admin_pb2.CreateApiKeyRes() 

727 

728 def GetChats( 

729 self, request: admin_pb2.GetChatsReq, context: CouchersContext, session: Session 

730 ) -> admin_pb2.GetChatsRes: 

731 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

732 if not user: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true

733 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

734 

735 # Cache for ChatUserInfo to avoid recomputing for the same user 

736 user_info_cache = {} 

737 

738 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo: 

739 if user_id not in user_info_cache: 739 ↛ 748line 739 didn't jump to line 748 because the condition on line 739 was always true

740 u = session.execute(select(User).where(User.id == user_id)).scalar_one() 

741 user_info_cache[user_id] = admin_pb2.ChatUserInfo( 

742 user_id=u.id, 

743 username=u.username, 

744 name=u.name, 

745 birthdate=date_to_api(u.birthdate), 

746 gender=u.gender, 

747 ) 

748 return user_info_cache[user_id] 

749 

750 def message_to_pb(message: Message) -> admin_pb2.ChatMessage: 

751 return admin_pb2.ChatMessage( 

752 message_id=message.id, 

753 author=get_chat_user_info(message.author_id), 

754 time=Timestamp_from_datetime(message.time), 

755 message_type=message.message_type.name if message.message_type else "", 

756 text=message.text or "", 

757 host_request_status_target=( 

758 message.host_request_status_target.name if message.host_request_status_target else "" 

759 ), 

760 target=get_chat_user_info(message.target_id) if message.target_id else None, 

761 ) 

762 

763 def get_messages_for_conversation(conversation_id: int) -> list[admin_pb2.ChatMessage]: 

764 messages = ( 

765 session.execute( 

766 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

767 ) 

768 .scalars() 

769 .all() 

770 ) 

771 return [message_to_pb(msg) for msg in messages] 

772 

773 def get_host_request_pb(host_request: HostRequest) -> admin_pb2.AdminHostRequest: 

774 return admin_pb2.AdminHostRequest( 

775 host_request_id=host_request.conversation_id, 

776 surfer=get_chat_user_info(host_request.initiator_user_id), 

777 host=get_chat_user_info(host_request.recipient_user_id), 

778 status=host_request.status.name if host_request.status else "", 

779 from_date=date_to_api(host_request.from_date), 

780 to_date=date_to_api(host_request.to_date), 

781 created=Timestamp_from_datetime(host_request.conversation.created), 

782 messages=get_messages_for_conversation(host_request.conversation_id), 

783 ) 

784 

785 def get_group_chat_pb(group_chat: GroupChat) -> admin_pb2.AdminGroupChat: 

786 subs = ( 

787 session.execute( 

788 select(GroupChatSubscription) 

789 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

790 .order_by(GroupChatSubscription.joined.asc()) 

791 ) 

792 .scalars() 

793 .all() 

794 ) 

795 members = [ 

796 admin_pb2.GroupChatMember( 

797 user=get_chat_user_info(sub.user_id), 

798 joined=Timestamp_from_datetime(sub.joined), 

799 left=Timestamp_from_datetime(sub.left) if sub.left else None, 

800 role=sub.role.name if sub.role else "", 

801 ) 

802 for sub in subs 

803 ] 

804 return admin_pb2.AdminGroupChat( 

805 group_chat_id=group_chat.conversation_id, 

806 title=group_chat.title or "", 

807 is_dm=group_chat.is_dm, 

808 creator=get_chat_user_info(group_chat.creator_id), 

809 members=members, 

810 messages=get_messages_for_conversation(group_chat.conversation_id), 

811 ) 

812 

813 # Get all host requests for the user 

814 host_requests = ( 

815 session.execute( 

816 select(HostRequest) 

817 .where(or_(HostRequest.recipient_user_id == user.id, HostRequest.initiator_user_id == user.id)) 

818 .order_by(HostRequest.conversation_id.desc()) 

819 ) 

820 .scalars() 

821 .all() 

822 ) 

823 

824 # Get all group chats for the user 

825 group_chat_ids = ( 

826 session.execute( 

827 select(GroupChatSubscription.group_chat_id) 

828 .where(GroupChatSubscription.user_id == user.id) 

829 .order_by(GroupChatSubscription.joined.desc()) 

830 ) 

831 .scalars() 

832 .all() 

833 ) 

834 group_chats = ( 

835 session.execute(select(GroupChat).where(GroupChat.conversation_id.in_(group_chat_ids))).scalars().all() 

836 ) 

837 

838 # Build protobuf objects, then sort by latest message time (most recent first) 

839 host_request_pbs = [get_host_request_pb(hr) for hr in host_requests] 

840 host_request_pbs.sort(key=lambda hr: hr.messages[-1].time.seconds if hr.messages else 0, reverse=True) 

841 

842 group_chat_pbs = [get_group_chat_pb(gc) for gc in group_chats] 

843 group_chat_pbs.sort(key=lambda gc: gc.messages[-1].time.seconds if gc.messages else 0, reverse=True) 

844 

845 return admin_pb2.GetChatsRes( 

846 user=get_chat_user_info(user.id), 

847 host_requests=host_request_pbs, 

848 group_chats=group_chat_pbs, 

849 ) 

850 

851 def DeleteEvent( 

852 self, request: admin_pb2.DeleteEventReq, context: CouchersContext, session: Session 

853 ) -> empty_pb2.Empty: 

854 res = session.execute( 

855 select(Event, EventOccurrence) 

856 .where(EventOccurrence.id == request.event_id) 

857 .where(EventOccurrence.event_id == Event.id) 

858 .where(~EventOccurrence.is_deleted) 

859 ).one_or_none() 

860 

861 if not res: 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true

862 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

863 

864 event, occurrence = res 

865 

866 occurrence.is_deleted = True 

867 

868 queue_job( 

869 session, 

870 job=generate_event_delete_notifications, 

871 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

872 occurrence_id=occurrence.id, 

873 ), 

874 ) 

875 

876 return empty_pb2.Empty() 

877 

878 def ListUserIds( 

879 self, request: admin_pb2.ListUserIdsReq, context: CouchersContext, session: Session 

880 ) -> admin_pb2.ListUserIdsRes: 

881 start_date = to_aware_datetime(request.start_time) 

882 end_date = to_aware_datetime(request.end_time) 

883 

884 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

885 next_user_id = int(request.page_token) if request.page_token else 0 

886 

887 user_ids = ( 

888 session.execute( 

889 select(User.id) 

890 .where(or_(User.id <= next_user_id, to_bool(next_user_id == 0))) 

891 .where(User.joined >= start_date) 

892 .where(User.joined <= end_date) 

893 .order_by(User.id.desc()) 

894 .limit(page_size + 1) 

895 ) 

896 .scalars() 

897 .all() 

898 ) 

899 

900 return admin_pb2.ListUserIdsRes( 

901 user_ids=user_ids[:page_size], 

902 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

903 ) 

904 

905 def EditReferenceText( 

906 self, request: admin_pb2.EditReferenceTextReq, context: CouchersContext, session: Session 

907 ) -> empty_pb2.Empty: 

908 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

909 

910 if reference is None: 910 ↛ 911line 910 didn't jump to line 911 because the condition on line 910 was never true

911 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:reference_not_found") 

912 

913 if not request.new_text.strip(): 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true

914 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "reference_no_text") 

915 

916 reference.text = request.new_text.strip() 

917 # Log action against the reference author 

918 author = session.execute(select(User).where(User.id == reference.from_user_id)).scalar_one() 

919 log_admin_action(session, context, author, "edit_reference", note=f"Edited reference {reference.id}") 

920 return empty_pb2.Empty() 

921 

922 def DeleteReference( 

923 self, request: admin_pb2.DeleteReferenceReq, context: CouchersContext, session: Session 

924 ) -> empty_pb2.Empty: 

925 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:deletereference_deprecated_use_ums") 

926 

927 def GetUserReferences( 

928 self, request: admin_pb2.GetUserReferencesReq, context: CouchersContext, session: Session 

929 ) -> admin_pb2.GetUserReferencesRes: 

930 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

931 if not user: 

932 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

933 

934 references_from = ( 

935 session.execute(select(Reference).where(Reference.from_user_id == user.id).order_by(Reference.id.desc())) 

936 .scalars() 

937 .all() 

938 ) 

939 

940 references_to = ( 

941 session.execute(select(Reference).where(Reference.to_user_id == user.id).order_by(Reference.id.desc())) 

942 .scalars() 

943 .all() 

944 ) 

945 

946 return admin_pb2.GetUserReferencesRes( 

947 references_from=[_reference_to_pb(ref) for ref in references_from], 

948 references_to=[_reference_to_pb(ref) for ref in references_to], 

949 ) 

950 

951 def GetFriendRequests( 

952 self, request: admin_pb2.GetFriendRequestsReq, context: CouchersContext, session: Session 

953 ) -> admin_pb2.GetFriendRequestsRes: 

954 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

955 if not user: 

956 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

957 

958 user_info_cache: dict[int, admin_pb2.ChatUserInfo] = {} 

959 

960 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo: 

961 if user_id not in user_info_cache: 

962 u = session.execute(select(User).where(User.id == user_id)).scalar_one() 

963 user_info_cache[user_id] = admin_pb2.ChatUserInfo( 

964 user_id=u.id, 

965 username=u.username, 

966 name=u.name, 

967 birthdate=date_to_api(u.birthdate), 

968 gender=u.gender, 

969 ) 

970 return user_info_cache[user_id] 

971 

972 def friend_request_to_pb(rel: FriendRelationship) -> admin_pb2.AdminFriendRequest: 

973 return admin_pb2.AdminFriendRequest( 

974 friend_request_id=rel.id, 

975 from_user=get_chat_user_info(rel.from_user_id), 

976 to_user=get_chat_user_info(rel.to_user_id), 

977 status=rel.status.name if rel.status else "", 

978 time_sent=Timestamp_from_datetime(rel.time_sent), 

979 time_responded=Timestamp_from_datetime(rel.time_responded) if rel.time_responded else None, 

980 moderation_visibility=rel.moderation_state.visibility.name, 

981 ) 

982 

983 sent = ( 

984 session.execute( 

985 select(FriendRelationship) 

986 .where(FriendRelationship.from_user_id == user.id) 

987 .order_by(FriendRelationship.id.desc()) 

988 ) 

989 .scalars() 

990 .all() 

991 ) 

992 

993 received = ( 

994 session.execute( 

995 select(FriendRelationship) 

996 .where(FriendRelationship.to_user_id == user.id) 

997 .order_by(FriendRelationship.id.desc()) 

998 ) 

999 .scalars() 

1000 .all() 

1001 ) 

1002 

1003 return admin_pb2.GetFriendRequestsRes( 

1004 sent=[friend_request_to_pb(rel) for rel in sent], 

1005 received=[friend_request_to_pb(rel) for rel in received], 

1006 ) 

1007 

1008 def GetNonvisibleUserAccessLog( 

1009 self, request: admin_pb2.GetNonvisibleUserAccessLogReq, context: CouchersContext, session: Session 

1010 ) -> admin_pb2.GetNonvisibleUserAccessLogRes: 

1011 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1012 if not user: 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true

1013 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1014 

1015 actor = aliased(User) 

1016 rows = session.execute( 

1017 select(NonvisibleUserAccess, actor.username) 

1018 .outerjoin(actor, NonvisibleUserAccess.actor_user_id == actor.id) 

1019 .where(NonvisibleUserAccess.target_user_id == user.id) 

1020 .order_by(NonvisibleUserAccess.time.desc()) 

1021 .limit(MAX_PAGINATION_LENGTH) 

1022 ).all() 

1023 

1024 return admin_pb2.GetNonvisibleUserAccessLogRes( 

1025 entries=[ 

1026 admin_pb2.NonvisibleUserAccessLogEntry( 

1027 time=Timestamp_from_datetime(access.time), 

1028 access_type=nonvisibleuseraccesstype2api[access.access_type], 

1029 target_state=nonvisibleuserstate2api[access.target_state], 

1030 target_user_id=access.target_user_id, 

1031 actor_user_id=Int64Value(value=access.actor_user_id) if access.actor_user_id is not None else None, 

1032 actor_username=actor_username or "", 

1033 ip_address=access.ip_address or "", 

1034 user_agent=access.user_agent or "", 

1035 sofa=access.sofa or "", 

1036 ) 

1037 for access, actor_username in rows 

1038 ] 

1039 ) 

1040 

1041 def EditDiscussion( 

1042 self, request: admin_pb2.EditDiscussionReq, context: CouchersContext, session: Session 

1043 ) -> empty_pb2.Empty: 

1044 discussion = session.execute( 

1045 select(Discussion).where(Discussion.id == request.discussion_id) 

1046 ).scalar_one_or_none() 

1047 if not discussion: 

1048 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found") 

1049 if request.new_title: 

1050 discussion.title = request.new_title.strip() 

1051 if request.new_content: 

1052 discussion.content = request.new_content.strip() 

1053 return empty_pb2.Empty() 

1054 

1055 def DeleteDiscussion( 

1056 self, request: admin_pb2.AdminDeleteDiscussionReq, context: CouchersContext, session: Session 

1057 ) -> empty_pb2.Empty: 

1058 discussion = session.execute( 

1059 select(Discussion).where(Discussion.id == request.discussion_id) 

1060 ).scalar_one_or_none() 

1061 if not discussion: 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true

1062 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found") 

1063 if discussion.deleted is not None: 1063 ↛ 1064line 1063 didn't jump to line 1064 because the condition on line 1063 was never true

1064 return empty_pb2.Empty() 

1065 session.add( 

1066 DiscussionVersion( 

1067 discussion_id=discussion.id, 

1068 editor_user_id=context.user_id, 

1069 change_type=ContentChangeType.delete, 

1070 old_title=discussion.title, 

1071 new_title=None, 

1072 old_content=discussion.content, 

1073 new_content=None, 

1074 ) 

1075 ) 

1076 discussion.deleted = now() 

1077 return empty_pb2.Empty() 

1078 

1079 def EditReply(self, request: admin_pb2.EditReplyReq, context: CouchersContext, session: Session) -> empty_pb2.Empty: 

1080 database_id, depth = unpack_thread_id(request.reply_id) 

1081 if depth == 1: 

1082 obj: Comment | Reply | None = session.execute( 

1083 select(Comment).where(Comment.id == database_id) 

1084 ).scalar_one_or_none() 

1085 elif depth == 2: 

1086 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none() 

1087 else: 

1088 obj = None 

1089 

1090 if not obj: 

1091 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:object_not_found") 

1092 old_content = obj.content 

1093 new_content = request.new_content.strip() 

1094 if depth == 1: 

1095 session.add( 

1096 CommentVersion( 

1097 comment_id=database_id, 

1098 editor_user_id=context.user_id, 

1099 change_type=ContentChangeType.edit, 

1100 old_content=old_content, 

1101 new_content=new_content, 

1102 ) 

1103 ) 

1104 else: 

1105 session.add( 

1106 ReplyVersion( 

1107 reply_id=database_id, 

1108 editor_user_id=context.user_id, 

1109 change_type=ContentChangeType.edit, 

1110 old_content=old_content, 

1111 new_content=new_content, 

1112 ) 

1113 ) 

1114 obj.content = new_content 

1115 return empty_pb2.Empty() 

1116 

1117 def AddUsersToModerationUserList( 

1118 self, request: admin_pb2.AddUsersToModerationUserListReq, context: CouchersContext, session: Session 

1119 ) -> admin_pb2.AddUsersToModerationUserListRes: 

1120 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created. 

1121 Id of the moderation list is returned.""" 

1122 req_users = request.users 

1123 users = [] 

1124 

1125 for req_user in req_users: 

1126 user = session.execute(select(User).where(username_or_email_or_id(req_user))).scalar_one_or_none() 

1127 if not user: 

1128 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1129 users.append(user) 

1130 

1131 if request.moderation_list_id: 

1132 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

1133 if not moderation_user_list: 

1134 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:moderation_user_list_not_found") 

1135 # Create a new moderation user list if no one is provided 

1136 else: 

1137 moderation_user_list = ModerationUserList() 

1138 session.add(moderation_user_list) 

1139 session.flush() 

1140 

1141 # Add users to the moderation list only if not already in it 

1142 for user in users: 

1143 if user not in moderation_user_list.users: 1143 ↛ 1145line 1143 didn't jump to line 1145 because the condition on line 1143 was always true

1144 moderation_user_list.users.append(user) 

1145 log_admin_action(session, context, user, "add_to_moderation_list") 

1146 

1147 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id) 

1148 

1149 def ListModerationUserLists( 

1150 self, request: admin_pb2.ListModerationUserListsReq, context: CouchersContext, session: Session 

1151 ) -> admin_pb2.ListModerationUserListsRes: 

1152 """Lists all moderation user lists for a user.""" 

1153 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1154 if not user: 1154 ↛ 1155line 1154 didn't jump to line 1155 because the condition on line 1154 was never true

1155 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1156 

1157 moderation_lists = [ 

1158 admin_pb2.ModerationList( 

1159 moderation_list_id=ml.id, 

1160 members=[_user_to_details(session, u) for u in ml.users], 

1161 ) 

1162 for ml in user.moderation_user_lists 

1163 ] 

1164 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists) 

1165 

1166 def RemoveUserFromModerationUserList( 

1167 self, request: admin_pb2.RemoveUserFromModerationUserListReq, context: CouchersContext, session: Session 

1168 ) -> empty_pb2.Empty: 

1169 """Removes a user from a provided moderation user list.""" 

1170 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1171 if not user: 

1172 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1173 if not request.moderation_list_id: 

1174 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:missing_moderation_user_list_id") 

1175 

1176 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

1177 if not moderation_user_list: 1177 ↛ 1178line 1177 didn't jump to line 1178 because the condition on line 1177 was never true

1178 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:moderation_user_list_not_found") 

1179 if user not in moderation_user_list.users: 

1180 context.abort_with_error_code( 

1181 grpc.StatusCode.FAILED_PRECONDITION, "admin:user_not_in_the_moderation_user_list" 

1182 ) 

1183 

1184 moderation_user_list.users.remove(user) 

1185 log_admin_action(session, context, user, "remove_from_moderation_list") 

1186 

1187 if len(moderation_user_list.users) == 0: 

1188 session.delete(moderation_user_list) 

1189 

1190 return empty_pb2.Empty() 

1191 

1192 def CreateAccountDeletionLink( 

1193 self, request: admin_pb2.CreateAccountDeletionLinkReq, context: CouchersContext, session: Session 

1194 ) -> admin_pb2.CreateAccountDeletionLinkRes: 

1195 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1196 if not user: 1196 ↛ 1197line 1196 didn't jump to line 1197 because the condition on line 1196 was never true

1197 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1198 token = AccountDeletionToken(token=urlsafe_secure_token(), user_id=user.id, expiry=now() + timedelta(hours=2)) 

1199 session.add(token) 

1200 log_admin_action(session, context, user, "create_account_deletion_link", level=AdminActionLevel.high) 

1201 return admin_pb2.CreateAccountDeletionLinkRes( 

1202 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token) 

1203 ) 

1204 

1205 def AccessStats( 

1206 self, request: admin_pb2.AccessStatsReq, context: CouchersContext, session: Session 

1207 ) -> admin_pb2.AccessStatsRes: 

1208 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1209 if not user: 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true

1210 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1211 

1212 start_time = ( 

1213 to_aware_datetime(request.start_time) if request.HasField("start_time") else now() - timedelta(days=90) 

1214 ) 

1215 end_time = to_aware_datetime(request.end_time) if request.HasField("end_time") else now() 

1216 

1217 user_activity = session.execute( 

1218 select( 

1219 UserActivity.ip_address, 

1220 UserActivity.user_agent, 

1221 func.sum(UserActivity.api_calls), 

1222 func.count(UserActivity.period), 

1223 func.min(UserActivity.period), 

1224 func.max(UserActivity.period), 

1225 ) 

1226 .where(UserActivity.user_id == user.id) 

1227 .where(UserActivity.period >= start_time) 

1228 .where(UserActivity.period <= end_time) 

1229 .order_by(func.max(UserActivity.period).desc()) 

1230 .group_by(UserActivity.ip_address, UserActivity.user_agent) 

1231 ).all() 

1232 

1233 out = admin_pb2.AccessStatsRes() 

1234 

1235 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity: 

1236 ip_address_str = str(ip_address) if ip_address is not None else None 

1237 user_agent_data = user_agents_parse(user_agent or "") 

1238 asn = geoip_asn(ip_address_str) 

1239 out.stats.append( 

1240 admin_pb2.AccessStat( 

1241 ip_address=ip_address_str, 

1242 asn=str(asn[0]) if asn else None, 

1243 asorg=str(asn[1]) if asn else None, 

1244 asnetwork=str(asn[2]) if asn else None, 

1245 user_agent=user_agent, 

1246 operating_system=user_agent_data.os.family, 

1247 browser=user_agent_data.browser.family, 

1248 device=user_agent_data.device.family, 

1249 approximate_location=geoip_approximate_location(ip_address_str) or "Unknown", 

1250 api_call_count=api_call_count, 

1251 periods_count=periods_count, 

1252 first_seen=Timestamp_from_datetime(first_seen), 

1253 last_seen=Timestamp_from_datetime(last_seen), 

1254 ) 

1255 ) 

1256 

1257 return out 

1258 

1259 def SetLastDonated( 

1260 self, request: admin_pb2.SetLastDonatedReq, context: CouchersContext, session: Session 

1261 ) -> admin_pb2.UserDetails: 

1262 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1263 if not user: 

1264 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1265 

1266 if request.HasField("last_donated"): 

1267 user.last_donated = to_aware_datetime(request.last_donated) 

1268 else: 

1269 user.last_donated = None 

1270 

1271 log_admin_action(session, context, user, "set_last_donated") 

1272 return _user_to_details(session, user) 

1273 

1274 def CreateAdminTag( 

1275 self, request: admin_pb2.CreateAdminTagReq, context: CouchersContext, session: Session 

1276 ) -> admin_pb2.AdminTagInfo: 

1277 if not request.tag.strip(): 

1278 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:tag_cant_be_empty") 

1279 existing = session.execute(select(AdminTag).where(AdminTag.tag == request.tag.strip())).scalar_one_or_none() 

1280 if existing: 

1281 context.abort_with_error_code(grpc.StatusCode.ALREADY_EXISTS, "admin:tag_already_exists") 

1282 admin_tag = AdminTag(tag=request.tag.strip()) 

1283 session.add(admin_tag) 

1284 session.flush() 

1285 return admin_pb2.AdminTagInfo(admin_tag_id=admin_tag.id, tag=admin_tag.tag) 

1286 

1287 def ListAdminTags( 

1288 self, request: admin_pb2.ListAdminTagsReq, context: CouchersContext, session: Session 

1289 ) -> admin_pb2.ListAdminTagsRes: 

1290 tags = session.execute(select(AdminTag).order_by(AdminTag.tag)).scalars().all() 

1291 return admin_pb2.ListAdminTagsRes( 

1292 tags=[admin_pb2.AdminTagInfo(admin_tag_id=tag.id, tag=tag.tag) for tag in tags] 

1293 ) 

1294 

1295 def AddAdminTagToUser( 

1296 self, request: admin_pb2.AddAdminTagToUserReq, context: CouchersContext, session: Session 

1297 ) -> admin_pb2.UserDetails: 

1298 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1299 if not user: 1299 ↛ 1300line 1299 didn't jump to line 1300 because the condition on line 1299 was never true

1300 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1301 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none() 

1302 if not admin_tag: 

1303 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:tag_not_found") 

1304 existing = session.execute( 

1305 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id) 

1306 ).scalar_one_or_none() 

1307 if existing: 

1308 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_already_has_admin_tag") 

1309 session.add(UserAdminTag(user_id=user.id, admin_tag_id=admin_tag.id)) 

1310 session.flush() 

1311 log_admin_action(session, context, user, "add_tag", tag=request.tag) 

1312 return _user_to_details(session, user) 

1313 

1314 def RemoveAdminTagFromUser( 

1315 self, request: admin_pb2.RemoveAdminTagFromUserReq, context: CouchersContext, session: Session 

1316 ) -> admin_pb2.UserDetails: 

1317 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1318 if not user: 1318 ↛ 1319line 1318 didn't jump to line 1319 because the condition on line 1318 was never true

1319 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1320 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none() 

1321 if not admin_tag: 1321 ↛ 1322line 1321 didn't jump to line 1322 because the condition on line 1321 was never true

1322 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:tag_not_found") 

1323 user_admin_tag = session.execute( 

1324 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id) 

1325 ).scalar_one_or_none() 

1326 if not user_admin_tag: 

1327 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_does_not_have_admin_tag") 

1328 session.delete(user_admin_tag) 

1329 session.flush() 

1330 log_admin_action(session, context, user, "remove_tag", tag=request.tag) 

1331 return _user_to_details(session, user) 

1332 

1333 def SetModScore( 

1334 self, request: admin_pb2.SetModScoreReq, context: CouchersContext, session: Session 

1335 ) -> admin_pb2.UserDetails: 

1336 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1337 if not user: 

1338 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1339 user.mod_score = request.mod_score 

1340 log_admin_action(session, context, user, "set_mod_score", note=f"mod_score={request.mod_score}") 

1341 return _user_to_details(session, user) 

1342 

1343 def ListAdminActions( 

1344 self, request: admin_pb2.ListAdminActionsReq, context: CouchersContext, session: Session 

1345 ) -> admin_pb2.ListAdminActionsRes: 

1346 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1347 

1348 admin_user = aliased(User) 

1349 target_user = aliased(User) 

1350 

1351 statement = ( 

1352 select(AdminAction, admin_user.username, target_user.username) 

1353 .join(admin_user, AdminAction.admin_user_id == admin_user.id) 

1354 .join(target_user, AdminAction.target_user_id == target_user.id) 

1355 ) 

1356 

1357 if request.admin_user_id: 

1358 statement = statement.where(AdminAction.admin_user_id == request.admin_user_id) 

1359 if request.target_user_id: 

1360 statement = statement.where(AdminAction.target_user_id == request.target_user_id) 

1361 if request.page_token: 

1362 statement = statement.where(AdminAction.id < int(request.page_token)) 

1363 

1364 statement = statement.order_by(AdminAction.id.desc()).limit(page_size + 1) 

1365 

1366 rows = session.execute(statement).all() 

1367 

1368 action_pbs = [ 

1369 admin_pb2.AdminActionLog( 

1370 admin_action_id=action.id, 

1371 created=Timestamp_from_datetime(action.created), 

1372 admin_user_id=action.admin_user_id, 

1373 admin_username=admin_username, 

1374 action_type=action.action_type, 

1375 level=adminactionlevel2api[action.level], 

1376 note=action.note or "", 

1377 data=json.dumps(action.data) if action.data is not None else "", 

1378 tag=action.tag or "", 

1379 target_user_id=action.target_user_id, 

1380 target_username=target_username, 

1381 ) 

1382 for action, admin_username, target_username in rows[:page_size] 

1383 ] 

1384 

1385 return admin_pb2.ListAdminActionsRes( 

1386 admin_actions=action_pbs, 

1387 next_page_token=str(rows[page_size - 1][0].id) if len(rows) > page_size else None, 

1388 ) 

1389 

1390 def ListUserUploads( 

1391 self, request: admin_pb2.ListUserUploadsReq, context: CouchersContext, session: Session 

1392 ) -> admin_pb2.ListUserUploadsRes: 

1393 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1394 if not user: 

1395 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1396 

1397 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1398 

1399 statement = select(Upload).where(Upload.creator_user_id == user.id) 

1400 if request.page_token: 

1401 cursor_created = session.execute( 

1402 select(Upload.created).where(Upload.key == request.page_token) 

1403 ).scalar_one() 

1404 statement = statement.where(tuple_(Upload.created, Upload.key) < (cursor_created, request.page_token)) 

1405 

1406 uploads = ( 

1407 session.execute(statement.order_by(Upload.created.desc(), Upload.key.desc()).limit(page_size + 1)) 

1408 .scalars() 

1409 .all() 

1410 ) 

1411 

1412 page = uploads[:page_size] 

1413 uses_by_key = get_upload_uses_for_keys(session, [upload.key for upload in page]) 

1414 

1415 return admin_pb2.ListUserUploadsRes( 

1416 uploads=[ 

1417 admin_pb2.UserUpload( 

1418 key=upload.key, 

1419 filename=upload.filename, 

1420 full_url=upload.full_url, 

1421 thumbnail_url=upload.thumbnail_url, 

1422 credit=upload.credit or "", 

1423 created=Timestamp_from_datetime(upload.created), 

1424 uses=[ 

1425 admin_pb2.UploadUse( 

1426 type=uploadusetype2api[use.use_type], 

1427 is_current=use.is_current, 

1428 user_id=use.user_id, 

1429 event_id=use.event_id, 

1430 page_id=use.page_id, 

1431 url=use.url, 

1432 ) 

1433 for use in uses_by_key.get(upload.key, []) 

1434 ], 

1435 ) 

1436 for upload in page 

1437 ], 

1438 next_page_token=uploads[page_size - 1].key if len(uploads) > page_size else None, 

1439 ) 

1440 

1441 def CreateOTAPackage( 

1442 self, request: admin_pb2.CreateOTAPackageReq, context: CouchersContext, session: Session 

1443 ) -> admin_pb2.OTAPackage: 

1444 platform = api2otaplatform.get(request.platform) 

1445 if platform is None: 1445 ↛ 1446line 1445 didn't jump to line 1446 because the condition on line 1445 was never true

1446 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_platform") 

1447 

1448 if not request.version: 

1449 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_version") 

1450 

1451 existing = session.execute( 

1452 select(OTAPackage.id).where(OTAPackage.platform == platform).where(OTAPackage.version == request.version) 

1453 ).scalar_one_or_none() 

1454 if existing is not None: 

1455 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:ota_package_already_exists") 

1456 

1457 # Read the keying/ordering fields out of the manifest we're about to serve, so the row can't 

1458 # disagree with the bytes on the CDN. 

1459 cdn_root = context.get_string_value("native_ota_cdn_root", "https://cdn.couchers.org/native/ota") 

1460 _content_type, body = _fetch_signed_manifest( 

1461 _native_ota_manifest_url(cdn_root=cdn_root, version=request.version, platform=platform.name) 

1462 ) 

1463 manifest = _extract_ota_manifest(body) 

1464 fingerprint = manifest.get("runtimeVersion") if manifest else None 

1465 manifest_id = manifest.get("id") if manifest else None 

1466 created_at_raw = manifest.get("createdAt") if manifest else None 

1467 if ( 

1468 manifest is None 

1469 or not isinstance(fingerprint, str) 

1470 or not fingerprint 

1471 or not isinstance(manifest_id, str) 

1472 or not manifest_id 

1473 or not isinstance(created_at_raw, str) 

1474 or not created_at_raw 

1475 ): 

1476 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_manifest") 

1477 try: 

1478 manifest_created_at = datetime.fromisoformat(created_at_raw) 

1479 except ValueError: 

1480 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_manifest") 

1481 if manifest_created_at.tzinfo is None: 1481 ↛ 1482line 1481 didn't jump to line 1482 because the condition on line 1481 was never true

1482 manifest_created_at = manifest_created_at.replace(tzinfo=UTC) 

1483 

1484 package = OTAPackage( 

1485 creator_user_id=context.user_id, 

1486 platform=platform, 

1487 fingerprint=fingerprint, 

1488 version=request.version, 

1489 manifest_created_at=manifest_created_at, 

1490 manifest_id=manifest_id, 

1491 ) 

1492 session.add(package) 

1493 session.flush() 

1494 

1495 return _ota_package_to_pb(package, _live_ota_package_ids(session)) 

1496 

1497 def ListOTAPackages( 

1498 self, request: admin_pb2.ListOTAPackagesReq, context: CouchersContext, session: Session 

1499 ) -> admin_pb2.ListOTAPackagesRes: 

1500 statement = select(OTAPackage).order_by(OTAPackage.manifest_created_at.desc(), OTAPackage.id.desc()) 

1501 if request.platform != admin_pb2.OTA_PLATFORM_UNSPECIFIED: 

1502 platform = api2otaplatform.get(request.platform) 

1503 if platform is None: 1503 ↛ 1504line 1503 didn't jump to line 1504 because the condition on line 1503 was never true

1504 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_platform") 

1505 statement = statement.where(OTAPackage.platform == platform) 

1506 if request.fingerprint: 1506 ↛ 1507line 1506 didn't jump to line 1507 because the condition on line 1506 was never true

1507 statement = statement.where(OTAPackage.fingerprint == request.fingerprint) 

1508 if not request.include_banned: 

1509 statement = statement.where(OTAPackage.banned_at.is_(None)) 

1510 

1511 packages = session.execute(statement).scalars().all() 

1512 live_ids = _live_ota_package_ids(session) 

1513 return admin_pb2.ListOTAPackagesRes(packages=[_ota_package_to_pb(package, live_ids) for package in packages]) 

1514 

1515 def BanOTAPackage( 

1516 self, request: admin_pb2.BanOTAPackageReq, context: CouchersContext, session: Session 

1517 ) -> admin_pb2.OTAPackage: 

1518 # Bans are irreversible — to roll back an accidental ban, republish the bundle as a new 

1519 # package — so a reason is required for the audit trail. 

1520 if not request.reason.strip(): 

1521 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:ota_ban_reason_required") 

1522 

1523 package = session.execute( 

1524 select(OTAPackage).where(OTAPackage.id == request.ota_package_id) 

1525 ).scalar_one_or_none() 

1526 if package is None: 

1527 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:ota_package_not_found") 

1528 

1529 if package.banned_at is None: 1529 ↛ 1533line 1529 didn't jump to line 1533 because the condition on line 1529 was always true

1530 package.banned_at = now() 

1531 package.banned_by_user_id = context.user_id 

1532 package.banned_reason = request.reason 

1533 session.flush() 

1534 

1535 return _ota_package_to_pb(package, _live_ota_package_ids(session))