Coverage for src / couchers / jobs / handlers.py: 88%

461 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-08 13:48 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from collections.abc import Sequence 

7from datetime import date, timedelta 

8from math import cos, pi, sin, sqrt 

9from random import sample 

10from typing import Any 

11 

12import requests 

13from google.protobuf import empty_pb2 

14from sqlalchemy import ColumnElement, Float, Function, Integer, select 

15from sqlalchemy.orm import aliased 

16from sqlalchemy.sql import ( 

17 and_, 

18 case, 

19 cast, 

20 delete, 

21 distinct, 

22 exists, 

23 extract, 

24 func, 

25 literal, 

26 not_, 

27 or_, 

28 union_all, 

29 update, 

30) 

31 

32from couchers import urls 

33from couchers.config import config 

34from couchers.constants import ( 

35 ACTIVENESS_PROBE_EXPIRY_TIME, 

36 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

37 ACTIVENESS_PROBE_TIME_REMINDERS, 

38 EVENT_REMINDER_TIMEDELTA, 

39 HOST_REQUEST_MAX_REMINDERS, 

40 HOST_REQUEST_REMINDER_INTERVAL, 

41) 

42from couchers.context import make_background_user_context 

43from couchers.crypto import ( 

44 USER_LOCATION_RANDOMIZATION_NAME, 

45 asym_encrypt, 

46 b64decode, 

47 get_secret, 

48 simple_decrypt, 

49 stable_secure_uniform, 

50) 

51from couchers.db import session_scope 

52from couchers.email.dev import print_dev_email 

53from couchers.email.smtp import send_smtp_email 

54from couchers.helpers.badges import user_add_badge, user_remove_badge 

55from couchers.materialized_views import ( 

56 UserResponseRate, 

57) 

58from couchers.metrics import ( 

59 moderation_auto_approved_counter, 

60 push_notification_counter, 

61 strong_verification_completions_counter, 

62) 

63from couchers.models import ( 

64 AccountDeletionToken, 

65 ActivenessProbe, 

66 ActivenessProbeStatus, 

67 Cluster, 

68 ClusterRole, 

69 ClusterSubscription, 

70 EventOccurrence, 

71 EventOccurrenceAttendee, 

72 GroupChat, 

73 GroupChatSubscription, 

74 HostingStatus, 

75 HostRequest, 

76 HostRequestStatus, 

77 LoginToken, 

78 MeetupStatus, 

79 Message, 

80 MessageType, 

81 ModerationAction, 

82 ModerationLog, 

83 ModerationObjectType, 

84 ModerationQueueItem, 

85 ModerationState, 

86 ModerationTrigger, 

87 PassportSex, 

88 PasswordResetToken, 

89 PhotoGallery, 

90 PostalVerificationAttempt, 

91 PostalVerificationStatus, 

92 PushNotificationDeliveryAttempt, 

93 PushNotificationSubscription, 

94 Reference, 

95 StrongVerificationAttempt, 

96 StrongVerificationAttemptStatus, 

97 User, 

98 UserBadge, 

99 Volunteer, 

100) 

101from couchers.models.notifications import NotificationTopicAction 

102from couchers.notifications.expo_api import get_expo_push_receipts 

103from couchers.notifications.notify import notify 

104from couchers.postal.postcard_service import send_postcard 

105from couchers.proto import moderation_pb2, notification_data_pb2 

106from couchers.proto.internal import jobs_pb2, verification_pb2 

107from couchers.resources import get_badge_dict, get_static_badge_dict 

108from couchers.servicers.api import user_model_to_pb 

109from couchers.servicers.events import ( 

110 event_to_pb, 

111) 

112from couchers.servicers.moderation import Moderation 

113from couchers.servicers.requests import host_request_to_pb 

114from couchers.sql import ( 

115 users_visible_to_each_other, 

116 where_moderated_content_visible, 

117 where_moderated_content_visible_to_user_column, 

118 where_user_columns_visible_to_each_other, 

119 where_users_column_visible, 

120) 

121from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

122from couchers.tasks import send_duplicate_strong_verification_email 

123from couchers.utils import ( 

124 Timestamp_from_datetime, 

125 create_coordinate, 

126 get_coordinates, 

127 not_none, 

128 now, 

129) 

130 

131logger = logging.getLogger(__name__) 

132 

133 

134def send_email(payload: jobs_pb2.SendEmailPayload) -> None: 

135 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

136 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

137 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

138 # the sender must return a models.Email object that can be added to the database 

139 email = sender( 

140 sender_name=payload.sender_name, 

141 sender_email=payload.sender_email, 

142 recipient=payload.recipient, 

143 subject=payload.subject, 

144 plain=payload.plain, 

145 html=payload.html, 

146 list_unsubscribe_header=payload.list_unsubscribe_header, 

147 source_data=payload.source_data, 

148 ) 

149 with session_scope() as session: 

150 session.add(email) 

151 

152 

153def purge_login_tokens(payload: empty_pb2.Empty) -> None: 

154 logger.info("Purging login tokens") 

155 with session_scope() as session: 

156 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

157 

158 

159def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None: 

160 logger.info("Purging login tokens") 

161 with session_scope() as session: 

162 session.execute( 

163 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

164 ) 

165 

166 

167def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None: 

168 logger.info("Purging account deletion tokens") 

169 with session_scope() as session: 

170 session.execute( 

171 delete(AccountDeletionToken) 

172 .where(~AccountDeletionToken.is_valid) 

173 .execution_options(synchronize_session=False) 

174 ) 

175 

176 

177def send_message_notifications(payload: empty_pb2.Empty) -> None: 

178 """ 

179 Sends out email notifications for messages that have been unseen for a long enough time 

180 """ 

181 # very crude and dumb algorithm 

182 logger.info("Sending out email notifications for unseen messages") 

183 

184 with session_scope() as session: 

185 # users who have unnotified messages older than 5 minutes in any group chat 

186 users = ( 

187 session.execute( 

188 where_moderated_content_visible_to_user_column( 

189 select(User) 

190 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

191 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

192 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

193 GroupChat, 

194 User.id, 

195 ) 

196 .where(not_(GroupChatSubscription.is_muted)) 

197 .where(User.is_visible) 

198 .where(Message.time >= GroupChatSubscription.joined) 

199 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

200 .where(Message.id > User.last_notified_message_id) 

201 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

202 .where(Message.time < now() - timedelta(minutes=5)) 

203 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

204 ) 

205 .scalars() 

206 .unique() 

207 ) 

