Coverage for src/couchers/jobs/handlers.py: 92%

519 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-25 10:58 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import cos, pi, sin, sqrt 

8from random import sample 

9from typing import Any 

10 

11import requests 

12from google.protobuf import empty_pb2 

13from sqlalchemy import Float, Integer 

14from sqlalchemy.orm import aliased 

15from sqlalchemy.sql import ( 

16 and_, 

17 case, 

18 cast, 

19 delete, 

20 distinct, 

21 exists, 

22 extract, 

23 func, 

24 literal, 

25 not_, 

26 or_, 

27 union_all, 

28 update, 

29) 

30 

31from couchers import urls 

32from couchers.config import config 

33from couchers.constants import ( 

34 ACTIVENESS_PROBE_EXPIRY_TIME, 

35 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

36 ACTIVENESS_PROBE_TIME_REMINDERS, 

37 EVENT_REMINDER_TIMEDELTA, 

38 HOST_REQUEST_MAX_REMINDERS, 

39 HOST_REQUEST_REMINDER_INTERVAL, 

40) 

41from couchers.context import make_background_user_context 

42from couchers.crypto import ( 

43 USER_LOCATION_RANDOMIZATION_NAME, 

44 asym_encrypt, 

45 b64decode, 

46 get_secret, 

47 simple_decrypt, 

48 stable_secure_uniform, 

49) 

50from couchers.db import session_scope 

51from couchers.email.dev import print_dev_email 

52from couchers.email.smtp import send_smtp_email 

53from couchers.helpers.badges import user_add_badge, user_remove_badge 

54from couchers.materialized_views import ( 

55 UserResponseRate, 

56 refresh_materialized_views, 

57 refresh_materialized_views_rapid, 

58) 

59from couchers.metrics import ( 

60 moderation_auto_approved_counter, 

61 push_notification_counter, 

62 strong_verification_completions_counter, 

63) 

64from couchers.models import ( 

65 AccountDeletionToken, 

66 ActivenessProbe, 

67 ActivenessProbeStatus, 

68 Cluster, 

69 ClusterRole, 

70 ClusterSubscription, 

71 EventOccurrence, 

72 EventOccurrenceAttendee, 

73 GroupChat, 

74 GroupChatSubscription, 

75 HostingStatus, 

76 HostRequest, 

77 HostRequestStatus, 

78 LoginToken, 

79 MeetupStatus, 

80 Message, 

81 MessageType, 

82 ModerationAction, 

83 ModerationLog, 

84 ModerationObjectType, 

85 ModerationQueueItem, 

86 ModerationState, 

87 ModerationTrigger, 

88 PassportSex, 

89 PasswordResetToken, 

90 PhotoGallery, 

91 PostalVerificationAttempt, 

92 PostalVerificationStatus, 

93 PushNotificationDeliveryAttempt, 

94 PushNotificationSubscription, 

95 Reference, 

96 StrongVerificationAttempt, 

97 StrongVerificationAttemptStatus, 

98 User, 

99 UserBadge, 

100 Volunteer, 

101) 

102from couchers.notifications.background import handle_email_digests, handle_notification 

103from couchers.notifications.expo_api import get_expo_push_receipts 

104from couchers.notifications.notify import notify 

105from couchers.notifications.send_raw_push_notification import send_raw_push_notification_v2 

106from couchers.postal.postcard_service import send_postcard 

107from couchers.proto import moderation_pb2, notification_data_pb2 

108from couchers.proto.internal import jobs_pb2, verification_pb2 

109from couchers.resources import get_badge_dict, get_static_badge_dict 

110from couchers.servicers.api import user_model_to_pb 

111from couchers.servicers.conversations import generate_message_notifications 

112from couchers.servicers.discussions import generate_create_discussion_notifications 

113from couchers.servicers.editor import generate_new_blog_post_notifications 

114from couchers.servicers.events import ( 

115 event_to_pb, 

116 generate_event_cancel_notifications, 

117 generate_event_create_notifications, 

118 generate_event_delete_notifications, 

119 generate_event_update_notifications, 

120) 

121from couchers.servicers.moderation import Moderation 

122from couchers.servicers.requests import host_request_to_pb 

123from couchers.servicers.threads import generate_reply_notifications 

124from couchers.sql import couchers_select as select 

125from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

126from couchers.tasks import send_duplicate_strong_verification_email 

127from couchers.utils import ( 

128 Timestamp_from_datetime, 

129 create_coordinate, 

130 get_coordinates, 

131 now, 

132) 

133 

134logger = logging.getLogger(__name__) 

135 

136 

137# these were straight up imported 

138handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

139 

140send_raw_push_notification_v2.PAYLOAD = jobs_pb2.SendRawPushNotificationPayloadV2 

141 

142handle_email_digests.PAYLOAD = empty_pb2.Empty 

143handle_email_digests.SCHEDULE = timedelta(minutes=15) 

144 

145generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

146 

147generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload 

148 

149generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload 

150 

151generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

152 

153generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

154 

155generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

156 

157generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

158 

159generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload 

160 

161refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

162refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

163 

164refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

165refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

166 

167 

168def send_email(payload: jobs_pb2.SendEmailPayload) -> None: 

169 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

170 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

171 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

172 # the sender must return a models.Email object that can be added to the database 

173 email = sender( 

174 sender_name=payload.sender_name, 

175 sender_email=payload.sender_email, 

176 recipient=payload.recipient, 

177 subject=payload.subject, 

178 plain=payload.plain, 

179 html=payload.html, 

180 list_unsubscribe_header=payload.list_unsubscribe_header, 

181 source_data=payload.source_data, 

182 ) 

183 with session_scope() as session: 

184 session.add(email) 

185 

186 

187send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

188 

189 

190def purge_login_tokens(payload: empty_pb2.Empty) -> None: 

191 logger.info("Purging login tokens") 

192 with session_scope() as session: 

193 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

194 

195 

196purge_login_tokens.PAYLOAD = empty_pb2.Empty 

197purge_login_tokens.SCHEDULE = timedelta(hours=24) 

198 

199 

200def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None: 

201 logger.info("Purging login tokens") 

202 with session_scope() as session: 

203 session.execute( 

204 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

205 ) 

206 

207 

208purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

209purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

210 

