Coverage for src / couchers / jobs / handlers.py: 88%

460 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-02 11:17 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from collections.abc import Sequence 

7from datetime import date, timedelta 

8from math import cos, pi, sin, sqrt 

9from random import sample 

10from typing import Any 

11from typing import cast as t_cast 

12 

13import requests 

14from google.protobuf import empty_pb2 

15from sqlalchemy import ColumnElement, Float, Function, Integer, Table, select 

16from sqlalchemy.orm import aliased 

17from sqlalchemy.sql import ( 

18 and_, 

19 case, 

20 cast, 

21 delete, 

22 distinct, 

23 exists, 

24 extract, 

25 func, 

26 literal, 

27 not_, 

28 or_, 

29 union_all, 

30 update, 

31) 

32 

33from couchers import urls 

34from couchers.config import config 

35from couchers.constants import ( 

36 ACTIVENESS_PROBE_EXPIRY_TIME, 

37 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

38 ACTIVENESS_PROBE_TIME_REMINDERS, 

39 EVENT_REMINDER_TIMEDELTA, 

40 HOST_REQUEST_MAX_REMINDERS, 

41 HOST_REQUEST_REMINDER_INTERVAL, 

42) 

43from couchers.context import make_background_user_context 

44from couchers.crypto import ( 

45 USER_LOCATION_RANDOMIZATION_NAME, 

46 asym_encrypt, 

47 b64decode, 

48 get_secret, 

49 simple_decrypt, 

50 stable_secure_uniform, 

51) 

52from couchers.db import session_scope 

53from couchers.email.dev import print_dev_email 

54from couchers.email.smtp import send_smtp_email 

55from couchers.helpers.badges import user_add_badge, user_remove_badge 

56from couchers.materialized_views import ( 

57 UserResponseRate, 

58) 

59from couchers.metrics import ( 

60 moderation_auto_approved_counter, 

61 push_notification_counter, 

62 strong_verification_completions_counter, 

63) 

64from couchers.models import ( 

65 AccountDeletionToken, 

66 ActivenessProbe, 

67 ActivenessProbeStatus, 

68 Cluster, 

69 ClusterRole, 

70 ClusterSubscription, 

71 EventOccurrence, 

72 EventOccurrenceAttendee, 

73 GroupChat, 

74 GroupChatSubscription, 

75 HostingStatus, 

76 HostRequest, 

77 HostRequestStatus, 

78 LoginToken, 

79 MeetupStatus, 

80 Message, 

81 MessageType, 

82 ModerationAction, 

83 ModerationLog, 

84 ModerationObjectType, 

85 ModerationQueueItem, 

86 ModerationState, 

87 ModerationTrigger, 

88 PassportSex, 

89 PasswordResetToken, 

90 PhotoGallery, 

91 PostalVerificationAttempt, 

92 PostalVerificationStatus, 

93 PushNotificationDeliveryAttempt, 

94 PushNotificationSubscription, 

95 Reference, 

96 StrongVerificationAttempt, 

97 StrongVerificationAttemptStatus, 

98 User, 

99 UserBadge, 

100 Volunteer, 

101) 

102from couchers.notifications.expo_api import get_expo_push_receipts 

103from couchers.notifications.notify import notify 

104from couchers.postal.postcard_service import send_postcard 

105from couchers.proto import moderation_pb2, notification_data_pb2 

106from couchers.proto.internal import jobs_pb2, verification_pb2 

107from couchers.resources import get_badge_dict, get_static_badge_dict 

108from couchers.servicers.api import user_model_to_pb 

109from couchers.servicers.events import ( 

110 event_to_pb, 

111) 

112from couchers.servicers.moderation import Moderation 

113from couchers.servicers.requests import host_request_to_pb 

114from couchers.sql import ( 

115 users_visible_to_each_other, 

116 where_moderated_content_visible, 

117 where_moderated_content_visible_to_user_column, 

118 where_user_columns_visible_to_each_other, 

119 where_users_column_visible, 

120) 

121from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

122from couchers.tasks import send_duplicate_strong_verification_email 

123from couchers.utils import ( 

124 Timestamp_from_datetime, 

125 create_coordinate, 

126 get_coordinates, 

127 not_none, 

128 now, 

129) 

130 

131logger = logging.getLogger(__name__) 

132 

133 

134def send_email(payload: jobs_pb2.SendEmailPayload) -> None: 

135 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

136 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

137 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

138 # the sender must return a models.Email object that can be added to the database 

139 email = sender( 

140 sender_name=payload.sender_name, 

141 sender_email=payload.sender_email, 

142 recipient=payload.recipient, 

143 subject=payload.subject, 

144 plain=payload.plain, 

145 html=payload.html, 

146 list_unsubscribe_header=payload.list_unsubscribe_header, 

147 source_data=payload.source_data, 

148 ) 

149 with session_scope() as session: 

150 session.add(email) 

151 

152 

153def purge_login_tokens(payload: empty_pb2.Empty) -> None: 

154 logger.info("Purging login tokens") 

155 with session_scope() as session: 

156 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

157 

158 

159def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None: 

160 logger.info("Purging login tokens") 

161 with session_scope() as session: 

162 session.execute( 

163 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

164 ) 

165 

166 

167def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None: 

168 logger.info("Purging account deletion tokens") 

169 with session_scope() as session: 

170 session.execute( 

171 delete(AccountDeletionToken) 

172 .where(~AccountDeletionToken.is_valid) 

173 .execution_options(synchronize_session=False) 

174 ) 

175 

176 

177def send_message_notifications(payload: empty_pb2.Empty) -> None: 

178 """ 

179 Sends out email notifications for messages that have been unseen for a long enough time 

180 """ 

181 # very crude and dumb algorithm 

182 logger.info("Sending out email notifications for unseen messages") 

183 

184 with session_scope() as session: 

185 # users who have unnotified messages older than 5 minutes in any group chat 