208 

209 for user in users: 

210 context = make_background_user_context(user_id=user.id) 

211 # now actually grab all the group chats, not just less than 5 min old 

212 subquery = ( 

213 where_users_column_visible( 

214 where_moderated_content_visible( 

215 select( 

216 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

217 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

218 func.max(Message.id).label("message_id"), 

219 func.count(Message.id).label("count_unseen"), 

220 ) 

221 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

222 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

223 context, 

224 GroupChat, 

225 is_list_operation=True, 

226 ) 

227 .where(GroupChatSubscription.user_id == user.id) 

228 .where(not_(GroupChatSubscription.is_muted)) 

229 .where(Message.id > user.last_notified_message_id) 

230 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

231 .where(Message.time >= GroupChatSubscription.joined) 

232 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

233 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)), 

234 context, 

235 Message.author_id, 

236 ) 

237 .group_by(GroupChatSubscription.group_chat_id) 

238 .order_by(func.max(Message.id).desc()) 

239 .subquery() 

240 ) 

241 

242 unseen_messages = session.execute( 

243 where_moderated_content_visible( 

244 select(GroupChat, Message, subquery.c.count_unseen) 

245 .join(subquery, subquery.c.message_id == Message.id) 

246 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id), 

247 context, 

248 GroupChat, 

249 is_list_operation=True, 

250 ).order_by(subquery.c.message_id.desc()) 

251 ).all() 

252 

253 if not unseen_messages: 

254 continue 

255 

256 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

257 

258 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str: 

259 if group_chat.is_dm: 

260 return f"You missed {count_unseen} message(s) from {message.author.name}" 

261 else: 

262 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

263 

264 notify( 

265 session, 

266 user_id=user.id, 

267 topic_action=NotificationTopicAction.chat__missed_messages, 

268 key="", 

269 data=notification_data_pb2.ChatMissedMessages( 

270 messages=[ 

271 notification_data_pb2.ChatMessage( 

272 author=user_model_to_pb( 

273 message.author, 

274 session, 

275 context, 

276 ), 

277 message=format_title(message, group_chat, count_unseen), 

278 text=message.text, 

279 group_chat_id=message.conversation_id, 

280 ) 

281 for group_chat, message, count_unseen in unseen_messages 

282 ], 

283 ), 

284 ) 

285 session.commit() 

286 

287 

288def send_request_notifications(payload: empty_pb2.Empty) -> None: 

289 """ 

290 Sends out email notifications for unseen messages in host requests (as surfer or host) 

291 """ 

292 logger.info("Sending out email notifications for unseen messages in host requests") 

293 

294 with session_scope() as session: 

295 # Get all candidate users who might have unseen request messages 

296 candidate_user_ids = ( 

297 session.execute( 

298 select(User.id) 

299 .where(User.is_visible) 

300 .where( 

301 or_( 

302 # Users with unseen messages as surfer 

303 exists( 

304 select(1) 

305 .select_from(HostRequest) 

306 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

307 .where(HostRequest.surfer_user_id == User.id) 

308 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

309 .where(Message.id > User.last_notified_request_message_id) 

310 .where(Message.time < now() - timedelta(minutes=5)) 

311 .where(Message.message_type == MessageType.text) 

312 ), 

313 # Users with unseen messages as host 

314 exists( 

315 select(1) 

316 .select_from(HostRequest) 

317 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

318 .where(HostRequest.host_user_id == User.id) 

319 .where(Message.id > HostRequest.host_last_seen_message_id) 

320 .where(Message.id > User.last_notified_request_message_id) 

321 .where(Message.time < now() - timedelta(minutes=5)) 

322 .where(Message.message_type == MessageType.text) 

323 ), 

324 ) 

325 ) 

326 ) 

327 .scalars() 

328 .all() 

329 ) 

330 

331 for user_id in candidate_user_ids: 

332 context = make_background_user_context(user_id=user_id) 

333 

334 # requests where this user is surfing 

335 surfing_reqs = session.execute( 

336 where_users_column_visible( 

337 where_moderated_content_visible_to_user_column( 

338 select(User, HostRequest, func.max(Message.id)) 

339 .where(User.id == user_id) 

340 .join(HostRequest, HostRequest.surfer_user_id == User.id), 

341 HostRequest, 

342 HostRequest.surfer_user_id, 

343 ), 

344 context, 

345 HostRequest.host_user_id, 

346 ) 

347 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

348 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

349 .where(Message.id > User.last_notified_request_message_id) 

350 .where(Message.time < now() - timedelta(minutes=5)) 

351 .where(Message.message_type == MessageType.text) 

352 .group_by(User, HostRequest) # type: ignore[arg-type] 

353 ).all() 

354 

355 # where this user is hosting 

356 hosting_reqs = session.execute( 

357 where_users_column_visible( 

358 where_moderated_content_visible_to_user_column( 

359 select(User, HostRequest, func.max(Message.id)) 

360 .where(User.id == user_id) 

361 .join(HostRequest, HostRequest.host_user_id == User.id), 

362 HostRequest, 

363 HostRequest.host_user_id, 

364 ), 

365 context, 

366 HostRequest.surfer_user_id, 

367 ) 

368 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

369 .where(Message.id > HostRequest.host_last_seen_message_id) 

370 .where(Message.id > User.last_notified_request_message_id) 

371 .where(Message.time < now() - timedelta(minutes=5)) 

372 .where(Message.message_type == MessageType.text) 

373 .group_by(User, HostRequest) # type: ignore[arg-type] 

374 ).all() 

375 

376 for user, host_request, max_message_id in surfing_reqs: 

377 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

378 session.flush() 

379 

380 notify( 

381 session, 

382 user_id=user.id, 

383 topic_action=NotificationTopicAction.host_request__missed_messages, 

384 key=str(host_request.conversation_id), 

385 data=notification_data_pb2.HostRequestMissedMessages( 

386 host_request=host_request_to_pb(host_request, session, context), 

387 user=user_model_to_pb(host_request.host, session, context), 

388 am_host=False, 

389 ), 

390 ) 

