Coverage for app / backend / src / couchers / jobs / handlers.py: 89%

474 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from collections.abc import Sequence 

7from datetime import date, timedelta 

8from math import cos, pi, sin, sqrt 

9from random import sample 

10from typing import Any 

11 

12import requests 

13from google.protobuf import empty_pb2 

14from sqlalchemy import ColumnElement, Float, Function, Integer, select 

15from sqlalchemy.orm import aliased 

16from sqlalchemy.sql import ( 

17 and_, 

18 case, 

19 cast, 

20 delete, 

21 distinct, 

22 exists, 

23 extract, 

24 func, 

25 literal, 

26 not_, 

27 or_, 

28 union_all, 

29 update, 

30) 

31 

32from couchers.config import config 

33from couchers.constants import ( 

34 ACTIVENESS_PROBE_EXPIRY_TIME, 

35 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

36 ACTIVENESS_PROBE_TIME_REMINDERS, 

37 EVENT_REMINDER_TIMEDELTA, 

38 HOST_REQUEST_MAX_REMINDERS, 

39 HOST_REQUEST_REMINDER_INTERVAL, 

40) 

41from couchers.context import make_background_user_context 

42from couchers.crypto import ( 

43 USER_LOCATION_RANDOMIZATION_NAME, 

44 asym_encrypt, 

45 b64decode, 

46 get_secret, 

47 simple_decrypt, 

48 stable_secure_uniform, 

49) 

50from couchers.db import session_scope 

51from couchers.email.dev import print_dev_email 

52from couchers.email.smtp import send_smtp_email 

53from couchers.event_log import log_event 

54from couchers.helpers.badges import user_add_badge, user_remove_badge 

55from couchers.helpers.completed_profile import has_completed_profile_expression 

56from couchers.materialized_views import ( 

57 UserResponseRate, 

58) 

59from couchers.metrics import ( 

60 moderation_auto_approved_counter, 

61 postcards_sent_counter, 

62 push_notification_counter, 

63 strong_verification_completions_counter, 

64) 

65from couchers.models import ( 

66 AccountDeletionToken, 

67 ActivenessProbe, 

68 ActivenessProbeStatus, 

69 Cluster, 

70 ClusterRole, 

71 ClusterSubscription, 

72 EventOccurrence, 

73 EventOccurrenceAttendee, 

74 GroupChat, 

75 GroupChatSubscription, 

76 HostingStatus, 

77 HostRequest, 

78 HostRequestStatus, 

79 LoginToken, 

80 MeetupStatus, 

81 Message, 

82 MessageType, 

83 ModerationAction, 

84 ModerationLog, 

85 ModerationObjectType, 

86 ModerationQueueItem, 

87 ModerationState, 

88 ModerationTrigger, 

89 PassportSex, 

90 PasswordResetToken, 

91 PhotoGallery, 

92 PostalVerificationAttempt, 

93 PostalVerificationStatus, 

94 PushNotificationDeliveryAttempt, 

95 PushNotificationSubscription, 

96 Reference, 

97 StrongVerificationAttempt, 

98 StrongVerificationAttemptStatus, 

99 User, 

100 UserBadge, 

101 Volunteer, 

102) 

103from couchers.models.notifications import NotificationTopicAction 

104from couchers.notifications.expo_api import get_expo_push_receipts 

105from couchers.notifications.notify import notify 

106from couchers.postal.my_postcard import get_order_ids, send_postcard 

107from couchers.proto import moderation_pb2, notification_data_pb2 

108from couchers.proto.internal import internal_pb2, jobs_pb2 

109from couchers.resources import get_badge_dict, get_static_badge_dict 

110from couchers.sentry import report_message 

111from couchers.servicers.api import user_model_to_pb 

112from couchers.servicers.events import ( 

113 event_to_pb, 

114) 

115from couchers.servicers.moderation import Moderation 

116from couchers.servicers.requests import host_request_to_pb 

117from couchers.sql import ( 

118 users_visible_to_each_other, 

119 where_moderated_content_visible, 

120 where_moderated_content_visible_to_user_column, 

121 where_user_columns_visible_to_each_other, 

122 where_users_column_visible, 

123) 

124from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

125from couchers.tasks import send_duplicate_strong_verification_email 

126from couchers.utils import ( 

127 Timestamp_from_datetime, 

128 create_coordinate, 

129 get_coordinates, 

130 not_none, 

131 now, 

132) 

133 

134logger = logging.getLogger(__name__) 

135 

136 

137def send_email(payload: jobs_pb2.SendEmailPayload) -> None: 

138 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

139 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

140 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

141 # the sender must return a models.Email object that can be added to the database 

142 email = sender( 

143 sender_name=payload.sender_name, 

144 sender_email=payload.sender_email, 

145 recipient=payload.recipient, 

146 subject=payload.subject, 

147 plain=payload.plain, 

148 html=payload.html, 

149 list_unsubscribe_header=payload.list_unsubscribe_header, 

150 source_data=payload.source_data, 

151 ) 

152 with session_scope() as session: 

153 session.add(email) 

154 

155 

156def purge_login_tokens(payload: empty_pb2.Empty) -> None: 

157 logger.info("Purging login tokens") 

158 with session_scope() as session: 

159 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

160 

161 

162def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None: 

163 logger.info("Purging login tokens") 

164 with session_scope() as session: 

165 session.execute( 

166 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

167 ) 

168 

169 

170def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None: 

171 logger.info("Purging account deletion tokens") 

172 with session_scope() as session: 

173 session.execute( 

174 delete(AccountDeletionToken) 

175 .where(~AccountDeletionToken.is_valid) 

176 .execution_options(synchronize_session=False) 

177 ) 

178 

179 

180def send_message_notifications(payload: empty_pb2.Empty) -> None: 

181 """ 

182 Sends out email notifications for messages that have been unseen for a long enough time 

183 """ 

184 # very crude and dumb algorithm 

185 logger.info("Sending out email notifications for unseen messages") 

186 

187 with session_scope() as session: 

188 # users who have unnotified messages older than 5 minutes in any group chat 

189 users = ( 

190 session.execute( 

191 where_moderated_content_visible_to_user_column( 

192 select(User) 

193 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

194 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

195 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

196 GroupChat, 

197 User.id, 

198 ) 

199 .where(not_(GroupChatSubscription.is_muted)) 

200 .where(User.is_visible) 

201 .where(Message.time >= GroupChatSubscription.joined) 

202 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

203 .where(Message.id > User.last_notified_message_id) 

204 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

205 .where(Message.time < now() - timedelta(minutes=5)) 

206 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

207 ) 

208 .scalars() 

209 .unique() 

210 ) 