186 users = ( 

187 session.execute( 

188 where_moderated_content_visible_to_user_column( 

189 select(User) 

190 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

191 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

192 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

193 GroupChat, 

194 User.id, 

195 ) 

196 .where(not_(GroupChatSubscription.is_muted)) 

197 .where(User.is_visible) 

198 .where(Message.time >= GroupChatSubscription.joined) 

199 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

200 .where(Message.id > User.last_notified_message_id) 

201 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

202 .where(Message.time < now() - timedelta(minutes=5)) 

203 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

204 ) 

205 .scalars() 

206 .unique() 

207 ) 

208 

209 for user in users: 

210 context = make_background_user_context(user_id=user.id) 

211 # now actually grab all the group chats, not just less than 5 min old 

212 subquery = ( 

213 where_users_column_visible( 

214 where_moderated_content_visible( 

215 select( 

216 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

217 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

218 func.max(Message.id).label("message_id"), 

219 func.count(Message.id).label("count_unseen"), 

220 ) 

221 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

222 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

223 context, 

224 GroupChat, 

225 is_list_operation=True, 

226 ) 

227 .where(GroupChatSubscription.user_id == user.id) 

228 .where(not_(GroupChatSubscription.is_muted)) 

229 .where(Message.id > user.last_notified_message_id) 

230 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

231 .where(Message.time >= GroupChatSubscription.joined) 

232 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

233 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)), 

234 context, 

235 Message.author_id, 

236 ) 

237 .group_by(GroupChatSubscription.group_chat_id) 

238 .order_by(func.max(Message.id).desc()) 

239 .subquery() 

240 ) 

241 

242 unseen_messages = session.execute( 

243 where_moderated_content_visible( 

244 select(GroupChat, Message, subquery.c.count_unseen) 

245 .join(subquery, subquery.c.message_id == Message.id) 

246 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id), 

247 context, 

248 GroupChat, 

249 is_list_operation=True, 

250 ).order_by(subquery.c.message_id.desc()) 

251 ).all() 

252 

253 if not unseen_messages: 

254 continue 

255 

256 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

257 

258 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str: 

259 if group_chat.is_dm: 

260 return f"You missed {count_unseen} message(s) from {message.author.name}" 

261 else: 

262 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

263 

264 notify( 

265 session, 

266 user_id=user.id, 

267 topic_action="chat:missed_messages", 

268 key="", 

269 data=notification_data_pb2.ChatMissedMessages( 

270 messages=[ 

271 notification_data_pb2.ChatMessage( 

272 author=user_model_to_pb( 

273 message.author, 

274 session, 

275 context, 

276 ), 

277 message=format_title(message, group_chat, count_unseen), 

278 text=message.text, 

279 group_chat_id=message.conversation_id, 

280 ) 

281 for group_chat, message, count_unseen in unseen_messages 

282 ], 

283 ), 

284 ) 

285 session.commit() 

286 

287 

288def send_request_notifications(payload: empty_pb2.Empty) -> None: 

289 """ 

290 Sends out email notifications for unseen messages in host requests (as surfer or host) 

291 """ 

292 logger.info("Sending out email notifications for unseen messages in host requests") 

293 

294 with session_scope() as session: 

295 # Get all candidate users who might have unseen request messages 

296 candidate_user_ids = ( 

297 session.execute( 

298 select(User.id) 

299 .where(User.is_visible) 

300 .where( 

301 or_( 

302 # Users with unseen messages as surfer 

303 exists( 

304 select(1) 

305 .select_from(HostRequest) 

306 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

307 .where(HostRequest.surfer_user_id == User.id) 

308 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

309 .where(Message.id > User.last_notified_request_message_id) 

310 .where(Message.time < now() - timedelta(minutes=5)) 

311 .where(Message.message_type == MessageType.text) 

312 ), 

313 # Users with unseen messages as host 

314 exists( 

315 select(1) 

316 .select_from(HostRequest) 

317 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

318 .where(HostRequest.host_user_id == User.id) 

319 .where(Message.id > HostRequest.host_last_seen_message_id) 

320 .where(Message.id > User.last_notified_request_message_id) 

321 .where(Message.time < now() - timedelta(minutes=5)) 

322 .where(Message.message_type == MessageType.text) 

323 ), 

324 ) 

325 ) 

326 ) 

327 .scalars() 

328 .all() 

329 ) 

330 

331 for user_id in candidate_user_ids: 

332 context = make_background_user_context(user_id=user_id) 

333 

334 # requests where this user is surfing 

335 surfing_reqs = session.execute( 

336 where_users_column_visible( 

337 where_moderated_content_visible_to_user_column( 

338 select(User, HostRequest, func.max(Message.id)) 

339 .where(User.id == user_id) 

340 .join(HostRequest, HostRequest.surfer_user_id == User.id), 

341 HostRequest, 

342 HostRequest.surfer_user_id, 

343 ), 

344 context, 

345 HostRequest.host_user_id, 

346 ) 

347 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

348 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

349 .where(Message.id > User.last_notified_request_message_id) 

350 .where(Message.time < now() - timedelta(minutes=5)) 

351 .where(Message.message_type == MessageType.text) 

352 .group_by(User, HostRequest) # type: ignore[arg-type] 

353 ).all() 

354 

355 # where this user is hosting 

356 hosting_reqs = session.execute( 

357 where_users_column_visible( 

358 where_moderated_content_visible_to_user_column( 

359 select(User, HostRequest, func.max(Message.id)) 

360 .where(User.id == user_id) 

361 .join(HostRequest, HostRequest.host_user_id == User.id), 

362 HostRequest, 

363 HostRequest.host_user_id, 

364 ), 

365 context, 

366 HostRequest.surfer_user_id, 

367 ) 

368 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

369 .where(Message.id > HostRequest.host_last_seen_message_id) 

370 .where(Message.id > User.last_notified_request_message_id) 

371 .where(Message.time < now() - timedelta(minutes=5)) 

372 .where(Message.message_type == MessageType.text) 

373 .group_by(User, HostRequest) # type: ignore[arg-type] 

374 ).all() 

375 

376 for user, host_request, max_message_id in surfing_reqs: 

377 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

378 session.flush() 

379 

380 notify( 

381 session, 

382 user_id=user.id, 

383 topic_action="host_request:missed_messages", 

384 key=str(host_request.conversation_id), 

385 data=notification_data_pb2.HostRequestMissedMessages( 

386 host_request=host_request_to_pb(host_request, session, context), 

387 user=user_model_to_pb(host_request.host, session, context), 

388 am_host=False, 

389 ), 

390 ) 

