Coverage for app / backend / src / couchers / jobs / handlers.py: 88%

462 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from collections.abc import Sequence 

7from datetime import date, timedelta 

8from math import cos, pi, sin, sqrt 

9from random import sample 

10from typing import Any 

11 

12import requests 

13from google.protobuf import empty_pb2 

14from sqlalchemy import ColumnElement, Float, Function, Integer, select 

15from sqlalchemy.orm import aliased 

16from sqlalchemy.sql import ( 

17 and_, 

18 case, 

19 cast, 

20 delete, 

21 distinct, 

22 exists, 

23 extract, 

24 func, 

25 literal, 

26 not_, 

27 or_, 

28 union_all, 

29 update, 

30) 

31 

32from couchers import urls 

33from couchers.config import config 

34from couchers.constants import ( 

35 ACTIVENESS_PROBE_EXPIRY_TIME, 

36 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

37 ACTIVENESS_PROBE_TIME_REMINDERS, 

38 EVENT_REMINDER_TIMEDELTA, 

39 HOST_REQUEST_MAX_REMINDERS, 

40 HOST_REQUEST_REMINDER_INTERVAL, 

41) 

42from couchers.context import make_background_user_context 

43from couchers.crypto import ( 

44 USER_LOCATION_RANDOMIZATION_NAME, 

45 asym_encrypt, 

46 b64decode, 

47 get_secret, 

48 simple_decrypt, 

49 stable_secure_uniform, 

50) 

51from couchers.db import session_scope 

52from couchers.email.dev import print_dev_email 

53from couchers.email.smtp import send_smtp_email 

54from couchers.helpers.badges import user_add_badge, user_remove_badge 

55from couchers.helpers.completed_profile import has_completed_profile_expression 

56from couchers.materialized_views import ( 

57 UserResponseRate, 

58) 

59from couchers.metrics import ( 

60 moderation_auto_approved_counter, 

61 push_notification_counter, 

62 strong_verification_completions_counter, 

63) 

64from couchers.models import ( 

65 AccountDeletionToken, 

66 ActivenessProbe, 

67 ActivenessProbeStatus, 

68 Cluster, 

69 ClusterRole, 

70 ClusterSubscription, 

71 EventOccurrence, 

72 EventOccurrenceAttendee, 

73 GroupChat, 

74 GroupChatSubscription, 

75 HostingStatus, 

76 HostRequest, 

77 HostRequestStatus, 

78 LoginToken, 

79 MeetupStatus, 

80 Message, 

81 MessageType, 

82 ModerationAction, 

83 ModerationLog, 

84 ModerationObjectType, 

85 ModerationQueueItem, 

86 ModerationState, 

87 ModerationTrigger, 

88 PassportSex, 

89 PasswordResetToken, 

90 PhotoGallery, 

91 PostalVerificationAttempt, 

92 PostalVerificationStatus, 

93 PushNotificationDeliveryAttempt, 

94 PushNotificationSubscription, 

95 Reference, 

96 StrongVerificationAttempt, 

97 StrongVerificationAttemptStatus, 

98 User, 

99 UserBadge, 

100 Volunteer, 

101) 

102from couchers.models.notifications import NotificationTopicAction 

103from couchers.notifications.expo_api import get_expo_push_receipts 

104from couchers.notifications.notify import notify 

105from couchers.postal.postcard_service import send_postcard 

106from couchers.proto import moderation_pb2, notification_data_pb2 

107from couchers.proto.internal import internal_pb2, jobs_pb2 

108from couchers.resources import get_badge_dict, get_static_badge_dict 

109from couchers.servicers.api import user_model_to_pb 

110from couchers.servicers.events import ( 

111 event_to_pb, 

112) 

113from couchers.servicers.moderation import Moderation 

114from couchers.servicers.requests import host_request_to_pb 

115from couchers.sql import ( 

116 users_visible_to_each_other, 

117 where_moderated_content_visible, 

118 where_moderated_content_visible_to_user_column, 

119 where_user_columns_visible_to_each_other, 

120 where_users_column_visible, 

121) 

122from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

123from couchers.tasks import send_duplicate_strong_verification_email 

124from couchers.utils import ( 

125 Timestamp_from_datetime, 

126 create_coordinate, 

127 get_coordinates, 

128 not_none, 

129 now, 

130) 

131 

132logger = logging.getLogger(__name__) 

133 

134 

135def send_email(payload: jobs_pb2.SendEmailPayload) -> None: 

136 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

137 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

138 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

139 # the sender must return a models.Email object that can be added to the database 

140 email = sender( 

141 sender_name=payload.sender_name, 

142 sender_email=payload.sender_email, 

143 recipient=payload.recipient, 

144 subject=payload.subject, 

145 plain=payload.plain, 

146 html=payload.html, 

147 list_unsubscribe_header=payload.list_unsubscribe_header, 

148 source_data=payload.source_data, 

149 ) 

150 with session_scope() as session: 

151 session.add(email) 

152 

153 

154def purge_login_tokens(payload: empty_pb2.Empty) -> None: 

155 logger.info("Purging login tokens") 

156 with session_scope() as session: 

157 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

158 

159 

160def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None: 

161 logger.info("Purging login tokens") 

162 with session_scope() as session: 

163 session.execute( 

164 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

165 ) 

166 

167 

168def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None: 

169 logger.info("Purging account deletion tokens") 

170 with session_scope() as session: 

171 session.execute( 

172 delete(AccountDeletionToken) 

173 .where(~AccountDeletionToken.is_valid) 

174 .execution_options(synchronize_session=False) 

175 ) 

176 

177 

178def send_message_notifications(payload: empty_pb2.Empty) -> None: 

179 """ 

180 Sends out email notifications for messages that have been unseen for a long enough time 

181 """ 

182 # very crude and dumb algorithm 

183 logger.info("Sending out email notifications for unseen messages") 

184 

185 with session_scope() as session: 

186 # users who have unnotified messages older than 5 minutes in any group chat 

