Coverage for app/backend/src/couchers/jobs/handlers.py: 88%

492 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from collections.abc import Sequence 

7from datetime import date, timedelta 

8from math import cos, pi, sin, sqrt 

9from random import sample 

10from typing import Any 

11 

12import requests 

13from google.protobuf import empty_pb2 

14from sqlalchemy import ColumnElement, Float, Function, Integer, select 

15from sqlalchemy.orm import aliased 

16from sqlalchemy.sql import ( 

17 and_, 

18 case, 

19 cast, 

20 delete, 

21 distinct, 

22 exists, 

23 extract, 

24 func, 

25 literal, 

26 not_, 

27 or_, 

28 union_all, 

29 update, 

30) 

31 

32from couchers import experimentation 

33from couchers.config import config 

34from couchers.constants import ( 

35 ACTIVENESS_PROBE_EXPIRY_TIME, 

36 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

37 ACTIVENESS_PROBE_TIME_REMINDERS, 

38 EVENT_REMINDER_TIMEDELTA, 

39 HOST_REQUEST_MAX_REMINDERS, 

40 HOST_REQUEST_REMINDER_INTERVAL, 

41 MODERATION_AUTO_APPROVE_FLAG_PRIORITY, 

42) 

43from couchers.context import make_background_user_context, make_notification_user_context 

44from couchers.crypto import ( 

45 USER_LOCATION_RANDOMIZATION_NAME, 

46 asym_encrypt, 

47 b64decode, 

48 get_secret, 

49 simple_decrypt, 

50 stable_secure_uniform, 

51) 

52from couchers.db import session_scope 

53from couchers.email.dev import print_dev_email 

54from couchers.email.smtp import send_smtp_email 

55from couchers.event_log import log_event 

56from couchers.helpers.badges import user_add_badge, user_remove_badge 

57from couchers.helpers.completed_profile import has_completed_profile_expression 

58from couchers.materialized_views import ( 

59 UserResponseRate, 

60) 

61from couchers.metrics import ( 

62 moderation_auto_approved_counter, 

63 postcards_sent_counter, 

64 push_notification_counter, 

65 strong_verification_completions_counter, 

66) 

67from couchers.models import ( 

68 AccountDeletionToken, 

69 ActivenessProbe, 

70 ActivenessProbeStatus, 

71 Cluster, 

72 ClusterRole, 

73 ClusterSubscription, 

74 EventOccurrence, 

75 EventOccurrenceAttendee, 

76 GroupChat, 

77 GroupChatSubscription, 

78 HostingStatus, 

79 HostRequest, 

80 HostRequestStatus, 

81 LoginToken, 

82 MeetupStatus, 

83 Message, 

84 MessageType, 

85 ModerationAction, 

86 ModerationLog, 

87 ModerationObjectType, 

88 ModerationQueueItem, 

89 ModerationState, 

90 ModerationTrigger, 

91 PassportSex, 

92 PasswordResetToken, 

93 PhotoGallery, 

94 PostalVerificationAttempt, 

95 PostalVerificationStatus, 

96 PushNotificationDeliveryAttempt, 

97 PushNotificationSubscription, 

98 Reference, 

99 StrongVerificationAttempt, 

100 StrongVerificationAttemptStatus, 

101 User, 

102 UserBadge, 

103 Volunteer, 

104) 

105from couchers.models.notifications import NotificationTopicAction 

106from couchers.notifications.expo_api import get_expo_push_receipts 

107from couchers.notifications.notify import notify 

108from couchers.postal.my_postcard import get_order_ids, send_postcard 

109from couchers.proto import moderation_pb2, notification_data_pb2 

110from couchers.proto.internal import internal_pb2, jobs_pb2 

111from couchers.resources import get_badge_dict, get_static_badge_dict 

112from couchers.sentry import report_message 

113from couchers.servicers.api import user_model_to_pb 

114from couchers.servicers.events import ( 

115 event_to_pb, 

116) 

117from couchers.servicers.moderation import Moderation 

118from couchers.servicers.requests import host_request_to_pb 

119from couchers.sql import ( 

120 users_visible_to_each_other, 

121 where_moderated_content_visible, 

122 where_moderated_content_visible_to_user_column, 

123 where_user_columns_visible_to_each_other, 

124 where_users_column_visible, 

125) 

126from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

127from couchers.tasks import send_duplicate_strong_verification_email 

128from couchers.utils import ( 

129 Timestamp_from_datetime, 

130 create_coordinate, 

131 get_coordinates, 

132 not_none, 

133 now, 

134) 

135 

136logger = logging.getLogger(__name__) 

137 

138 

139def send_email(payload: jobs_pb2.SendEmailPayload) -> None: 

140 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

141 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

142 sender = send_smtp_email if config.ENABLE_EMAIL else print_dev_email 

143 # the sender must return a models.Email object that can be added to the database 

144 email = sender(payload) 

145 with session_scope() as session: 

146 session.add(email) 

147 

148 

149def purge_login_tokens(payload: empty_pb2.Empty) -> None: 

150 logger.info("Purging login tokens") 

151 with session_scope() as session: 

152 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

153 

154 

155def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None: 

156 logger.info("Purging login tokens") 

157 with session_scope() as session: 

158 session.execute( 

159 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

160 ) 

161 

162 

163def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None: 

164 logger.info("Purging account deletion tokens") 

165 with session_scope() as session: 

166 session.execute( 

167 delete(AccountDeletionToken) 

168 .where(~AccountDeletionToken.is_valid) 

169 .execution_options(synchronize_session=False) 

170 ) 

171 

172 

173def send_message_notifications(payload: empty_pb2.Empty) -> None: 

174 """ 

175 Sends out email notifications for messages that have been unseen for a long enough time 

176 """ 

177 # very crude and dumb algorithm 

178 logger.info("Sending out email notifications for unseen messages") 

179 

180 with session_scope() as session: 

181 # users who have unnotified messages older than 5 minutes in any group chat 

182 users = ( 

183 session.execute( 

184 where_moderated_content_visible_to_user_column( 

185 select(User) 

186 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

187 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

188 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

189 GroupChat, 

190 User.id, 

191 ) 

192 .where(not_(GroupChatSubscription.is_muted)) 

193 .where(User.is_visible) 

194 .where(Message.time >= GroupChatSubscription.joined) 

195 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

196 .where(Message.id > User.last_notified_message_id) 

197 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

198 .where(Message.time < now() - timedelta(minutes=5)) 

199 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

200 ) 

201 .scalars() 

202 .unique() 

203 ) 

204 

205 for user in users: 

206 context = make_notification_user_context(user_id=user.id) 

207 # now actually grab all the group chats, not just less than 5 min old 