391 

392 for user, host_request, max_message_id in hosting_reqs: 

393 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

394 session.flush() 

395 

396 notify( 

397 session, 

398 user_id=user.id, 

399 topic_action="host_request:missed_messages", 

400 key=str(host_request.conversation_id), 

401 data=notification_data_pb2.HostRequestMissedMessages( 

402 host_request=host_request_to_pb(host_request, session, context), 

403 user=user_model_to_pb(host_request.surfer, session, context), 

404 am_host=True, 

405 ), 

406 ) 

407 

408 

409def send_onboarding_emails(payload: empty_pb2.Empty) -> None: 

410 """ 

411 Sends out onboarding emails 

412 """ 

413 logger.info("Sending out onboarding emails") 

414 

415 with session_scope() as session: 

416 # first onboarding email 

417 users = ( 

418 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

419 ) 

420 

421 for user in users: 

422 notify( 

423 session, 

424 user_id=user.id, 

425 topic_action="onboarding:reminder", 

426 key="1", 

427 ) 

428 user.onboarding_emails_sent = 1 

429 user.last_onboarding_email_sent = now() 

430 session.commit() 

431 

432 # second onboarding email 

433 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

434 users = ( 

435 session.execute( 

436 select(User) 

437 .where(User.is_visible) 

438 .where(User.onboarding_emails_sent == 1) 

439 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

440 .where(User.has_completed_profile == False) 

441 ) 

442 .scalars() 

443 .all() 

444 ) 

445 

446 for user in users: 

447 notify( 

448 session, 

449 user_id=user.id, 

450 topic_action="onboarding:reminder", 

451 key="2", 

452 ) 

453 user.onboarding_emails_sent = 2 

454 user.last_onboarding_email_sent = now() 

455 session.commit() 

456 

457 

458def send_reference_reminders(payload: empty_pb2.Empty) -> None: 

459 """ 

460 Sends out reminders to write references after hosting/staying 

461 """ 

462 logger.info("Sending out reference reminder emails") 

463 

464 # Keep this in chronological order! 

465 reference_reminder_schedule = [ 

466 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

467 # the end time to write a reference is supposed to be midnight in the host's timezone 

468 # 8 pm ish on the last day of the stay 

469 (1, timedelta(days=15) - timedelta(hours=20), 14), 

470 # 2 pm ish a week after stay 

471 (2, timedelta(days=8) - timedelta(hours=14), 7), 

472 # 10 am ish 3 days before end of time to write ref 

473 (3, timedelta(days=4) - timedelta(hours=10), 3), 

474 ] 

475 

476 with session_scope() as session: 

477 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

478 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

479 user = aliased(User) 

480 other_user = aliased(User) 

481 # surfers needing to write a ref 

482 q1 = ( 

483 select(literal(True), HostRequest, user, other_user) 

484 .join(user, user.id == HostRequest.surfer_user_id) 

485 .join(other_user, other_user.id == HostRequest.host_user_id) 

486 .outerjoin( 

487 Reference, 

488 and_( 

489 Reference.host_request_id == HostRequest.conversation_id, 

490 # if no reference is found in this join, then the surfer has not written a ref 

491 Reference.from_user_id == HostRequest.surfer_user_id, 

492 ), 

493 ) 

494 .where(Reference.id == None) 

495 .where(HostRequest.can_write_reference) 

496 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

497 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

498 .where(HostRequest.surfer_reason_didnt_meetup == None) 

499 .where(users_visible_to_each_other(user, other_user)) 

500 ) 

501 

502 # hosts needing to write a ref 

503 q2 = ( 

504 select(literal(False), HostRequest, user, other_user) 

505 .join(user, user.id == HostRequest.host_user_id) 

506 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

507 .outerjoin( 

508 Reference, 

509 and_( 

510 Reference.host_request_id == HostRequest.conversation_id, 

511 # if no reference is found in this join, then the host has not written a ref 

512 Reference.from_user_id == HostRequest.host_user_id, 

513 ), 

514 ) 

515 .where(Reference.id == None) 

516 .where(HostRequest.can_write_reference) 

517 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

518 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

519 .where(HostRequest.host_reason_didnt_meetup == None) 

520 .where(users_visible_to_each_other(user, other_user)) 

521 ) 

522 

523 union = union_all(q1, q2).subquery() 

524 query = select( 

525 union.c[0].label("surfed"), 

526 aliased(HostRequest, union), 

527 aliased(user, union), 

528 aliased(other_user, union), 

529 ) 

530 reference_reminders = session.execute(query).all() 

531 

532 for surfed, host_request, user, other_user in reference_reminders: 

533 # visibility and blocking already checked in sql 

534 assert user.is_visible 

535 context = make_background_user_context(user_id=user.id) 

536 notify( 

537 session, 

538 user_id=user.id, 

539 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

540 key=str(host_request.conversation_id), 

541 data=notification_data_pb2.ReferenceReminder( 

542 host_request_id=host_request.conversation_id, 

543 other_user=user_model_to_pb(other_user, session, context), 

544 days_left=reminder_days_left, 

545 ), 

546 ) 

547 if surfed: 

548 host_request.surfer_sent_reference_reminders = reminder_number 

549 else: 

550 host_request.host_sent_reference_reminders = reminder_number 

551 session.commit() 

552 

553 

554def send_host_request_reminders(payload: empty_pb2.Empty) -> None: 

555 with session_scope() as session: 

556 host_has_sent_message = select(1).where( 

557 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id 

558 ) 

559 

560 requests = ( 

561 session.execute( 

562 where_user_columns_visible_to_each_other( 

563 where_moderated_content_visible_to_user_column( 

564 select(HostRequest), 

565 HostRequest, 

566 HostRequest.host_user_id, 

567 ) 

568 .where(HostRequest.status == HostRequestStatus.pending) 

569 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

570 .where(HostRequest.start_time > func.now()) 

571 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

572 .where(~exists(host_has_sent_message)), 

573 HostRequest.host_user_id, 

574 HostRequest.surfer_user_id, 

575 ) 

576 ) 

577 .scalars() 

578 .all() 

579 ) 