187 users = ( 

188 session.execute( 

189 where_moderated_content_visible_to_user_column( 

190 select(User) 

191 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

192 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

193 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

194 GroupChat, 

195 User.id, 

196 ) 

197 .where(not_(GroupChatSubscription.is_muted)) 

198 .where(User.is_visible) 

199 .where(Message.time >= GroupChatSubscription.joined) 

200 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

201 .where(Message.id > User.last_notified_message_id) 

202 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

203 .where(Message.time < now() - timedelta(minutes=5)) 

204 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

205 ) 

206 .scalars() 

207 .unique() 

208 ) 

209 

210 for user in users: 

211 context = make_background_user_context(user_id=user.id) 

212 # now actually grab all the group chats, not just less than 5 min old 

213 subquery = ( 

214 where_users_column_visible( 

215 where_moderated_content_visible( 

216 select( 

217 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

218 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

219 func.max(Message.id).label("message_id"), 

220 func.count(Message.id).label("count_unseen"), 

221 ) 

222 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

223 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

224 context, 

225 GroupChat, 

226 is_list_operation=True, 

227 ) 

228 .where(GroupChatSubscription.user_id == user.id) 

229 .where(not_(GroupChatSubscription.is_muted)) 

230 .where(Message.id > user.last_notified_message_id) 

231 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

232 .where(Message.time >= GroupChatSubscription.joined) 

233 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

234 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)), 

235 context, 

236 Message.author_id, 

237 ) 

238 .group_by(GroupChatSubscription.group_chat_id) 

239 .order_by(func.max(Message.id).desc()) 

240 .subquery() 

241 ) 

242 

243 unseen_messages = session.execute( 

244 where_moderated_content_visible( 

245 select(GroupChat, Message, subquery.c.count_unseen) 

246 .join(subquery, subquery.c.message_id == Message.id) 

247 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id), 

248 context, 

249 GroupChat, 

250 is_list_operation=True, 

251 ).order_by(subquery.c.message_id.desc()) 

252 ).all() 

253 

254 if not unseen_messages: 

255 continue 

256 

257 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

258 

259 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str: 

260 if group_chat.is_dm: 

261 return f"You missed {count_unseen} message(s) from {message.author.name}" 

262 else: 

263 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

264 

265 notify( 

266 session, 

267 user_id=user.id, 

268 topic_action=NotificationTopicAction.chat__missed_messages, 

269 key="", 

270 data=notification_data_pb2.ChatMissedMessages( 

271 messages=[ 

272 notification_data_pb2.ChatMessage( 

273 author=user_model_to_pb( 

274 message.author, 

275 session, 

276 context, 

277 ), 

278 message=format_title(message, group_chat, count_unseen), 

279 text=message.text, 

280 group_chat_id=message.conversation_id, 

281 ) 

282 for group_chat, message, count_unseen in unseen_messages 

283 ], 

284 ), 

285 ) 

286 session.commit() 

287 

288 

289def send_request_notifications(payload: empty_pb2.Empty) -> None: 

290 """ 

291 Sends out email notifications for unseen messages in host requests (as surfer or host) 

292 """ 

293 logger.info("Sending out email notifications for unseen messages in host requests") 

294 

295 with session_scope() as session: 

296 # Get all candidate users who might have unseen request messages 

297 candidate_user_ids = ( 

298 session.execute( 

299 select(User.id) 

300 .where(User.is_visible) 

301 .where( 

302 or_( 

303 # Users with unseen messages as surfer 

304 exists( 

305 select(1) 

306 .select_from(HostRequest) 

307 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

308 .where(HostRequest.surfer_user_id == User.id) 

309 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

310 .where(Message.id > User.last_notified_request_message_id) 

311 .where(Message.time < now() - timedelta(minutes=5)) 

312 .where(Message.message_type == MessageType.text) 

313 ), 

314 # Users with unseen messages as host 

315 exists( 

316 select(1) 

317 .select_from(HostRequest) 

318 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

319 .where(HostRequest.host_user_id == User.id) 

320 .where(Message.id > HostRequest.host_last_seen_message_id) 

321 .where(Message.id > User.last_notified_request_message_id) 

322 .where(Message.time < now() - timedelta(minutes=5)) 

323 .where(Message.message_type == MessageType.text) 

324 ), 

325 ) 

326 ) 

327 ) 

328 .scalars() 

329 .all() 

330 ) 

331 

332 for user_id in candidate_user_ids: 

333 context = make_background_user_context(user_id=user_id) 

334 

335 # requests where this user is surfing 

336 surfing_reqs = session.execute( 

337 where_users_column_visible( 

338 where_moderated_content_visible_to_user_column( 

339 select(User, HostRequest, func.max(Message.id)) 

340 .where(User.id == user_id) 

341 .join(HostRequest, HostRequest.surfer_user_id == User.id), 

342 HostRequest, 

343 HostRequest.surfer_user_id, 

344 ), 

345 context, 

346 HostRequest.host_user_id, 

347 ) 

348 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

349 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

350 .where(Message.id > User.last_notified_request_message_id) 

351 .where(Message.time < now() - timedelta(minutes=5)) 

352 .where(Message.message_type == MessageType.text) 

353 .group_by(User, HostRequest) # type: ignore[arg-type] 

354 ).all() 

355 

356 # where this user is hosting 

357 hosting_reqs = session.execute( 

358 where_users_column_visible( 

359 where_moderated_content_visible_to_user_column( 

360 select(User, HostRequest, func.max(Message.id)) 

361 .where(User.id == user_id) 

362 .join(HostRequest, HostRequest.host_user_id == User.id), 

363 HostRequest, 

364 HostRequest.host_user_id, 

365 ), 

366 context, 

367 HostRequest.surfer_user_id, 

368 ) 

369 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

370 .where(Message.id > HostRequest.host_last_seen_message_id) 

371 .where(Message.id > User.last_notified_request_message_id) 

372 .where(Message.time < now() - timedelta(minutes=5)) 

373 .where(Message.message_type == MessageType.text) 

374 .group_by(User, HostRequest) # type: ignore[arg-type] 

375 ).all() 

376 

377 for user, host_request, max_message_id in surfing_reqs: 

378 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

379 session.flush() 

380 

381 notify( 

382 session, 

383 user_id=user.id, 

384 topic_action=NotificationTopicAction.host_request__missed_messages, 

385 key=str(host_request.conversation_id), 

386 data=notification_data_pb2.HostRequestMissedMessages( 

387 host_request=host_request_to_pb(host_request, session, context), 

388 user=user_model_to_pb(host_request.host, session, context), 

389 am_host=False, 

390 ), 

391 ) 