211 

212def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None: 

213 logger.info("Purging account deletion tokens") 

214 with session_scope() as session: 

215 session.execute( 

216 delete(AccountDeletionToken) 

217 .where(~AccountDeletionToken.is_valid) 

218 .execution_options(synchronize_session=False) 

219 ) 

220 

221 

222purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

223purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

224 

225 

226def send_message_notifications(payload: empty_pb2.Empty) -> None: 

227 """ 

228 Sends out email notifications for messages that have been unseen for a long enough time 

229 """ 

230 # very crude and dumb algorithm 

231 logger.info("Sending out email notifications for unseen messages") 

232 

233 with session_scope() as session: 

234 # users who have unnotified messages older than 5 minutes in any group chat 

235 users = ( 

236 session.execute( 

237 select(User) 

238 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

239 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

240 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

241 .where_moderated_content_visible_to_user_column(GroupChat, User.id) 

242 .where(not_(GroupChatSubscription.is_muted)) 

243 .where(User.is_visible) 

244 .where(Message.time >= GroupChatSubscription.joined) 

245 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

246 .where(Message.id > User.last_notified_message_id) 

247 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

248 .where(Message.time < now() - timedelta(minutes=5)) 

249 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

250 ) 

251 .scalars() 

252 .unique() 

253 ) 

254 

255 for user in users: 

256 context = make_background_user_context(user_id=user.id) 

257 # now actually grab all the group chats, not just less than 5 min old 

258 subquery = ( 

259 select( 

260 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

261 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

262 func.max(Message.id).label("message_id"), 

263 func.count(Message.id).label("count_unseen"), 

264 ) 

265 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

266 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

267 .where_moderated_content_visible(context, GroupChat, is_list_operation=True) 

268 .where(GroupChatSubscription.user_id == user.id) 

269 .where(not_(GroupChatSubscription.is_muted)) 

270 .where(Message.id > user.last_notified_message_id) 

271 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

272 .where(Message.time >= GroupChatSubscription.joined) 

273 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

274 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

275 .where_users_column_visible(context, Message.author_id) 

276 .group_by(GroupChatSubscription.group_chat_id) 

277 .order_by(func.max(Message.id).desc()) 

278 .subquery() 

279 ) 

280 

281 unseen_messages = session.execute( 

282 select(GroupChat, Message, subquery.c.count_unseen) 

283 .join(subquery, subquery.c.message_id == Message.id) 

284 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

285 .where_moderated_content_visible(context, GroupChat, is_list_operation=True) 

286 .order_by(subquery.c.message_id.desc()) 

287 ).all() 

288 

289 if not unseen_messages: 

290 continue 

291 

292 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

293 

294 def format_title(message, group_chat, count_unseen): 

295 if group_chat.is_dm: 

296 return f"You missed {count_unseen} message(s) from {message.author.name}" 

297 else: 

298 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

299 

300 notify( 

301 session, 

302 user_id=user.id, 

303 topic_action="chat:missed_messages", 

304 key="", 

305 data=notification_data_pb2.ChatMissedMessages( 

306 messages=[ 

307 notification_data_pb2.ChatMessage( 

308 author=user_model_to_pb( 

309 message.author, 

310 session, 

311 context, 

312 ), 

313 message=format_title(message, group_chat, count_unseen), 

314 text=message.text, 

315 group_chat_id=message.conversation_id, 

316 ) 

317 for group_chat, message, count_unseen in unseen_messages 

318 ], 

319 ), 

320 ) 

321 session.commit() 

322 

323 

324send_message_notifications.PAYLOAD = empty_pb2.Empty 

325send_message_notifications.SCHEDULE = timedelta(minutes=3) 

326 

327 

328def send_request_notifications(payload: empty_pb2.Empty) -> None: 

329 """ 

330 Sends out email notifications for unseen messages in host requests (as surfer or host) 

331 """ 

332 logger.info("Sending out email notifications for unseen messages in host requests") 

333 

334 with session_scope() as session: 

335 # Get all candidate users who might have unseen request messages 

336 candidate_user_ids = ( 

337 session.execute( 

338 select(User.id) 

339 .where(User.is_visible) 

340 .where( 

341 or_( 

342 # Users with unseen messages as surfer 

343 exists( 

344 select(1) 

345 .select_from(HostRequest) 

346 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

347 .where(HostRequest.surfer_user_id == User.id) 

348 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

349 .where(Message.id > User.last_notified_request_message_id) 

350 .where(Message.time < now() - timedelta(minutes=5)) 

351 .where(Message.message_type == MessageType.text) 

352 ), 

353 # Users with unseen messages as host 

354 exists( 

355 select(1) 

356 .select_from(HostRequest) 

357 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

358 .where(HostRequest.host_user_id == User.id) 

359 .where(Message.id > HostRequest.host_last_seen_message_id) 

360 .where(Message.id > User.last_notified_request_message_id) 

361 .where(Message.time < now() - timedelta(minutes=5)) 

362 .where(Message.message_type == MessageType.text) 

363 ), 

364 ) 

365 ) 

366 ) 

367 .scalars() 

368 .all() 

369 ) 

370 

371 for user_id in candidate_user_ids: 

372 context = make_background_user_context(user_id=user_id) 

373 

374 # requests where this user is surfing 

375 surfing_reqs = session.execute( 

376 select(User, HostRequest, func.max(Message.id)) 

377 .where(User.id == user_id) 

378 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

379 .where_moderated_content_visible_to_user_column(HostRequest, HostRequest.surfer_user_id) 

380 .where_users_column_visible(context, HostRequest.host_user_id) 

381 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

382 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

383 .where(Message.id > User.last_notified_request_message_id) 

384 .where(Message.time < now() - timedelta(minutes=5)) 

385 .where(Message.message_type == MessageType.text) 

386 .group_by(User, HostRequest) 

387 ).all() 

388 

389 # where this user is hosting 

390 hosting_reqs = session.execute( 

391 select(User, HostRequest, func.max(Message.id)) 

392 .where(User.id == user_id) 

393 .join(HostRequest, HostRequest.host_user_id == User.id) 

394 .where_moderated_content_visible_to_user_column(HostRequest, HostRequest.host_user_id) 

395 .where_users_column_visible(context, HostRequest.surfer_user_id) 

396 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

397 .where(Message.id > HostRequest.host_last_seen_message_id) 

398 .where(Message.id > User.last_notified_request_message_id) 

399 .where(Message.time < now() - timedelta(minutes=5)) 

400 .where(Message.message_type == MessageType.text) 

401 .group_by(User, HostRequest) 

402 ).all() 