580 

581 for host_request in requests: 

582 host_request.host_sent_request_reminders += 1 

583 host_request.last_sent_request_reminder_time = now() 

584 

585 context = make_background_user_context(user_id=host_request.host_user_id) 

586 notify( 

587 session, 

588 user_id=host_request.host_user_id, 

589 topic_action="host_request:reminder", 

590 key=str(host_request.conversation_id), 

591 data=notification_data_pb2.HostRequestReminder( 

592 host_request=host_request_to_pb(host_request, session, context), 

593 surfer=user_model_to_pb(host_request.surfer, session, context), 

594 ), 

595 moderation_state_id=host_request.moderation_state_id, 

596 ) 

597 

598 session.commit() 

599 

600 

601def add_users_to_email_list(payload: empty_pb2.Empty) -> None: 

602 if not config["LISTMONK_ENABLED"]: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 logger.info("Not adding users to mailing list") 

604 return 

605 

606 logger.info("Adding users to mailing list") 

607 

608 while True: 

609 with session_scope() as session: 

610 user = session.execute( 

611 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

612 ).scalar_one_or_none() 

613 if not user: 

614 logger.info("Finished adding users to mailing list") 

615 return 

616 

617 if user.opt_out_of_newsletter: 

618 user.in_sync_with_newsletter = True 

619 session.commit() 

620 continue 

621 

622 r = requests.post( 

623 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

624 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

625 json={ 

626 "email": user.email, 

627 "name": user.name, 

628 "lists": [config["LISTMONK_LIST_ID"]], 

629 "preconfirm_subscriptions": True, 

630 "attribs": {"couchers_user_id": user.id}, 

631 "status": "enabled", 

632 }, 

633 timeout=10, 

634 ) 

635 # the API returns if the user is already subscribed 

636 if r.status_code == 200 or r.status_code == 409: 636 ↛ 640line 636 didn't jump to line 640 because the condition on line 636 was always true

637 user.in_sync_with_newsletter = True 

638 session.commit() 

639 else: 

640 raise Exception("Failed to add users to mailing list") 

641 

642 

643def enforce_community_membership(payload: empty_pb2.Empty) -> None: 

644 tasks_enforce_community_memberships() 

645 

646 

647def update_recommendation_scores(payload: empty_pb2.Empty) -> None: 

648 text_fields = [ 

649 User.hometown, 

650 User.occupation, 

651 User.education, 

652 User.about_me, 

653 User.things_i_like, 

654 User.about_place, 

655 User.additional_information, 

656 User.pet_details, 

657 User.kid_details, 

658 User.housemate_details, 

659 User.other_host_info, 

660 User.sleeping_details, 

661 User.area, 

662 User.house_rules, 

663 ] 

664 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

665 

666 def poor_man_gaussian() -> ColumnElement[float] | float: 

667 """ 

668 Produces an approximatley std normal random variate 

669 """ 

670 trials = 5 

671 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

672 

673 def int_(stmt: Any) -> Function[int]: 

674 return func.coalesce(cast(stmt, Integer), 0) 

675 

676 def float_(stmt: Any) -> Function[float]: 

677 return func.coalesce(cast(stmt, Float), 0.0) 

678 

679 with session_scope() as session: 

680 # profile 

681 profile_text = "" 

682 for field in text_fields: 

683 profile_text += func.coalesce(field, "") # type: ignore[assignment] 

684 text_length = func.length(profile_text) 

685 home_text = "" 

686 for field in home_fields: 

687 home_text += func.coalesce(field, "") # type: ignore[assignment] 

688 home_length = func.length(home_text) 

689 

690 filled_profile = int_(User.has_completed_profile) 

691 has_text = int_(text_length > 500) 

692 long_text = int_(text_length > 2000) 

693 can_host = int_(User.hosting_status == HostingStatus.can_host) 

694 may_host = int_(User.hosting_status == HostingStatus.maybe) 

695 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

696 filled_home = int_(User.has_completed_my_home) 

697 filled_home_lots = int_(home_length > 200) 

698 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

699 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

700 

701 # references 

702 left_ref_expr = int_(1).label("left_reference") 

703 left_refs_subquery = ( 

704 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

705 ) 

706 left_reference = int_(left_refs_subquery.c.left_reference) 

707 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

708 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

709 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

710 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

711 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

712 "has_bad_ref" 

713 ) 

714 received_ref_subquery = ( 

715 select( 

716 Reference.to_user_id.label("user_id"), 

717 has_reference_expr, 

718 has_multiple_types_expr, 

719 has_bad_ref_expr, 

720 ref_count_expr, 

721 ref_avg_expr, 

722 ) 

723 .group_by(Reference.to_user_id) 

724 .subquery() 

725 ) 

726 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

727 has_reference = int_(received_ref_subquery.c.has_reference) 

728 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

729 rating_score = float_( 

730 received_ref_subquery.c.ref_avg 

731 * ( 

732 2 * func.least(received_ref_subquery.c.ref_count, 5) 

733 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

734 ) 

735 ) 

736 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

737 

738 # activeness 

739 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

740 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

741 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

742 messaged_lots = int_(func.count(Message.id) > 5) 

743 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

744 messaging_subquery = ( 

745 select(Message.author_id.label("user_id"), messaging_points_subquery) 

746 .where(Message.message_type == MessageType.text) 

747 .group_by(Message.author_id) 

748 .subquery() 

749 ) 

750 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

751 

752 # verification 

753 cb_subquery = ( 

754 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

755 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

756 .where(ClusterSubscription.role == ClusterRole.admin) 

757 .where(Cluster.is_official_cluster) 

758 .group_by(ClusterSubscription.user_id) 

759 .subquery() 

760 ) 

761 min_node_id = cb_subquery.c.min_node_id 

762 cb = int_(min_node_id >= 1) 

763 wcb = int_(min_node_id == 1) 

764 badge_points = { 

765 "founder": 100, 

766 "board_member": 20, 

767 "past_board_member": 5, 

768 "strong_verification": 3, 

769 "volunteer": 3, 

770 "past_volunteer": 2, 

771 "donor": 1, 

772 "phone_verified": 1, 

773 } 