392 

393 for user, host_request, max_message_id in hosting_reqs: 

394 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

395 session.flush() 

396 

397 notify( 

398 session, 

399 user_id=user.id, 

400 topic_action=NotificationTopicAction.host_request__missed_messages, 

401 key=str(host_request.conversation_id), 

402 data=notification_data_pb2.HostRequestMissedMessages( 

403 host_request=host_request_to_pb(host_request, session, context), 

404 user=user_model_to_pb(host_request.surfer, session, context), 

405 am_host=True, 

406 ), 

407 ) 

408 

409 

410def send_onboarding_emails(payload: empty_pb2.Empty) -> None: 

411 """ 

412 Sends out onboarding emails 

413 """ 

414 logger.info("Sending out onboarding emails") 

415 

416 with session_scope() as session: 

417 # first onboarding email 

418 users = ( 

419 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

420 ) 

421 

422 for user in users: 

423 notify( 

424 session, 

425 user_id=user.id, 

426 topic_action=NotificationTopicAction.onboarding__reminder, 

427 key="1", 

428 ) 

429 user.onboarding_emails_sent = 1 

430 user.last_onboarding_email_sent = now() 

431 session.commit() 

432 

433 # second onboarding email 

434 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

435 users = ( 

436 session.execute( 

437 select(User) 

438 .where(User.is_visible) 

439 .where(User.onboarding_emails_sent == 1) 

440 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

441 .where(~has_completed_profile_expression()) 

442 ) 

443 .scalars() 

444 .all() 

445 ) 

446 

447 for user in users: 

448 notify( 

449 session, 

450 user_id=user.id, 

451 topic_action=NotificationTopicAction.onboarding__reminder, 

452 key="2", 

453 ) 

454 user.onboarding_emails_sent = 2 

455 user.last_onboarding_email_sent = now() 

456 session.commit() 

457 

458 

459def send_reference_reminders(payload: empty_pb2.Empty) -> None: 

460 """ 

461 Sends out reminders to write references after hosting/staying 

462 """ 

463 logger.info("Sending out reference reminder emails") 

464 

465 # Keep this in chronological order! 

466 reference_reminder_schedule = [ 

467 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

468 # the end time to write a reference is supposed to be midnight in the host's timezone 

469 # 8 pm ish on the last day of the stay 

470 (1, timedelta(days=15) - timedelta(hours=20), 14), 

471 # 2 pm ish a week after stay 

472 (2, timedelta(days=8) - timedelta(hours=14), 7), 

473 # 10 am ish 3 days before end of time to write ref 

474 (3, timedelta(days=4) - timedelta(hours=10), 3), 

475 ] 

476 

477 with session_scope() as session: 

478 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

479 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

480 user = aliased(User) 

481 other_user = aliased(User) 

482 # surfers needing to write a ref 

483 q1 = ( 

484 select(literal(True), HostRequest, user, other_user) 

485 .join(user, user.id == HostRequest.surfer_user_id) 

486 .join(other_user, other_user.id == HostRequest.host_user_id) 

487 .outerjoin( 

488 Reference, 

489 and_( 

490 Reference.host_request_id == HostRequest.conversation_id, 

491 # if no reference is found in this join, then the surfer has not written a ref 

492 Reference.from_user_id == HostRequest.surfer_user_id, 

493 ), 

494 ) 

495 .where(Reference.id == None) 

496 .where(HostRequest.can_write_reference) 

497 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

498 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

499 .where(HostRequest.surfer_reason_didnt_meetup == None) 

500 .where(users_visible_to_each_other(user, other_user)) 

501 ) 

502 

503 # hosts needing to write a ref 

504 q2 = ( 

505 select(literal(False), HostRequest, user, other_user) 

506 .join(user, user.id == HostRequest.host_user_id) 

507 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

508 .outerjoin( 

509 Reference, 

510 and_( 

511 Reference.host_request_id == HostRequest.conversation_id, 

512 # if no reference is found in this join, then the host has not written a ref 

513 Reference.from_user_id == HostRequest.host_user_id, 

514 ), 

515 ) 

516 .where(Reference.id == None) 

517 .where(HostRequest.can_write_reference) 

518 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

519 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

520 .where(HostRequest.host_reason_didnt_meetup == None) 

521 .where(users_visible_to_each_other(user, other_user)) 

522 ) 

523 

524 union = union_all(q1, q2).subquery() 

525 query = select( 

526 union.c[0].label("surfed"), 

527 aliased(HostRequest, union), 

528 aliased(user, union), 

529 aliased(other_user, union), 

530 ) 

531 reference_reminders = session.execute(query).all() 

532 

533 for surfed, host_request, user, other_user in reference_reminders: 

534 # visibility and blocking already checked in sql 

535 assert user.is_visible 

536 context = make_background_user_context(user_id=user.id) 

537 topic_action = ( 

538 NotificationTopicAction.reference__reminder_surfed 

539 if surfed 

540 else NotificationTopicAction.reference__reminder_hosted 

541 ) 

542 notify( 

543 session, 

544 user_id=user.id, 

545 topic_action=topic_action, 

546 key=str(host_request.conversation_id), 

547 data=notification_data_pb2.ReferenceReminder( 

548 host_request_id=host_request.conversation_id, 

549 other_user=user_model_to_pb(other_user, session, context), 

550 days_left=reminder_days_left, 

551 ), 

552 ) 

553 if surfed: 

554 host_request.surfer_sent_reference_reminders = reminder_number 

555 else: 

556 host_request.host_sent_reference_reminders = reminder_number 

557 session.commit() 

558 

559 

560def send_host_request_reminders(payload: empty_pb2.Empty) -> None: 

561 with session_scope() as session: 

562 host_has_sent_message = select(1).where( 

563 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id 

564 ) 

565 

566 requests = ( 

567 session.execute( 

568 where_user_columns_visible_to_each_other( 

569 where_moderated_content_visible_to_user_column( 

570 select(HostRequest), 

571 HostRequest, 

572 HostRequest.host_user_id, 

573 ) 

574 .where(HostRequest.status == HostRequestStatus.pending) 

575 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

576 .where(HostRequest.start_time > func.now()) 

577 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

578 .where(~exists(host_has_sent_message)), 

579 HostRequest.host_user_id, 

580 HostRequest.surfer_user_id, 

581 ) 

582 ) 

583 .scalars() 

584 .all() 

585 ) 