208 subquery = ( 

209 where_users_column_visible( 

210 where_moderated_content_visible( 

211 select( 

212 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

213 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

214 func.max(Message.id).label("message_id"), 

215 func.count(Message.id).label("unseen_count"), 

216 ) 

217 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

218 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id), 

219 context, 

220 GroupChat, 

221 is_list_operation=True, 

222 ) 

223 .where(GroupChatSubscription.user_id == user.id) 

224 .where(not_(GroupChatSubscription.is_muted)) 

225 .where(Message.id > user.last_notified_message_id) 

226 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

227 .where(Message.time >= GroupChatSubscription.joined) 

228 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

229 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)), 

230 context, 

231 Message.author_id, 

232 ) 

233 .group_by(GroupChatSubscription.group_chat_id) 

234 .order_by(func.max(Message.id).desc()) 

235 .subquery() 

236 ) 

237 

238 unseen_messages = session.execute( 

239 where_moderated_content_visible( 

240 select(GroupChat, Message, subquery.c.unseen_count) 

241 .join(subquery, subquery.c.message_id == Message.id) 

242 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id), 

243 context, 

244 GroupChat, 

245 is_list_operation=True, 

246 ).order_by(subquery.c.message_id.desc()) 

247 ).all() 

248 

249 if not unseen_messages: 

250 continue 

251 

252 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

253 

254 notify( 

255 session, 

256 user_id=user.id, 

257 topic_action=NotificationTopicAction.chat__missed_messages, 

258 key="", 

259 data=notification_data_pb2.ChatMissedMessages( 

260 messages=[ 

261 notification_data_pb2.ChatMessage( 

262 author=user_model_to_pb( 

263 message.author, 

264 session, 

265 context, 

266 ), 

267 text=message.text, 

268 group_chat_id=message.conversation_id, 

269 group_chat_title=group_chat.title or None, 

270 unseen_count=unseen_count, 

271 ) 

272 for group_chat, message, unseen_count in unseen_messages 

273 ], 

274 ), 

275 ) 

276 session.commit() 

277 

278 

279def send_request_notifications(payload: empty_pb2.Empty) -> None: 

280 """ 

281 Sends out email notifications for unseen messages in host requests (as surfer or host) 

282 """ 

283 logger.info("Sending out email notifications for unseen messages in host requests") 

284 

285 with session_scope() as session: 

286 # Get all candidate users who might have unseen request messages. 

287 # Drive from host_requests/messages (selective) rather than scanning all users (expensive). 

288 surfer_ids = ( 

289 select(User.id) 

290 .join(HostRequest, HostRequest.initiator_user_id == User.id) 

291 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

292 .where(User.is_visible) 

293 .where(Message.id > HostRequest.initiator_last_seen_message_id) 

294 .where(Message.id > User.last_notified_request_message_id) 

295 .where(Message.time < now() - timedelta(minutes=5)) 

296 .where(Message.message_type == MessageType.text) 

297 ) 

298 host_ids = ( 

299 select(User.id) 

300 .join(HostRequest, HostRequest.recipient_user_id == User.id) 

301 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

302 .where(User.is_visible) 

303 .where(Message.id > HostRequest.recipient_last_seen_message_id) 

304 .where(Message.id > User.last_notified_request_message_id) 

305 .where(Message.time < now() - timedelta(minutes=5)) 

306 .where(Message.message_type == MessageType.text) 

307 ) 

308 candidate_user_ids = session.execute(union_all(surfer_ids, host_ids)).scalars().unique().all() 

309 

310 for user_id in candidate_user_ids: 

311 context = make_notification_user_context(user_id=user_id) 

312 

313 # requests where this user is surfing 

314 surfing_reqs = session.execute( 

315 where_users_column_visible( 

316 where_moderated_content_visible_to_user_column( 

317 select(User, HostRequest, func.max(Message.id)) 

318 .where(User.id == user_id) 

319 .join(HostRequest, HostRequest.initiator_user_id == User.id), 

320 HostRequest, 

321 HostRequest.initiator_user_id, 

322 ), 

323 context, 

324 HostRequest.recipient_user_id, 

325 ) 

326 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

327 .where(Message.id > HostRequest.initiator_last_seen_message_id) 

328 .where(Message.id > User.last_notified_request_message_id) 

329 .where(Message.time < now() - timedelta(minutes=5)) 

330 .where(Message.message_type == MessageType.text) 

331 .group_by(User, HostRequest) # type: ignore[arg-type] 

332 ).all() 

333 

334 # where this user is hosting 

335 hosting_reqs = session.execute( 

336 where_users_column_visible( 

337 where_moderated_content_visible_to_user_column( 

338 select(User, HostRequest, func.max(Message.id)) 

339 .where(User.id == user_id) 

340 .join(HostRequest, HostRequest.recipient_user_id == User.id), 

341 HostRequest, 

342 HostRequest.recipient_user_id, 

343 ), 

344 context, 

345 HostRequest.initiator_user_id, 

346 ) 

347 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

348 .where(Message.id > HostRequest.recipient_last_seen_message_id) 

349 .where(Message.id > User.last_notified_request_message_id) 

350 .where(Message.time < now() - timedelta(minutes=5)) 

351 .where(Message.message_type == MessageType.text) 

352 .group_by(User, HostRequest) # type: ignore[arg-type] 

353 ).all() 

354 

355 for user, host_request, max_message_id in surfing_reqs: 

356 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

357 session.flush() 

358 

359 notify( 

360 session, 

361 user_id=user.id, 

362 topic_action=NotificationTopicAction.host_request__missed_messages, 

363 key=str(host_request.conversation_id), 

364 data=notification_data_pb2.HostRequestMissedMessages( 

365 host_request=host_request_to_pb(host_request, session, context), 

366 user=user_model_to_pb(host_request.recipient, session, context), 

367 am_host=False, 

368 ), 

369 ) 

370 

371 for user, host_request, max_message_id in hosting_reqs: 

372 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

373 session.flush() 

374 

375 # When a host request is created, the recipient immediately receives a 

376 # host_request__create notification that includes the initial message text. 

377 # A few minutes later, this background job sees that same message as "unseen" 

378 # (the recipient hasn't opened the request yet) and would send a duplicate 

379 # missed_messages notification. 

380 # 

381 # To prevent this, we check if the only unseen text message in this host 

382 # request is the very first text message in the conversation (i.e. the 

383 # creation message). If so, we skip sending missed_messages — the user was 

384 # already notified via host_request__create. 

385 # 

386 # Advancing last_notified_request_message_id above is safe even when we skip 

387 # the notification: this watermark is only ever advanced when we process all 

388 # unseen messages for the user, so skipping one notification doesn't cause us 

389 # to miss future messages in other host requests. 

390 only_creation_message = not session.execute( 

391 select( 

392 select(func.count()) 

393 .where(Message.conversation_id == host_request.conversation_id) 

394 .where(Message.message_type == MessageType.text) 

395 .scalar_subquery() 

396 > 1 

397 ) 

398 ).scalar_one() 

399 if only_creation_message: 

400 continue 

401 

402 notify( 

403 session, 

404 user_id=user.id, 

405 topic_action=NotificationTopicAction.host_request__missed_messages, 

406 key=str(host_request.conversation_id), 

407 data=notification_data_pb2.HostRequestMissedMessages( 

408 host_request=host_request_to_pb(host_request, session, context), 

409 user=user_model_to_pb(host_request.initiator, session, context), 

410 am_host=True, 

411 ), 

412 ) 