391 

392 for user, host_request, max_message_id in hosting_reqs: 

393 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

394 session.flush() 

395 

396 notify( 

397 session, 

398 user_id=user.id, 

399 topic_action=NotificationTopicAction.host_request__missed_messages, 

400 key=str(host_request.conversation_id), 

401 data=notification_data_pb2.HostRequestMissedMessages( 

402 host_request=host_request_to_pb(host_request, session, context), 

403 user=user_model_to_pb(host_request.surfer, session, context), 

404 am_host=True, 

405 ), 

406 ) 

407 

408 

409def send_onboarding_emails(payload: empty_pb2.Empty) -> None: 

410 """ 

411 Sends out onboarding emails 

412 """ 

413 logger.info("Sending out onboarding emails") 

414 

415 with session_scope() as session: 

416 # first onboarding email 

417 users = ( 

418 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

419 ) 

420 

421 for user in users: 

422 notify( 

423 session, 

424 user_id=user.id, 

425 topic_action=NotificationTopicAction.onboarding__reminder, 

426 key="1", 

427 ) 

428 user.onboarding_emails_sent = 1 

429 user.last_onboarding_email_sent = now() 

430 session.commit() 

431 

432 # second onboarding email 

433 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

434 users = ( 

435 session.execute( 

436 select(User) 

437 .where(User.is_visible) 

438 .where(User.onboarding_emails_sent == 1) 

439 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

440 .where(User.has_completed_profile == False) 

441 ) 

442 .scalars() 

443 .all() 

444 ) 

445 

446 for user in users: 

447 notify( 

448 session, 

449 user_id=user.id, 

450 topic_action=NotificationTopicAction.onboarding__reminder, 

451 key="2", 

452 ) 

453 user.onboarding_emails_sent = 2 

454 user.last_onboarding_email_sent = now() 

455 session.commit() 

456 

457 

458def send_reference_reminders(payload: empty_pb2.Empty) -> None: 

459 """ 

460 Sends out reminders to write references after hosting/staying 

461 """ 

462 logger.info("Sending out reference reminder emails") 

463 

464 # Keep this in chronological order! 

465 reference_reminder_schedule = [ 

466 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

467 # the end time to write a reference is supposed to be midnight in the host's timezone 

468 # 8 pm ish on the last day of the stay 

469 (1, timedelta(days=15) - timedelta(hours=20), 14), 

470 # 2 pm ish a week after stay 

471 (2, timedelta(days=8) - timedelta(hours=14), 7), 

472 # 10 am ish 3 days before end of time to write ref 

473 (3, timedelta(days=4) - timedelta(hours=10), 3), 

474 ] 

475 

476 with session_scope() as session: 

477 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

478 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

479 user = aliased(User) 

480 other_user = aliased(User) 

481 # surfers needing to write a ref 

482 q1 = ( 

483 select(literal(True), HostRequest, user, other_user) 

484 .join(user, user.id == HostRequest.surfer_user_id) 

485 .join(other_user, other_user.id == HostRequest.host_user_id) 

486 .outerjoin( 

487 Reference, 

488 and_( 

489 Reference.host_request_id == HostRequest.conversation_id, 

490 # if no reference is found in this join, then the surfer has not written a ref 

491 Reference.from_user_id == HostRequest.surfer_user_id, 

492 ), 

493 ) 

494 .where(Reference.id == None) 

495 .where(HostRequest.can_write_reference) 

496 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

497 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

498 .where(HostRequest.surfer_reason_didnt_meetup == None) 

499 .where(users_visible_to_each_other(user, other_user)) 

500 ) 

501 

502 # hosts needing to write a ref 

503 q2 = ( 

504 select(literal(False), HostRequest, user, other_user) 

505 .join(user, user.id == HostRequest.host_user_id) 

506 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

507 .outerjoin( 

508 Reference, 

509 and_( 

510 Reference.host_request_id == HostRequest.conversation_id, 

511 # if no reference is found in this join, then the host has not written a ref 

512 Reference.from_user_id == HostRequest.host_user_id, 

513 ), 

514 ) 

515 .where(Reference.id == None) 

516 .where(HostRequest.can_write_reference) 

517 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

518 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

519 .where(HostRequest.host_reason_didnt_meetup == None) 

520 .where(users_visible_to_each_other(user, other_user)) 

521 ) 

522 

523 union = union_all(q1, q2).subquery() 

524 query = select( 

525 union.c[0].label("surfed"), 

526 aliased(HostRequest, union), 

527 aliased(user, union), 

528 aliased(other_user, union), 

529 ) 

530 reference_reminders = session.execute(query).all() 

531 

532 for surfed, host_request, user, other_user in reference_reminders: 

533 # visibility and blocking already checked in sql 

534 assert user.is_visible 

535 context = make_background_user_context(user_id=user.id) 

536 topic_action = ( 

537 NotificationTopicAction.reference__reminder_surfed 

538 if surfed 

539 else NotificationTopicAction.reference__reminder_hosted 

540 ) 

541 notify( 

542 session, 

543 user_id=user.id, 

544 topic_action=topic_action, 

545 key=str(host_request.conversation_id), 

546 data=notification_data_pb2.ReferenceReminder( 

547 host_request_id=host_request.conversation_id, 

548 other_user=user_model_to_pb(other_user, session, context), 

549 days_left=reminder_days_left, 

550 ), 

551 ) 

552 if surfed: 

553 host_request.surfer_sent_reference_reminders = reminder_number 

554 else: 

555 host_request.host_sent_reference_reminders = reminder_number 

556 session.commit() 

557 

558 

559def send_host_request_reminders(payload: empty_pb2.Empty) -> None: 

560 with session_scope() as session: 

561 host_has_sent_message = select(1).where( 

562 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id 

563 ) 

564 

565 requests = ( 

566 session.execute( 

567 where_user_columns_visible_to_each_other( 

568 where_moderated_content_visible_to_user_column( 

569 select(HostRequest), 

570 HostRequest, 

571 HostRequest.host_user_id, 

572 ) 

573 .where(HostRequest.status == HostRequestStatus.pending) 

574 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

575 .where(HostRequest.start_time > func.now()) 

576 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

577 .where(~exists(host_has_sent_message)), 

578 HostRequest.host_user_id, 

579 HostRequest.surfer_user_id, 

580 ) 

581 ) 

582 .scalars() 

583 .all() 

584 ) 