586 

587 for host_request in requests: 

588 host_request.host_sent_request_reminders += 1 

589 host_request.last_sent_request_reminder_time = now() 

590 

591 context = make_background_user_context(user_id=host_request.host_user_id) 

592 notify( 

593 session, 

594 user_id=host_request.host_user_id, 

595 topic_action=NotificationTopicAction.host_request__reminder, 

596 key=str(host_request.conversation_id), 

597 data=notification_data_pb2.HostRequestReminder( 

598 host_request=host_request_to_pb(host_request, session, context), 

599 surfer=user_model_to_pb(host_request.surfer, session, context), 

600 ), 

601 moderation_state_id=host_request.moderation_state_id, 

602 ) 

603 

604 session.commit() 

605 

606 

607def add_users_to_email_list(payload: empty_pb2.Empty) -> None: 

608 if not config["LISTMONK_ENABLED"]: 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true

609 logger.info("Not adding users to mailing list") 

610 return 

611 

612 logger.info("Adding users to mailing list") 

613 

614 while True: 

615 with session_scope() as session: 

616 user = session.execute( 

617 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

618 ).scalar_one_or_none() 

619 if not user: 

620 logger.info("Finished adding users to mailing list") 

621 return 

622 

623 if user.opt_out_of_newsletter: 

624 user.in_sync_with_newsletter = True 

625 session.commit() 

626 continue 

627 

628 r = requests.post( 

629 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

630 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

631 json={ 

632 "email": user.email, 

633 "name": user.name, 

634 "lists": [config["LISTMONK_LIST_ID"]], 

635 "preconfirm_subscriptions": True, 

636 "attribs": {"couchers_user_id": user.id}, 

637 "status": "enabled", 

638 }, 

639 timeout=10, 

640 ) 

641 # the API returns if the user is already subscribed 

642 if r.status_code == 200 or r.status_code == 409: 642 ↛ 646line 642 didn't jump to line 646 because the condition on line 642 was always true

643 user.in_sync_with_newsletter = True 

644 session.commit() 

645 else: 

646 raise Exception("Failed to add users to mailing list") 

647 

648 

649def enforce_community_membership(payload: empty_pb2.Empty) -> None: 

650 tasks_enforce_community_memberships() 

651 

652 

653def update_recommendation_scores(payload: empty_pb2.Empty) -> None: 

654 text_fields = [ 

655 User.hometown, 

656 User.occupation, 

657 User.education, 

658 User.about_me, 

659 User.things_i_like, 

660 User.about_place, 

661 User.additional_information, 

662 User.pet_details, 

663 User.kid_details, 

664 User.housemate_details, 

665 User.other_host_info, 

666 User.sleeping_details, 

667 User.area, 

668 User.house_rules, 

669 ] 

670 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

671 

672 def poor_man_gaussian() -> ColumnElement[float] | float: 

673 """ 

674 Produces an approximatley std normal random variate 

675 """ 

676 trials = 5 

677 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

678 

679 def int_(stmt: Any) -> Function[int]: 

680 return func.coalesce(cast(stmt, Integer), 0) 

681 

682 def float_(stmt: Any) -> Function[float]: 

683 return func.coalesce(cast(stmt, Float), 0.0) 

684 

685 with session_scope() as session: 

686 # profile 

687 profile_text = "" 

688 for field in text_fields: 

689 profile_text += func.coalesce(field, "") # type: ignore[assignment] 

690 text_length = func.length(profile_text) 

691 home_text = "" 

692 for field in home_fields: 

693 home_text += func.coalesce(field, "") # type: ignore[assignment] 

694 home_length = func.length(home_text) 

695 

696 filled_profile = int_(has_completed_profile_expression()) 

697 has_text = int_(text_length > 500) 

698 long_text = int_(text_length > 2000) 

699 can_host = int_(User.hosting_status == HostingStatus.can_host) 

700 may_host = int_(User.hosting_status == HostingStatus.maybe) 

701 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

702 filled_home = int_(User.has_completed_my_home) 

703 filled_home_lots = int_(home_length > 200) 

704 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

705 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

706 

707 # references 

708 left_ref_expr = int_(1).label("left_reference") 

709 left_refs_subquery = ( 

710 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

711 ) 

712 left_reference = int_(left_refs_subquery.c.left_reference) 

713 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

714 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

715 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

716 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

717 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

718 "has_bad_ref" 

719 ) 

720 received_ref_subquery = ( 

721 select( 

722 Reference.to_user_id.label("user_id"), 

723 has_reference_expr, 

724 has_multiple_types_expr, 

725 has_bad_ref_expr, 

726 ref_count_expr, 

727 ref_avg_expr, 

728 ) 

729 .group_by(Reference.to_user_id) 

730 .subquery() 

731 ) 

732 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

733 has_reference = int_(received_ref_subquery.c.has_reference) 

734 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

735 rating_score = float_( 

736 received_ref_subquery.c.ref_avg 

737 * ( 

738 2 * func.least(received_ref_subquery.c.ref_count, 5) 

739 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

740 ) 

741 ) 

742 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

743 

744 # activeness 

745 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

746 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

747 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

748 messaged_lots = int_(func.count(Message.id) > 5) 

749 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

750 messaging_subquery = ( 

751 select(Message.author_id.label("user_id"), messaging_points_subquery) 

752 .where(Message.message_type == MessageType.text) 

753 .group_by(Message.author_id) 

754 .subquery() 

755 ) 

756 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

757 

758 # verification 

759 cb_subquery = ( 

760 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

761 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

762 .where(ClusterSubscription.role == ClusterRole.admin) 

763 .where(Cluster.is_official_cluster) 

764 .group_by(ClusterSubscription.user_id) 

765 .subquery() 

766 ) 

767 min_node_id = cb_subquery.c.min_node_id 

768 cb = int_(min_node_id >= 1) 

769 wcb = int_(min_node_id == 1) 