413 

414 

415def send_onboarding_emails(payload: empty_pb2.Empty) -> None: 

416 """ 

417 Sends out onboarding emails 

418 """ 

419 logger.info("Sending out onboarding emails") 

420 

421 with session_scope() as session: 

422 # first onboarding email 

423 users = ( 

424 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

425 ) 

426 

427 for user in users: 

428 notify( 

429 session, 

430 user_id=user.id, 

431 topic_action=NotificationTopicAction.onboarding__reminder, 

432 key="1", 

433 ) 

434 user.onboarding_emails_sent = 1 

435 user.last_onboarding_email_sent = now() 

436 session.commit() 

437 

438 # second onboarding email 

439 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

440 users = ( 

441 session.execute( 

442 select(User) 

443 .where(User.is_visible) 

444 .where(User.onboarding_emails_sent == 1) 

445 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

446 .where(~has_completed_profile_expression()) 

447 ) 

448 .scalars() 

449 .all() 

450 ) 

451 

452 for user in users: 

453 notify( 

454 session, 

455 user_id=user.id, 

456 topic_action=NotificationTopicAction.onboarding__reminder, 

457 key="2", 

458 ) 

459 user.onboarding_emails_sent = 2 

460 user.last_onboarding_email_sent = now() 

461 session.commit() 

462 

463 

464def send_reference_reminders(payload: empty_pb2.Empty) -> None: 

465 """ 

466 Sends out reminders to write references after hosting/staying 

467 """ 

468 logger.info("Sending out reference reminder emails") 

469 

470 # Keep this in chronological order! 

471 reference_reminder_schedule = [ 

472 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

473 # the end time to write a reference is supposed to be midnight in the host's timezone 

474 # 8 pm ish on the last day of the stay 

475 (1, timedelta(days=15) - timedelta(hours=20), 14), 

476 # 2 pm ish a week after stay 

477 (2, timedelta(days=8) - timedelta(hours=14), 7), 

478 # 10 am ish 3 days before end of time to write ref 

479 (3, timedelta(days=4) - timedelta(hours=10), 3), 

480 ] 

481 

482 with session_scope() as session: 

483 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

484 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

485 user = aliased(User) 

486 other_user = aliased(User) 

487 # surfers needing to write a ref 

488 q1 = ( 

489 select(literal(True), HostRequest, user, other_user) 

490 .join(user, user.id == HostRequest.initiator_user_id) 

491 .join(other_user, other_user.id == HostRequest.recipient_user_id) 

492 .outerjoin( 

493 Reference, 

494 and_( 

495 Reference.host_request_id == HostRequest.conversation_id, 

496 # if no reference is found in this join, then the surfer has not written a ref 

497 Reference.from_user_id == HostRequest.initiator_user_id, 

498 ), 

499 ) 

500 .where(Reference.id == None) 

501 .where(HostRequest.can_write_reference) 

502 .where(HostRequest.initiator_sent_reference_reminders < reminder_number) 

503 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

504 .where(HostRequest.initiator_reason_didnt_meetup == None) 

505 .where(users_visible_to_each_other(self_user=user, other_user=other_user)) 

506 ) 

507 

508 # hosts needing to write a ref 

509 q2 = ( 

510 select(literal(False), HostRequest, user, other_user) 

511 .join(user, user.id == HostRequest.recipient_user_id) 

512 .join(other_user, other_user.id == HostRequest.initiator_user_id) 

513 .outerjoin( 

514 Reference, 

515 and_( 

516 Reference.host_request_id == HostRequest.conversation_id, 

517 # if no reference is found in this join, then the host has not written a ref 

518 Reference.from_user_id == HostRequest.recipient_user_id, 

519 ), 

520 ) 

521 .where(Reference.id == None) 

522 .where(HostRequest.can_write_reference) 

523 .where(HostRequest.recipient_sent_reference_reminders < reminder_number) 

524 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

525 .where(HostRequest.recipient_reason_didnt_meetup == None) 

526 .where(users_visible_to_each_other(self_user=user, other_user=other_user)) 

527 ) 

528 

529 union = union_all(q1, q2).subquery() 

530 query = select( 

531 union.c[0].label("surfed"), 

532 aliased(HostRequest, union), 

533 aliased(user, union), 

534 aliased(other_user, union), 

535 ) 

536 reference_reminders = session.execute(query).all() 

537 

538 for surfed, host_request, user, other_user in reference_reminders: 

539 # visibility and blocking already checked in sql 

540 assert user.is_visible 

541 context = make_notification_user_context(user_id=user.id) 

542 topic_action = ( 

543 NotificationTopicAction.reference__reminder_surfed 

544 if surfed 

545 else NotificationTopicAction.reference__reminder_hosted 

546 ) 

547 notify( 

548 session, 

549 user_id=user.id, 

550 topic_action=topic_action, 

551 key=str(host_request.conversation_id), 

552 data=notification_data_pb2.ReferenceReminder( 

553 host_request_id=host_request.conversation_id, 

554 other_user=user_model_to_pb(other_user, session, context), 

555 days_left=reminder_days_left, 

556 ), 

557 ) 

558 if surfed: 

559 host_request.initiator_sent_reference_reminders = reminder_number 

560 else: 

561 host_request.recipient_sent_reference_reminders = reminder_number 

562 session.commit() 

563 

564 

565def send_host_request_reminders(payload: empty_pb2.Empty) -> None: 

566 with session_scope() as session: 

567 host_has_sent_message = select(1).where( 

568 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.recipient_user_id 

569 ) 

570 