211 

212 for user in users: 

213 context = make_background_user_context(user_id=user.id) 

214 # now actually grab all the group chats, not just less than 5 min old 

215 subquery = ( 

216 where_users_column_visible( 

217 where_moderated_content_visible( 

218 select( 

219 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

220 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

221 func.max(Message.id).label("message_id"), 

222 func.count(Message.id).label("count_unseen"), 

223 ) 

224 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

225 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

226 context, 

227 GroupChat, 

228 is_list_operation=True, 

229 ) 

230 .where(GroupChatSubscription.user_id == user.id) 

231 .where(not_(GroupChatSubscription.is_muted)) 

232 .where(Message.id > user.last_notified_message_id) 

233 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

234 .where(Message.time >= GroupChatSubscription.joined) 

235 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

236 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)), 

237 context, 

238 Message.author_id, 

239 ) 

240 .group_by(GroupChatSubscription.group_chat_id) 

241 .order_by(func.max(Message.id).desc()) 

242 .subquery() 

243 ) 

244 

245 unseen_messages = session.execute( 

246 where_moderated_content_visible( 

247 select(GroupChat, Message, subquery.c.count_unseen) 

248 .join(subquery, subquery.c.message_id == Message.id) 

249 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id), 

250 context, 

251 GroupChat, 

252 is_list_operation=True, 

253 ).order_by(subquery.c.message_id.desc()) 

254 ).all() 

255 

256 if not unseen_messages: 

257 continue 

258 

259 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

260 

261 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str: 

262 if group_chat.is_dm: 

263 return f"You missed {count_unseen} message(s) from {message.author.name}" 

264 else: 

265 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

266 

267 notify( 

268 session, 

269 user_id=user.id, 

270 topic_action=NotificationTopicAction.chat__missed_messages, 

271 key="", 

272 data=notification_data_pb2.ChatMissedMessages( 

273 messages=[ 

274 notification_data_pb2.ChatMessage( 

275 author=user_model_to_pb( 

276 message.author, 

277 session, 

278 context, 

279 ), 

280 message=format_title(message, group_chat, count_unseen), 

281 text=message.text, 

282 group_chat_id=message.conversation_id, 

283 ) 

284 for group_chat, message, count_unseen in unseen_messages 

285 ], 

286 ), 

287 ) 

288 session.commit() 

289 

290 

291def send_request_notifications(payload: empty_pb2.Empty) -> None: 

292 """ 

293 Sends out email notifications for unseen messages in host requests (as surfer or host) 

294 """ 

295 logger.info("Sending out email notifications for unseen messages in host requests") 

296 

297 with session_scope() as session: 

298 # Get all candidate users who might have unseen request messages. 

299 # Drive from host_requests/messages (selective) rather than scanning all users (expensive). 

300 surfer_ids = ( 

301 select(User.id) 

302 .join(HostRequest, HostRequest.initiator_user_id == User.id) 

303 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

304 .where(User.is_visible) 

305 .where(Message.id > HostRequest.initiator_last_seen_message_id) 

306 .where(Message.id > User.last_notified_request_message_id) 

307 .where(Message.time < now() - timedelta(minutes=5)) 

308 .where(Message.message_type == MessageType.text) 

309 ) 

310 host_ids = ( 

311 select(User.id) 

312 .join(HostRequest, HostRequest.recipient_user_id == User.id) 

313 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

314 .where(User.is_visible) 

315 .where(Message.id > HostRequest.recipient_last_seen_message_id) 

316 .where(Message.id > User.last_notified_request_message_id) 

317 .where(Message.time < now() - timedelta(minutes=5)) 

318 .where(Message.message_type == MessageType.text) 

319 ) 

320 candidate_user_ids = session.execute(union_all(surfer_ids, host_ids)).scalars().unique().all() 

321 

322 for user_id in candidate_user_ids: 

323 context = make_background_user_context(user_id=user_id) 

324 

325 # requests where this user is surfing 

326 surfing_reqs = session.execute( 

327 where_users_column_visible( 

328 where_moderated_content_visible_to_user_column( 

329 select(User, HostRequest, func.max(Message.id)) 

330 .where(User.id == user_id) 

331 .join(HostRequest, HostRequest.initiator_user_id == User.id), 

332 HostRequest, 

333 HostRequest.initiator_user_id, 

334 ), 

335 context, 

336 HostRequest.recipient_user_id, 

337 ) 

338 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

339 .where(Message.id > HostRequest.initiator_last_seen_message_id) 

340 .where(Message.id > User.last_notified_request_message_id) 

341 .where(Message.time < now() - timedelta(minutes=5)) 

342 .where(Message.message_type == MessageType.text) 

343 .group_by(User, HostRequest) # type: ignore[arg-type] 

344 ).all() 

345 

346 # where this user is hosting 

347 hosting_reqs = session.execute( 

348 where_users_column_visible( 

349 where_moderated_content_visible_to_user_column( 

350 select(User, HostRequest, func.max(Message.id)) 

351 .where(User.id == user_id) 

352 .join(HostRequest, HostRequest.recipient_user_id == User.id), 

353 HostRequest, 

354 HostRequest.recipient_user_id, 

355 ), 

356 context, 

357 HostRequest.initiator_user_id, 

358 ) 

359 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

360 .where(Message.id > HostRequest.recipient_last_seen_message_id) 

361 .where(Message.id > User.last_notified_request_message_id) 

362 .where(Message.time < now() - timedelta(minutes=5)) 

363 .where(Message.message_type == MessageType.text) 

364 .group_by(User, HostRequest) # type: ignore[arg-type] 

365 ).all() 

366 

367 for user, host_request, max_message_id in surfing_reqs: 

368 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

369 session.flush() 

370 

371 notify( 

372 session, 

373 user_id=user.id, 

374 topic_action=NotificationTopicAction.host_request__missed_messages, 

375 key=str(host_request.conversation_id), 

376 data=notification_data_pb2.HostRequestMissedMessages( 

377 host_request=host_request_to_pb(host_request, session, context), 

378 user=user_model_to_pb(host_request.recipient, session, context), 

379 am_host=False, 

380 ), 

381 ) 

382 

383 for user, host_request, max_message_id in hosting_reqs: 

384 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

385 session.flush() 

386 

387 # When a host request is created, the recipient immediately receives a 

388 # host_request__create notification that includes the initial message text. 

389 # A few minutes later, this background job sees that same message as "unseen" 

390 # (the recipient hasn't opened the request yet) and would send a duplicate 

391 # missed_messages notification. 

392 # 

393 # To prevent this, we check if the only unseen text message in this host 

394 # request is the very first text message in the conversation (i.e. the 