770 badge_points = { 

771 "founder": 100, 

772 "board_member": 20, 

773 "past_board_member": 5, 

774 "strong_verification": 3, 

775 "volunteer": 3, 

776 "past_volunteer": 2, 

777 "donor": 1, 

778 "phone_verified": 1, 

779 } 

780 

781 badge_subquery = ( 

782 select( 

783 UserBadge.user_id.label("user_id"), 

784 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

785 ) 

786 .group_by(UserBadge.user_id) 

787 .subquery() 

788 ) 

789 

790 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

791 

792 # response rate 

793 hr_subquery = select( 

794 UserResponseRate.user_id, 

795 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

796 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

797 ).subquery() 

798 response_time_33p = hr_subquery.c.response_time_33p 

799 response_time_66p = hr_subquery.c.response_time_66p 

800 # be careful with nulls 

801 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

802 

803 recommendation_score = ( 

804 hosting_status_points 

805 + profile_points 

806 + ref_score 

807 + activeness_points 

808 + other_points 

809 + response_rate_points 

810 + 2 * poor_man_gaussian() 

811 ) 

812 

813 scores = ( 

814 select(User.id.label("user_id"), recommendation_score.label("score")) 

815 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

816 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

817 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

818 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

819 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

820 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

821 ).subquery() 

822 

823 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)) 

824 

825 logger.info("Updated recommendation scores") 

826 

827 

828def update_badges(payload: empty_pb2.Empty) -> None: 

829 with session_scope() as session: 

830 

831 def update_badge(badge_id: str, members: Sequence[int]) -> None: 

832 badge = get_badge_dict()[badge_id] 

833 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all() 

834 # in case the user ids don't exist in the db 

835 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

836 # we should add the badge to these 

837 add = set(actual_members) - set(user_ids) 

838 # we should remove the badge from these 

839 remove = set(user_ids) - set(actual_members) 

840 for user_id in add: 

841 user_add_badge(session, user_id, badge.id) 

842 

843 for user_id in remove: 

844 user_remove_badge(session, user_id, badge.id) 

845 

846 update_badge("founder", get_static_badge_dict()["founder"]) 

847 update_badge("board_member", get_static_badge_dict()["board_member"]) 

848 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

849 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all()) 

850 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

851 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

852 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

853 update_badge( 

854 "strong_verification", 

855 session.execute( 

856 select(User.id) 

857 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

858 .where(StrongVerificationAttempt.has_strong_verification(User)) 

859 ) 

860 .scalars() 

861 .all(), 

862 ) 

863 # volunteer badge for active volunteers (stopped_volunteering is null) 

864 update_badge( 

865 "volunteer", 

866 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(), 

867 ) 

868 # past_volunteer badge for past volunteers (stopped_volunteering is not null) 

869 update_badge( 

870 "past_volunteer", 

871 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None))) 

872 .scalars() 

873 .all(), 

874 ) 

875 

876 

877def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None: 

878 with session_scope() as session: 

879 verification_attempt = session.execute( 

880 select(StrongVerificationAttempt) 

881 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

882 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

883 ).scalar_one() 

884 response = requests.post( 

885 "https://passportreader.app/api/v1/session.get", 

886 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

887 json={"id": verification_attempt.iris_session_id}, 

888 timeout=10, 

889 verify="/etc/ssl/certs/ca-certificates.crt", 

890 ) 

891 if response.status_code != 200: 891 ↛ 892line 891 didn't jump to line 892 because the condition on line 891 was never true

892 raise Exception(f"Iris didn't return 200: {response.text}") 

893 json_data = response.json() 

894 reference_payload = internal_pb2.VerificationReferencePayload.FromString( 

895 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

896 ) 

897 assert verification_attempt.user_id == reference_payload.user_id 

898 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

899 assert verification_attempt.iris_session_id == json_data["id"] 

900 assert json_data["state"] == "APPROVED" 

901 

902 if json_data["document_type"] != "PASSPORT": 

903 verification_attempt.status = StrongVerificationAttemptStatus.failed 

904 notify( 

905 session, 

906 user_id=verification_attempt.user_id, 

907 topic_action=NotificationTopicAction.verification__sv_fail, 

908 key="", 

909 data=notification_data_pb2.VerificationSVFail( 

910 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

911 ), 

912 ) 

913 return 

914 

915 assert json_data["document_type"] == "PASSPORT" 

916 

917 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

918 nationality = json_data["nationality"] 

919 last_three_document_chars = json_data["document_number"][-3:] 

920 

921 existing_attempt = session.execute( 

922 select(StrongVerificationAttempt) 

923 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

924 .where(StrongVerificationAttempt.passport_nationality == nationality) 

925 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

926 .order_by(StrongVerificationAttempt.id) 

927 .limit(1) 

928 ).scalar_one_or_none() 

929 

930 verification_attempt.has_minimal_data = True 

931 verification_attempt.passport_expiry_date = expiry_date 

932 verification_attempt.passport_nationality = nationality 

933 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

934 

935 if existing_attempt: 

936 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

937 

938 if existing_attempt.user_id != verification_attempt.user_id: 

939 session.flush() 

940 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

941 

942 notify( 

943 session, 

944 user_id=verification_attempt.user_id, 

945 topic_action=NotificationTopicAction.verification__sv_fail, 

946 key="", 

947 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

948 ) 

949 return 

950 

951 verification_attempt.has_full_data = True 

952 verification_attempt.passport_encrypted_data = asym_encrypt( 

953 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

954 ) 

955 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

956 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

957 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

958 

959 session.flush() 

960 

961 strong_verification_completions_counter.inc() 

962 

963 user = verification_attempt.user 

964 if verification_attempt.has_strong_verification(user): 964 ↛ 979line 964 didn't jump to line 979 because the condition on line 964 was always true

965 badge_id = "strong_verification" 

966 if session.execute( 

967 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

968 ).scalar_one_or_none(): 

969 return 

970 

971 user_add_badge(session, user.id, badge_id, do_notify=False) 

972 notify( 

973 session, 

974 user_id=verification_attempt.user_id, 

975 topic_action=NotificationTopicAction.verification__sv_success, 

976 key="", 

977 ) 

978 else: 

979 notify( 

980 session, 

981 user_id=verification_attempt.user_id, 

982 topic_action=NotificationTopicAction.verification__sv_fail, 

983 key="", 

984 data=notification_data_pb2.VerificationSVFail( 

985 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

986 ), 

987 ) 