571 requests = ( 

572 session.execute( 

573 where_user_columns_visible_to_each_other( 

574 where_moderated_content_visible_to_user_column( 

575 select(HostRequest), 

576 HostRequest, 

577 HostRequest.recipient_user_id, 

578 ) 

579 .where(HostRequest.status == HostRequestStatus.pending) 

580 .where(HostRequest.recipient_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

581 .where(HostRequest.start_time > func.now()) 

582 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

583 .where(~exists(host_has_sent_message)), 

584 self_column=HostRequest.recipient_user_id, 

585 other_column=HostRequest.initiator_user_id, 

586 ) 

587 ) 

588 .scalars() 

589 .all() 

590 ) 

591 

592 for host_request in requests: 

593 host_request.recipient_sent_request_reminders += 1 

594 host_request.last_sent_request_reminder_time = now() 

595 

596 context = make_notification_user_context(user_id=host_request.recipient_user_id) 

597 notify( 

598 session, 

599 user_id=host_request.recipient_user_id, 

600 topic_action=NotificationTopicAction.host_request__reminder, 

601 key=str(host_request.conversation_id), 

602 data=notification_data_pb2.HostRequestReminder( 

603 host_request=host_request_to_pb(host_request, session, context), 

604 surfer=user_model_to_pb(host_request.initiator, session, context), 

605 ), 

606 moderation_state_id=host_request.moderation_state_id, 

607 ) 

608 

609 session.commit() 

610 

611 

612def add_users_to_email_list(payload: empty_pb2.Empty) -> None: 

613 if not experimentation.get_global_boolean_value("listmonk_enabled", default=False): 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true

614 logger.info("Not adding users to mailing list") 

615 return 

616 

617 sess = requests.Session() 

618 sess.auth = (config.LISTMONK_API_USERNAME, config.LISTMONK_API_KEY) 

619 

620 def sync_subscriber(user: User, status: str) -> None: 

621 r = sess.post( 

622 config.LISTMONK_BASE_URL + "/api/subscribers", 

623 json={ 

624 "email": user.email, 

625 "name": user.name, 

626 "lists": [config.LISTMONK_LIST_ID], 

627 "preconfirm_subscriptions": True, 

628 "attribs": {"couchers_user_id": user.id}, 

629 "status": status, 

630 }, 

631 timeout=10, 

632 ) 

633 # the API returns 409 if the subscriber already exists 

634 if r.status_code not in (200, 409): 634 ↛ 635line 634 didn't jump to line 635 because the condition on line 634 was never true

635 raise Exception("Failed to update user mailing list status") 

636 

637 logger.info("Adding users to mailing list") 

638 

639 while True: 

640 with session_scope() as session: 

641 user = session.execute( 

642 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

643 ).scalar_one_or_none() 

644 if not user: 

645 logger.info("Finished adding users to mailing list") 

646 break 

647 

648 if not user.opt_out_of_newsletter: 

649 sync_subscriber(user, "enabled") 

650 

651 user.in_sync_with_newsletter = True 

652 session.commit() 

653 

654 if experimentation.get_global_boolean_value("remove_removed_users_from_mailing_list_enabled", default=False): 654 ↛ 655line 654 didn't jump to line 655 because the condition on line 654 was never true

655 with session_scope() as session: 

656 session.execute( 

657 update(User) 

658 .where(~User.is_visible | User.is_shadowed) 

659 .where(User.opt_out_of_newsletter == False) 

660 .values(opt_out_of_newsletter=True, in_sync_with_newsletter=False) 

661 ) 

662 session.commit() 

663 

664 while True: 

665 with session_scope() as session: 

666 user = session.execute( 

667 select(User) 

668 .where(~User.is_visible | User.is_shadowed) 

669 .where(User.in_sync_with_newsletter == False) 

670 .limit(1) 

671 ).scalar_one_or_none() 

672 if not user: 

673 logger.info("Finished removing users from mailing list") 

674 return 

675 

676 sync_subscriber(user, "blocklisted") 

677 user.in_sync_with_newsletter = True 

678 session.commit() 

679 

680 

681def enforce_community_membership(payload: empty_pb2.Empty) -> None: 

682 tasks_enforce_community_memberships() 

683 

684 

685def update_recommendation_scores(payload: empty_pb2.Empty) -> None: 

686 text_fields = [ 

687 User.hometown, 

688 User.occupation, 

689 User.education, 

690 User.about_me, 

691 User.things_i_like, 

692 User.about_place, 

693 User.additional_information, 

694 User.pet_details, 

695 User.kid_details, 

696 User.housemate_details, 

697 User.other_host_info, 

698 User.sleeping_details, 

699 User.area, 

700 User.house_rules, 

701 ] 

702 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

703 

704 def poor_man_gaussian() -> ColumnElement[float] | float: 

705 """ 

706 Produces an approximatley std normal random variate 

707 """ 

708 trials = 5 

709 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

710 

711 def int_(stmt: Any) -> Function[int]: 

712 return func.coalesce(cast(stmt, Integer), 0) 

713 

714 def float_(stmt: Any) -> Function[float]: 

715 return func.coalesce(cast(stmt, Float), 0.0) 

716 

717 with session_scope() as session: 

718 # profile 

719 profile_text = "" 

720 for field in text_fields: 

721 profile_text += func.coalesce(field, "") # type: ignore[assignment] 

722 text_length = func.length(profile_text) 

723 home_text = "" 

724 for field in home_fields: 

725 home_text += func.coalesce(field, "") # type: ignore[assignment] 

726 home_length = func.length(home_text) 

727 

728 filled_profile = int_(has_completed_profile_expression()) 

729 has_text = int_(text_length > 500) 

730 long_text = int_(text_length > 2000) 

731 can_host = int_(User.hosting_status == HostingStatus.can_host) 

732 may_host = int_(User.hosting_status == HostingStatus.maybe) 

733 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

734 filled_home = int_(User.has_completed_my_home) 

735 filled_home_lots = int_(home_length > 200) 

736 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

737 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

738 

739 # references 

740 left_ref_expr = int_(1).label("left_reference") 

741 left_refs_subquery = ( 

742 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

743 ) 

744 left_reference = int_(left_refs_subquery.c.left_reference) 

745 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

746 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

747 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

748 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

749 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

750 "has_bad_ref" 

751 ) 

752 received_ref_subquery = ( 

753 select( 

754 Reference.to_user_id.label("user_id"), 

755 has_reference_expr, 

756 has_multiple_types_expr, 

757 has_bad_ref_expr, 

758 ref_count_expr, 

759 ref_avg_expr, 

760 ) 

761 .group_by(Reference.to_user_id) 

762 .subquery() 

763 ) 

764 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

765 has_reference = int_(received_ref_subquery.c.has_reference) 

766 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

767 rating_score = float_( 

768 received_ref_subquery.c.ref_avg 

769 * ( 

770 2 * func.least(received_ref_subquery.c.ref_count, 5) 

771 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

772 ) 

773 ) 

774 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

775 

776 # activeness 

777 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

778 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

779 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

780 messaged_lots = int_(func.count(Message.id) > 5) 

781 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

782 messaging_subquery = ( 

783 select(Message.author_id.label("user_id"), messaging_points_subquery) 

784 .where(Message.message_type == MessageType.text) 

785 .group_by(Message.author_id) 

786 .subquery() 

787 ) 

788 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

789 

790 # verification 

791 cb_subquery = ( 

792 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

793 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

794 .where(ClusterSubscription.role == ClusterRole.admin) 

795 .where(Cluster.is_official_cluster) 

796 .group_by(ClusterSubscription.user_id) 

797 .subquery() 

798 ) 

799 min_node_id = cb_subquery.c.min_node_id 

800 cb = int_(min_node_id >= 1) 

801 wcb = int_(min_node_id == 1) 

802 badge_points = { 

803 "founder": 100, 

804 "board_member": 20, 

805 "past_board_member": 5, 

806 "strong_verification": 3, 

807 "volunteer": 3, 

808 "past_volunteer": 2, 

809 "donor": 1, 

810 "phone_verified": 1, 

811 } 

812 

813 badge_subquery = ( 

814 select( 

815 UserBadge.user_id.label("user_id"), 

816 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

817 ) 

818 .group_by(UserBadge.user_id) 

819 .subquery() 

820 ) 