395 # creation message). If so, we skip sending missed_messages — the user was 

396 # already notified via host_request__create. 

397 # 

398 # Advancing last_notified_request_message_id above is safe even when we skip 

399 # the notification: this watermark is only ever advanced when we process all 

400 # unseen messages for the user, so skipping one notification doesn't cause us 

401 # to miss future messages in other host requests. 

402 only_creation_message = not session.execute( 

403 select( 

404 select(func.count()) 

405 .where(Message.conversation_id == host_request.conversation_id) 

406 .where(Message.message_type == MessageType.text) 

407 .scalar_subquery() 

408 > 1 

409 ) 

410 ).scalar_one() 

411 if only_creation_message: 

412 continue 

413 

414 notify( 

415 session, 

416 user_id=user.id, 

417 topic_action=NotificationTopicAction.host_request__missed_messages, 

418 key=str(host_request.conversation_id), 

419 data=notification_data_pb2.HostRequestMissedMessages( 

420 host_request=host_request_to_pb(host_request, session, context), 

421 user=user_model_to_pb(host_request.initiator, session, context), 

422 am_host=True, 

423 ), 

424 ) 

425 

426 

427def send_onboarding_emails(payload: empty_pb2.Empty) -> None: 

428 """ 

429 Sends out onboarding emails 

430 """ 

431 logger.info("Sending out onboarding emails") 

432 

433 with session_scope() as session: 

434 # first onboarding email 

435 users = ( 

436 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

437 ) 

438 

439 for user in users: 

440 notify( 

441 session, 

442 user_id=user.id, 

443 topic_action=NotificationTopicAction.onboarding__reminder, 

444 key="1", 

445 ) 

446 user.onboarding_emails_sent = 1 

447 user.last_onboarding_email_sent = now() 

448 session.commit() 

449 

450 # second onboarding email 

451 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

452 users = ( 

453 session.execute( 

454 select(User) 

455 .where(User.is_visible) 

456 .where(User.onboarding_emails_sent == 1) 

457 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

458 .where(~has_completed_profile_expression()) 

459 ) 

460 .scalars() 

461 .all() 

462 ) 

463 

464 for user in users: 

465 notify( 

466 session, 

467 user_id=user.id, 

468 topic_action=NotificationTopicAction.onboarding__reminder, 

469 key="2", 

470 ) 

471 user.onboarding_emails_sent = 2 

472 user.last_onboarding_email_sent = now() 

473 session.commit() 

474 

475 

476def send_reference_reminders(payload: empty_pb2.Empty) -> None: 

477 """ 

478 Sends out reminders to write references after hosting/staying 

479 """ 

480 logger.info("Sending out reference reminder emails") 

481 

482 # Keep this in chronological order! 

483 reference_reminder_schedule = [ 

484 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

485 # the end time to write a reference is supposed to be midnight in the host's timezone 

486 # 8 pm ish on the last day of the stay 

487 (1, timedelta(days=15) - timedelta(hours=20), 14), 

488 # 2 pm ish a week after stay 

489 (2, timedelta(days=8) - timedelta(hours=14), 7), 

490 # 10 am ish 3 days before end of time to write ref 

491 (3, timedelta(days=4) - timedelta(hours=10), 3), 

492 ] 

493 

494 with session_scope() as session: 

495 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

496 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

497 user = aliased(User) 

498 other_user = aliased(User) 

499 # surfers needing to write a ref 

500 q1 = ( 

501 select(literal(True), HostRequest, user, other_user) 

502 .join(user, user.id == HostRequest.initiator_user_id) 

503 .join(other_user, other_user.id == HostRequest.recipient_user_id) 

504 .outerjoin( 

505 Reference, 

506 and_( 

507 Reference.host_request_id == HostRequest.conversation_id, 

508 # if no reference is found in this join, then the surfer has not written a ref 

509 Reference.from_user_id == HostRequest.initiator_user_id, 

510 ), 

511 ) 

512 .where(Reference.id == None) 

513 .where(HostRequest.can_write_reference) 

514 .where(HostRequest.initiator_sent_reference_reminders < reminder_number) 

515 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

516 .where(HostRequest.initiator_reason_didnt_meetup == None) 

517 .where(users_visible_to_each_other(user, other_user)) 

518 ) 

519 

520 # hosts needing to write a ref 

521 q2 = ( 

522 select(literal(False), HostRequest, user, other_user) 

523 .join(user, user.id == HostRequest.recipient_user_id) 

524 .join(other_user, other_user.id == HostRequest.initiator_user_id) 

525 .outerjoin( 

526 Reference, 

527 and_( 

528 Reference.host_request_id == HostRequest.conversation_id, 

529 # if no reference is found in this join, then the host has not written a ref 

530 Reference.from_user_id == HostRequest.recipient_user_id, 

531 ), 

532 ) 

533 .where(Reference.id == None) 

534 .where(HostRequest.can_write_reference) 

535 .where(HostRequest.recipient_sent_reference_reminders < reminder_number) 

536 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

537 .where(HostRequest.recipient_reason_didnt_meetup == None) 

538 .where(users_visible_to_each_other(user, other_user)) 

539 ) 

540 

541 union = union_all(q1, q2).subquery() 

542 query = select( 

543 union.c[0].label("surfed"), 

544 aliased(HostRequest, union), 

545 aliased(user, union), 

546 aliased(other_user, union), 

547 ) 

548 reference_reminders = session.execute(query).all() 

549 

550 for surfed, host_request, user, other_user in reference_reminders: 

551 # visibility and blocking already checked in sql 

552 assert user.is_visible 

553 context = make_background_user_context(user_id=user.id) 

554 topic_action = ( 

555 NotificationTopicAction.reference__reminder_surfed 

556 if surfed 

557 else NotificationTopicAction.reference__reminder_hosted 

558 ) 

559 notify( 

560 session, 

561 user_id=user.id, 

562 topic_action=topic_action, 

563 key=str(host_request.conversation_id), 

564 data=notification_data_pb2.ReferenceReminder( 

565 host_request_id=host_request.conversation_id, 

566 other_user=user_model_to_pb(other_user, session, context), 

567 days_left=reminder_days_left, 

568 ), 

569 ) 

570 if surfed: 

571 host_request.initiator_sent_reference_reminders = reminder_number 

572 else: 

573 host_request.recipient_sent_reference_reminders = reminder_number 

574 session.commit() 

575 

576 

577def send_host_request_reminders(payload: empty_pb2.Empty) -> None: 

578 with session_scope() as session: 

579 host_has_sent_message = select(1).where( 

580 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.recipient_user_id 

581 ) 

582 