585 

586 for host_request in requests: 

587 host_request.host_sent_request_reminders += 1 

588 host_request.last_sent_request_reminder_time = now() 

589 

590 context = make_background_user_context(user_id=host_request.host_user_id) 

591 notify( 

592 session, 

593 user_id=host_request.host_user_id, 

594 topic_action=NotificationTopicAction.host_request__reminder, 

595 key=str(host_request.conversation_id), 

596 data=notification_data_pb2.HostRequestReminder( 

597 host_request=host_request_to_pb(host_request, session, context), 

598 surfer=user_model_to_pb(host_request.surfer, session, context), 

599 ), 

600 moderation_state_id=host_request.moderation_state_id, 

601 ) 

602 

603 session.commit() 

604 

605 

606def add_users_to_email_list(payload: empty_pb2.Empty) -> None: 

607 if not config["LISTMONK_ENABLED"]: 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true

608 logger.info("Not adding users to mailing list") 

609 return 

610 

611 logger.info("Adding users to mailing list") 

612 

613 while True: 

614 with session_scope() as session: 

615 user = session.execute( 

616 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

617 ).scalar_one_or_none() 

618 if not user: 

619 logger.info("Finished adding users to mailing list") 

620 return 

621 

622 if user.opt_out_of_newsletter: 

623 user.in_sync_with_newsletter = True 

624 session.commit() 

625 continue 

626 

627 r = requests.post( 

628 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

629 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

630 json={ 

631 "email": user.email, 

632 "name": user.name, 

633 "lists": [config["LISTMONK_LIST_ID"]], 

634 "preconfirm_subscriptions": True, 

635 "attribs": {"couchers_user_id": user.id}, 

636 "status": "enabled", 

637 }, 

638 timeout=10, 

639 ) 

640 # the API returns if the user is already subscribed 

641 if r.status_code == 200 or r.status_code == 409: 641 ↛ 645line 641 didn't jump to line 645 because the condition on line 641 was always true

642 user.in_sync_with_newsletter = True 

643 session.commit() 

644 else: 

645 raise Exception("Failed to add users to mailing list") 

646 

647 

648def enforce_community_membership(payload: empty_pb2.Empty) -> None: 

649 tasks_enforce_community_memberships() 

650 

651 

652def update_recommendation_scores(payload: empty_pb2.Empty) -> None: 

653 text_fields = [ 

654 User.hometown, 

655 User.occupation, 

656 User.education, 

657 User.about_me, 

658 User.things_i_like, 

659 User.about_place, 

660 User.additional_information, 

661 User.pet_details, 

662 User.kid_details, 

663 User.housemate_details, 

664 User.other_host_info, 

665 User.sleeping_details, 

666 User.area, 

667 User.house_rules, 

668 ] 

669 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

670 

671 def poor_man_gaussian() -> ColumnElement[float] | float: 

672 """ 

673 Produces an approximatley std normal random variate 

674 """ 

675 trials = 5 

676 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

677 

678 def int_(stmt: Any) -> Function[int]: 

679 return func.coalesce(cast(stmt, Integer), 0) 

680 

681 def float_(stmt: Any) -> Function[float]: 

682 return func.coalesce(cast(stmt, Float), 0.0) 

683 

684 with session_scope() as session: 

685 # profile 

686 profile_text = "" 

687 for field in text_fields: 

688 profile_text += func.coalesce(field, "") # type: ignore[assignment] 

689 text_length = func.length(profile_text) 

690 home_text = "" 

691 for field in home_fields: 

692 home_text += func.coalesce(field, "") # type: ignore[assignment] 

693 home_length = func.length(home_text) 

694 

695 filled_profile = int_(User.has_completed_profile) 

696 has_text = int_(text_length > 500) 

697 long_text = int_(text_length > 2000) 

698 can_host = int_(User.hosting_status == HostingStatus.can_host) 

699 may_host = int_(User.hosting_status == HostingStatus.maybe) 

700 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

701 filled_home = int_(User.has_completed_my_home) 

702 filled_home_lots = int_(home_length > 200) 

703 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

704 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

705 

706 # references 

707 left_ref_expr = int_(1).label("left_reference") 

708 left_refs_subquery = ( 

709 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

710 ) 

711 left_reference = int_(left_refs_subquery.c.left_reference) 

712 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

713 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

714 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

715 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

716 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

717 "has_bad_ref" 

718 ) 

719 received_ref_subquery = ( 

720 select( 

721 Reference.to_user_id.label("user_id"), 

722 has_reference_expr, 

723 has_multiple_types_expr, 

724 has_bad_ref_expr, 

725 ref_count_expr, 

726 ref_avg_expr, 

727 ) 

728 .group_by(Reference.to_user_id) 

729 .subquery() 

730 ) 

731 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

732 has_reference = int_(received_ref_subquery.c.has_reference) 

733 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

734 rating_score = float_( 

735 received_ref_subquery.c.ref_avg 

736 * ( 

737 2 * func.least(received_ref_subquery.c.ref_count, 5) 

738 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

739 ) 

740 ) 

741 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

742 

743 # activeness 

744 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

745 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

746 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

747 messaged_lots = int_(func.count(Message.id) > 5) 

748 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

749 messaging_subquery = ( 

750 select(Message.author_id.label("user_id"), messaging_points_subquery) 

751 .where(Message.message_type == MessageType.text) 

752 .group_by(Message.author_id) 

753 .subquery() 

754 ) 

755 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

756 

757 # verification 

758 cb_subquery = ( 

759 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

760 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

761 .where(ClusterSubscription.role == ClusterRole.admin) 

762 .where(Cluster.is_official_cluster) 

763 .group_by(ClusterSubscription.user_id) 

764 .subquery() 

765 ) 

766 min_node_id = cb_subquery.c.min_node_id 

767 cb = int_(min_node_id >= 1) 

768 wcb = int_(min_node_id == 1) 