821 

822 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

823 

824 # response rate 

825 hr_subquery = select( 

826 UserResponseRate.user_id, 

827 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

828 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

829 ).subquery() 

830 response_time_33p = hr_subquery.c.response_time_33p 

831 response_time_66p = hr_subquery.c.response_time_66p 

832 # be careful with nulls 

833 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

834 

835 recommendation_score = ( 

836 hosting_status_points 

837 + profile_points 

838 + ref_score 

839 + activeness_points 

840 + other_points 

841 + response_rate_points 

842 + 2 * poor_man_gaussian() 

843 ) 

844 

845 scores = ( 

846 select(User.id.label("user_id"), recommendation_score.label("score")) 

847 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

848 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

849 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

850 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

851 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

852 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

853 ).subquery() 

854 

855 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)) 

856 

857 logger.info("Updated recommendation scores") 

858 

859 

860def update_badges(payload: empty_pb2.Empty) -> None: 

861 with session_scope() as session: 

862 

863 def update_badge(badge_id: str, members: Sequence[int]) -> None: 

864 badge = get_badge_dict()[badge_id] 

865 # this batch job has no per-user context to evaluate the gate against, so it's global 

866 if badge.flag is not None and not experimentation.get_global_boolean_value(badge.flag, default=True): 

867 members = [] 

868 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all() 

869 # in case the user ids don't exist in the db 

870 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

871 # we should add the badge to these 

872 add = set(actual_members) - set(user_ids) 

873 # we should remove the badge from these 

874 remove = set(user_ids) - set(actual_members) 

875 for user_id in add: 

876 user_add_badge(session, user_id, badge.id) 

877 

878 for user_id in remove: 

879 user_remove_badge(session, user_id, badge.id) 

880 

881 update_badge("founder", get_static_badge_dict()["founder"]) 

882 update_badge("board_member", get_static_badge_dict()["board_member"]) 

883 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

884 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all()) 

885 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

886 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

887 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

888 update_badge( 

889 "strong_verification", 

890 session.execute( 

891 select(User.id) 

892 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

893 .where(StrongVerificationAttempt.has_strong_verification(User)) 

894 ) 

895 .scalars() 

896 .all(), 

897 ) 

898 # volunteer badge for active volunteers (stopped_volunteering is null) 

899 update_badge( 

900 "volunteer", 

901 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(), 

902 ) 

903 # past_volunteer badge for past volunteers (stopped_volunteering is not null) 

904 update_badge( 

905 "past_volunteer", 

906 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None))) 

907 .scalars() 

908 .all(), 

909 ) 

910 

911 

912def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None: 

913 with session_scope() as session: 

914 verification_attempt = session.execute( 

915 select(StrongVerificationAttempt) 

916 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

917 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

918 ).scalar_one() 

919 response = requests.post( 

920 "https://passportreader.app/api/v1/session.get", 

921 auth=(config.IRIS_ID_PUBKEY, config.IRIS_ID_SECRET), 

922 json={"id": verification_attempt.iris_session_id}, 

923 timeout=10, 

924 verify="/etc/ssl/certs/ca-certificates.crt", 

925 ) 

926 if response.status_code != 200: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true

927 raise Exception(f"Iris didn't return 200: {response.text}") 

928 json_data = response.json() 

929 reference_payload = internal_pb2.VerificationReferencePayload.FromString( 

930 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

931 ) 

932 assert verification_attempt.user_id == reference_payload.user_id 

933 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

934 assert verification_attempt.iris_session_id == json_data["id"] 

935 assert json_data["state"] == "APPROVED" 

936 

937 if json_data["document_type"] != "PASSPORT": 

938 verification_attempt.status = StrongVerificationAttemptStatus.failed 

939 notify( 

940 session, 

941 user_id=verification_attempt.user_id, 

942 topic_action=NotificationTopicAction.verification__sv_fail, 

943 key="", 

944 data=notification_data_pb2.VerificationSVFail( 

945 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

946 ), 

947 ) 

948 return 

949 

950 assert json_data["document_type"] == "PASSPORT" 

951 

952 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

953 nationality = json_data["nationality"] 

954 last_three_document_chars = json_data["document_number"][-3:] 

955 

956 existing_attempt = session.execute( 

957 select(StrongVerificationAttempt) 

958 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

959 .where(StrongVerificationAttempt.passport_nationality == nationality) 

960 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

961 .order_by(StrongVerificationAttempt.id) 

962 .limit(1) 

963 ).scalar_one_or_none() 

964 

965 verification_attempt.has_minimal_data = True 

966 verification_attempt.passport_expiry_date = expiry_date 

967 verification_attempt.passport_nationality = nationality 

968 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

969 

970 if existing_attempt: 

971 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

972 

973 if existing_attempt.user_id != verification_attempt.user_id: 

974 session.flush() 

975 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

976 

977 notify( 

978 session, 

979 user_id=verification_attempt.user_id, 

980 topic_action=NotificationTopicAction.verification__sv_fail, 

981 key="", 

982 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

983 ) 

984 return 

985 

986 verification_attempt.has_full_data = True 

987 verification_attempt.passport_encrypted_data = asym_encrypt( 

988 config.VERIFICATION_DATA_PUBLIC_KEY, response.text.encode("utf8") 

989 ) 

990 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

991 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

992 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

993 

994 session.flush() 

995 

996 strong_verification_completions_counter.inc() 

997 

998 user = verification_attempt.user 

999 if verification_attempt.has_strong_verification(user): 999 ↛ 1014line 999 didn't jump to line 1014 because the condition on line 999 was always true

1000 badge_id = "strong_verification" 

1001 if session.execute( 

1002 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

1003 ).scalar_one_or_none(): 

1004 return 

1005 

1006 user_add_badge(session, user.id, badge_id, do_notify=False) 

1007 notify( 

1008 session, 

1009 user_id=verification_attempt.user_id, 

1010 topic_action=NotificationTopicAction.verification__sv_success, 

1011 key="", 

1012 ) 

1013 else: 

1014 notify( 

1015 session, 

1016 user_id=verification_attempt.user_id, 

1017 topic_action=NotificationTopicAction.verification__sv_fail, 

1018 key="", 

1019 data=notification_data_pb2.VerificationSVFail( 

1020 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

1021 ), 

1022 ) 

1023 

1024 

1025def send_activeness_probes(payload: empty_pb2.Empty) -> None: 

1026 with session_scope() as session: 

1027 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

1028 

1029 if config.ACTIVENESS_PROBES_ENABLED: 

1030 # current activeness probes 

1031 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

1032 

1033 # users who we should send an activeness probe to 

1034 new_probe_user_ids = ( 

1035 session.execute( 

1036 select(User.id) 

1037 .where(User.is_visible) 

1038 .where(User.hosting_status == HostingStatus.can_host) 

1039 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

1040 .where(User.id.not_in(select(subquery.c.user_id))) 

1041 ) 

1042 .scalars() 

1043 .all() 

1044 ) 