988 

989 

990def send_activeness_probes(payload: empty_pb2.Empty) -> None: 

991 with session_scope() as session: 

992 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

993 

994 if config["ACTIVENESS_PROBES_ENABLED"]: 

995 # current activeness probes 

996 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

997 

998 # users who we should send an activeness probe to 

999 new_probe_user_ids = ( 

1000 session.execute( 

1001 select(User.id) 

1002 .where(User.is_visible) 

1003 .where(User.hosting_status == HostingStatus.can_host) 

1004 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

1005 .where(User.id.not_in(select(subquery.c.user_id))) 

1006 ) 

1007 .scalars() 

1008 .all() 

1009 ) 

1010 

1011 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

1012 probes_today = session.execute( 

1013 select(func.count()) 

1014 .select_from(ActivenessProbe) 

1015 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

1016 ).scalar_one() 

1017 

1018 # send probes to max 2% of users per day 

1019 max_probes_per_day = 0.02 * total_users 

1020 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

1021 

1022 if len(new_probe_user_ids) > max_probe_size: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true

1023 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

1024 

1025 for user_id in new_probe_user_ids: 

1026 session.add(ActivenessProbe(user_id=user_id)) 

1027 

1028 session.commit() 

1029 

1030 ## Step 2: actually send out probe notifications 

1031 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

1032 probes = ( 

1033 session.execute( 

1034 select(ActivenessProbe) 

1035 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

1036 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1037 .where(ActivenessProbe.is_pending) 

1038 ) 

1039 .scalars() 

1040 .all() 

1041 ) 

1042 

1043 for probe in probes: 

1044 probe.notifications_sent = probe_number_minus_1 + 1 

1045 context = make_background_user_context(user_id=probe.user.id) 

1046 notify( 

1047 session, 

1048 user_id=probe.user.id, 

1049 topic_action=NotificationTopicAction.activeness__probe, 

1050 key=str(probe.id), 

1051 data=notification_data_pb2.ActivenessProbe( 

1052 reminder_number=probe_number_minus_1 + 1, 

1053 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1054 ), 

1055 ) 

1056 session.commit() 

1057 

1058 ## Step 3: for those who haven't responded, mark them as failed 

1059 expired_probes = ( 

1060 session.execute( 

1061 select(ActivenessProbe) 

1062 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1063 .where(ActivenessProbe.is_pending) 

1064 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1065 ) 

1066 .scalars() 

1067 .all() 

1068 ) 

1069 

1070 for probe in expired_probes: 

1071 probe.responded = now() 

1072 probe.response = ActivenessProbeStatus.expired 

1073 if probe.user.hosting_status == HostingStatus.can_host: 1073 ↛ 1075line 1073 didn't jump to line 1075 because the condition on line 1073 was always true

1074 probe.user.hosting_status = HostingStatus.maybe 

1075 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1075 ↛ 1077line 1075 didn't jump to line 1077 because the condition on line 1075 was always true

1076 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1077 session.commit() 

1078 

1079 

1080def update_randomized_locations(payload: empty_pb2.Empty) -> None: 

1081 """ 

1082 We generate for each user a randomized location as follows: 

1083 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1084 - For each user, mix in the user_id for randomness 

1085 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1086 - Generate an angle from [0, 360] 

1087 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1088 """ 

1089 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1090 

1091 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]: 

1092 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1093 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1094 radius = 0.02 + 0.08 * radius_u 

1095 angle_rad = 2 * pi * angle_u 

1096 offset_lng = radius * cos(angle_rad) 

1097 offset_lat = radius * sin(angle_rad) 

1098 return lat + offset_lat, lng + offset_lng 

1099 

1100 user_updates: list[dict[str, Any]] = [] 

1101 

1102 with session_scope() as session: 

1103 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1104 

1105 for user_id, geom in users_to_update: 

1106 lat, lng = get_coordinates(geom) 

1107 user_updates.append( 

1108 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1109 ) 

1110 

1111 with session_scope() as session: 

1112 session.execute(update(User), user_updates) 

1113 

1114 

1115def send_event_reminders(payload: empty_pb2.Empty) -> None: 

1116 """ 

1117 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1118 """ 

1119 logger.info("Sending event reminder emails") 

1120 

1121 with session_scope() as session: 

1122 occurrences = ( 

1123 session.execute( 

1124 select(EventOccurrence) 

1125 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1126 .where(EventOccurrence.start_time >= now()) 

1127 ) 

1128 .scalars() 

1129 .all() 

1130 ) 

1131 

1132 for occurrence in occurrences: 

1133 results = session.execute( 

1134 select(User, EventOccurrenceAttendee) 

1135 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1136 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1137 .where(EventOccurrenceAttendee.reminder_sent == False) 

1138 ).all() 

1139 

1140 for user, attendee in results: 

1141 context = make_background_user_context(user_id=user.id) 

1142 

1143 notify( 

1144 session, 

1145 user_id=user.id, 

1146 topic_action=NotificationTopicAction.event__reminder, 

1147 key=str(occurrence.id), 

1148 data=notification_data_pb2.EventReminder( 

1149 event=event_to_pb(session, occurrence, context), 

1150 user=user_model_to_pb(user, session, context), 

1151 ), 

1152 ) 

1153 

1154 attendee.reminder_sent = True 

1155 session.commit() 

1156 

1157 

1158def check_expo_push_receipts(payload: empty_pb2.Empty) -> None: 

1159 """ 

1160 Check Expo push receipts in batch and update delivery attempts. 

1161 """ 

1162 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max 

1163 

1164 for iteration in range(MAX_ITERATIONS): 1164 ↛ 1226line 1164 didn't jump to line 1226 because the loop on line 1164 didn't complete

1165 with session_scope() as session: 

1166 # Find all delivery attempts that need receipt checking 

1167 # Wait 15 minutes per Expo's recommendation before checking receipts 

1168 attempts = ( 

1169 session.execute( 

1170 select(PushNotificationDeliveryAttempt) 

1171 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None) 

1172 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None) 

1173 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15)) 

1174 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24)) 

1175 .limit(100) 

1176 ) 

1177 .scalars() 