774 

775 badge_subquery = ( 

776 select( 

777 UserBadge.user_id.label("user_id"), 

778 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

779 ) 

780 .group_by(UserBadge.user_id) 

781 .subquery() 

782 ) 

783 

784 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

785 

786 # response rate 

787 hr_subquery = select( 

788 UserResponseRate.user_id, 

789 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

790 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

791 ).subquery() 

792 response_time_33p = hr_subquery.c.response_time_33p 

793 response_time_66p = hr_subquery.c.response_time_66p 

794 # be careful with nulls 

795 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

796 

797 recommendation_score = ( 

798 hosting_status_points 

799 + profile_points 

800 + ref_score 

801 + activeness_points 

802 + other_points 

803 + response_rate_points 

804 + 2 * poor_man_gaussian() 

805 ) 

806 

807 scores = ( 

808 select(User.id.label("user_id"), recommendation_score.label("score")) 

809 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

810 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

811 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

812 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

813 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

814 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

815 ).subquery() 

816 

817 session.execute( 

818 t_cast(Table, User.__table__) 

819 .update() 

820 .values(recommendation_score=scores.c.score) 

821 .where(User.id == scores.c.user_id) 

822 ) 

823 

824 logger.info("Updated recommendation scores") 

825 

826 

827def update_badges(payload: empty_pb2.Empty) -> None: 

828 with session_scope() as session: 

829 

830 def update_badge(badge_id: str, members: Sequence[int]) -> None: 

831 badge = get_badge_dict()[badge_id] 

832 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all() 

833 # in case the user ids don't exist in the db 

834 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

835 # we should add the badge to these 

836 add = set(actual_members) - set(user_ids) 

837 # we should remove the badge from these 

838 remove = set(user_ids) - set(actual_members) 

839 for user_id in add: 

840 user_add_badge(session, user_id, badge.id) 

841 

842 for user_id in remove: 

843 user_remove_badge(session, user_id, badge.id) 

844 

845 update_badge("founder", get_static_badge_dict()["founder"]) 

846 update_badge("board_member", get_static_badge_dict()["board_member"]) 

847 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

848 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all()) 

849 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

850 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

851 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

852 update_badge( 

853 "strong_verification", 

854 session.execute( 

855 select(User.id) 

856 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

857 .where(StrongVerificationAttempt.has_strong_verification(User)) 

858 ) 

859 .scalars() 

860 .all(), 

861 ) 

862 # volunteer badge for active volunteers (stopped_volunteering is null) 

863 update_badge( 

864 "volunteer", 

865 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(), 

866 ) 

867 # past_volunteer badge for past volunteers (stopped_volunteering is not null) 

868 update_badge( 

869 "past_volunteer", 

870 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None))) 

871 .scalars() 

872 .all(), 

873 ) 

874 

875 

876def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None: 

877 with session_scope() as session: 

878 verification_attempt = session.execute( 

879 select(StrongVerificationAttempt) 

880 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

881 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

882 ).scalar_one() 

883 response = requests.post( 

884 "https://passportreader.app/api/v1/session.get", 

885 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

886 json={"id": verification_attempt.iris_session_id}, 

887 timeout=10, 

888 verify="/etc/ssl/certs/ca-certificates.crt", 

889 ) 

890 if response.status_code != 200: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true

891 raise Exception(f"Iris didn't return 200: {response.text}") 

892 json_data = response.json() 

893 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

894 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

895 ) 

896 assert verification_attempt.user_id == reference_payload.user_id 

897 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

898 assert verification_attempt.iris_session_id == json_data["id"] 

899 assert json_data["state"] == "APPROVED" 

900 

901 if json_data["document_type"] != "PASSPORT": 

902 verification_attempt.status = StrongVerificationAttemptStatus.failed 

903 notify( 

904 session, 

905 user_id=verification_attempt.user_id, 

906 topic_action="verification:sv_fail", 

907 key="", 

908 data=notification_data_pb2.VerificationSVFail( 

909 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

910 ), 

911 ) 

912 return 

913 

914 assert json_data["document_type"] == "PASSPORT" 

915 

916 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

917 nationality = json_data["nationality"] 

918 last_three_document_chars = json_data["document_number"][-3:] 

919 

920 existing_attempt = session.execute( 

921 select(StrongVerificationAttempt) 

922 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

923 .where(StrongVerificationAttempt.passport_nationality == nationality) 

924 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

925 .order_by(StrongVerificationAttempt.id) 

926 .limit(1) 

927 ).scalar_one_or_none() 

928 

929 verification_attempt.has_minimal_data = True 

930 verification_attempt.passport_expiry_date = expiry_date 

931 verification_attempt.passport_nationality = nationality 

932 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

933 

934 if existing_attempt: 

935 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

936 

937 if existing_attempt.user_id != verification_attempt.user_id: 

938 session.flush() 

939 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

940 

941 notify( 

942 session, 

943 user_id=verification_attempt.user_id, 

944 topic_action="verification:sv_fail", 

945 key="", 

946 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

947 ) 

948 return 

949 

950 verification_attempt.has_full_data = True 

951 verification_attempt.passport_encrypted_data = asym_encrypt( 

952 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

953 ) 

954 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

955 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

956 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

957 

958 session.flush() 

959 

960 strong_verification_completions_counter.inc() 

961 

962 user = verification_attempt.user 

963 if verification_attempt.has_strong_verification(user): 963 ↛ 973line 963 didn't jump to line 973 because the condition on line 963 was always true

964 badge_id = "strong_verification" 

965 if session.execute( 

966 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

967 ).scalar_one_or_none(): 

968 return 

969 

970 user_add_badge(session, user.id, badge_id, do_notify=False) 

971 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success", key="") 

972 else: 

973 notify( 

974 session, 

975 user_id=verification_attempt.user_id, 

976 topic_action="verification:sv_fail", 

977 key="", 

978 data=notification_data_pb2.VerificationSVFail( 

979 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

980 ), 

981 ) 

982 

983 

984def send_activeness_probes(payload: empty_pb2.Empty) -> None: 

985 with session_scope() as session: 

986 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

987 