403 

404 for user, host_request, max_message_id in surfing_reqs: 

405 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

406 session.flush() 

407 

408 notify( 

409 session, 

410 user_id=user.id, 

411 topic_action="host_request:missed_messages", 

412 key=str(host_request.conversation_id), 

413 data=notification_data_pb2.HostRequestMissedMessages( 

414 host_request=host_request_to_pb(host_request, session, context), 

415 user=user_model_to_pb(host_request.host, session, context), 

416 am_host=False, 

417 ), 

418 ) 

419 

420 for user, host_request, max_message_id in hosting_reqs: 

421 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

422 session.flush() 

423 

424 notify( 

425 session, 

426 user_id=user.id, 

427 topic_action="host_request:missed_messages", 

428 key=str(host_request.conversation_id), 

429 data=notification_data_pb2.HostRequestMissedMessages( 

430 host_request=host_request_to_pb(host_request, session, context), 

431 user=user_model_to_pb(host_request.surfer, session, context), 

432 am_host=True, 

433 ), 

434 ) 

435 

436 

437send_request_notifications.PAYLOAD = empty_pb2.Empty 

438send_request_notifications.SCHEDULE = timedelta(minutes=3) 

439 

440 

441def send_onboarding_emails(payload: empty_pb2.Empty) -> None: 

442 """ 

443 Sends out onboarding emails 

444 """ 

445 logger.info("Sending out onboarding emails") 

446 

447 with session_scope() as session: 

448 # first onboarding email 

449 users = ( 

450 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

451 ) 

452 

453 for user in users: 

454 notify( 

455 session, 

456 user_id=user.id, 

457 topic_action="onboarding:reminder", 

458 key="1", 

459 ) 

460 user.onboarding_emails_sent = 1 

461 user.last_onboarding_email_sent = now() 

462 session.commit() 

463 

464 # second onboarding email 

465 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

466 users = ( 

467 session.execute( 

468 select(User) 

469 .where(User.is_visible) 

470 .where(User.onboarding_emails_sent == 1) 

471 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

472 .where(User.has_completed_profile == False) 

473 ) 

474 .scalars() 

475 .all() 

476 ) 

477 

478 for user in users: 

479 notify( 

480 session, 

481 user_id=user.id, 

482 topic_action="onboarding:reminder", 

483 key="2", 

484 ) 

485 user.onboarding_emails_sent = 2 

486 user.last_onboarding_email_sent = now() 

487 session.commit() 

488 

489 

490send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

491send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

492 

493 

494def send_reference_reminders(payload: empty_pb2.Empty) -> None: 

495 """ 

496 Sends out reminders to write references after hosting/staying 

497 """ 

498 logger.info("Sending out reference reminder emails") 

499 

500 # Keep this in chronological order! 

501 reference_reminder_schedule = [ 

502 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

503 # the end time to write a reference is supposed to be midnight in the host's timezone 

504 # 8 pm ish on the last day of the stay 

505 (1, timedelta(days=15) - timedelta(hours=20), 14), 

506 # 2 pm ish a week after stay 

507 (2, timedelta(days=8) - timedelta(hours=14), 7), 

508 # 10 am ish 3 days before end of time to write ref 

509 (3, timedelta(days=4) - timedelta(hours=10), 3), 

510 ] 

511 

512 with session_scope() as session: 

513 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

514 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

515 user = aliased(User) 

516 other_user = aliased(User) 

517 # surfers needing to write a ref 

518 q1 = ( 

519 select(literal(True), HostRequest, user, other_user) 

520 .join(user, user.id == HostRequest.surfer_user_id) 

521 .join(other_user, other_user.id == HostRequest.host_user_id) 

522 .outerjoin( 

523 Reference, 

524 and_( 

525 Reference.host_request_id == HostRequest.conversation_id, 

526 # if no reference is found in this join, then the surfer has not written a ref 

527 Reference.from_user_id == HostRequest.surfer_user_id, 

528 ), 

529 ) 

530 .where(Reference.id == None) 

531 .where(HostRequest.can_write_reference) 

532 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

533 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

534 .where(HostRequest.surfer_reason_didnt_meetup == None) 

535 .where_users_visible_to_each_other(user, other_user) 

536 ) 

537 

538 # hosts needing to write a ref 

539 q2 = ( 

540 select(literal(False), HostRequest, user, other_user) 

541 .join(user, user.id == HostRequest.host_user_id) 

542 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

543 .outerjoin( 

544 Reference, 

545 and_( 

546 Reference.host_request_id == HostRequest.conversation_id, 

547 # if no reference is found in this join, then the host has not written a ref 

548 Reference.from_user_id == HostRequest.host_user_id, 

549 ), 

550 ) 

551 .where(Reference.id == None) 

552 .where(HostRequest.can_write_reference) 

553 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

554 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

555 .where(HostRequest.host_reason_didnt_meetup == None) 

556 .where_users_visible_to_each_other(user, other_user) 

557 ) 

558 

559 union = union_all(q1, q2).subquery() 

560 union = select( 

561 union.c[0].label("surfed"), 

562 aliased(HostRequest, union), 

563 aliased(user, union), 

564 aliased(other_user, union), 

565 ) 

566 reference_reminders = session.execute(union).all() 

567 

568 for surfed, host_request, user, other_user in reference_reminders: 

569 # visibility and blocking already checked in sql 

570 assert user.is_visible 

571 context = make_background_user_context(user_id=user.id) 

572 notify( 

573 session, 

574 user_id=user.id, 

575 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

576 key=str(host_request.conversation_id), 

577 data=notification_data_pb2.ReferenceReminder( 

578 host_request_id=host_request.conversation_id, 

579 other_user=user_model_to_pb(other_user, session, context), 

580 days_left=reminder_days_left, 

581 ), 

582 ) 

583 if surfed: 

584 host_request.surfer_sent_reference_reminders = reminder_number 

585 else: 

586 host_request.host_sent_reference_reminders = reminder_number 