769 badge_points = { 

770 "founder": 100, 

771 "board_member": 20, 

772 "past_board_member": 5, 

773 "strong_verification": 3, 

774 "volunteer": 3, 

775 "past_volunteer": 2, 

776 "donor": 1, 

777 "phone_verified": 1, 

778 } 

779 

780 badge_subquery = ( 

781 select( 

782 UserBadge.user_id.label("user_id"), 

783 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

784 ) 

785 .group_by(UserBadge.user_id) 

786 .subquery() 

787 ) 

788 

789 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

790 

791 # response rate 

792 hr_subquery = select( 

793 UserResponseRate.user_id, 

794 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

795 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

796 ).subquery() 

797 response_time_33p = hr_subquery.c.response_time_33p 

798 response_time_66p = hr_subquery.c.response_time_66p 

799 # be careful with nulls 

800 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

801 

802 recommendation_score = ( 

803 hosting_status_points 

804 + profile_points 

805 + ref_score 

806 + activeness_points 

807 + other_points 

808 + response_rate_points 

809 + 2 * poor_man_gaussian() 

810 ) 

811 

812 scores = ( 

813 select(User.id.label("user_id"), recommendation_score.label("score")) 

814 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

815 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

816 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

817 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

818 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

819 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

820 ).subquery() 

821 

822 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)) 

823 

824 logger.info("Updated recommendation scores") 

825 

826 

827def update_badges(payload: empty_pb2.Empty) -> None: 

828 with session_scope() as session: 

829 

830 def update_badge(badge_id: str, members: Sequence[int]) -> None: 

831 badge = get_badge_dict()[badge_id] 

832 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all() 

833 # in case the user ids don't exist in the db 

834 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

835 # we should add the badge to these 

836 add = set(actual_members) - set(user_ids) 

837 # we should remove the badge from these 

838 remove = set(user_ids) - set(actual_members) 

839 for user_id in add: 

840 user_add_badge(session, user_id, badge.id) 

841 

842 for user_id in remove: 

843 user_remove_badge(session, user_id, badge.id) 

844 

845 update_badge("founder", get_static_badge_dict()["founder"]) 

846 update_badge("board_member", get_static_badge_dict()["board_member"]) 

847 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

848 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all()) 

849 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

850 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

851 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

852 update_badge( 

853 "strong_verification", 

854 session.execute( 

855 select(User.id) 

856 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

857 .where(StrongVerificationAttempt.has_strong_verification(User)) 

858 ) 

859 .scalars() 

860 .all(), 

861 ) 

862 # volunteer badge for active volunteers (stopped_volunteering is null) 

863 update_badge( 

864 "volunteer", 

865 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(), 

866 ) 

867 # past_volunteer badge for past volunteers (stopped_volunteering is not null) 

868 update_badge( 

869 "past_volunteer", 

870 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None))) 

871 .scalars() 

872 .all(), 

873 ) 

874 

875 

876def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None: 

877 with session_scope() as session: 

878 verification_attempt = session.execute( 

879 select(StrongVerificationAttempt) 

880 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

881 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

882 ).scalar_one() 

883 response = requests.post( 

884 "https://passportreader.app/api/v1/session.get", 

885 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

886 json={"id": verification_attempt.iris_session_id}, 

887 timeout=10, 

888 verify="/etc/ssl/certs/ca-certificates.crt", 

889 ) 

890 if response.status_code != 200: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true

891 raise Exception(f"Iris didn't return 200: {response.text}") 

892 json_data = response.json() 

893 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

894 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

895 ) 

896 assert verification_attempt.user_id == reference_payload.user_id 

897 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

898 assert verification_attempt.iris_session_id == json_data["id"] 

899 assert json_data["state"] == "APPROVED" 

900 

901 if json_data["document_type"] != "PASSPORT": 

902 verification_attempt.status = StrongVerificationAttemptStatus.failed 

903 notify( 

904 session, 

905 user_id=verification_attempt.user_id, 

906 topic_action=NotificationTopicAction.verification__sv_fail, 

907 key="", 

908 data=notification_data_pb2.VerificationSVFail( 

909 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

910 ), 

911 ) 

912 return 

913 

914 assert json_data["document_type"] == "PASSPORT" 

915 

916 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

917 nationality = json_data["nationality"] 

918 last_three_document_chars = json_data["document_number"][-3:] 

919 

920 existing_attempt = session.execute( 

921 select(StrongVerificationAttempt) 

922 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

923 .where(StrongVerificationAttempt.passport_nationality == nationality) 

924 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

925 .order_by(StrongVerificationAttempt.id) 

926 .limit(1) 

927 ).scalar_one_or_none() 

928 

929 verification_attempt.has_minimal_data = True 

930 verification_attempt.passport_expiry_date = expiry_date 

931 verification_attempt.passport_nationality = nationality 

932 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

933 

934 if existing_attempt: 

935 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

936 

937 if existing_attempt.user_id != verification_attempt.user_id: 

938 session.flush() 

939 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

940 

941 notify( 

942 session, 

943 user_id=verification_attempt.user_id, 

944 topic_action=NotificationTopicAction.verification__sv_fail, 

945 key="", 

946 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

947 ) 

948 return 

949 

950 verification_attempt.has_full_data = True 

951 verification_attempt.passport_encrypted_data = asym_encrypt( 

952 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

953 ) 

954 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

955 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

956 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

957 

958 session.flush() 

959 

960 strong_verification_completions_counter.inc() 

961 

962 user = verification_attempt.user 

963 if verification_attempt.has_strong_verification(user): 963 ↛ 978line 963 didn't jump to line 978 because the condition on line 963 was always true

964 badge_id = "strong_verification" 

965 if session.execute( 

966 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

967 ).scalar_one_or_none(): 

968 return 

969 

970 user_add_badge(session, user.id, badge_id, do_notify=False) 

971 notify( 

972 session, 

973 user_id=verification_attempt.user_id, 

974 topic_action=NotificationTopicAction.verification__sv_success, 

975 key="", 

976 ) 

977 else: 

978 notify( 

979 session, 

980 user_id=verification_attempt.user_id, 

981 topic_action=NotificationTopicAction.verification__sv_fail, 

982 key="", 

983 data=notification_data_pb2.VerificationSVFail( 

984 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

985 ), 

986 ) 