988 if config["ACTIVENESS_PROBES_ENABLED"]: 

989 # current activeness probes 

990 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

991 

992 # users who we should send an activeness probe to 

993 new_probe_user_ids = ( 

994 session.execute( 

995 select(User.id) 

996 .where(User.is_visible) 

997 .where(User.hosting_status == HostingStatus.can_host) 

998 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

999 .where(User.id.not_in(select(subquery.c.user_id))) 

1000 ) 

1001 .scalars() 

1002 .all() 

1003 ) 

1004 

1005 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

1006 probes_today = session.execute( 

1007 select(func.count()) 

1008 .select_from(ActivenessProbe) 

1009 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

1010 ).scalar_one() 

1011 

1012 # send probes to max 2% of users per day 

1013 max_probes_per_day = 0.02 * total_users 

1014 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

1015 

1016 if len(new_probe_user_ids) > max_probe_size: 1016 ↛ 1017line 1016 didn't jump to line 1017 because the condition on line 1016 was never true

1017 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

1018 

1019 for user_id in new_probe_user_ids: 

1020 session.add(ActivenessProbe(user_id=user_id)) 

1021 

1022 session.commit() 

1023 

1024 ## Step 2: actually send out probe notifications 

1025 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

1026 probes = ( 

1027 session.execute( 

1028 select(ActivenessProbe) 

1029 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

1030 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1031 .where(ActivenessProbe.is_pending) 

1032 ) 

1033 .scalars() 

1034 .all() 

1035 ) 

1036 

1037 for probe in probes: 

1038 probe.notifications_sent = probe_number_minus_1 + 1 

1039 context = make_background_user_context(user_id=probe.user.id) 

1040 notify( 

1041 session, 

1042 user_id=probe.user.id, 

1043 topic_action="activeness:probe", 

1044 key=str(probe.id), 

1045 data=notification_data_pb2.ActivenessProbe( 

1046 reminder_number=probe_number_minus_1 + 1, 

1047 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1048 ), 

1049 ) 

1050 session.commit() 

1051 

1052 ## Step 3: for those who haven't responded, mark them as failed 

1053 expired_probes = ( 

1054 session.execute( 

1055 select(ActivenessProbe) 

1056 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1057 .where(ActivenessProbe.is_pending) 

1058 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1059 ) 

1060 .scalars() 

1061 .all() 

1062 ) 

1063 

1064 for probe in expired_probes: 

1065 probe.responded = now() 

1066 probe.response = ActivenessProbeStatus.expired 

1067 if probe.user.hosting_status == HostingStatus.can_host: 1067 ↛ 1069line 1067 didn't jump to line 1069 because the condition on line 1067 was always true

1068 probe.user.hosting_status = HostingStatus.maybe 

1069 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1069 ↛ 1071line 1069 didn't jump to line 1071 because the condition on line 1069 was always true

1070 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1071 session.commit() 

1072 

1073 

1074def update_randomized_locations(payload: empty_pb2.Empty) -> None: 

1075 """ 

1076 We generate for each user a randomized location as follows: 

1077 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1078 - For each user, mix in the user_id for randomness 

1079 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1080 - Generate an angle from [0, 360] 

1081 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1082 """ 

1083 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1084 

1085 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]: 

1086 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1087 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1088 radius = 0.02 + 0.08 * radius_u 

1089 angle_rad = 2 * pi * angle_u 

1090 offset_lng = radius * cos(angle_rad) 

1091 offset_lat = radius * sin(angle_rad) 

1092 return lat + offset_lat, lng + offset_lng 

1093 

1094 user_updates: list[dict[str, Any]] = [] 

1095 

1096 with session_scope() as session: 

1097 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1098 

1099 for user_id, geom in users_to_update: 

1100 lat, lng = get_coordinates(geom) 

1101 user_updates.append( 

1102 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1103 ) 

1104 

1105 with session_scope() as session: 

1106 session.execute(update(User), user_updates) 

1107 

1108 

1109def send_event_reminders(payload: empty_pb2.Empty) -> None: 

1110 """ 

1111 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1112 """ 

1113 logger.info("Sending event reminder emails") 

1114 

1115 with session_scope() as session: 

1116 occurrences = ( 

1117 session.execute( 

1118 select(EventOccurrence) 

1119 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1120 .where(EventOccurrence.start_time >= now()) 

1121 ) 

1122 .scalars() 

1123 .all() 

1124 ) 

1125 

1126 for occurrence in occurrences: 

1127 results = session.execute( 

1128 select(User, EventOccurrenceAttendee) 

1129 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1130 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1131 .where(EventOccurrenceAttendee.reminder_sent == False) 

1132 ).all() 

1133 

1134 for user, attendee in results: 

1135 context = make_background_user_context(user_id=user.id) 

1136 

1137 notify( 

1138 session, 

1139 user_id=user.id, 

1140 topic_action="event:reminder", 

1141 key=str(occurrence.id), 

1142 data=notification_data_pb2.EventReminder( 

1143 event=event_to_pb(session, occurrence, context), 

1144 user=user_model_to_pb(user, session, context), 

1145 ), 

1146 ) 

1147 

1148 attendee.reminder_sent = True 

1149 session.commit() 

1150 

1151 

1152def check_expo_push_receipts(payload: empty_pb2.Empty) -> None: 

1153 """ 

1154 Check Expo push receipts in batch and update delivery attempts. 

1155 """ 

1156 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max 

1157 

1158 for iteration in range(MAX_ITERATIONS): 1158 ↛ 1220line 1158 didn't jump to line 1220 because the loop on line 1158 didn't complete

1159 with session_scope() as session: 

1160 # Find all delivery attempts that need receipt checking 

1161 # Wait 15 minutes per Expo's recommendation before checking receipts 

1162 attempts = ( 

1163 session.execute( 

1164 select(PushNotificationDeliveryAttempt) 

1165 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None) 

1166 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None) 

1167 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15)) 

1168 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24)) 

1169 .limit(100) 

1170 ) 

1171 .scalars() 

1172 .all() 

1173 ) 

1174 

1175 if not attempts: 

1176 logger.debug("No Expo receipts to check") 

1177 return 

1178 