587 session.commit() 

588 

589 

590send_reference_reminders.PAYLOAD = empty_pb2.Empty 

591send_reference_reminders.SCHEDULE = timedelta(hours=1) 

592 

593 

594def send_host_request_reminders(payload: empty_pb2.Empty) -> None: 

595 with session_scope() as session: 

596 host_has_sent_message = select(1).where( 

597 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id 

598 ) 

599 

600 requests = ( 

601 session.execute( 

602 select(HostRequest) 

603 .where_moderated_content_visible_to_user_column(HostRequest, HostRequest.host_user_id) 

604 .where(HostRequest.status == HostRequestStatus.pending) 

605 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

606 .where(HostRequest.start_time > func.now()) 

607 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

608 .where(~exists(host_has_sent_message)) 

609 .where_user_columns_visible_to_each_other(HostRequest.host_user_id, HostRequest.surfer_user_id) 

610 ) 

611 .scalars() 

612 .all() 

613 ) 

614 

615 for host_request in requests: 

616 host_request.host_sent_request_reminders += 1 

617 host_request.last_sent_request_reminder_time = now() 

618 

619 context = make_background_user_context(user_id=host_request.host_user_id) 

620 notify( 

621 session, 

622 user_id=host_request.host_user_id, 

623 topic_action="host_request:reminder", 

624 key=str(host_request.conversation_id), 

625 data=notification_data_pb2.HostRequestReminder( 

626 host_request=host_request_to_pb(host_request, session, context), 

627 surfer=user_model_to_pb(host_request.surfer, session, context), 

628 ), 

629 moderation_state_id=host_request.moderation_state_id, 

630 ) 

631 

632 session.commit() 

633 

634 

635send_host_request_reminders.PAYLOAD = empty_pb2.Empty 

636send_host_request_reminders.SCHEDULE = timedelta(minutes=15) 

637 

638 

639def add_users_to_email_list(payload: empty_pb2.Empty) -> None: 

640 if not config["LISTMONK_ENABLED"]: 

641 logger.info("Not adding users to mailing list") 

642 return 

643 

644 logger.info("Adding users to mailing list") 

645 

646 while True: 

647 with session_scope() as session: 

648 user = session.execute( 

649 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

650 ).scalar_one_or_none() 

651 if not user: 

652 logger.info("Finished adding users to mailing list") 

653 return 

654 

655 if user.opt_out_of_newsletter: 

656 user.in_sync_with_newsletter = True 

657 session.commit() 

658 continue 

659 

660 r = requests.post( 

661 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

662 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

663 json={ 

664 "email": user.email, 

665 "name": user.name, 

666 "lists": [config["LISTMONK_LIST_ID"]], 

667 "preconfirm_subscriptions": True, 

668 "attribs": {"couchers_user_id": user.id}, 

669 "status": "enabled", 

670 }, 

671 timeout=10, 

672 ) 

673 # the API returns if the user is already subscribed 

674 if r.status_code == 200 or r.status_code == 409: 

675 user.in_sync_with_newsletter = True 

676 session.commit() 

677 else: 

678 raise Exception("Failed to add users to mailing list") 

679 

680 

681add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

682add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

683 

684 

685def enforce_community_membership(payload: empty_pb2.Empty) -> None: 

686 tasks_enforce_community_memberships() 

687 

688 

689enforce_community_membership.PAYLOAD = empty_pb2.Empty 

690enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

691 

692 

693def update_recommendation_scores(payload: empty_pb2.Empty) -> None: 

694 text_fields = [ 

695 User.hometown, 

696 User.occupation, 

697 User.education, 

698 User.about_me, 

699 User.things_i_like, 

700 User.about_place, 

701 User.additional_information, 

702 User.pet_details, 

703 User.kid_details, 

704 User.housemate_details, 

705 User.other_host_info, 

706 User.sleeping_details, 

707 User.area, 

708 User.house_rules, 

709 ] 

710 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

711 

712 def poor_man_gaussian(): 

713 """ 

714 Produces an approximatley std normal random variate 

715 """ 

716 trials = 5 

717 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

718 

719 def int_(stmt): 

720 return func.coalesce(cast(stmt, Integer), 0) 

721 

722 def float_(stmt): 

723 return func.coalesce(cast(stmt, Float), 0.0) 

724 

725 with session_scope() as session: 

726 # profile 

727 profile_text = "" 

728 for field in text_fields: 

729 profile_text += func.coalesce(field, "") 

730 text_length = func.length(profile_text) 

731 home_text = "" 

732 for field in home_fields: 

733 home_text += func.coalesce(field, "") 

734 home_length = func.length(home_text) 

735 

736 filled_profile = int_(User.has_completed_profile) 

737 has_text = int_(text_length > 500) 

738 long_text = int_(text_length > 2000) 

739 can_host = int_(User.hosting_status == HostingStatus.can_host) 

740 may_host = int_(User.hosting_status == HostingStatus.maybe) 

741 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

742 filled_home = int_(User.has_completed_my_home) 

743 filled_home_lots = int_(home_length > 200) 

744 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

745 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

746 

747 # references 

748 left_ref_expr = int_(1).label("left_reference") 

749 left_refs_subquery = ( 

750 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

751 ) 

752 left_reference = int_(left_refs_subquery.c.left_reference) 

753 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

754 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

755 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

756 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

757 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

758 "has_bad_ref" 

759 ) 

760 received_ref_subquery = ( 

761 select( 

762 Reference.to_user_id.label("user_id"), 

763 has_reference_expr, 

764 has_multiple_types_expr, 

765 has_bad_ref_expr, 

766 ref_count_expr, 

767 ref_avg_expr, 

768 ) 

769 .group_by(Reference.to_user_id) 

770 .subquery() 

771 ) 

772 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

773 has_reference = int_(received_ref_subquery.c.has_reference) 

774 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

775 rating_score = float_( 

776 received_ref_subquery.c.ref_avg 

777 * ( 

778 2 * func.least(received_ref_subquery.c.ref_count, 5) 

779 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

780 ) 

781 ) 

782 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

783 

784 # activeness 

785 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

786 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

787 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

788 messaged_lots = int_(func.count(Message.id) > 5) 

789 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