987 

988 

989def send_activeness_probes(payload: empty_pb2.Empty) -> None: 

990 with session_scope() as session: 

991 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

992 

993 if config["ACTIVENESS_PROBES_ENABLED"]: 

994 # current activeness probes 

995 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

996 

997 # users who we should send an activeness probe to 

998 new_probe_user_ids = ( 

999 session.execute( 

1000 select(User.id) 

1001 .where(User.is_visible) 

1002 .where(User.hosting_status == HostingStatus.can_host) 

1003 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

1004 .where(User.id.not_in(select(subquery.c.user_id))) 

1005 ) 

1006 .scalars() 

1007 .all() 

1008 ) 

1009 

1010 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

1011 probes_today = session.execute( 

1012 select(func.count()) 

1013 .select_from(ActivenessProbe) 

1014 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

1015 ).scalar_one() 

1016 

1017 # send probes to max 2% of users per day 

1018 max_probes_per_day = 0.02 * total_users 

1019 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

1020 

1021 if len(new_probe_user_ids) > max_probe_size: 1021 ↛ 1022line 1021 didn't jump to line 1022 because the condition on line 1021 was never true

1022 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

1023 

1024 for user_id in new_probe_user_ids: 

1025 session.add(ActivenessProbe(user_id=user_id)) 

1026 

1027 session.commit() 

1028 

1029 ## Step 2: actually send out probe notifications 

1030 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

1031 probes = ( 

1032 session.execute( 

1033 select(ActivenessProbe) 

1034 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

1035 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1036 .where(ActivenessProbe.is_pending) 

1037 ) 

1038 .scalars() 

1039 .all() 

1040 ) 

1041 

1042 for probe in probes: 

1043 probe.notifications_sent = probe_number_minus_1 + 1 

1044 context = make_background_user_context(user_id=probe.user.id) 

1045 notify( 

1046 session, 

1047 user_id=probe.user.id, 

1048 topic_action=NotificationTopicAction.activeness__probe, 

1049 key=str(probe.id), 

1050 data=notification_data_pb2.ActivenessProbe( 

1051 reminder_number=probe_number_minus_1 + 1, 

1052 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1053 ), 

1054 ) 

1055 session.commit() 

1056 

1057 ## Step 3: for those who haven't responded, mark them as failed 

1058 expired_probes = ( 

1059 session.execute( 

1060 select(ActivenessProbe) 

1061 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1062 .where(ActivenessProbe.is_pending) 

1063 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1064 ) 

1065 .scalars() 

1066 .all() 

1067 ) 

1068 

1069 for probe in expired_probes: 

1070 probe.responded = now() 

1071 probe.response = ActivenessProbeStatus.expired 

1072 if probe.user.hosting_status == HostingStatus.can_host: 1072 ↛ 1074line 1072 didn't jump to line 1074 because the condition on line 1072 was always true

1073 probe.user.hosting_status = HostingStatus.maybe 

1074 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1074 ↛ 1076line 1074 didn't jump to line 1076 because the condition on line 1074 was always true

1075 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1076 session.commit() 

1077 

1078 

1079def update_randomized_locations(payload: empty_pb2.Empty) -> None: 

1080 """ 

1081 We generate for each user a randomized location as follows: 

1082 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1083 - For each user, mix in the user_id for randomness 

1084 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1085 - Generate an angle from [0, 360] 

1086 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1087 """ 

1088 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1089 

1090 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]: 

1091 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1092 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1093 radius = 0.02 + 0.08 * radius_u 

1094 angle_rad = 2 * pi * angle_u 

1095 offset_lng = radius * cos(angle_rad) 

1096 offset_lat = radius * sin(angle_rad) 

1097 return lat + offset_lat, lng + offset_lng 

1098 

1099 user_updates: list[dict[str, Any]] = [] 

1100 

1101 with session_scope() as session: 

1102 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1103 

1104 for user_id, geom in users_to_update: 

1105 lat, lng = get_coordinates(geom) 

1106 user_updates.append( 

1107 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1108 ) 

1109 

1110 with session_scope() as session: 

1111 session.execute(update(User), user_updates) 

1112 

1113 

1114def send_event_reminders(payload: empty_pb2.Empty) -> None: 

1115 """ 

1116 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1117 """ 

1118 logger.info("Sending event reminder emails") 

1119 

1120 with session_scope() as session: 

1121 occurrences = ( 

1122 session.execute( 

1123 select(EventOccurrence) 

1124 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1125 .where(EventOccurrence.start_time >= now()) 

1126 ) 

1127 .scalars() 

1128 .all() 

1129 ) 

1130 

1131 for occurrence in occurrences: 

1132 results = session.execute( 

1133 select(User, EventOccurrenceAttendee) 

1134 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1135 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1136 .where(EventOccurrenceAttendee.reminder_sent == False) 

1137 ).all() 

1138 

1139 for user, attendee in results: 

1140 context = make_background_user_context(user_id=user.id) 

1141 

1142 notify( 

1143 session, 

1144 user_id=user.id, 

1145 topic_action=NotificationTopicAction.event__reminder, 

1146 key=str(occurrence.id), 

1147 data=notification_data_pb2.EventReminder( 

1148 event=event_to_pb(session, occurrence, context), 

1149 user=user_model_to_pb(user, session, context), 

1150 ), 

1151 ) 

1152 

1153 attendee.reminder_sent = True 

1154 session.commit() 

1155 

1156 

1157def check_expo_push_receipts(payload: empty_pb2.Empty) -> None: 

1158 """ 

1159 Check Expo push receipts in batch and update delivery attempts. 

1160 """ 

1161 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max 

1162 

1163 for iteration in range(MAX_ITERATIONS): 1163 ↛ 1225line 1163 didn't jump to line 1225 because the loop on line 1163 didn't complete

1164 with session_scope() as session: 

1165 # Find all delivery attempts that need receipt checking 

1166 # Wait 15 minutes per Expo's recommendation before checking receipts 

1167 attempts = ( 

1168 session.execute( 

1169 select(PushNotificationDeliveryAttempt) 

1170 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None) 

1171 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None) 

1172 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15)) 

1173 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24)) 

1174 .limit(100) 

1175 ) 