583 requests = ( 

584 session.execute( 

585 where_user_columns_visible_to_each_other( 

586 where_moderated_content_visible_to_user_column( 

587 select(HostRequest), 

588 HostRequest, 

589 HostRequest.recipient_user_id, 

590 ) 

591 .where(HostRequest.status == HostRequestStatus.pending) 

592 .where(HostRequest.recipient_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

593 .where(HostRequest.start_time > func.now()) 

594 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

595 .where(~exists(host_has_sent_message)), 

596 HostRequest.recipient_user_id, 

597 HostRequest.initiator_user_id, 

598 ) 

599 ) 

600 .scalars() 

601 .all() 

602 ) 

603 

604 for host_request in requests: 

605 host_request.recipient_sent_request_reminders += 1 

606 host_request.last_sent_request_reminder_time = now() 

607 

608 context = make_background_user_context(user_id=host_request.recipient_user_id) 

609 notify( 

610 session, 

611 user_id=host_request.recipient_user_id, 

612 topic_action=NotificationTopicAction.host_request__reminder, 

613 key=str(host_request.conversation_id), 

614 data=notification_data_pb2.HostRequestReminder( 

615 host_request=host_request_to_pb(host_request, session, context), 

616 surfer=user_model_to_pb(host_request.initiator, session, context), 

617 ), 

618 moderation_state_id=host_request.moderation_state_id, 

619 ) 

620 

621 session.commit() 

622 

623 

624def add_users_to_email_list(payload: empty_pb2.Empty) -> None: 

625 if not config["LISTMONK_ENABLED"]: 625 ↛ 626line 625 didn't jump to line 626 because the condition on line 625 was never true

626 logger.info("Not adding users to mailing list") 

627 return 

628 

629 logger.info("Adding users to mailing list") 

630 

631 while True: 

632 with session_scope() as session: 

633 user = session.execute( 

634 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

635 ).scalar_one_or_none() 

636 if not user: 

637 logger.info("Finished adding users to mailing list") 

638 return 

639 

640 if user.opt_out_of_newsletter: 

641 user.in_sync_with_newsletter = True 

642 session.commit() 

643 continue 

644 

645 r = requests.post( 

646 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

647 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

648 json={ 

649 "email": user.email, 

650 "name": user.name, 

651 "lists": [config["LISTMONK_LIST_ID"]], 

652 "preconfirm_subscriptions": True, 

653 "attribs": {"couchers_user_id": user.id}, 

654 "status": "enabled", 

655 }, 

656 timeout=10, 

657 ) 

658 # the API returns if the user is already subscribed 

659 if r.status_code == 200 or r.status_code == 409: 659 ↛ 663line 659 didn't jump to line 663 because the condition on line 659 was always true

660 user.in_sync_with_newsletter = True 

661 session.commit() 

662 else: 

663 raise Exception("Failed to add users to mailing list") 

664 

665 

666def enforce_community_membership(payload: empty_pb2.Empty) -> None: 

667 tasks_enforce_community_memberships() 

668 

669 

670def update_recommendation_scores(payload: empty_pb2.Empty) -> None: 

671 text_fields = [ 

672 User.hometown, 

673 User.occupation, 

674 User.education, 

675 User.about_me, 

676 User.things_i_like, 

677 User.about_place, 

678 User.additional_information, 

679 User.pet_details, 

680 User.kid_details, 

681 User.housemate_details, 

682 User.other_host_info, 

683 User.sleeping_details, 

684 User.area, 

685 User.house_rules, 

686 ] 

687 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

688 

689 def poor_man_gaussian() -> ColumnElement[float] | float: 

690 """ 

691 Produces an approximatley std normal random variate 

692 """ 

693 trials = 5 

694 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

695 

696 def int_(stmt: Any) -> Function[int]: 

697 return func.coalesce(cast(stmt, Integer), 0) 

698 

699 def float_(stmt: Any) -> Function[float]: 

700 return func.coalesce(cast(stmt, Float), 0.0) 

701 

702 with session_scope() as session: 

703 # profile 

704 profile_text = "" 

705 for field in text_fields: 

706 profile_text += func.coalesce(field, "") # type: ignore[assignment] 

707 text_length = func.length(profile_text) 

708 home_text = "" 

709 for field in home_fields: 

710 home_text += func.coalesce(field, "") # type: ignore[assignment] 

711 home_length = func.length(home_text) 

712 

713 filled_profile = int_(has_completed_profile_expression()) 

714 has_text = int_(text_length > 500) 

715 long_text = int_(text_length > 2000) 

716 can_host = int_(User.hosting_status == HostingStatus.can_host) 

717 may_host = int_(User.hosting_status == HostingStatus.maybe) 

718 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

719 filled_home = int_(User.has_completed_my_home) 

720 filled_home_lots = int_(home_length > 200) 

721 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

722 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

723 

724 # references 

725 left_ref_expr = int_(1).label("left_reference") 

726 left_refs_subquery = ( 

727 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

728 ) 

729 left_reference = int_(left_refs_subquery.c.left_reference) 

730 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

731 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

732 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

733 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

734 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

735 "has_bad_ref" 

736 ) 

737 received_ref_subquery = ( 

738 select( 

739 Reference.to_user_id.label("user_id"), 

740 has_reference_expr, 

741 has_multiple_types_expr, 

742 has_bad_ref_expr, 

743 ref_count_expr, 

744 ref_avg_expr, 

745 ) 

746 .group_by(Reference.to_user_id) 

747 .subquery() 

748 ) 

749 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

750 has_reference = int_(received_ref_subquery.c.has_reference) 

751 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

752 rating_score = float_( 

753 received_ref_subquery.c.ref_avg 

754 * ( 

755 2 * func.least(received_ref_subquery.c.ref_count, 5) 

756 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

757 ) 

758 ) 

759 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

760 

761 # activeness 

762 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

763 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

764 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

765 messaged_lots = int_(func.count(Message.id) > 5) 

766 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

767 messaging_subquery = ( 

768 select(Message.author_id.label("user_id"), messaging_points_subquery) 

769 .where(Message.message_type == MessageType.text) 

770 .group_by(Message.author_id) 

771 .subquery() 

772 ) 

773 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

774 

775 # verification 

776 cb_subquery = ( 

777 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

778 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

779 .where(ClusterSubscription.role == ClusterRole.admin) 

780 .where(Cluster.is_official_cluster) 

781 .group_by(ClusterSubscription.user_id) 

782 .subquery() 

783 ) 

784 min_node_id = cb_subquery.c.min_node_id 

785 cb = int_(min_node_id >= 1) 

786 wcb = int_(min_node_id == 1) 