1178 .all() 

1179 ) 

1180 

1181 if not attempts: 

1182 logger.debug("No Expo receipts to check") 

1183 return 

1184 

1185 logger.info(f"Checking {len(attempts)} Expo push receipts") 

1186 

1187 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts]) 

1188 

1189 for attempt in attempts: 

1190 receipt = receipts.get(not_none(attempt.expo_ticket_id)) 

1191 

1192 # Always mark as checked to avoid infinite loops 

1193 attempt.receipt_checked_at = now() 

1194 

1195 if receipt is None: 

1196 # Receipt not found after 15min - likely expired (>24h) or never existed 

1197 # Per Expo docs: receipts should be available within 15 minutes 

1198 attempt.receipt_status = "not_found" 

1199 continue 

1200 

1201 attempt.receipt_status = receipt.get("status") 

1202 

1203 if receipt.get("status") == "error": 

1204 details = receipt.get("details", {}) 

1205 error_code = details.get("error") 

1206 attempt.receipt_error_code = error_code 

1207 

1208 if error_code == "DeviceNotRegistered": 1208 ↛ 1223line 1208 didn't jump to line 1223 because the condition on line 1208 was always true

1209 # Device token is no longer valid - disable the subscription 

1210 sub = session.execute( 

1211 select(PushNotificationSubscription).where( 

1212 PushNotificationSubscription.id == attempt.push_notification_subscription_id 

1213 ) 

1214 ).scalar_one() 

1215 

1216 if sub.disabled_at > now(): 1216 ↛ 1189line 1216 didn't jump to line 1189 because the condition on line 1216 was always true

1217 sub.disabled_at = now() 

1218 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt") 

1219 push_notification_counter.labels( 

1220 platform="expo", outcome="permanent_subscription_failure_receipt" 

1221 ).inc() 

1222 else: 

1223 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}") 

1224 

1225 # If we get here, we've exhausted MAX_ITERATIONS without finishing 

1226 raise RuntimeError( 

1227 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - " 

1228 "there may be an unusually large backlog of receipts to check" 

1229 ) 

1230 

1231 

1232def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None: 

1233 """ 

1234 Sends the postcard via external API and updates attempt status. 

1235 """ 

1236 with session_scope() as session: 

1237 attempt = session.execute( 

1238 select(PostalVerificationAttempt).where( 

1239 PostalVerificationAttempt.id == payload.postal_verification_attempt_id 

1240 ) 

1241 ).scalar_one_or_none() 

1242 

1243 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1243 ↛ 1244line 1243 didn't jump to line 1244 because the condition on line 1243 was never true

1244 logger.warning( 

1245 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state" 

1246 ) 

1247 return 

1248 

1249 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one() 

1250 

1251 result = send_postcard( 

1252 recipient_name=user_name, 

1253 address_line_1=attempt.address_line_1, 

1254 address_line_2=attempt.address_line_2, 

1255 city=attempt.city, 

1256 state=attempt.state, 

1257 postal_code=attempt.postal_code, 

1258 country=attempt.country, 

1259 verification_code=not_none(attempt.verification_code), 

1260 qr_code_url=urls.postal_verification_link(code=not_none(attempt.verification_code)), 

1261 ) 

1262 

1263 if result.success: 

1264 attempt.status = PostalVerificationStatus.awaiting_verification 

1265 attempt.postcard_sent_at = func.now() 

1266 

1267 notify( 

1268 session, 

1269 user_id=attempt.user_id, 

1270 topic_action=NotificationTopicAction.postal_verification__postcard_sent, 

1271 key="", 

1272 data=notification_data_pb2.PostalVerificationPostcardSent( 

1273 city=attempt.city, 

1274 country=attempt.country, 

1275 ), 

1276 ) 

1277 else: 

1278 # Could retry or fail - for now, fail 

1279 attempt.status = PostalVerificationStatus.failed 

1280 logger.error(f"Postcard send failed: {result.error_message}") 

1281 

1282 

1283class DatabaseInconsistencyError(Exception): 

1284 """Raised when database consistency checks fail""" 

1285 

1286 pass 

1287 

1288 

1289def check_database_consistency(payload: empty_pb2.Empty) -> None: 

1290 """ 

1291 Checks database consistency and raises an exception if any issues are found. 

1292 """ 

1293 logger.info("Checking database consistency") 

1294 errors = [] 

1295 

1296 with session_scope() as session: 

1297 # Check that all users have a profile gallery 

1298 users_without_gallery = session.execute( 

1299 select(User.id, User.username).where(User.profile_gallery_id.is_(None)) 

1300 ).all() 

1301 if users_without_gallery: 

1302 errors.append(f"Users without profile gallery: {users_without_gallery}") 

1303 

1304 # Check that all profile galleries point to their owner 

1305 mismatched_galleries = session.execute( 

1306 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id) 

1307 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id) 

1308 .where(User.profile_gallery_id.is_not(None)) 

1309 .where(PhotoGallery.owner_user_id != User.id) 

1310 ).all() 

1311 if mismatched_galleries: 1311 ↛ 1312line 1311 didn't jump to line 1312 because the condition on line 1311 was never true

1312 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}") 

1313 

1314 # === Moderation System Consistency Checks === 

1315 

1316 # Check all ModerationStates have a known object_type 

1317 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT] 

1318 unknown_type_states = session.execute( 

1319 select(ModerationState.id, ModerationState.object_type).where( 

1320 ModerationState.object_type.not_in(known_object_types) 

1321 ) 

1322 ).all() 

1323 if unknown_type_states: 1323 ↛ 1324line 1323 didn't jump to line 1324 because the condition on line 1323 was never true

1324 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}") 

1325 

1326 # Check every ModerationState has at least one INITIAL_REVIEW queue item 

1327 # Skip items with ID < 2000000 as they were created before this check was introduced 

1328 states_without_initial_review = session.execute( 

1329 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1330 ModerationState.id >= 2000000, 

1331 ~exists( 

1332 select(1) 

1333 .where(ModerationQueueItem.moderation_state_id == ModerationState.id) 

1334 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1335 ), 

1336 ) 

1337 ).all() 

1338 if states_without_initial_review: 1338 ↛ 1339line 1338 didn't jump to line 1339 because the condition on line 1338 was never true