1176 .scalars() 

1177 .all() 

1178 ) 

1179 

1180 if not attempts: 

1181 logger.debug("No Expo receipts to check") 

1182 return 

1183 

1184 logger.info(f"Checking {len(attempts)} Expo push receipts") 

1185 

1186 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts]) 

1187 

1188 for attempt in attempts: 

1189 receipt = receipts.get(not_none(attempt.expo_ticket_id)) 

1190 

1191 # Always mark as checked to avoid infinite loops 

1192 attempt.receipt_checked_at = now() 

1193 

1194 if receipt is None: 

1195 # Receipt not found after 15min - likely expired (>24h) or never existed 

1196 # Per Expo docs: receipts should be available within 15 minutes 

1197 attempt.receipt_status = "not_found" 

1198 continue 

1199 

1200 attempt.receipt_status = receipt.get("status") 

1201 

1202 if receipt.get("status") == "error": 

1203 details = receipt.get("details", {}) 

1204 error_code = details.get("error") 

1205 attempt.receipt_error_code = error_code 

1206 

1207 if error_code == "DeviceNotRegistered": 1207 ↛ 1222line 1207 didn't jump to line 1222 because the condition on line 1207 was always true

1208 # Device token is no longer valid - disable the subscription 

1209 sub = session.execute( 

1210 select(PushNotificationSubscription).where( 

1211 PushNotificationSubscription.id == attempt.push_notification_subscription_id 

1212 ) 

1213 ).scalar_one() 

1214 

1215 if sub.disabled_at > now(): 1215 ↛ 1188line 1215 didn't jump to line 1188 because the condition on line 1215 was always true

1216 sub.disabled_at = now() 

1217 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt") 

1218 push_notification_counter.labels( 

1219 platform="expo", outcome="permanent_subscription_failure_receipt" 

1220 ).inc() 

1221 else: 

1222 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}") 

1223 

1224 # If we get here, we've exhausted MAX_ITERATIONS without finishing 

1225 raise RuntimeError( 

1226 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - " 

1227 "there may be an unusually large backlog of receipts to check" 

1228 ) 

1229 

1230 

1231def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None: 

1232 """ 

1233 Sends the postcard via external API and updates attempt status. 

1234 """ 

1235 with session_scope() as session: 

1236 attempt = session.execute( 

1237 select(PostalVerificationAttempt).where( 

1238 PostalVerificationAttempt.id == payload.postal_verification_attempt_id 

1239 ) 

1240 ).scalar_one_or_none() 

1241 

1242 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1242 ↛ 1243line 1242 didn't jump to line 1243 because the condition on line 1242 was never true

1243 logger.warning( 

1244 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state" 

1245 ) 

1246 return 

1247 

1248 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one() 

1249 

1250 result = send_postcard( 

1251 recipient_name=user_name, 

1252 address_line_1=attempt.address_line_1, 

1253 address_line_2=attempt.address_line_2, 

1254 city=attempt.city, 

1255 state=attempt.state, 

1256 postal_code=attempt.postal_code, 

1257 country=attempt.country, 

1258 verification_code=not_none(attempt.verification_code), 

1259 qr_code_url=urls.postal_verification_link(code=not_none(attempt.verification_code)), 

1260 ) 

1261 

1262 if result.success: 

1263 attempt.status = PostalVerificationStatus.awaiting_verification 

1264 attempt.postcard_sent_at = func.now() 

1265 

1266 notify( 

1267 session, 

1268 user_id=attempt.user_id, 

1269 topic_action=NotificationTopicAction.postal_verification__postcard_sent, 

1270 key="", 

1271 data=notification_data_pb2.PostalVerificationPostcardSent( 

1272 city=attempt.city, 

1273 country=attempt.country, 

1274 ), 

1275 ) 

1276 else: 

1277 # Could retry or fail - for now, fail 

1278 attempt.status = PostalVerificationStatus.failed 

1279 logger.error(f"Postcard send failed: {result.error_message}") 

1280 

1281 

1282class DatabaseInconsistencyError(Exception): 

1283 """Raised when database consistency checks fail""" 

1284 

1285 pass 

1286 

1287 

1288def check_database_consistency(payload: empty_pb2.Empty) -> None: 

1289 """ 

1290 Checks database consistency and raises an exception if any issues are found. 

1291 """ 

1292 logger.info("Checking database consistency") 

1293 errors = [] 

1294 

1295 with session_scope() as session: 

1296 # Check that all non-deleted users have a profile gallery 

1297 users_without_gallery = session.execute( 

1298 select(User.id, User.username).where(User.is_deleted == False).where(User.profile_gallery_id.is_(None)) 

1299 ).all() 

1300 if users_without_gallery: 

1301 errors.append(f"Users without profile gallery: {users_without_gallery}") 

1302 

1303 # Check that all profile galleries point to their owner 

1304 mismatched_galleries = session.execute( 

1305 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id) 

1306 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id) 

1307 .where(User.profile_gallery_id.is_not(None)) 

1308 .where(PhotoGallery.owner_user_id != User.id) 

1309 ).all() 

1310 if mismatched_galleries: 1310 ↛ 1311line 1310 didn't jump to line 1311 because the condition on line 1310 was never true

1311 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}") 

1312 

1313 # === Moderation System Consistency Checks === 

1314 

1315 # Check all ModerationStates have a known object_type 

1316 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT] 

1317 unknown_type_states = session.execute( 

1318 select(ModerationState.id, ModerationState.object_type).where( 

1319 ModerationState.object_type.not_in(known_object_types) 

1320 ) 

1321 ).all() 

1322 if unknown_type_states: 1322 ↛ 1323line 1322 didn't jump to line 1323 because the condition on line 1322 was never true

1323 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}") 

1324 

1325 # Check every ModerationState has at least one INITIAL_REVIEW queue item 

1326 # Skip items with ID < 2000000 as they were created before this check was introduced 

1327 states_without_initial_review = session.execute( 

1328 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1329 ModerationState.id >= 2000000, 

1330 ~exists( 

1331 select(1) 

1332 .where(ModerationQueueItem.moderation_state_id == ModerationState.id) 

1333 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1334 ), 

1335 ) 

1336 ).all() 