787 badge_points = { 

788 "founder": 100, 

789 "board_member": 20, 

790 "past_board_member": 5, 

791 "strong_verification": 3, 

792 "volunteer": 3, 

793 "past_volunteer": 2, 

794 "donor": 1, 

795 "phone_verified": 1, 

796 } 

797 

798 badge_subquery = ( 

799 select( 

800 UserBadge.user_id.label("user_id"), 

801 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

802 ) 

803 .group_by(UserBadge.user_id) 

804 .subquery() 

805 ) 

806 

807 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

808 

809 # response rate 

810 hr_subquery = select( 

811 UserResponseRate.user_id, 

812 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

813 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

814 ).subquery() 

815 response_time_33p = hr_subquery.c.response_time_33p 

816 response_time_66p = hr_subquery.c.response_time_66p 

817 # be careful with nulls 

818 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

819 

820 recommendation_score = ( 

821 hosting_status_points 

822 + profile_points 

823 + ref_score 

824 + activeness_points 

825 + other_points 

826 + response_rate_points 

827 + 2 * poor_man_gaussian() 

828 ) 

829 

830 scores = ( 

831 select(User.id.label("user_id"), recommendation_score.label("score")) 

832 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

833 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

834 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

835 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

836 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

837 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

838 ).subquery() 

839 

840 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)) 

841 

842 logger.info("Updated recommendation scores") 

843 

844 

845def update_badges(payload: empty_pb2.Empty) -> None: 

846 with session_scope() as session: 

847 

848 def update_badge(badge_id: str, members: Sequence[int]) -> None: 

849 badge = get_badge_dict()[badge_id] 

850 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all() 

851 # in case the user ids don't exist in the db 

852 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

853 # we should add the badge to these 

854 add = set(actual_members) - set(user_ids) 

855 # we should remove the badge from these 

856 remove = set(user_ids) - set(actual_members) 

857 for user_id in add: 

858 user_add_badge(session, user_id, badge.id) 

859 

860 for user_id in remove: 

861 user_remove_badge(session, user_id, badge.id) 

862 

863 update_badge("founder", get_static_badge_dict()["founder"]) 

864 update_badge("board_member", get_static_badge_dict()["board_member"]) 

865 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

866 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all()) 

867 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

868 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

869 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

870 update_badge( 

871 "strong_verification", 

872 session.execute( 

873 select(User.id) 

874 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

875 .where(StrongVerificationAttempt.has_strong_verification(User)) 

876 ) 

877 .scalars() 

878 .all(), 

879 ) 

880 # volunteer badge for active volunteers (stopped_volunteering is null) 

881 update_badge( 

882 "volunteer", 

883 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(), 

884 ) 

885 # past_volunteer badge for past volunteers (stopped_volunteering is not null) 

886 update_badge( 

887 "past_volunteer", 

888 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None))) 

889 .scalars() 

890 .all(), 

891 ) 

892 

893 

894def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None: 

895 with session_scope() as session: 

896 verification_attempt = session.execute( 

897 select(StrongVerificationAttempt) 

898 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

899 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

900 ).scalar_one() 

901 response = requests.post( 

902 "https://passportreader.app/api/v1/session.get", 

903 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

904 json={"id": verification_attempt.iris_session_id}, 

905 timeout=10, 

906 verify="/etc/ssl/certs/ca-certificates.crt", 

907 ) 

908 if response.status_code != 200: 908 ↛ 909line 908 didn't jump to line 909 because the condition on line 908 was never true

909 raise Exception(f"Iris didn't return 200: {response.text}") 

910 json_data = response.json() 

911 reference_payload = internal_pb2.VerificationReferencePayload.FromString( 

912 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

913 ) 

914 assert verification_attempt.user_id == reference_payload.user_id 

915 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

916 assert verification_attempt.iris_session_id == json_data["id"] 

917 assert json_data["state"] == "APPROVED" 

918 

919 if json_data["document_type"] != "PASSPORT": 

920 verification_attempt.status = StrongVerificationAttemptStatus.failed 

921 notify( 

922 session, 

923 user_id=verification_attempt.user_id, 

924 topic_action=NotificationTopicAction.verification__sv_fail, 

925 key="", 

926 data=notification_data_pb2.VerificationSVFail( 

927 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

928 ), 

929 ) 

930 return 

931 

932 assert json_data["document_type"] == "PASSPORT" 

933 

934 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

935 nationality = json_data["nationality"] 

936 last_three_document_chars = json_data["document_number"][-3:] 

937 

938 existing_attempt = session.execute( 

939 select(StrongVerificationAttempt) 

940 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

941 .where(StrongVerificationAttempt.passport_nationality == nationality) 

942 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

943 .order_by(StrongVerificationAttempt.id) 

944 .limit(1) 

945 ).scalar_one_or_none() 

946 

947 verification_attempt.has_minimal_data = True 

948 verification_attempt.passport_expiry_date = expiry_date 

949 verification_attempt.passport_nationality = nationality 

950 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

951 

952 if existing_attempt: 

953 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

954 

955 if existing_attempt.user_id != verification_attempt.user_id: 

956 session.flush() 

957 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

958 

959 notify( 

960 session, 

961 user_id=verification_attempt.user_id, 

962 topic_action=NotificationTopicAction.verification__sv_fail, 

963 key="", 

964 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

965 ) 

966 return 

967 

968 verification_attempt.has_full_data = True 

969 verification_attempt.passport_encrypted_data = asym_encrypt( 

970 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

971 ) 

972 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

973 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

974 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

975 

976 session.flush() 

977 

978 strong_verification_completions_counter.inc() 

979 

980 user = verification_attempt.user 

981 if verification_attempt.has_strong_verification(user): 981 ↛ 996line 981 didn't jump to line 996 because the condition on line 981 was always true

982 badge_id = "strong_verification" 

983 if session.execute( 

984 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

985 ).scalar_one_or_none(): 

986 return 

987 

988 user_add_badge(session, user.id, badge_id, do_notify=False) 

989 notify( 

990 session, 

991 user_id=verification_attempt.user_id, 

992 topic_action=NotificationTopicAction.verification__sv_success, 

993 key="", 

994 ) 

995 else: 

996 notify( 

997 session, 

998 user_id=verification_attempt.user_id, 

999 topic_action=NotificationTopicAction.verification__sv_fail, 

1000 key="", 

1001 data=notification_data_pb2.VerificationSVFail( 

1002 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

1003 ), 

1004 ) 

1005 

1006 

1007def send_activeness_probes(payload: empty_pb2.Empty) -> None: 

1008 with session_scope() as session: 

1009 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

1010 

1011 if config["ACTIVENESS_PROBES_ENABLED"]: 