790 messaging_subquery = ( 

791 select(Message.author_id.label("user_id"), messaging_points_subquery) 

792 .where(Message.message_type == MessageType.text) 

793 .group_by(Message.author_id) 

794 .subquery() 

795 ) 

796 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

797 

798 # verification 

799 cb_subquery = ( 

800 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

801 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

802 .where(ClusterSubscription.role == ClusterRole.admin) 

803 .where(Cluster.is_official_cluster) 

804 .group_by(ClusterSubscription.user_id) 

805 .subquery() 

806 ) 

807 min_node_id = cb_subquery.c.min_node_id 

808 cb = int_(min_node_id >= 1) 

809 wcb = int_(min_node_id == 1) 

810 badge_points = { 

811 "founder": 100, 

812 "board_member": 20, 

813 "past_board_member": 5, 

814 "strong_verification": 3, 

815 "volunteer": 3, 

816 "past_volunteer": 2, 

817 "donor": 1, 

818 "phone_verified": 1, 

819 } 

820 

821 badge_subquery = ( 

822 select( 

823 UserBadge.user_id.label("user_id"), 

824 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

825 ) 

826 .group_by(UserBadge.user_id) 

827 .subquery() 

828 ) 

829 

830 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

831 

832 # response rate 

833 hr_subquery = select( 

834 UserResponseRate.user_id, 

835 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

836 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

837 ).subquery() 

838 response_time_33p = hr_subquery.c.response_time_33p 

839 response_time_66p = hr_subquery.c.response_time_66p 

840 # be careful with nulls 

841 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

842 

843 recommendation_score = ( 

844 hosting_status_points 

845 + profile_points 

846 + ref_score 

847 + activeness_points 

848 + other_points 

849 + response_rate_points 

850 + 2 * poor_man_gaussian() 

851 ) 

852 

853 scores = ( 

854 select(User.id.label("user_id"), recommendation_score.label("score")) 

855 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

856 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

857 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

858 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

859 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

860 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

861 ).subquery() 

862 

863 session.execute( 

864 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

865 ) 

866 

867 logger.info("Updated recommendation scores") 

868 

869 

870update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

871update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

872 

873 

874def update_badges(payload: empty_pb2.Empty) -> None: 

875 with session_scope() as session: 

876 

877 def update_badge(badge_id: str, members: list[int]) -> None: 

878 badge = get_badge_dict()[badge_id] 

879 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

880 # in case the user ids don't exist in the db 

881 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

882 # we should add the badge to these 

883 add = set(actual_members) - set(user_ids) 

884 # we should remove the badge from these 

885 remove = set(user_ids) - set(actual_members) 

886 for user_id in add: 

887 user_add_badge(session, user_id, badge_id) 

888 

889 for user_id in remove: 

890 user_remove_badge(session, user_id, badge_id) 

891 

892 update_badge("founder", get_static_badge_dict()["founder"]) 

893 update_badge("board_member", get_static_badge_dict()["board_member"]) 

894 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

895 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all()) 

896 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

897 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

898 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

899 update_badge( 

900 "strong_verification", 

901 session.execute( 

902 select(User.id) 

903 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

904 .where(StrongVerificationAttempt.has_strong_verification(User)) 

905 ) 

906 .scalars() 

907 .all(), 

908 ) 

909 # volunteer badge for active volunteers (stopped_volunteering is null) 

910 update_badge( 

911 "volunteer", 

912 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(), 

913 ) 

914 # past_volunteer badge for past volunteers (stopped_volunteering is not null) 

915 update_badge( 

916 "past_volunteer", 

917 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None))) 

918 .scalars() 

919 .all(), 

920 ) 

921 

922 

923update_badges.PAYLOAD = empty_pb2.Empty 

924update_badges.SCHEDULE = timedelta(minutes=15) 

925 

926 

927def finalize_strong_verification(payload: "jobs_pb2.FinalizeStrongVerificationPayload") -> None: 

928 with session_scope() as session: 

929 verification_attempt = session.execute( 

930 select(StrongVerificationAttempt) 

931 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

932 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

933 ).scalar_one() 

934 response = requests.post( 

935 "https://passportreader.app/api/v1/session.get", 

936 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

937 json={"id": verification_attempt.iris_session_id}, 

938 timeout=10, 

939 verify="/etc/ssl/certs/ca-certificates.crt", 

940 ) 

941 if response.status_code != 200: 

942 raise Exception(f"Iris didn't return 200: {response.text}") 

943 json_data = response.json() 

944 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

945 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

946 ) 

947 assert verification_attempt.user_id == reference_payload.user_id 

948 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

949 assert verification_attempt.iris_session_id == json_data["id"] 

950 assert json_data["state"] == "APPROVED" 

951 

952 if json_data["document_type"] != "PASSPORT": 

953 verification_attempt.status = StrongVerificationAttemptStatus.failed 

954 notify( 

955 session, 

956 user_id=verification_attempt.user_id, 

957 topic_action="verification:sv_fail", 

958 key="", 

959 data=notification_data_pb2.VerificationSVFail( 

960 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

961 ), 

962 ) 

963 return 

964 

965 assert json_data["document_type"] == "PASSPORT" 

966 

967 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

968 nationality = json_data["nationality"] 

969 last_three_document_chars = json_data["document_number"][-3:] 

970 

971 existing_attempt = session.execute( 

972 select(StrongVerificationAttempt) 

973 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

974 .where(StrongVerificationAttempt.passport_nationality == nationality) 

975 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

976 .order_by(StrongVerificationAttempt.id) 

977 .limit(1) 

978 ).scalar_one_or_none() 

979 

980 verification_attempt.has_minimal_data = True 

981 verification_attempt.passport_expiry_date = expiry_date 

982 verification_attempt.passport_nationality = nationality 

983 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

984 

985 if existing_attempt: 

986 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

987 

988 if existing_attempt.user_id != verification_attempt.user_id: 

989 session.flush() 

990 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

991 

992 notify( 

993 session, 

994 user_id=verification_attempt.user_id, 

995 topic_action="verification:sv_fail", 

996 key="", 

997 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

998 ) 

999 return 

1000 

1001 verification_attempt.has_full_data = True 

1002 verification_attempt.passport_encrypted_data = asym_encrypt( 

1003 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

1004 ) 

1005 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

1006 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

1007 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

1008 

1009 session.flush() 

1010 