1179 logger.info(f"Checking {len(attempts)} Expo push receipts") 

1180 

1181 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts]) 

1182 

1183 for attempt in attempts: 

1184 receipt = receipts.get(not_none(attempt.expo_ticket_id)) 

1185 

1186 # Always mark as checked to avoid infinite loops 

1187 attempt.receipt_checked_at = now() 

1188 

1189 if receipt is None: 

1190 # Receipt not found after 15min - likely expired (>24h) or never existed 

1191 # Per Expo docs: receipts should be available within 15 minutes 

1192 attempt.receipt_status = "not_found" 

1193 continue 

1194 

1195 attempt.receipt_status = receipt.get("status") 

1196 

1197 if receipt.get("status") == "error": 

1198 details = receipt.get("details", {}) 

1199 error_code = details.get("error") 

1200 attempt.receipt_error_code = error_code 

1201 

1202 if error_code == "DeviceNotRegistered": 1202 ↛ 1217line 1202 didn't jump to line 1217 because the condition on line 1202 was always true

1203 # Device token is no longer valid - disable the subscription 

1204 sub = session.execute( 

1205 select(PushNotificationSubscription).where( 

1206 PushNotificationSubscription.id == attempt.push_notification_subscription_id 

1207 ) 

1208 ).scalar_one() 

1209 

1210 if sub.disabled_at > now(): 1210 ↛ 1183line 1210 didn't jump to line 1183 because the condition on line 1210 was always true

1211 sub.disabled_at = now() 

1212 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt") 

1213 push_notification_counter.labels( 

1214 platform="expo", outcome="permanent_subscription_failure_receipt" 

1215 ).inc() 

1216 else: 

1217 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}") 

1218 

1219 # If we get here, we've exhausted MAX_ITERATIONS without finishing 

1220 raise RuntimeError( 

1221 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - " 

1222 "there may be an unusually large backlog of receipts to check" 

1223 ) 

1224 

1225 

1226def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None: 

1227 """ 

1228 Sends the postcard via external API and updates attempt status. 

1229 """ 

1230 with session_scope() as session: 

1231 attempt = session.execute( 

1232 select(PostalVerificationAttempt).where( 

1233 PostalVerificationAttempt.id == payload.postal_verification_attempt_id 

1234 ) 

1235 ).scalar_one_or_none() 

1236 

1237 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1237 ↛ 1238line 1237 didn't jump to line 1238 because the condition on line 1237 was never true

1238 logger.warning( 

1239 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state" 

1240 ) 

1241 return 

1242 

1243 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one() 

1244 

1245 result = send_postcard( 

1246 recipient_name=user_name, 

1247 address_line_1=attempt.address_line_1, 

1248 address_line_2=attempt.address_line_2, 

1249 city=attempt.city, 

1250 state=attempt.state, 

1251 postal_code=attempt.postal_code, 

1252 country=attempt.country, 

1253 verification_code=not_none(attempt.verification_code), 

1254 qr_code_url=urls.postal_verification_link(code=not_none(attempt.verification_code)), 

1255 ) 

1256 

1257 if result.success: 

1258 attempt.status = PostalVerificationStatus.awaiting_verification 

1259 attempt.postcard_sent_at = func.now() 

1260 

1261 notify( 

1262 session, 

1263 user_id=attempt.user_id, 

1264 topic_action="postal_verification:postcard_sent", 

1265 key="", 

1266 data=notification_data_pb2.PostalVerificationPostcardSent( 

1267 city=attempt.city, 

1268 country=attempt.country, 

1269 ), 

1270 ) 

1271 else: 

1272 # Could retry or fail - for now, fail 

1273 attempt.status = PostalVerificationStatus.failed 

1274 logger.error(f"Postcard send failed: {result.error_message}") 

1275 

1276 

1277class DatabaseInconsistencyError(Exception): 

1278 """Raised when database consistency checks fail""" 

1279 

1280 pass 

1281 

1282 

1283def check_database_consistency(payload: empty_pb2.Empty) -> None: 

1284 """ 

1285 Checks database consistency and raises an exception if any issues are found. 

1286 """ 

1287 logger.info("Checking database consistency") 

1288 errors = [] 

1289 

1290 with session_scope() as session: 

1291 # Check that all non-deleted users have a profile gallery 

1292 users_without_gallery = session.execute( 

1293 select(User.id, User.username).where(User.is_deleted == False).where(User.profile_gallery_id.is_(None)) 

1294 ).all() 

1295 if users_without_gallery: 

1296 errors.append(f"Users without profile gallery: {users_without_gallery}") 

1297 

1298 # Check that all profile galleries point to their owner 

1299 mismatched_galleries = session.execute( 

1300 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id) 

1301 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id) 

1302 .where(User.profile_gallery_id.is_not(None)) 

1303 .where(PhotoGallery.owner_user_id != User.id) 

1304 ).all() 

1305 if mismatched_galleries: 1305 ↛ 1306line 1305 didn't jump to line 1306 because the condition on line 1305 was never true

1306 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}") 

1307 

1308 # === Moderation System Consistency Checks === 

1309 

1310 # Check all ModerationStates have a known object_type 

1311 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT] 

1312 unknown_type_states = session.execute( 

1313 select(ModerationState.id, ModerationState.object_type).where( 

1314 ModerationState.object_type.not_in(known_object_types) 

1315 ) 

1316 ).all() 

1317 if unknown_type_states: 1317 ↛ 1318line 1317 didn't jump to line 1318 because the condition on line 1317 was never true

1318 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}") 

1319 

1320 # Check every ModerationState has at least one INITIAL_REVIEW queue item 

1321 # Skip items with ID < 2000000 as they were created before this check was introduced 

1322 states_without_initial_review = session.execute( 

1323 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1324 ModerationState.id >= 2000000, 

1325 ~exists( 

1326 select(1) 

1327 .where(ModerationQueueItem.moderation_state_id == ModerationState.id) 

1328 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1329 ), 

1330 ) 

1331 ).all() 

1332 if states_without_initial_review: 1332 ↛ 1333line 1332 didn't jump to line 1333 because the condition on line 1332 was never true

1333 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}") 

1334 