1012 # current activeness probes 

1013 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

1014 

1015 # users who we should send an activeness probe to 

1016 new_probe_user_ids = ( 

1017 session.execute( 

1018 select(User.id) 

1019 .where(User.is_visible) 

1020 .where(User.hosting_status == HostingStatus.can_host) 

1021 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

1022 .where(User.id.not_in(select(subquery.c.user_id))) 

1023 ) 

1024 .scalars() 

1025 .all() 

1026 ) 

1027 

1028 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

1029 probes_today = session.execute( 

1030 select(func.count()) 

1031 .select_from(ActivenessProbe) 

1032 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

1033 ).scalar_one() 

1034 

1035 # send probes to max 2% of users per day 

1036 max_probes_per_day = 0.02 * total_users 

1037 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

1038 

1039 if len(new_probe_user_ids) > max_probe_size: 1039 ↛ 1040line 1039 didn't jump to line 1040 because the condition on line 1039 was never true

1040 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

1041 

1042 for user_id in new_probe_user_ids: 

1043 session.add(ActivenessProbe(user_id=user_id)) 

1044 

1045 session.commit() 

1046 

1047 ## Step 2: actually send out probe notifications 

1048 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

1049 probes = ( 

1050 session.execute( 

1051 select(ActivenessProbe) 

1052 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

1053 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1054 .where(ActivenessProbe.is_pending) 

1055 ) 

1056 .scalars() 

1057 .all() 

1058 ) 

1059 

1060 for probe in probes: 

1061 probe.notifications_sent = probe_number_minus_1 + 1 

1062 context = make_background_user_context(user_id=probe.user.id) 

1063 notify( 

1064 session, 

1065 user_id=probe.user.id, 

1066 topic_action=NotificationTopicAction.activeness__probe, 

1067 key=str(probe.id), 

1068 data=notification_data_pb2.ActivenessProbe( 

1069 reminder_number=probe_number_minus_1 + 1, 

1070 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1071 ), 

1072 ) 

1073 session.commit() 

1074 

1075 ## Step 3: for those who haven't responded, mark them as failed 

1076 expired_probes = ( 

1077 session.execute( 

1078 select(ActivenessProbe) 

1079 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1080 .where(ActivenessProbe.is_pending) 

1081 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1082 ) 

1083 .scalars() 

1084 .all() 

1085 ) 

1086 

1087 for probe in expired_probes: 

1088 probe.responded = now() 

1089 probe.response = ActivenessProbeStatus.expired 

1090 if probe.user.hosting_status == HostingStatus.can_host: 1090 ↛ 1092line 1090 didn't jump to line 1092 because the condition on line 1090 was always true

1091 probe.user.hosting_status = HostingStatus.maybe 

1092 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1092 ↛ 1094line 1092 didn't jump to line 1094 because the condition on line 1092 was always true

1093 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1094 session.commit() 

1095 

1096 

1097def update_randomized_locations(payload: empty_pb2.Empty) -> None: 

1098 """ 

1099 We generate for each user a randomized location as follows: 

1100 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1101 - For each user, mix in the user_id for randomness 

1102 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1103 - Generate an angle from [0, 360] 

1104 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1105 """ 

1106 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1107 

1108 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]: 

1109 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1110 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1111 radius = 0.02 + 0.08 * radius_u 

1112 angle_rad = 2 * pi * angle_u 

1113 offset_lng = radius * cos(angle_rad) 

1114 offset_lat = radius * sin(angle_rad) 

1115 return lat + offset_lat, lng + offset_lng 

1116 

1117 user_updates: list[dict[str, Any]] = [] 

1118 

1119 with session_scope() as session: 

1120 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1121 

1122 for user_id, geom in users_to_update: 

1123 lat, lng = get_coordinates(geom) 

1124 user_updates.append( 

1125 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1126 ) 

1127 

1128 with session_scope() as session: 

1129 session.execute(update(User), user_updates) 

1130 

1131 

1132def send_event_reminders(payload: empty_pb2.Empty) -> None: 

1133 """ 

1134 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1135 """ 

1136 logger.info("Sending event reminder emails") 

1137 

1138 with session_scope() as session: 

1139 occurrences = ( 

1140 session.execute( 

1141 select(EventOccurrence) 

1142 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1143 .where(EventOccurrence.start_time >= now()) 

1144 .where(~EventOccurrence.is_cancelled) 

1145 .where(~EventOccurrence.is_deleted) 

1146 ) 

1147 .scalars() 

1148 .all() 

1149 ) 

1150 

1151 for occurrence in occurrences: 

1152 results = session.execute( 

1153 select(User, EventOccurrenceAttendee) 

1154 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1155 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1156 .where(EventOccurrenceAttendee.reminder_sent == False) 

1157 ).all() 

1158 

1159 for user, attendee in results: 

1160 context = make_background_user_context(user_id=user.id) 

1161 

1162 notify( 

1163 session, 

1164 user_id=user.id, 

1165 topic_action=NotificationTopicAction.event__reminder, 

1166 key=str(occurrence.id), 

1167 data=notification_data_pb2.EventReminder( 

1168 event=event_to_pb(session, occurrence, context), 

1169 user=user_model_to_pb(user, session, context), 

1170 ), 

1171 moderation_state_id=occurrence.moderation_state_id, 

1172 ) 

1173 

1174 attendee.reminder_sent = True 

1175 session.commit() 

1176 

1177 

1178def check_expo_push_receipts(payload: empty_pb2.Empty) -> None: 

1179 """ 

1180 Check Expo push receipts in batch and update delivery attempts. 

1181 """ 

1182 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max 

1183 

1184 for iteration in range(MAX_ITERATIONS): 1184 ↛ 1246line 1184 didn't jump to line 1246 because the loop on line 1184 didn't complete

1185 with session_scope() as session: 

1186 # Find all delivery attempts that need receipt checking 

1187 # Wait 15 minutes per Expo's recommendation before checking receipts 

1188 attempts = ( 

1189 session.execute( 

1190 select(PushNotificationDeliveryAttempt) 

1191 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None) 

1192 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None) 

1193 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15)) 

1194 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24)) 

1195 .limit(100) 

1196 ) 

1197 .scalars() 

1198 .all() 

1199 ) 

1200 

1201 if not attempts: 

1202 logger.debug("No Expo receipts to check") 

1203 return 

1204 

1205 logger.info(f"Checking {len(attempts)} Expo push receipts") 

1206 

1207 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts]) 

1208 

1209 for attempt in attempts: 

1210 receipt = receipts.get(not_none(attempt.expo_ticket_id)) 

1211 

1212 # Always mark as checked to avoid infinite loops 