1011 strong_verification_completions_counter.inc() 

1012 

1013 user = verification_attempt.user 

1014 if verification_attempt.has_strong_verification(user): 

1015 badge_id = "strong_verification" 

1016 if session.execute( 

1017 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

1018 ).scalar_one_or_none(): 

1019 return 

1020 

1021 user_add_badge(session, user.id, badge_id, do_notify=False) 

1022 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success", key="") 

1023 else: 

1024 notify( 

1025 session, 

1026 user_id=verification_attempt.user_id, 

1027 topic_action="verification:sv_fail", 

1028 key="", 

1029 data=notification_data_pb2.VerificationSVFail( 

1030 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

1031 ), 

1032 ) 

1033 

1034 

1035finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload 

1036 

1037 

1038def send_activeness_probes(payload: empty_pb2.Empty) -> None: 

1039 with session_scope() as session: 

1040 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

1041 

1042 if config["ACTIVENESS_PROBES_ENABLED"]: 

1043 # current activeness probes 

1044 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

1045 

1046 # users who we should send an activeness probe to 

1047 new_probe_user_ids = ( 

1048 session.execute( 

1049 select(User.id) 

1050 .where(User.is_visible) 

1051 .where(User.hosting_status == HostingStatus.can_host) 

1052 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

1053 .where(User.id.not_in(select(subquery.c.user_id))) 

1054 ) 

1055 .scalars() 

1056 .all() 

1057 ) 

1058 

1059 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

1060 probes_today = session.execute( 

1061 select(func.count()) 

1062 .select_from(ActivenessProbe) 

1063 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

1064 ).scalar_one() 

1065 

1066 # send probes to max 2% of users per day 

1067 max_probes_per_day = 0.02 * total_users 

1068 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

1069 

1070 if len(new_probe_user_ids) > max_probe_size: 

1071 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

1072 

1073 for user_id in new_probe_user_ids: 

1074 session.add(ActivenessProbe(user_id=user_id)) 

1075 

1076 session.commit() 

1077 

1078 ## Step 2: actually send out probe notifications 

1079 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

1080 probes = ( 

1081 session.execute( 

1082 select(ActivenessProbe) 

1083 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

1084 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1085 .where(ActivenessProbe.is_pending) 

1086 ) 

1087 .scalars() 

1088 .all() 

1089 ) 

1090 

1091 for probe in probes: 

1092 probe.notifications_sent = probe_number_minus_1 + 1 

1093 context = make_background_user_context(user_id=probe.user.id) 

1094 notify( 

1095 session, 

1096 user_id=probe.user.id, 

1097 topic_action="activeness:probe", 

1098 key=str(probe.id), 

1099 data=notification_data_pb2.ActivenessProbe( 

1100 reminder_number=probe_number_minus_1 + 1, 

1101 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1102 ), 

1103 ) 

1104 session.commit() 

1105 

1106 ## Step 3: for those who haven't responded, mark them as failed 

1107 expired_probes = ( 

1108 session.execute( 

1109 select(ActivenessProbe) 

1110 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1111 .where(ActivenessProbe.is_pending) 

1112 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1113 ) 

1114 .scalars() 

1115 .all() 

1116 ) 

1117 

1118 for probe in expired_probes: 

1119 probe.responded = now() 

1120 probe.response = ActivenessProbeStatus.expired 

1121 if probe.user.hosting_status == HostingStatus.can_host: 

1122 probe.user.hosting_status = HostingStatus.maybe 

1123 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 

1124 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1125 session.commit() 

1126 

1127 

1128send_activeness_probes.PAYLOAD = empty_pb2.Empty 

1129send_activeness_probes.SCHEDULE = timedelta(minutes=60) 

1130 

1131 

1132def update_randomized_locations(payload: empty_pb2.Empty) -> None: 

1133 """ 

1134 We generate for each user a randomized location as follows: 

1135 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1136 - For each user, mix in the user_id for randomness 

1137 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1138 - Generate an angle from [0, 360] 

1139 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1140 """ 

1141 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1142 

1143 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]: 

1144 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1145 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1146 radius = 0.02 + 0.08 * radius_u 

1147 angle_rad = 2 * pi * angle_u 

1148 offset_lng = radius * cos(angle_rad) 

1149 offset_lat = radius * sin(angle_rad) 

1150 return lat + offset_lat, lng + offset_lng 

1151 

1152 user_updates: list[dict[str, Any]] = [] 

1153 

1154 with session_scope() as session: 

1155 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1156 

1157 for user_id, geom in users_to_update: 

1158 lat, lng = get_coordinates(geom) 

1159 user_updates.append( 

1160 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1161 ) 

1162 

1163 with session_scope() as session: 

1164 session.execute(update(User), user_updates) 

1165 

1166 

1167update_randomized_locations.PAYLOAD = empty_pb2.Empty 

1168update_randomized_locations.SCHEDULE = timedelta(hours=1) 

1169 

1170 

1171def send_event_reminders(payload: empty_pb2.Empty) -> None: 

1172 """ 

1173 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1174 """ 

1175 logger.info("Sending event reminder emails") 

1176 

1177 with session_scope() as session: 

1178 occurrences = ( 

1179 session.execute( 

1180 select(EventOccurrence) 

1181 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1182 .where(EventOccurrence.start_time >= now()) 

1183 ) 

1184 .scalars() 

1185 .all() 

1186 ) 

1187 

1188 for occurrence in occurrences: 

1189 results = session.execute( 

1190 select(User, EventOccurrenceAttendee) 

1191 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1192 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1193 .where(EventOccurrenceAttendee.reminder_sent == False) 

1194 ).all() 

1195 

1196 for user, attendee in results: 

1197 context = make_background_user_context(user_id=user.id) 

1198 

1199 notify( 

1200 session, 

1201 user_id=user.id, 

1202 topic_action="event:reminder", 

1203 key=str(occurrence.id), 

1204 data=notification_data_pb2.EventReminder( 

1205 event=event_to_pb(session, occurrence, context), 

1206 user=user_model_to_pb(user, session, context), 

1207 ), 

1208 ) 

1209 

1210 attendee.reminder_sent = True 

1211 session.commit() 

1212 

1213 

1214send_event_reminders.PAYLOAD = empty_pb2.Empty 