1045 

1046 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

1047 probes_today = session.execute( 

1048 select(func.count()) 

1049 .select_from(ActivenessProbe) 

1050 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

1051 ).scalar_one() 

1052 

1053 # send probes to max 2% of users per day 

1054 max_probes_per_day = 0.02 * total_users 

1055 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

1056 

1057 if len(new_probe_user_ids) > max_probe_size: 1057 ↛ 1058line 1057 didn't jump to line 1058 because the condition on line 1057 was never true

1058 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

1059 

1060 for user_id in new_probe_user_ids: 

1061 session.add(ActivenessProbe(user_id=user_id)) 

1062 

1063 session.commit() 

1064 

1065 ## Step 2: actually send out probe notifications 

1066 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

1067 probes = ( 

1068 session.execute( 

1069 select(ActivenessProbe) 

1070 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

1071 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1072 .where(ActivenessProbe.is_pending) 

1073 ) 

1074 .scalars() 

1075 .all() 

1076 ) 

1077 

1078 for probe in probes: 

1079 probe.notifications_sent = probe_number_minus_1 + 1 

1080 context = make_notification_user_context(user_id=probe.user.id) 

1081 notify( 

1082 session, 

1083 user_id=probe.user.id, 

1084 topic_action=NotificationTopicAction.activeness__probe, 

1085 key=str(probe.id), 

1086 data=notification_data_pb2.ActivenessProbe( 

1087 reminder_number=probe_number_minus_1 + 1, 

1088 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1089 ), 

1090 ) 

1091 session.commit() 

1092 

1093 ## Step 3: for those who haven't responded, mark them as failed 

1094 expired_probes = ( 

1095 session.execute( 

1096 select(ActivenessProbe) 

1097 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1098 .where(ActivenessProbe.is_pending) 

1099 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1100 ) 

1101 .scalars() 

1102 .all() 

1103 ) 

1104 

1105 for probe in expired_probes: 

1106 probe.responded = now() 

1107 probe.response = ActivenessProbeStatus.expired 

1108 if probe.user.hosting_status == HostingStatus.can_host: 1108 ↛ 1110line 1108 didn't jump to line 1110 because the condition on line 1108 was always true

1109 probe.user.hosting_status = HostingStatus.maybe 

1110 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1110 ↛ 1112line 1110 didn't jump to line 1112 because the condition on line 1110 was always true

1111 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1112 session.commit() 

1113 

1114 

1115def update_randomized_locations(payload: empty_pb2.Empty) -> None: 

1116 """ 

1117 We generate for each user a randomized location as follows: 

1118 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1119 - For each user, mix in the user_id for randomness 

1120 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1121 - Generate an angle from [0, 360] 

1122 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1123 """ 

1124 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1125 

1126 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]: 

1127 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1128 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1129 radius = 0.02 + 0.08 * radius_u 

1130 angle_rad = 2 * pi * angle_u 

1131 offset_lng = radius * cos(angle_rad) 

1132 offset_lat = radius * sin(angle_rad) 

1133 return lat + offset_lat, lng + offset_lng 

1134 

1135 user_updates: list[dict[str, Any]] = [] 

1136 

1137 with session_scope() as session: 

1138 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1139 

1140 for user_id, geom in users_to_update: 

1141 lat, lng = get_coordinates(geom) 

1142 user_updates.append( 

1143 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1144 ) 

1145 

1146 with session_scope() as session: 

1147 session.execute(update(User), user_updates) 

1148 

1149 

1150def send_event_reminders(payload: empty_pb2.Empty) -> None: 

1151 """ 

1152 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1153 """ 

1154 logger.info("Sending event reminder emails") 

1155 

1156 with session_scope() as session: 

1157 occurrences = ( 

1158 session.execute( 

1159 select(EventOccurrence) 

1160 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1161 .where(EventOccurrence.start_time >= now()) 

1162 .where(~EventOccurrence.is_cancelled) 

1163 .where(~EventOccurrence.is_deleted) 

1164 ) 

1165 .scalars() 

1166 .all() 

1167 ) 

1168 

1169 for occurrence in occurrences: 

1170 results = session.execute( 

1171 select(User, EventOccurrenceAttendee) 

1172 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1173 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1174 .where(EventOccurrenceAttendee.reminder_sent == False) 

1175 .where(User.is_visible) 

1176 .where(~User.is_shadowed) 

1177 ).all() 

1178 

1179 for user, attendee in results: 

1180 context = make_notification_user_context(user_id=user.id) 

1181 

1182 notify( 

1183 session, 

1184 user_id=user.id, 

1185 topic_action=NotificationTopicAction.event__reminder, 

1186 key=str(occurrence.id), 

1187 data=notification_data_pb2.EventReminder( 

1188 event=event_to_pb(session, occurrence, context), 

1189 user=user_model_to_pb(user, session, context), 

1190 ), 

1191 moderation_state_id=occurrence.moderation_state_id, 

1192 ) 

1193 

1194 attendee.reminder_sent = True 

1195 session.commit() 

1196 

1197 

1198def check_expo_push_receipts(payload: empty_pb2.Empty) -> None: 

1199 """ 

1200 Check Expo push receipts in batch and update delivery attempts. 

1201 """ 

1202 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max 

1203 

1204 for iteration in range(MAX_ITERATIONS): 1204 ↛ 1266line 1204 didn't jump to line 1266 because the loop on line 1204 didn't complete

1205 with session_scope() as session: 

1206 # Find all delivery attempts that need receipt checking 

1207 # Wait 15 minutes per Expo's recommendation before checking receipts 

1208 attempts = ( 

1209 session.execute( 

1210 select(PushNotificationDeliveryAttempt) 

1211 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None) 

1212 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None) 

1213 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15)) 

1214 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24)) 

1215 .limit(100) 

1216 ) 

1217 .scalars() 

1218 .all() 

1219 ) 

1220 

1221 if not attempts: 

1222 logger.debug("No Expo receipts to check") 

1223 return 

1224 

1225 logger.info(f"Checking {len(attempts)} Expo push receipts") 

1226 

1227 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts]) 

1228 

1229 for attempt in attempts: 

1230 receipt = receipts.get(not_none(attempt.expo_ticket_id)) 

1231 

1232 # Always mark as checked to avoid infinite loops 

1233 attempt.receipt_checked_at = now() 

1234 

1235 if receipt is None: 

1236 # Receipt not found after 15min - likely expired (>24h) or never existed 

1237 # Per Expo docs: receipts should be available within 15 minutes 

1238 attempt.receipt_status = "not_found" 

1239 continue 

1240 

1241 attempt.receipt_status = receipt.get("status") 

1242 

1243 if receipt.get("status") == "error": 

1244 details = receipt.get("details", {}) 

1245 error_code = details.get("error") 

1246 attempt.receipt_error_code = error_code 

1247 

1248 if error_code == "DeviceNotRegistered": 1248 ↛ 1263line 1248 didn't jump to line 1263 because the condition on line 1248 was always true