1339 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}") 

1340 

1341 # Check every ModerationState has a CREATE log entry 

1342 # Skip items with ID < 2000000 as they were created before this check was introduced 

1343 states_without_create_log = session.execute( 

1344 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1345 ModerationState.id >= 2000000, 

1346 ~exists( 

1347 select(1) 

1348 .where(ModerationLog.moderation_state_id == ModerationState.id) 

1349 .where(ModerationLog.action == ModerationAction.CREATE) 

1350 ), 

1351 ) 

1352 ).all() 

1353 if states_without_create_log: 1353 ↛ 1354line 1353 didn't jump to line 1354 because the condition on line 1353 was never true

1354 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}") 

1355 

1356 # Check resolved queue items point to log entries for the same moderation state 

1357 resolved_item_log_mismatches = session.execute( 

1358 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id) 

1359 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id) 

1360 .where(ModerationQueueItem.resolved_by_log_id.is_not(None)) 

1361 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id) 

1362 ).all() 

1363 if resolved_item_log_mismatches: 1363 ↛ 1364line 1363 didn't jump to line 1364 because the condition on line 1363 was never true

1364 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}") 

1365 

1366 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it 

1367 hr_states = ( 

1368 session.execute( 

1369 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1370 ) 

1371 .scalars() 

1372 .all() 

1373 ) 

1374 for state_id in hr_states: 1374 ↛ 1375line 1374 didn't jump to line 1375 because the loop on line 1374 never started

1375 hr_count = session.execute( 

1376 select(func.count()).where(HostRequest.moderation_state_id == state_id) 

1377 ).scalar_one() 

1378 if hr_count != 1: 

1379 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)") 

1380 

1381 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it 

1382 gc_states = ( 

1383 session.execute( 

1384 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1385 ) 

1386 .scalars() 

1387 .all() 

1388 ) 

1389 for state_id in gc_states: 1389 ↛ 1390line 1389 didn't jump to line 1390 because the loop on line 1389 never started

1390 gc_count = session.execute( 

1391 select(func.count()).where(GroupChat.moderation_state_id == state_id) 

1392 ).scalar_one() 

1393 if gc_count != 1: 

1394 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)") 

1395 

1396 # Check ModerationState.object_id matches the actual object's ID 

1397 hr_object_id_mismatches = session.execute( 

1398 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id) 

1399 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id) 

1400 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1401 .where(ModerationState.object_id != HostRequest.conversation_id) 

1402 ).all() 

1403 if hr_object_id_mismatches: 1403 ↛ 1404line 1403 didn't jump to line 1404 because the condition on line 1403 was never true

1404 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}") 

1405 

1406 gc_object_id_mismatches = session.execute( 

1407 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id) 

1408 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id) 

1409 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1410 .where(ModerationState.object_id != GroupChat.conversation_id) 

1411 ).all() 

1412 if gc_object_id_mismatches: 1412 ↛ 1413line 1412 didn't jump to line 1413 because the condition on line 1412 was never true

1413 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}") 

1414 

1415 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState 

1416 hr_reverse_mismatches = session.execute( 

1417 select( 

1418 HostRequest.conversation_id, 

1419 HostRequest.moderation_state_id, 

1420 ModerationState.object_type, 

1421 ModerationState.object_id, 

1422 ) 

1423 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id) 

1424 .where( 

1425 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST) 

1426 | (ModerationState.object_id != HostRequest.conversation_id) 

1427 ) 

1428 ).all() 

1429 if hr_reverse_mismatches: 1429 ↛ 1430line 1429 didn't jump to line 1430 because the condition on line 1429 was never true

1430 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}") 

1431 

1432 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState 

1433 gc_reverse_mismatches = session.execute( 

1434 select( 

1435 GroupChat.conversation_id, 

1436 GroupChat.moderation_state_id, 

1437 ModerationState.object_type, 

1438 ModerationState.object_id, 

1439 ) 

1440 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id) 

1441 .where( 

1442 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT) 

1443 | (ModerationState.object_id != GroupChat.conversation_id) 

1444 ) 

1445 ).all() 

1446 if gc_reverse_mismatches: 1446 ↛ 1447line 1446 didn't jump to line 1447 because the condition on line 1446 was never true

1447 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}") 

1448 

1449 # Ensure auto-approve deadline isn't being exceeded by a significant margin 

1450 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting 

1451 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1452 if deadline_seconds > 0: 1452 ↛ 1453line 1452 didn't jump to line 1453 because the condition on line 1452 was never true

1453 grace_period = timedelta(minutes=5) 

1454 stale_initial_review_items = session.execute( 

1455 select( 

1456 ModerationQueueItem.id, 

1457 ModerationQueueItem.moderation_state_id, 

1458 ModerationQueueItem.time_created, 

1459 ) 

1460 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1461 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

1462 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period) 

1463 ).all() 

1464 if stale_initial_review_items: 

1465 errors.append( 

1466 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}" 

1467 ) 

1468 

1469 if errors: 

1470 raise DatabaseInconsistencyError("\n".join(errors)) 

1471 

1472 

1473def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None: 

1474 """ 

1475 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline. 

1476 Items explicitly actioned by moderators are left alone. 

1477 """ 

1478 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1479 if deadline_seconds <= 0: 

1480 return 

1481 

1482 with session_scope() as session: 

1483 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"]) 

1484 

1485 items = ( 

1486 Moderation() 

1487 .GetModerationQueue( 

1488 request=moderation_pb2.GetModerationQueueReq( 

1489 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW], 

1490 unresolved_only=True, 

1491 page_size=100, 

1492 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)), 

1493 ), 

1494 context=ctx, 

1495 session=session, 

1496 ) 

1497 .queue_items 

1498 ) 

1499 

1500 if not items: 

1501 return 

1502 

1503 logger.info(f"Auto-approving {len(items)} moderation queue items") 

1504 for item in items: 

1505 Moderation().ModerateContent( 

1506 request=moderation_pb2.ModerateContentReq( 

1507 moderation_state_id=item.moderation_state_id, 

1508 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1509 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1510 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.", 

1511 ), 

1512 context=ctx, 

1513 session=session, 

1514 ) 

1515 moderation_auto_approved_counter.inc(len(items))