1215send_event_reminders.SCHEDULE = timedelta(hours=1) 

1216 

1217 

1218def check_expo_push_receipts(payload: empty_pb2.Empty) -> None: 

1219 """ 

1220 Check Expo push receipts in batch and update delivery attempts. 

1221 """ 

1222 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max 

1223 

1224 for iteration in range(MAX_ITERATIONS): 

1225 with session_scope() as session: 

1226 # Find all delivery attempts that need receipt checking 

1227 # Wait 15 minutes per Expo's recommendation before checking receipts 

1228 attempts = ( 

1229 session.execute( 

1230 select(PushNotificationDeliveryAttempt) 

1231 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None) 

1232 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None) 

1233 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15)) 

1234 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24)) 

1235 .limit(100) 

1236 ) 

1237 .scalars() 

1238 .all() 

1239 ) 

1240 

1241 if not attempts: 

1242 logger.debug("No Expo receipts to check") 

1243 return 

1244 

1245 logger.info(f"Checking {len(attempts)} Expo push receipts") 

1246 

1247 receipts = get_expo_push_receipts([attempt.expo_ticket_id for attempt in attempts]) 

1248 

1249 for attempt in attempts: 

1250 receipt = receipts.get(attempt.expo_ticket_id) 

1251 

1252 # Always mark as checked to avoid infinite loops 

1253 attempt.receipt_checked_at = now() 

1254 

1255 if receipt is None: 

1256 # Receipt not found after 15min - likely expired (>24h) or never existed 

1257 # Per Expo docs: receipts should be available within 15 minutes 

1258 attempt.receipt_status = "not_found" 

1259 continue 

1260 

1261 attempt.receipt_status = receipt.get("status") 

1262 

1263 if receipt.get("status") == "error": 

1264 details = receipt.get("details", {}) 

1265 error_code = details.get("error") 

1266 attempt.receipt_error_code = error_code 

1267 

1268 if error_code == "DeviceNotRegistered": 

1269 # Device token is no longer valid - disable the subscription 

1270 sub = session.execute( 

1271 select(PushNotificationSubscription).where( 

1272 PushNotificationSubscription.id == attempt.push_notification_subscription_id 

1273 ) 

1274 ).scalar_one() 

1275 

1276 if sub.disabled_at > now(): 

1277 sub.disabled_at = now() 

1278 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt") 

1279 push_notification_counter.labels( 

1280 platform="expo", outcome="permanent_subscription_failure_receipt" 

1281 ).inc() 

1282 else: 

1283 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}") 

1284 

1285 # If we get here, we've exhausted MAX_ITERATIONS without finishing 

1286 raise RuntimeError( 

1287 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - " 

1288 "there may be an unusually large backlog of receipts to check" 

1289 ) 

1290 

1291 

1292check_expo_push_receipts.PAYLOAD = empty_pb2.Empty 

1293check_expo_push_receipts.SCHEDULE = timedelta(minutes=5) 

1294 

1295 

1296def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None: 

1297 """ 

1298 Sends the postcard via external API and updates attempt status. 

1299 """ 

1300 with session_scope() as session: 

1301 attempt = session.execute( 

1302 select(PostalVerificationAttempt).where( 

1303 PostalVerificationAttempt.id == payload.postal_verification_attempt_id 

1304 ) 

1305 ).scalar_one_or_none() 

1306 

1307 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 

1308 logger.warning( 

1309 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state" 

1310 ) 

1311 return 

1312 

1313 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one() 

1314 

1315 result = send_postcard( 

1316 recipient_name=user_name, 

1317 address_line_1=attempt.address_line_1, 

1318 address_line_2=attempt.address_line_2, 

1319 city=attempt.city, 

1320 state=attempt.state, 

1321 postal_code=attempt.postal_code, 

1322 country=attempt.country, 

1323 verification_code=attempt.verification_code, 

1324 qr_code_url=urls.postal_verification_link(code=attempt.verification_code), 

1325 ) 

1326 

1327 if result.success: 

1328 attempt.status = PostalVerificationStatus.awaiting_verification 

1329 attempt.postcard_sent_at = func.now() 

1330 

1331 notify( 

1332 session, 

1333 user_id=attempt.user_id, 

1334 topic_action="postal_verification:postcard_sent", 

1335 key="", 

1336 data=notification_data_pb2.PostalVerificationPostcardSent( 

1337 city=attempt.city, 

1338 country=attempt.country, 

1339 ), 

1340 ) 

1341 else: 

1342 # Could retry or fail - for now, fail 

1343 attempt.status = PostalVerificationStatus.failed 

1344 logger.error(f"Postcard send failed: {result.error_message}") 

1345 

1346 

1347send_postal_verification_postcard.PAYLOAD = jobs_pb2.SendPostalVerificationPostcardPayload 

1348 

1349 

1350class DatabaseInconsistencyError(Exception): 

1351 """Raised when database consistency checks fail""" 

1352 

1353 pass 

1354 

1355 

1356def check_database_consistency(payload: empty_pb2.Empty) -> None: 

1357 """ 

1358 Checks database consistency and raises an exception if any issues are found. 

1359 """ 

1360 logger.info("Checking database consistency") 

1361 errors = [] 

1362 

1363 with session_scope() as session: 

1364 # Check that all non-deleted users have a profile gallery 

1365 users_without_gallery = session.execute( 

1366 select(User.id, User.username).where(User.is_deleted == False).where(User.profile_gallery_id.is_(None)) 

1367 ).all() 

1368 if users_without_gallery: 

1369 errors.append(f"Users without profile gallery: {users_without_gallery}") 

1370 

1371 # Check that all profile galleries point to their owner 

1372 mismatched_galleries = session.execute( 

1373 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id) 

1374 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id) 

1375 .where(User.profile_gallery_id.is_not(None)) 

1376 .where(PhotoGallery.owner_user_id != User.id) 

1377 ).all() 

1378 if mismatched_galleries: 

1379 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}") 

1380 

1381 # === Moderation System Consistency Checks === 

1382 

1383 # Check all ModerationStates have a known object_type 

1384 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT] 

1385 unknown_type_states = session.execute( 

1386 select(ModerationState.id, ModerationState.object_type).where( 

1387 ModerationState.object_type.not_in(known_object_types) 

1388 ) 

1389 ).all() 