1249 # Device token is no longer valid - disable the subscription 

1250 sub = session.execute( 

1251 select(PushNotificationSubscription).where( 

1252 PushNotificationSubscription.id == attempt.push_notification_subscription_id 

1253 ) 

1254 ).scalar_one() 

1255 

1256 if sub.disabled_at > now(): 1256 ↛ 1229line 1256 didn't jump to line 1229 because the condition on line 1256 was always true

1257 sub.disabled_at = now() 

1258 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt") 

1259 push_notification_counter.labels( 

1260 platform="expo", outcome="permanent_subscription_failure_receipt" 

1261 ).inc() 

1262 else: 

1263 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}") 

1264 

1265 # If we get here, we've exhausted MAX_ITERATIONS without finishing 

1266 raise RuntimeError( 

1267 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - " 

1268 "there may be an unusually large backlog of receipts to check" 

1269 ) 

1270 

1271 

1272def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None: 

1273 """ 

1274 Sends the postcard via external API and updates attempt status. 

1275 """ 

1276 with session_scope() as session: 

1277 attempt = session.execute( 

1278 select(PostalVerificationAttempt).where( 

1279 PostalVerificationAttempt.id == payload.postal_verification_attempt_id 

1280 ) 

1281 ).scalar_one_or_none() 

1282 

1283 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1283 ↛ 1284line 1283 didn't jump to line 1284 because the condition on line 1283 was never true

1284 logger.warning( 

1285 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state" 

1286 ) 

1287 return 

1288 

1289 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one() 

1290 

1291 job_id = send_postcard( 

1292 recipient_name=user_name, 

1293 address_line_1=attempt.address_line_1, 

1294 address_line_2=attempt.address_line_2, 

1295 city=attempt.city, 

1296 state=attempt.state, 

1297 postal_code=attempt.postal_code, 

1298 country=attempt.country_code, 

1299 verification_code=not_none(attempt.verification_code), 

1300 ) 

1301 

1302 attempt.mypostcard_job_id = job_id 

1303 attempt.status = PostalVerificationStatus.awaiting_verification 

1304 attempt.postcard_sent_at = func.now() 

1305 

1306 postcards_sent_counter.labels(country_code=attempt.country_code).inc() 

1307 

1308 context = make_background_user_context(attempt.user_id) 

1309 log_event( 

1310 context, 

1311 session, 

1312 "postcard.sent", 

1313 { 

1314 "attempt_id": attempt.id, 

1315 "country": attempt.country_code, 

1316 "city": attempt.city, 

1317 "mypostcard_job_id": job_id, 

1318 }, 

1319 ) 

1320 

1321 notify( 

1322 session, 

1323 user_id=attempt.user_id, 

1324 topic_action=NotificationTopicAction.postal_verification__postcard_sent, 

1325 key="", 

1326 data=notification_data_pb2.PostalVerificationPostcardSent( 

1327 city=attempt.city, 

1328 country=attempt.country_code, 

1329 ), 

1330 ) 

1331 

1332 

1333def check_mypostcard_jobs(payload: empty_pb2.Empty) -> None: 

1334 """ 

1335 Checks that all MyPostcard jobs from the last week are tied to a postal verification attempt. 

1336 """ 

1337 if not experimentation.get_global_boolean_value("postal_verification_enabled", default=False): 

1338 return 

1339 

1340 with session_scope() as session: 

1341 mypostcard_job_ids = set( 

1342 get_order_ids( 

1343 date_from=(now() - timedelta(days=7)).date(), 

1344 date_to=now().date(), 

1345 ) 

1346 ) 

1347 

1348 known_job_ids = set( 

1349 session.execute( 

1350 select(PostalVerificationAttempt.mypostcard_job_id).where( 

1351 PostalVerificationAttempt.mypostcard_job_id.isnot(None), 

1352 PostalVerificationAttempt.created >= now() - timedelta(days=14), 

1353 ) 

1354 ) 

1355 .scalars() 

1356 .all() 

1357 ) 

1358 

1359 orphaned = mypostcard_job_ids - known_job_ids 

1360 if orphaned: 

1361 report_message( 

1362 f"Found {len(orphaned)} orphaned MyPostcard jobs not tied to any verification attempt: {orphaned}" 

1363 ) 

1364 

1365 

1366class DatabaseInconsistencyError(Exception): 

1367 """Raised when database consistency checks fail""" 

1368 

1369 pass 

1370 

1371 

1372def check_database_consistency(payload: empty_pb2.Empty) -> None: 

1373 """ 

1374 Checks database consistency and raises an exception if any issues are found. 

1375 """ 

1376 logger.info("Checking database consistency") 

1377 errors = [] 

1378 

1379 with session_scope() as session: 

1380 # Check that all users have a profile gallery 

1381 users_without_gallery = session.execute( 

1382 select(User.id, User.username).where(User.profile_gallery_id.is_(None)) 

1383 ).all() 

1384 if users_without_gallery: 

1385 errors.append(f"Users without profile gallery: {users_without_gallery}") 

1386 

1387 # Check that all profile galleries point to their owner 

1388 mismatched_galleries = session.execute( 

1389 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id) 

1390 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id) 

1391 .where(User.profile_gallery_id.is_not(None)) 

1392 .where(PhotoGallery.owner_user_id != User.id) 

1393 ).all() 

1394 if mismatched_galleries: 1394 ↛ 1395line 1394 didn't jump to line 1395 because the condition on line 1394 was never true

1395 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}") 

1396 

1397 # === Moderation System Consistency Checks === 

1398 

1399 # Check every ModerationState has at least one INITIAL_REVIEW queue item 

1400 # Skip items with ID < 2000000 as they were created before this check was introduced 

1401 states_without_initial_review = session.execute( 

1402 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1403 ModerationState.id >= 2000000, 

1404 ~exists( 

1405 select(1) 

1406 .where(ModerationQueueItem.moderation_state_id == ModerationState.id) 

1407 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review) 

1408 ), 

1409 ) 

1410 ).all() 

1411 if states_without_initial_review: 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true

1412 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}") 

1413 

1414 # Check every ModerationState has a CREATE log entry 

1415 # Skip items with ID < 2000000 as they were created before this check was introduced 

1416 states_without_create_log = session.execute( 

1417 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1418 ModerationState.id >= 2000000, 

1419 ~exists( 

1420 select(1) 

1421 .where(ModerationLog.moderation_state_id == ModerationState.id) 

1422 .where(ModerationLog.action == ModerationAction.create) 

1423 ), 

1424 ) 

1425 ).all() 

1426 if states_without_create_log: 1426 ↛ 1427line 1426 didn't jump to line 1427 because the condition on line 1426 was never true

1427 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}") 

1428 

1429 # Check resolved queue items point to log entries for the same moderation state 

1430 resolved_item_log_mismatches = session.execute( 

1431 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id) 

1432 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id) 

1433 .where(ModerationQueueItem.resolved_by_log_id.is_not(None)) 