1337 if states_without_initial_review: 1337 ↛ 1338line 1337 didn't jump to line 1338 because the condition on line 1337 was never true

1338 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}") 

1339 

1340 # Check every ModerationState has a CREATE log entry 

1341 # Skip items with ID < 2000000 as they were created before this check was introduced 

1342 states_without_create_log = session.execute( 

1343 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1344 ModerationState.id >= 2000000, 

1345 ~exists( 

1346 select(1) 

1347 .where(ModerationLog.moderation_state_id == ModerationState.id) 

1348 .where(ModerationLog.action == ModerationAction.CREATE) 

1349 ), 

1350 ) 

1351 ).all() 

1352 if states_without_create_log: 1352 ↛ 1353line 1352 didn't jump to line 1353 because the condition on line 1352 was never true

1353 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}") 

1354 

1355 # Check resolved queue items point to log entries for the same moderation state 

1356 resolved_item_log_mismatches = session.execute( 

1357 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id) 

1358 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id) 

1359 .where(ModerationQueueItem.resolved_by_log_id.is_not(None)) 

1360 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id) 

1361 ).all() 

1362 if resolved_item_log_mismatches: 1362 ↛ 1363line 1362 didn't jump to line 1363 because the condition on line 1362 was never true

1363 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}") 

1364 

1365 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it 

1366 hr_states = ( 

1367 session.execute( 

1368 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1369 ) 

1370 .scalars() 

1371 .all() 

1372 ) 

1373 for state_id in hr_states: 1373 ↛ 1374line 1373 didn't jump to line 1374 because the loop on line 1373 never started

1374 hr_count = session.execute( 

1375 select(func.count()).where(HostRequest.moderation_state_id == state_id) 

1376 ).scalar_one() 

1377 if hr_count != 1: 

1378 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)") 

1379 

1380 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it 

1381 gc_states = ( 

1382 session.execute( 

1383 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1384 ) 

1385 .scalars() 

1386 .all() 

1387 ) 

1388 for state_id in gc_states: 1388 ↛ 1389line 1388 didn't jump to line 1389 because the loop on line 1388 never started

1389 gc_count = session.execute( 

1390 select(func.count()).where(GroupChat.moderation_state_id == state_id) 

1391 ).scalar_one() 

1392 if gc_count != 1: 

1393 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)") 

1394 

1395 # Check ModerationState.object_id matches the actual object's ID 

1396 hr_object_id_mismatches = session.execute( 

1397 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id) 

1398 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id) 

1399 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1400 .where(ModerationState.object_id != HostRequest.conversation_id) 

1401 ).all() 

1402 if hr_object_id_mismatches: 1402 ↛ 1403line 1402 didn't jump to line 1403 because the condition on line 1402 was never true

1403 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}") 

1404 

1405 gc_object_id_mismatches = session.execute( 

1406 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id) 

1407 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id) 

1408 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1409 .where(ModerationState.object_id != GroupChat.conversation_id) 

1410 ).all() 

1411 if gc_object_id_mismatches: 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true

1412 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}") 

1413 

1414 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState 

1415 hr_reverse_mismatches = session.execute( 

1416 select( 

1417 HostRequest.conversation_id, 

1418 HostRequest.moderation_state_id, 

1419 ModerationState.object_type, 

1420 ModerationState.object_id, 

1421 ) 

1422 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id) 

1423 .where( 

1424 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST) 

1425 | (ModerationState.object_id != HostRequest.conversation_id) 

1426 ) 

1427 ).all() 

1428 if hr_reverse_mismatches: 1428 ↛ 1429line 1428 didn't jump to line 1429 because the condition on line 1428 was never true

1429 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}") 

1430 

1431 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState 

1432 gc_reverse_mismatches = session.execute( 

1433 select( 

1434 GroupChat.conversation_id, 

1435 GroupChat.moderation_state_id, 

1436 ModerationState.object_type, 

1437 ModerationState.object_id, 

1438 ) 

1439 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id) 

1440 .where( 

1441 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT) 

1442 | (ModerationState.object_id != GroupChat.conversation_id) 

1443 ) 

1444 ).all() 

1445 if gc_reverse_mismatches: 1445 ↛ 1446line 1445 didn't jump to line 1446 because the condition on line 1445 was never true

1446 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}") 

1447 

1448 # Ensure auto-approve deadline isn't being exceeded by a significant margin 

1449 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting 

1450 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1451 if deadline_seconds > 0: 1451 ↛ 1452line 1451 didn't jump to line 1452 because the condition on line 1451 was never true

1452 grace_period = timedelta(minutes=5) 

1453 stale_initial_review_items = session.execute( 

1454 select( 

1455 ModerationQueueItem.id, 

1456 ModerationQueueItem.moderation_state_id, 

1457 ModerationQueueItem.time_created, 

1458 ) 

1459 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1460 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

1461 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period) 

1462 ).all() 

1463 if stale_initial_review_items: 

1464 errors.append( 

1465 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}" 

1466 ) 

1467 

1468 if errors: 

1469 raise DatabaseInconsistencyError("\n".join(errors)) 

1470 

1471 

1472def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None: 

1473 """ 

1474 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline. 

1475 Items explicitly actioned by moderators are left alone. 

1476 """ 

1477 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1478 if deadline_seconds <= 0: 

1479 return 

1480 

1481 with session_scope() as session: 

1482 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"]) 

1483 

1484 items = ( 

1485 Moderation() 

1486 .GetModerationQueue( 

1487 request=moderation_pb2.GetModerationQueueReq( 

1488 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW], 

1489 unresolved_only=True, 

1490 page_size=100, 

1491 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)), 

1492 ), 

1493 context=ctx, 

1494 session=session, 

1495 ) 

1496 .queue_items 

1497 ) 

1498 

1499 if not items: 

1500 return 

1501 

1502 logger.info(f"Auto-approving {len(items)} moderation queue items") 

1503 for item in items: 

1504 Moderation().ModerateContent( 

1505 request=moderation_pb2.ModerateContentReq( 

1506 moderation_state_id=item.moderation_state_id, 

1507 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1508 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1509 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.", 

1510 ), 

1511 context=ctx, 

1512 session=session, 

1513 ) 

1514 moderation_auto_approved_counter.inc(len(items))