1335 # Check every ModerationState has a CREATE log entry 

1336 # Skip items with ID < 2000000 as they were created before this check was introduced 

1337 states_without_create_log = session.execute( 

1338 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1339 ModerationState.id >= 2000000, 

1340 ~exists( 

1341 select(1) 

1342 .where(ModerationLog.moderation_state_id == ModerationState.id) 

1343 .where(ModerationLog.action == ModerationAction.CREATE) 

1344 ), 

1345 ) 

1346 ).all() 

1347 if states_without_create_log: 1347 ↛ 1348line 1347 didn't jump to line 1348 because the condition on line 1347 was never true

1348 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}") 

1349 

1350 # Check resolved queue items point to log entries for the same moderation state 

1351 resolved_item_log_mismatches = session.execute( 

1352 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id) 

1353 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id) 

1354 .where(ModerationQueueItem.resolved_by_log_id.is_not(None)) 

1355 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id) 

1356 ).all() 

1357 if resolved_item_log_mismatches: 1357 ↛ 1358line 1357 didn't jump to line 1358 because the condition on line 1357 was never true

1358 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}") 

1359 

1360 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it 

1361 hr_states = ( 

1362 session.execute( 

1363 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1364 ) 

1365 .scalars() 

1366 .all() 

1367 ) 

1368 for state_id in hr_states: 1368 ↛ 1369line 1368 didn't jump to line 1369 because the loop on line 1368 never started

1369 hr_count = session.execute( 

1370 select(func.count()).where(HostRequest.moderation_state_id == state_id) 

1371 ).scalar_one() 

1372 if hr_count != 1: 

1373 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)") 

1374 

1375 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it 

1376 gc_states = ( 

1377 session.execute( 

1378 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1379 ) 

1380 .scalars() 

1381 .all() 

1382 ) 

1383 for state_id in gc_states: 1383 ↛ 1384line 1383 didn't jump to line 1384 because the loop on line 1383 never started

1384 gc_count = session.execute( 

1385 select(func.count()).where(GroupChat.moderation_state_id == state_id) 

1386 ).scalar_one() 

1387 if gc_count != 1: 

1388 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)") 

1389 

1390 # Check ModerationState.object_id matches the actual object's ID 

1391 hr_object_id_mismatches = session.execute( 

1392 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id) 

1393 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id) 

1394 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1395 .where(ModerationState.object_id != HostRequest.conversation_id) 

1396 ).all() 

1397 if hr_object_id_mismatches: 1397 ↛ 1398line 1397 didn't jump to line 1398 because the condition on line 1397 was never true

1398 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}") 

1399 

1400 gc_object_id_mismatches = session.execute( 

1401 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id) 

1402 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id) 

1403 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1404 .where(ModerationState.object_id != GroupChat.conversation_id) 

1405 ).all() 

1406 if gc_object_id_mismatches: 1406 ↛ 1407line 1406 didn't jump to line 1407 because the condition on line 1406 was never true

1407 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}") 

1408 

1409 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState 

1410 hr_reverse_mismatches = session.execute( 

1411 select( 

1412 HostRequest.conversation_id, 

1413 HostRequest.moderation_state_id, 

1414 ModerationState.object_type, 

1415 ModerationState.object_id, 

1416 ) 

1417 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id) 

1418 .where( 

1419 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST) 

1420 | (ModerationState.object_id != HostRequest.conversation_id) 

1421 ) 

1422 ).all() 

1423 if hr_reverse_mismatches: 1423 ↛ 1424line 1423 didn't jump to line 1424 because the condition on line 1423 was never true

1424 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}") 

1425 

1426 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState 

1427 gc_reverse_mismatches = session.execute( 

1428 select( 

1429 GroupChat.conversation_id, 

1430 GroupChat.moderation_state_id, 

1431 ModerationState.object_type, 

1432 ModerationState.object_id, 

1433 ) 

1434 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id) 

1435 .where( 

1436 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT) 

1437 | (ModerationState.object_id != GroupChat.conversation_id) 

1438 ) 

1439 ).all() 

1440 if gc_reverse_mismatches: 1440 ↛ 1441line 1440 didn't jump to line 1441 because the condition on line 1440 was never true

1441 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}") 

1442 

1443 # Ensure auto-approve deadline isn't being exceeded by a significant margin 

1444 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting 

1445 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1446 if deadline_seconds > 0: 1446 ↛ 1447line 1446 didn't jump to line 1447 because the condition on line 1446 was never true

1447 grace_period = timedelta(minutes=5) 

1448 stale_initial_review_items = session.execute( 

1449 select( 

1450 ModerationQueueItem.id, 

1451 ModerationQueueItem.moderation_state_id, 

1452 ModerationQueueItem.time_created, 

1453 ) 

1454 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1455 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

1456 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period) 

1457 ).all() 

1458 if stale_initial_review_items: 

1459 errors.append( 

1460 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}" 

1461 ) 

1462 

1463 if errors: 

1464 raise DatabaseInconsistencyError("\n".join(errors)) 

1465 

1466 

1467def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None: 

1468 """ 

1469 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline. 

1470 Items explicitly actioned by moderators are left alone. 

1471 """ 

1472 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1473 if deadline_seconds <= 0: 

1474 return 

1475 

1476 with session_scope() as session: 

1477 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"]) 

1478 

1479 items = ( 

1480 Moderation() 

1481 .GetModerationQueue( 

1482 request=moderation_pb2.GetModerationQueueReq( 

1483 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW], 

1484 unresolved_only=True, 

1485 page_size=100, 

1486 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)), 

1487 ), 

1488 context=ctx, 

1489 session=session, 

1490 ) 

1491 .queue_items 

1492 ) 

1493 

1494 if not items: 

1495 return 

1496 

1497 logger.info(f"Auto-approving {len(items)} moderation queue items") 

1498 for item in items: 

1499 Moderation().ModerateContent( 

1500 request=moderation_pb2.ModerateContentReq( 

1501 moderation_state_id=item.moderation_state_id, 

1502 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1503 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1504 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.", 

1505 ), 

1506 context=ctx, 

1507 session=session, 

1508 ) 

1509 moderation_auto_approved_counter.inc(len(items))