1213 attempt.receipt_checked_at = now() 

1214 

1215 if receipt is None: 

1216 # Receipt not found after 15min - likely expired (>24h) or never existed 

1217 # Per Expo docs: receipts should be available within 15 minutes 

1218 attempt.receipt_status = "not_found" 

1219 continue 

1220 

1221 attempt.receipt_status = receipt.get("status") 

1222 

1223 if receipt.get("status") == "error": 

1224 details = receipt.get("details", {}) 

1225 error_code = details.get("error") 

1226 attempt.receipt_error_code = error_code 

1227 

1228 if error_code == "DeviceNotRegistered": 1228 ↛ 1243line 1228 didn't jump to line 1243 because the condition on line 1228 was always true

1229 # Device token is no longer valid - disable the subscription 

1230 sub = session.execute( 

1231 select(PushNotificationSubscription).where( 

1232 PushNotificationSubscription.id == attempt.push_notification_subscription_id 

1233 ) 

1234 ).scalar_one() 

1235 

1236 if sub.disabled_at > now(): 1236 ↛ 1209line 1236 didn't jump to line 1209 because the condition on line 1236 was always true

1237 sub.disabled_at = now() 

1238 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt") 

1239 push_notification_counter.labels( 

1240 platform="expo", outcome="permanent_subscription_failure_receipt" 

1241 ).inc() 

1242 else: 

1243 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}") 

1244 

1245 # If we get here, we've exhausted MAX_ITERATIONS without finishing 

1246 raise RuntimeError( 

1247 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - " 

1248 "there may be an unusually large backlog of receipts to check" 

1249 ) 

1250 

1251 

1252def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None: 

1253 """ 

1254 Sends the postcard via external API and updates attempt status. 

1255 """ 

1256 with session_scope() as session: 

1257 attempt = session.execute( 

1258 select(PostalVerificationAttempt).where( 

1259 PostalVerificationAttempt.id == payload.postal_verification_attempt_id 

1260 ) 

1261 ).scalar_one_or_none() 

1262 

1263 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1263 ↛ 1264line 1263 didn't jump to line 1264 because the condition on line 1263 was never true

1264 logger.warning( 

1265 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state" 

1266 ) 

1267 return 

1268 

1269 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one() 

1270 

1271 job_id = send_postcard( 

1272 recipient_name=user_name, 

1273 address_line_1=attempt.address_line_1, 

1274 address_line_2=attempt.address_line_2, 

1275 city=attempt.city, 

1276 state=attempt.state, 

1277 postal_code=attempt.postal_code, 

1278 country=attempt.country_code, 

1279 verification_code=not_none(attempt.verification_code), 

1280 ) 

1281 

1282 attempt.mypostcard_job_id = job_id 

1283 attempt.status = PostalVerificationStatus.awaiting_verification 

1284 attempt.postcard_sent_at = func.now() 

1285 

1286 postcards_sent_counter.labels(country_code=attempt.country_code).inc() 

1287 

1288 context = make_background_user_context(attempt.user_id) 

1289 log_event( 

1290 context, 

1291 session, 

1292 "postcard.sent", 

1293 { 

1294 "attempt_id": attempt.id, 

1295 "country": attempt.country_code, 

1296 "city": attempt.city, 

1297 "mypostcard_job_id": job_id, 

1298 }, 

1299 ) 

1300 

1301 notify( 

1302 session, 

1303 user_id=attempt.user_id, 

1304 topic_action=NotificationTopicAction.postal_verification__postcard_sent, 

1305 key="", 

1306 data=notification_data_pb2.PostalVerificationPostcardSent( 

1307 city=attempt.city, 

1308 country=attempt.country_code, 

1309 ), 

1310 ) 

1311 

1312 

1313def check_mypostcard_jobs(payload: empty_pb2.Empty) -> None: 

1314 """ 

1315 Checks that all MyPostcard jobs from the last week are tied to a postal verification attempt. 

1316 """ 

1317 if not config["ENABLE_POSTAL_VERIFICATION"]: 

1318 return 

1319 

1320 with session_scope() as session: 

1321 mypostcard_job_ids = set( 

1322 get_order_ids( 

1323 date_from=(now() - timedelta(days=7)).date(), 

1324 date_to=now().date(), 

1325 ) 

1326 ) 

1327 

1328 known_job_ids = set( 

1329 session.execute( 

1330 select(PostalVerificationAttempt.mypostcard_job_id).where( 

1331 PostalVerificationAttempt.mypostcard_job_id.isnot(None), 

1332 PostalVerificationAttempt.created >= now() - timedelta(days=14), 

1333 ) 

1334 ) 

1335 .scalars() 

1336 .all() 

1337 ) 

1338 

1339 orphaned = mypostcard_job_ids - known_job_ids 

1340 if orphaned: 

1341 report_message( 

1342 f"Found {len(orphaned)} orphaned MyPostcard jobs not tied to any verification attempt: {orphaned}" 

1343 ) 

1344 

1345 

1346class DatabaseInconsistencyError(Exception): 

1347 """Raised when database consistency checks fail""" 

1348 

1349 pass 

1350 

1351 

1352def check_database_consistency(payload: empty_pb2.Empty) -> None: 

1353 """ 

1354 Checks database consistency and raises an exception if any issues are found. 

1355 """ 

1356 logger.info("Checking database consistency") 

1357 errors = [] 

1358 

1359 with session_scope() as session: 

1360 # Check that all users have a profile gallery 

1361 users_without_gallery = session.execute( 

1362 select(User.id, User.username).where(User.profile_gallery_id.is_(None)) 

1363 ).all() 

1364 if users_without_gallery: 

1365 errors.append(f"Users without profile gallery: {users_without_gallery}") 

1366 

1367 # Check that all profile galleries point to their owner 

1368 mismatched_galleries = session.execute( 

1369 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id) 

1370 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id) 

1371 .where(User.profile_gallery_id.is_not(None)) 

1372 .where(PhotoGallery.owner_user_id != User.id) 

1373 ).all() 

1374 if mismatched_galleries: 1374 ↛ 1375line 1374 didn't jump to line 1375 because the condition on line 1374 was never true

1375 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}") 

1376 

1377 # === Moderation System Consistency Checks === 

1378 

1379 # Check every ModerationState has at least one INITIAL_REVIEW queue item 

1380 # Skip items with ID < 2000000 as they were created before this check was introduced 

1381 states_without_initial_review = session.execute( 

1382 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1383 ModerationState.id >= 2000000, 

1384 ~exists( 

1385 select(1) 

1386 .where(ModerationQueueItem.moderation_state_id == ModerationState.id) 

1387 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review) 

1388 ), 

1389 ) 