1434 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id) 

1435 ).all() 

1436 if resolved_item_log_mismatches: 1436 ↛ 1437line 1436 didn't jump to line 1437 because the condition on line 1436 was never true

1437 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}") 

1438 

1439 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it 

1440 hr_states = ( 

1441 session.execute( 

1442 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.host_request) 

1443 ) 

1444 .scalars() 

1445 .all() 

1446 ) 

1447 for state_id in hr_states: 1447 ↛ 1448line 1447 didn't jump to line 1448 because the loop on line 1447 never started

1448 hr_count = session.execute( 

1449 select(func.count()).where(HostRequest.moderation_state_id == state_id) 

1450 ).scalar_one() 

1451 if hr_count != 1: 

1452 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)") 

1453 

1454 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it 

1455 gc_states = ( 

1456 session.execute( 

1457 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.group_chat) 

1458 ) 

1459 .scalars() 

1460 .all() 

1461 ) 

1462 for state_id in gc_states: 

1463 gc_count = session.execute( 

1464 select(func.count()).where(GroupChat.moderation_state_id == state_id) 

1465 ).scalar_one() 

1466 if gc_count != 1: 1466 ↛ 1467line 1466 didn't jump to line 1467 because the condition on line 1466 was never true

1467 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)") 

1468 

1469 # Check ModerationState.object_id matches the actual object's ID 

1470 hr_object_id_mismatches = session.execute( 

1471 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id) 

1472 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id) 

1473 .where(ModerationState.object_type == ModerationObjectType.host_request) 

1474 .where(ModerationState.object_id != HostRequest.conversation_id) 

1475 ).all() 

1476 if hr_object_id_mismatches: 1476 ↛ 1477line 1476 didn't jump to line 1477 because the condition on line 1476 was never true

1477 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}") 

1478 

1479 gc_object_id_mismatches = session.execute( 

1480 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id) 

1481 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id) 

1482 .where(ModerationState.object_type == ModerationObjectType.group_chat) 

1483 .where(ModerationState.object_id != GroupChat.conversation_id) 

1484 ).all() 

1485 if gc_object_id_mismatches: 1485 ↛ 1486line 1485 didn't jump to line 1486 because the condition on line 1485 was never true

1486 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}") 

1487 

1488 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState 

1489 hr_reverse_mismatches = session.execute( 

1490 select( 

1491 HostRequest.conversation_id, 

1492 HostRequest.moderation_state_id, 

1493 ModerationState.object_type, 

1494 ModerationState.object_id, 

1495 ) 

1496 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id) 

1497 .where( 

1498 (ModerationState.object_type != ModerationObjectType.host_request) 

1499 | (ModerationState.object_id != HostRequest.conversation_id) 

1500 ) 

1501 ).all() 

1502 if hr_reverse_mismatches: 1502 ↛ 1503line 1502 didn't jump to line 1503 because the condition on line 1502 was never true

1503 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}") 

1504 

1505 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState 

1506 gc_reverse_mismatches = session.execute( 

1507 select( 

1508 GroupChat.conversation_id, 

1509 GroupChat.moderation_state_id, 

1510 ModerationState.object_type, 

1511 ModerationState.object_id, 

1512 ) 

1513 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id) 

1514 .where( 

1515 (ModerationState.object_type != ModerationObjectType.group_chat) 

1516 | (ModerationState.object_id != GroupChat.conversation_id) 

1517 ) 

1518 ).all() 

1519 if gc_reverse_mismatches: 1519 ↛ 1520line 1519 didn't jump to line 1520 because the condition on line 1519 was never true

1520 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}") 

1521 

1522 # Ensure auto-approve deadline isn't being exceeded by a significant margin 

1523 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting 

1524 deadline_seconds = config.MODERATION_AUTO_APPROVE_DEADLINE_SECONDS 

1525 if deadline_seconds > 0: 1525 ↛ 1526line 1525 didn't jump to line 1526 because the condition on line 1525 was never true

1526 grace_period = timedelta(minutes=5) 

1527 stale_initial_review_items = session.execute( 

1528 select( 

1529 ModerationQueueItem.id, 

1530 ModerationQueueItem.moderation_state_id, 

1531 ModerationQueueItem.time_created, 

1532 ) 

1533 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review) 

1534 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

1535 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period) 

1536 ).all() 

1537 if stale_initial_review_items: 

1538 errors.append( 

1539 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}" 

1540 ) 

1541 

1542 if errors: 

1543 raise DatabaseInconsistencyError("\n".join(errors)) 

1544 

1545 

1546def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None: 

1547 """ 

1548 Dead man's switch: approves unresolved INITIAL_REVIEW content older than the deadline to VISIBLE, then 

1549 re-flags it as a high-priority MACHINE_FLAG superseding only the INITIAL_REVIEW item. The switch only fires 

1550 when moderators are behind, so every auto-approved item stays in the queue for a human to check. Other open 

1551 flags are untouched, and items already actioned by moderators are left alone. 

1552 """ 

1553 deadline_seconds = config.MODERATION_AUTO_APPROVE_DEADLINE_SECONDS 

1554 if deadline_seconds <= 0: 

1555 return 

1556 

1557 with session_scope() as session: 

1558 ctx = make_background_user_context(user_id=config.MODERATION_BOT_USER_ID) 

1559 

1560 items = ( 

1561 Moderation() 

1562 .GetModerationQueue( 

1563 request=moderation_pb2.GetModerationQueueReq( 

1564 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW], 

1565 unresolved_only=True, 

1566 page_size=100, 

1567 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)), 

1568 ), 

1569 context=ctx, 

1570 session=session, 

1571 ) 

1572 .queue_items 

1573 ) 

1574 

1575 if not items: 

1576 return 

1577 

1578 # Skip items whose author is shadowed; their content stays in shadowed state indefinitely 

1579 approvable = [item for item in items if not item.moderation_state.author.shadowed] 

1580 if not approvable: 

1581 return 

1582 

1583 logger.info(f"Auto-approving {len(approvable)} moderation queue items") 

1584 reason = f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded." 

1585 for item in approvable: 

1586 Moderation().ModerateContent( 

1587 request=moderation_pb2.ModerateContentReq( 

1588 moderation_state_id=item.moderation_state_id, 

1589 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1590 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1591 reason=reason, 

1592 clear_flags=False, 

1593 ), 

1594 context=ctx, 

1595 session=session, 

1596 ) 

1597 Moderation().ModerateContent( 

1598 request=moderation_pb2.ModerateContentReq( 

1599 moderation_state_id=item.moderation_state_id, 

1600 action=moderation_pb2.MODERATION_ACTION_FLAG, 

1601 trigger=moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG, 

1602 priority=MODERATION_AUTO_APPROVE_FLAG_PRIORITY, 

1603 reason=reason, 

1604 supersede_queue_item_id=item.queue_item_id, 

1605 ), 

1606 context=ctx, 

1607 session=session, 

1608 ) 

1609 moderation_auto_approved_counter.inc(len(approvable))