1390 if unknown_type_states: 

1391 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}") 

1392 

1393 # Check every ModerationState has at least one INITIAL_REVIEW queue item 

1394 # Skip items with ID < 2000000 as they were created before this check was introduced 

1395 states_without_initial_review = session.execute( 

1396 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1397 ModerationState.id >= 2000000, 

1398 ~exists( 

1399 select(1) 

1400 .where(ModerationQueueItem.moderation_state_id == ModerationState.id) 

1401 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1402 ), 

1403 ) 

1404 ).all() 

1405 if states_without_initial_review: 

1406 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}") 

1407 

1408 # Check every ModerationState has a CREATE log entry 

1409 # Skip items with ID < 2000000 as they were created before this check was introduced 

1410 states_without_create_log = session.execute( 

1411 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1412 ModerationState.id >= 2000000, 

1413 ~exists( 

1414 select(1) 

1415 .where(ModerationLog.moderation_state_id == ModerationState.id) 

1416 .where(ModerationLog.action == ModerationAction.CREATE) 

1417 ), 

1418 ) 

1419 ).all() 

1420 if states_without_create_log: 

1421 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}") 

1422 

1423 # Check resolved queue items point to log entries for the same moderation state 

1424 resolved_item_log_mismatches = session.execute( 

1425 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id) 

1426 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id) 

1427 .where(ModerationQueueItem.resolved_by_log_id.is_not(None)) 

1428 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id) 

1429 ).all() 

1430 if resolved_item_log_mismatches: 

1431 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}") 

1432 

1433 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it 

1434 hr_states = ( 

1435 session.execute( 

1436 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1437 ) 

1438 .scalars() 

1439 .all() 

1440 ) 

1441 for state_id in hr_states: 

1442 hr_count = session.execute( 

1443 select(func.count()).where(HostRequest.moderation_state_id == state_id) 

1444 ).scalar_one() 

1445 if hr_count != 1: 

1446 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)") 

1447 

1448 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it 

1449 gc_states = ( 

1450 session.execute( 

1451 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1452 ) 

1453 .scalars() 

1454 .all() 

1455 ) 

1456 for state_id in gc_states: 

1457 gc_count = session.execute( 

1458 select(func.count()).where(GroupChat.moderation_state_id == state_id) 

1459 ).scalar_one() 

1460 if gc_count != 1: 

1461 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)") 

1462 

1463 # Check ModerationState.object_id matches the actual object's ID 

1464 hr_object_id_mismatches = session.execute( 

1465 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id) 

1466 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id) 

1467 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1468 .where(ModerationState.object_id != HostRequest.conversation_id) 

1469 ).all() 

1470 if hr_object_id_mismatches: 

1471 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}") 

1472 

1473 gc_object_id_mismatches = session.execute( 

1474 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id) 

1475 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id) 

1476 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1477 .where(ModerationState.object_id != GroupChat.conversation_id) 

1478 ).all() 

1479 if gc_object_id_mismatches: 

1480 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}") 

1481 

1482 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState 

1483 hr_reverse_mismatches = session.execute( 

1484 select( 

1485 HostRequest.conversation_id, 

1486 HostRequest.moderation_state_id, 

1487 ModerationState.object_type, 

1488 ModerationState.object_id, 

1489 ) 

1490 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id) 

1491 .where( 

1492 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST) 

1493 | (ModerationState.object_id != HostRequest.conversation_id) 

1494 ) 

1495 ).all() 

1496 if hr_reverse_mismatches: 

1497 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}") 

1498 

1499 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState 

1500 gc_reverse_mismatches = session.execute( 

1501 select( 

1502 GroupChat.conversation_id, 

1503 GroupChat.moderation_state_id, 

1504 ModerationState.object_type, 

1505 ModerationState.object_id, 

1506 ) 

1507 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id) 

1508 .where( 

1509 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT) 

1510 | (ModerationState.object_id != GroupChat.conversation_id) 

1511 ) 

1512 ).all() 

1513 if gc_reverse_mismatches: 

1514 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}") 

1515 

1516 # Ensure auto-approve deadline isn't being exceeded by a significant margin 

1517 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting 

1518 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1519 if deadline_seconds > 0: 

1520 grace_period = timedelta(minutes=5) 

1521 stale_initial_review_items = session.execute( 

1522 select( 

1523 ModerationQueueItem.id, 

1524 ModerationQueueItem.moderation_state_id, 

1525 ModerationQueueItem.time_created, 

1526 ) 

1527 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1528 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

1529 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period) 

1530 ).all() 

1531 if stale_initial_review_items: 

1532 errors.append( 

1533 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}" 

1534 ) 

1535 

1536 if errors: 

1537 raise DatabaseInconsistencyError("\n".join(errors)) 

1538 

1539 

1540check_database_consistency.PAYLOAD = empty_pb2.Empty 

1541check_database_consistency.SCHEDULE = timedelta(hours=24) 

1542 

1543 

1544def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None: 

1545 """ 

1546 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline. 

1547 Items explicitly actioned by moderators are left alone. 

1548 """ 

1549 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1550 if deadline_seconds <= 0: 

1551 return 

1552 

1553 with session_scope() as session: 

1554 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"]) 

1555 

1556 items = ( 

1557 Moderation() 

1558 .GetModerationQueue( 

1559 request=moderation_pb2.GetModerationQueueReq( 

1560 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW], 

1561 unresolved_only=True, 

1562 page_size=100, 

1563 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)), 

1564 ), 

1565 context=ctx, 

1566 session=session, 

1567 ) 

1568 .queue_items 

1569 ) 

1570 

1571 if not items: 

1572 return 

1573 

1574 logger.info(f"Auto-approving {len(items)} moderation queue items") 

1575 for item in items: 

1576 Moderation().ModerateContent( 

1577 request=moderation_pb2.ModerateContentReq( 

1578 moderation_state_id=item.moderation_state_id, 

1579 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1580 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1581 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.", 

1582 ), 

1583 context=ctx, 

1584 session=session, 

1585 ) 

1586 moderation_auto_approved_counter.inc(len(items)) 

1587 

1588 

1589auto_approve_moderation_queue.PAYLOAD = empty_pb2.Empty 

1590auto_approve_moderation_queue.SCHEDULE = timedelta(seconds=15)