1390 ).all() 

1391 if states_without_initial_review: 1391 ↛ 1392line 1391 didn't jump to line 1392 because the condition on line 1391 was never true

1392 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}") 

1393 

1394 # Check every ModerationState has a CREATE log entry 

1395 # Skip items with ID < 2000000 as they were created before this check was introduced 

1396 states_without_create_log = session.execute( 

1397 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1398 ModerationState.id >= 2000000, 

1399 ~exists( 

1400 select(1) 

1401 .where(ModerationLog.moderation_state_id == ModerationState.id) 

1402 .where(ModerationLog.action == ModerationAction.create) 

1403 ), 

1404 ) 

1405 ).all() 

1406 if states_without_create_log: 1406 ↛ 1407line 1406 didn't jump to line 1407 because the condition on line 1406 was never true

1407 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}") 

1408 

1409 # Check resolved queue items point to log entries for the same moderation state 

1410 resolved_item_log_mismatches = session.execute( 

1411 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id) 

1412 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id) 

1413 .where(ModerationQueueItem.resolved_by_log_id.is_not(None)) 

1414 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id) 

1415 ).all() 

1416 if resolved_item_log_mismatches: 1416 ↛ 1417line 1416 didn't jump to line 1417 because the condition on line 1416 was never true

1417 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}") 

1418 

1419 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it 

1420 hr_states = ( 

1421 session.execute( 

1422 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.host_request) 

1423 ) 

1424 .scalars() 

1425 .all() 

1426 ) 

1427 for state_id in hr_states: 1427 ↛ 1428line 1427 didn't jump to line 1428 because the loop on line 1427 never started

1428 hr_count = session.execute( 

1429 select(func.count()).where(HostRequest.moderation_state_id == state_id) 

1430 ).scalar_one() 

1431 if hr_count != 1: 

1432 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)") 

1433 

1434 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it 

1435 gc_states = ( 

1436 session.execute( 

1437 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.group_chat) 

1438 ) 

1439 .scalars() 

1440 .all() 

1441 ) 

1442 for state_id in gc_states: 1442 ↛ 1443line 1442 didn't jump to line 1443 because the loop on line 1442 never started

1443 gc_count = session.execute( 

1444 select(func.count()).where(GroupChat.moderation_state_id == state_id) 

1445 ).scalar_one() 

1446 if gc_count != 1: 

1447 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)") 

1448 

1449 # Check ModerationState.object_id matches the actual object's ID 

1450 hr_object_id_mismatches = session.execute( 

1451 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id) 

1452 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id) 

1453 .where(ModerationState.object_type == ModerationObjectType.host_request) 

1454 .where(ModerationState.object_id != HostRequest.conversation_id) 

1455 ).all() 

1456 if hr_object_id_mismatches: 1456 ↛ 1457line 1456 didn't jump to line 1457 because the condition on line 1456 was never true

1457 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}") 

1458 

1459 gc_object_id_mismatches = session.execute( 

1460 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id) 

1461 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id) 

1462 .where(ModerationState.object_type == ModerationObjectType.group_chat) 

1463 .where(ModerationState.object_id != GroupChat.conversation_id) 

1464 ).all() 

1465 if gc_object_id_mismatches: 1465 ↛ 1466line 1465 didn't jump to line 1466 because the condition on line 1465 was never true

1466 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}") 

1467 

1468 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState 

1469 hr_reverse_mismatches = session.execute( 

1470 select( 

1471 HostRequest.conversation_id, 

1472 HostRequest.moderation_state_id, 

1473 ModerationState.object_type, 

1474 ModerationState.object_id, 

1475 ) 

1476 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id) 

1477 .where( 

1478 (ModerationState.object_type != ModerationObjectType.host_request) 

1479 | (ModerationState.object_id != HostRequest.conversation_id) 

1480 ) 

1481 ).all() 

1482 if hr_reverse_mismatches: 1482 ↛ 1483line 1482 didn't jump to line 1483 because the condition on line 1482 was never true

1483 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}") 

1484 

1485 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState 

1486 gc_reverse_mismatches = session.execute( 

1487 select( 

1488 GroupChat.conversation_id, 

1489 GroupChat.moderation_state_id, 

1490 ModerationState.object_type, 

1491 ModerationState.object_id, 

1492 ) 

1493 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id) 

1494 .where( 

1495 (ModerationState.object_type != ModerationObjectType.group_chat) 

1496 | (ModerationState.object_id != GroupChat.conversation_id) 

1497 ) 

1498 ).all() 

1499 if gc_reverse_mismatches: 1499 ↛ 1500line 1499 didn't jump to line 1500 because the condition on line 1499 was never true

1500 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}") 

1501 

1502 # Ensure auto-approve deadline isn't being exceeded by a significant margin 

1503 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting 

1504 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1505 if deadline_seconds > 0: 1505 ↛ 1506line 1505 didn't jump to line 1506 because the condition on line 1505 was never true

1506 grace_period = timedelta(minutes=5) 

1507 stale_initial_review_items = session.execute( 

1508 select( 

1509 ModerationQueueItem.id, 

1510 ModerationQueueItem.moderation_state_id, 

1511 ModerationQueueItem.time_created, 

1512 ) 

1513 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review) 

1514 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

1515 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period) 

1516 ).all() 

1517 if stale_initial_review_items: 

1518 errors.append( 

1519 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}" 

1520 ) 

1521 

1522 if errors: 

1523 raise DatabaseInconsistencyError("\n".join(errors)) 

1524 

1525 

1526def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None: 

1527 """ 

1528 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline. 

1529 Items explicitly actioned by moderators are left alone. 

1530 """ 

1531 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1532 if deadline_seconds <= 0: 

1533 return 

1534 

1535 with session_scope() as session: 

1536 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"]) 

1537 

1538 items = ( 

1539 Moderation() 

1540 .GetModerationQueue( 

1541 request=moderation_pb2.GetModerationQueueReq( 

1542 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW], 

1543 unresolved_only=True, 

1544 page_size=100, 

1545 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)), 

1546 ), 

1547 context=ctx, 

1548 session=session, 

1549 ) 

1550 .queue_items 

1551 ) 

1552 

1553 if not items: 

1554 return 

1555 

1556 logger.info(f"Auto-approving {len(items)} moderation queue items") 

1557 for item in items: 

1558 Moderation().ModerateContent( 

1559 request=moderation_pb2.ModerateContentReq( 

1560 moderation_state_id=item.moderation_state_id, 

1561 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1562 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1563 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.", 

1564 ), 

1565 context=ctx, 

1566 session=session, 

1567 ) 

1568 moderation_auto_approved_counter.inc(len(items))