Coverage for src/couchers/jobs/handlers.py: 92%

526 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-10 13:55 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import cos, pi, sin, sqrt 

8from random import sample 

9from typing import Any 

10 

11import requests 

12from google.protobuf import empty_pb2 

13from sqlalchemy import Float, Integer 

14from sqlalchemy.orm import aliased 

15from sqlalchemy.sql import ( 

16 and_, 

17 case, 

18 cast, 

19 delete, 

20 distinct, 

21 exists, 

22 extract, 

23 func, 

24 literal, 

25 not_, 

26 or_, 

27 union_all, 

28 update, 

29) 

30 

31from couchers import urls 

32from couchers.config import config 

33from couchers.constants import ( 

34 ACTIVENESS_PROBE_EXPIRY_TIME, 

35 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

36 ACTIVENESS_PROBE_TIME_REMINDERS, 

37 EVENT_REMINDER_TIMEDELTA, 

38 HOST_REQUEST_MAX_REMINDERS, 

39 HOST_REQUEST_REMINDER_INTERVAL, 

40) 

41from couchers.context import make_background_user_context 

42from couchers.crypto import ( 

43 USER_LOCATION_RANDOMIZATION_NAME, 

44 asym_encrypt, 

45 b64decode, 

46 get_secret, 

47 simple_decrypt, 

48 stable_secure_uniform, 

49) 

50from couchers.db import session_scope 

51from couchers.email.dev import print_dev_email 

52from couchers.email.smtp import send_smtp_email 

53from couchers.helpers.badges import user_add_badge, user_remove_badge 

54from couchers.materialized_views import ( 

55 UserResponseRate, 

56 refresh_materialized_views, 

57 refresh_materialized_views_rapid, 

58) 

59from couchers.metrics import ( 

60 moderation_auto_approved_counter, 

61 push_notification_counter, 

62 strong_verification_completions_counter, 

63) 

64from couchers.models import ( 

65 AccountDeletionToken, 

66 ActivenessProbe, 

67 ActivenessProbeStatus, 

68 Cluster, 

69 ClusterRole, 

70 ClusterSubscription, 

71 EventOccurrence, 

72 EventOccurrenceAttendee, 

73 GroupChat, 

74 GroupChatSubscription, 

75 HostingStatus, 

76 HostRequest, 

77 HostRequestStatus, 

78 LoginToken, 

79 MeetupStatus, 

80 Message, 

81 MessageType, 

82 ModerationAction, 

83 ModerationLog, 

84 ModerationObjectType, 

85 ModerationQueueItem, 

86 ModerationState, 

87 ModerationTrigger, 

88 ModerationVisibility, 

89 Notification, 

90 NotificationDelivery, 

91 PassportSex, 

92 PasswordResetToken, 

93 PhotoGallery, 

94 PostalVerificationAttempt, 

95 PostalVerificationStatus, 

96 PushNotificationDeliveryAttempt, 

97 PushNotificationSubscription, 

98 Reference, 

99 StrongVerificationAttempt, 

100 StrongVerificationAttemptStatus, 

101 User, 

102 UserBadge, 

103 Volunteer, 

104) 

105from couchers.notifications.background import handle_email_digests, handle_notification 

106from couchers.notifications.expo_api import get_expo_push_receipts 

107from couchers.notifications.notify import notify 

108from couchers.notifications.send_raw_push_notification import send_raw_push_notification_v2 

109from couchers.postal.postcard_service import send_postcard 

110from couchers.proto import moderation_pb2, notification_data_pb2 

111from couchers.proto.internal import jobs_pb2, verification_pb2 

112from couchers.resources import get_badge_dict, get_static_badge_dict 

113from couchers.servicers.api import user_model_to_pb 

114from couchers.servicers.conversations import generate_message_notifications 

115from couchers.servicers.discussions import generate_create_discussion_notifications 

116from couchers.servicers.editor import generate_new_blog_post_notifications 

117from couchers.servicers.events import ( 

118 event_to_pb, 

119 generate_event_cancel_notifications, 

120 generate_event_create_notifications, 

121 generate_event_delete_notifications, 

122 generate_event_update_notifications, 

123) 

124from couchers.servicers.moderation import Moderation 

125from couchers.servicers.requests import host_request_to_pb 

126from couchers.servicers.threads import generate_reply_notifications 

127from couchers.sql import couchers_select as select 

128from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

129from couchers.tasks import send_duplicate_strong_verification_email 

130from couchers.utils import ( 

131 Timestamp_from_datetime, 

132 create_coordinate, 

133 get_coordinates, 

134 now, 

135) 

136 

137logger = logging.getLogger(__name__) 

138 

139 

140# these were straight up imported 

141handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

142 

143send_raw_push_notification_v2.PAYLOAD = jobs_pb2.SendRawPushNotificationPayloadV2 

144 

145handle_email_digests.PAYLOAD = empty_pb2.Empty 

146handle_email_digests.SCHEDULE = timedelta(minutes=15) 

147 

148generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

149 

150generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload 

151 

152generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload 

153 

154generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

155 

156generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

157 

158generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

159 

160generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

161 

162generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload 

163 

164refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

165refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

166 

167refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

168refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

169 

170 

171def send_email(payload: jobs_pb2.SendEmailPayload) -> None: 

172 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

173 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

174 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

175 # the sender must return a models.Email object that can be added to the database 

176 email = sender( 

177 sender_name=payload.sender_name, 

178 sender_email=payload.sender_email, 

179 recipient=payload.recipient, 

180 subject=payload.subject, 

181 plain=payload.plain, 

182 html=payload.html, 

183 list_unsubscribe_header=payload.list_unsubscribe_header, 

184 source_data=payload.source_data, 

185 ) 

186 with session_scope() as session: 

187 session.add(email) 

188 

189 

190send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

191 

192 

193def purge_login_tokens(payload: empty_pb2.Empty) -> None: 

194 logger.info("Purging login tokens") 

195 with session_scope() as session: 

196 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

197 

198 

199purge_login_tokens.PAYLOAD = empty_pb2.Empty 

200purge_login_tokens.SCHEDULE = timedelta(hours=24) 

201 

202 

203def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None: 

204 logger.info("Purging login tokens") 

205 with session_scope() as session: 

206 session.execute( 

207 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

208 ) 

209 

210 

211purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

212purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

213 

214 

215def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None: 

216 logger.info("Purging account deletion tokens") 

217 with session_scope() as session: 

218 session.execute( 

219 delete(AccountDeletionToken) 

220 .where(~AccountDeletionToken.is_valid) 

221 .execution_options(synchronize_session=False) 

222 ) 

223 

224 

225purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

226purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

227 

228 

229def send_message_notifications(payload: empty_pb2.Empty) -> None: 

230 """ 

231 Sends out email notifications for messages that have been unseen for a long enough time 

232 """ 

233 # very crude and dumb algorithm 

234 logger.info("Sending out email notifications for unseen messages") 

235 

236 with session_scope() as session: 

237 # users who have unnotified messages older than 5 minutes in any group chat 

238 users = ( 

239 session.execute( 

240 select(User) 

241 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

242 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

243 .where(not_(GroupChatSubscription.is_muted)) 

244 .where(User.is_visible) 

245 .where(Message.time >= GroupChatSubscription.joined) 

246 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

247 .where(Message.id > User.last_notified_message_id) 

248 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

249 .where(Message.time < now() - timedelta(minutes=5)) 

250 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

251 ) 

252 .scalars() 

253 .unique() 

254 ) 

255 

256 for user in users: 

257 context = make_background_user_context(user_id=user.id) 

258 # now actually grab all the group chats, not just less than 5 min old 

259 subquery = ( 

260 select( 

261 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

262 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

263 func.max(Message.id).label("message_id"), 

264 func.count(Message.id).label("count_unseen"), 

265 ) 

266 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

267 .where(GroupChatSubscription.user_id == user.id) 

268 .where(not_(GroupChatSubscription.is_muted)) 

269 .where(Message.id > user.last_notified_message_id) 

270 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

271 .where(Message.time >= GroupChatSubscription.joined) 

272 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

273 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

274 .where_users_column_visible(context, Message.author_id) 

275 .group_by(GroupChatSubscription.group_chat_id) 

276 .order_by(func.max(Message.id).desc()) 

277 .subquery() 

278 ) 

279 

280 unseen_messages = session.execute( 

281 select(GroupChat, Message, subquery.c.count_unseen) 

282 .join(subquery, subquery.c.message_id == Message.id) 

283 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

284 .order_by(subquery.c.message_id.desc()) 

285 ).all() 

286 

287 if not unseen_messages: 

288 continue 

289 

290 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

291 

292 def format_title(message, group_chat, count_unseen): 

293 if group_chat.is_dm: 

294 return f"You missed {count_unseen} message(s) from {message.author.name}" 

295 else: 

296 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

297 

298 notify( 

299 session, 

300 user_id=user.id, 

301 topic_action="chat:missed_messages", 

302 key="", 

303 data=notification_data_pb2.ChatMissedMessages( 

304 messages=[ 

305 notification_data_pb2.ChatMessage( 

306 author=user_model_to_pb( 

307 message.author, 

308 session, 

309 context, 

310 ), 

311 message=format_title(message, group_chat, count_unseen), 

312 text=message.text, 

313 group_chat_id=message.conversation_id, 

314 ) 

315 for group_chat, message, count_unseen in unseen_messages 

316 ], 

317 ), 

318 ) 

319 session.commit() 

320 

321 

322send_message_notifications.PAYLOAD = empty_pb2.Empty 

323send_message_notifications.SCHEDULE = timedelta(minutes=3) 

324 

325 

326def send_request_notifications(payload: empty_pb2.Empty) -> None: 

327 """ 

328 Sends out email notifications for unseen messages in host requests (as surfer or host) 

329 """ 

330 logger.info("Sending out email notifications for unseen messages in host requests") 

331 

332 with session_scope() as session: 

333 # Get all candidate users who might have unseen request messages 

334 candidate_user_ids = ( 

335 session.execute( 

336 select(User.id) 

337 .where(User.is_visible) 

338 .where( 

339 or_( 

340 # Users with unseen messages as surfer 

341 exists( 

342 select(1) 

343 .select_from(HostRequest) 

344 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

345 .where(HostRequest.surfer_user_id == User.id) 

346 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

347 .where(Message.id > User.last_notified_request_message_id) 

348 .where(Message.time < now() - timedelta(minutes=5)) 

349 .where(Message.message_type == MessageType.text) 

350 ), 

351 # Users with unseen messages as host 

352 exists( 

353 select(1) 

354 .select_from(HostRequest) 

355 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

356 .where(HostRequest.host_user_id == User.id) 

357 .where(Message.id > HostRequest.host_last_seen_message_id) 

358 .where(Message.id > User.last_notified_request_message_id) 

359 .where(Message.time < now() - timedelta(minutes=5)) 

360 .where(Message.message_type == MessageType.text) 

361 ), 

362 ) 

363 ) 

364 ) 

365 .scalars() 

366 .all() 

367 ) 

368 

369 for user_id in candidate_user_ids: 

370 context = make_background_user_context(user_id=user_id) 

371 

372 # requests where this user is surfing 

373 surfing_reqs = session.execute( 

374 select(User, HostRequest, func.max(Message.id)) 

375 .where(User.id == user_id) 

376 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

377 .where_users_column_visible(context, HostRequest.host_user_id) 

378 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

379 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

380 .where(Message.id > User.last_notified_request_message_id) 

381 .where(Message.time < now() - timedelta(minutes=5)) 

382 .where(Message.message_type == MessageType.text) 

383 .group_by(User, HostRequest) 

384 ).all() 

385 

386 # where this user is hosting 

387 hosting_reqs = session.execute( 

388 select(User, HostRequest, func.max(Message.id)) 

389 .where(User.id == user_id) 

390 .join(HostRequest, HostRequest.host_user_id == User.id) 

391 .where_users_column_visible(context, HostRequest.surfer_user_id) 

392 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

393 .where(Message.id > HostRequest.host_last_seen_message_id) 

394 .where(Message.id > User.last_notified_request_message_id) 

395 .where(Message.time < now() - timedelta(minutes=5)) 

396 .where(Message.message_type == MessageType.text) 

397 .group_by(User, HostRequest) 

398 ).all() 

399 

400 for user, host_request, max_message_id in surfing_reqs: 

401 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

402 session.flush() 

403 

404 notify( 

405 session, 

406 user_id=user.id, 

407 topic_action="host_request:missed_messages", 

408 key=str(host_request.conversation_id), 

409 data=notification_data_pb2.HostRequestMissedMessages( 

410 host_request=host_request_to_pb(host_request, session, context), 

411 user=user_model_to_pb(host_request.host, session, context), 

412 am_host=False, 

413 ), 

414 ) 

415 

416 for user, host_request, max_message_id in hosting_reqs: 

417 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

418 session.flush() 

419 

420 notify( 

421 session, 

422 user_id=user.id, 

423 topic_action="host_request:missed_messages", 

424 key=str(host_request.conversation_id), 

425 data=notification_data_pb2.HostRequestMissedMessages( 

426 host_request=host_request_to_pb(host_request, session, context), 

427 user=user_model_to_pb(host_request.surfer, session, context), 

428 am_host=True, 

429 ), 

430 ) 

431 

432 

433send_request_notifications.PAYLOAD = empty_pb2.Empty 

434send_request_notifications.SCHEDULE = timedelta(minutes=3) 

435 

436 

437def send_onboarding_emails(payload: empty_pb2.Empty) -> None: 

438 """ 

439 Sends out onboarding emails 

440 """ 

441 logger.info("Sending out onboarding emails") 

442 

443 with session_scope() as session: 

444 # first onboarding email 

445 users = ( 

446 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

447 ) 

448 

449 for user in users: 

450 notify( 

451 session, 

452 user_id=user.id, 

453 topic_action="onboarding:reminder", 

454 key="1", 

455 ) 

456 user.onboarding_emails_sent = 1 

457 user.last_onboarding_email_sent = now() 

458 session.commit() 

459 

460 # second onboarding email 

461 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

462 users = ( 

463 session.execute( 

464 select(User) 

465 .where(User.is_visible) 

466 .where(User.onboarding_emails_sent == 1) 

467 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

468 .where(User.has_completed_profile == False) 

469 ) 

470 .scalars() 

471 .all() 

472 ) 

473 

474 for user in users: 

475 notify( 

476 session, 

477 user_id=user.id, 

478 topic_action="onboarding:reminder", 

479 key="2", 

480 ) 

481 user.onboarding_emails_sent = 2 

482 user.last_onboarding_email_sent = now() 

483 session.commit() 

484 

485 

486send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

487send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

488 

489 

490def send_reference_reminders(payload: empty_pb2.Empty) -> None: 

491 """ 

492 Sends out reminders to write references after hosting/staying 

493 """ 

494 logger.info("Sending out reference reminder emails") 

495 

496 # Keep this in chronological order! 

497 reference_reminder_schedule = [ 

498 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

499 # the end time to write a reference is supposed to be midnight in the host's timezone 

500 # 8 pm ish on the last day of the stay 

501 (1, timedelta(days=15) - timedelta(hours=20), 14), 

502 # 2 pm ish a week after stay 

503 (2, timedelta(days=8) - timedelta(hours=14), 7), 

504 # 10 am ish 3 days before end of time to write ref 

505 (3, timedelta(days=4) - timedelta(hours=10), 3), 

506 ] 

507 

508 with session_scope() as session: 

509 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

510 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

511 user = aliased(User) 

512 other_user = aliased(User) 

513 # surfers needing to write a ref 

514 q1 = ( 

515 select(literal(True), HostRequest, user, other_user) 

516 .join(user, user.id == HostRequest.surfer_user_id) 

517 .join(other_user, other_user.id == HostRequest.host_user_id) 

518 .outerjoin( 

519 Reference, 

520 and_( 

521 Reference.host_request_id == HostRequest.conversation_id, 

522 # if no reference is found in this join, then the surfer has not written a ref 

523 Reference.from_user_id == HostRequest.surfer_user_id, 

524 ), 

525 ) 

526 .where(Reference.id == None) 

527 .where(HostRequest.can_write_reference) 

528 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

529 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

530 .where(HostRequest.surfer_reason_didnt_meetup == None) 

531 .where_users_visible_to_each_other(user, other_user) 

532 ) 

533 

534 # hosts needing to write a ref 

535 q2 = ( 

536 select(literal(False), HostRequest, user, other_user) 

537 .join(user, user.id == HostRequest.host_user_id) 

538 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

539 .outerjoin( 

540 Reference, 

541 and_( 

542 Reference.host_request_id == HostRequest.conversation_id, 

543 # if no reference is found in this join, then the host has not written a ref 

544 Reference.from_user_id == HostRequest.host_user_id, 

545 ), 

546 ) 

547 .where(Reference.id == None) 

548 .where(HostRequest.can_write_reference) 

549 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

550 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

551 .where(HostRequest.host_reason_didnt_meetup == None) 

552 .where_users_visible_to_each_other(user, other_user) 

553 ) 

554 

555 union = union_all(q1, q2).subquery() 

556 union = select( 

557 union.c[0].label("surfed"), 

558 aliased(HostRequest, union), 

559 aliased(user, union), 

560 aliased(other_user, union), 

561 ) 

562 reference_reminders = session.execute(union).all() 

563 

564 for surfed, host_request, user, other_user in reference_reminders: 

565 # visibility and blocking already checked in sql 

566 assert user.is_visible 

567 context = make_background_user_context(user_id=user.id) 

568 notify( 

569 session, 

570 user_id=user.id, 

571 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

572 key=str(host_request.conversation_id), 

573 data=notification_data_pb2.ReferenceReminder( 

574 host_request_id=host_request.conversation_id, 

575 other_user=user_model_to_pb(other_user, session, context), 

576 days_left=reminder_days_left, 

577 ), 

578 ) 

579 if surfed: 

580 host_request.surfer_sent_reference_reminders = reminder_number 

581 else: 

582 host_request.host_sent_reference_reminders = reminder_number 

583 session.commit() 

584 

585 

586send_reference_reminders.PAYLOAD = empty_pb2.Empty 

587send_reference_reminders.SCHEDULE = timedelta(hours=1) 

588 

589 

590def send_host_request_reminders(payload: empty_pb2.Empty) -> None: 

591 with session_scope() as session: 

592 host_has_sent_message = select(1).where( 

593 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id 

594 ) 

595 

596 requests = ( 

597 session.execute( 

598 select(HostRequest) 

599 .where(HostRequest.status == HostRequestStatus.pending) 

600 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

601 .where(HostRequest.start_time > func.now()) 

602 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

603 .where(~exists(host_has_sent_message)) 

604 .where_user_columns_visible_to_each_other(HostRequest.host_user_id, HostRequest.surfer_user_id) 

605 ) 

606 .scalars() 

607 .all() 

608 ) 

609 

610 for host_request in requests: 

611 host_request.host_sent_request_reminders += 1 

612 host_request.last_sent_request_reminder_time = now() 

613 

614 context = make_background_user_context(user_id=host_request.host_user_id) 

615 notify( 

616 session, 

617 user_id=host_request.host_user_id, 

618 topic_action="host_request:reminder", 

619 key=str(host_request.conversation_id), 

620 data=notification_data_pb2.HostRequestReminder( 

621 host_request=host_request_to_pb(host_request, session, context), 

622 surfer=user_model_to_pb(host_request.surfer, session, context), 

623 ), 

624 ) 

625 

626 session.commit() 

627 

628 

629send_host_request_reminders.PAYLOAD = empty_pb2.Empty 

630send_host_request_reminders.SCHEDULE = timedelta(minutes=15) 

631 

632 

633def add_users_to_email_list(payload: empty_pb2.Empty) -> None: 

634 if not config["LISTMONK_ENABLED"]: 

635 logger.info("Not adding users to mailing list") 

636 return 

637 

638 logger.info("Adding users to mailing list") 

639 

640 while True: 

641 with session_scope() as session: 

642 user = session.execute( 

643 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

644 ).scalar_one_or_none() 

645 if not user: 

646 logger.info("Finished adding users to mailing list") 

647 return 

648 

649 if user.opt_out_of_newsletter: 

650 user.in_sync_with_newsletter = True 

651 session.commit() 

652 continue 

653 

654 r = requests.post( 

655 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

656 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

657 json={ 

658 "email": user.email, 

659 "name": user.name, 

660 "lists": [config["LISTMONK_LIST_ID"]], 

661 "preconfirm_subscriptions": True, 

662 "attribs": {"couchers_user_id": user.id}, 

663 "status": "enabled", 

664 }, 

665 timeout=10, 

666 ) 

667 # the API returns if the user is already subscribed 

668 if r.status_code == 200 or r.status_code == 409: 

669 user.in_sync_with_newsletter = True 

670 session.commit() 

671 else: 

672 raise Exception("Failed to add users to mailing list") 

673 

674 

675add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

676add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

677 

678 

679def enforce_community_membership(payload: empty_pb2.Empty) -> None: 

680 tasks_enforce_community_memberships() 

681 

682 

683enforce_community_membership.PAYLOAD = empty_pb2.Empty 

684enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

685 

686 

687def update_recommendation_scores(payload: empty_pb2.Empty) -> None: 

688 text_fields = [ 

689 User.hometown, 

690 User.occupation, 

691 User.education, 

692 User.about_me, 

693 User.things_i_like, 

694 User.about_place, 

695 User.additional_information, 

696 User.pet_details, 

697 User.kid_details, 

698 User.housemate_details, 

699 User.other_host_info, 

700 User.sleeping_details, 

701 User.area, 

702 User.house_rules, 

703 ] 

704 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

705 

706 def poor_man_gaussian(): 

707 """ 

708 Produces an approximatley std normal random variate 

709 """ 

710 trials = 5 

711 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

712 

713 def int_(stmt): 

714 return func.coalesce(cast(stmt, Integer), 0) 

715 

716 def float_(stmt): 

717 return func.coalesce(cast(stmt, Float), 0.0) 

718 

719 with session_scope() as session: 

720 # profile 

721 profile_text = "" 

722 for field in text_fields: 

723 profile_text += func.coalesce(field, "") 

724 text_length = func.length(profile_text) 

725 home_text = "" 

726 for field in home_fields: 

727 home_text += func.coalesce(field, "") 

728 home_length = func.length(home_text) 

729 

730 filled_profile = int_(User.has_completed_profile) 

731 has_text = int_(text_length > 500) 

732 long_text = int_(text_length > 2000) 

733 can_host = int_(User.hosting_status == HostingStatus.can_host) 

734 may_host = int_(User.hosting_status == HostingStatus.maybe) 

735 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

736 filled_home = int_(User.has_completed_my_home) 

737 filled_home_lots = int_(home_length > 200) 

738 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

739 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

740 

741 # references 

742 left_ref_expr = int_(1).label("left_reference") 

743 left_refs_subquery = ( 

744 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

745 ) 

746 left_reference = int_(left_refs_subquery.c.left_reference) 

747 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

748 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

749 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

750 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

751 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

752 "has_bad_ref" 

753 ) 

754 received_ref_subquery = ( 

755 select( 

756 Reference.to_user_id.label("user_id"), 

757 has_reference_expr, 

758 has_multiple_types_expr, 

759 has_bad_ref_expr, 

760 ref_count_expr, 

761 ref_avg_expr, 

762 ) 

763 .group_by(Reference.to_user_id) 

764 .subquery() 

765 ) 

766 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

767 has_reference = int_(received_ref_subquery.c.has_reference) 

768 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

769 rating_score = float_( 

770 received_ref_subquery.c.ref_avg 

771 * ( 

772 2 * func.least(received_ref_subquery.c.ref_count, 5) 

773 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

774 ) 

775 ) 

776 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

777 

778 # activeness 

779 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

780 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

781 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

782 messaged_lots = int_(func.count(Message.id) > 5) 

783 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

784 messaging_subquery = ( 

785 select(Message.author_id.label("user_id"), messaging_points_subquery) 

786 .where(Message.message_type == MessageType.text) 

787 .group_by(Message.author_id) 

788 .subquery() 

789 ) 

790 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

791 

792 # verification 

793 cb_subquery = ( 

794 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

795 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

796 .where(ClusterSubscription.role == ClusterRole.admin) 

797 .where(Cluster.is_official_cluster) 

798 .group_by(ClusterSubscription.user_id) 

799 .subquery() 

800 ) 

801 min_node_id = cb_subquery.c.min_node_id 

802 cb = int_(min_node_id >= 1) 

803 wcb = int_(min_node_id == 1) 

804 badge_points = { 

805 "founder": 100, 

806 "board_member": 20, 

807 "past_board_member": 5, 

808 "strong_verification": 3, 

809 "volunteer": 3, 

810 "past_volunteer": 2, 

811 "donor": 1, 

812 "phone_verified": 1, 

813 } 

814 

815 badge_subquery = ( 

816 select( 

817 UserBadge.user_id.label("user_id"), 

818 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

819 ) 

820 .group_by(UserBadge.user_id) 

821 .subquery() 

822 ) 

823 

824 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

825 

826 # response rate 

827 hr_subquery = select( 

828 UserResponseRate.user_id, 

829 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

830 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

831 ).subquery() 

832 response_time_33p = hr_subquery.c.response_time_33p 

833 response_time_66p = hr_subquery.c.response_time_66p 

834 # be careful with nulls 

835 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

836 

837 recommendation_score = ( 

838 hosting_status_points 

839 + profile_points 

840 + ref_score 

841 + activeness_points 

842 + other_points 

843 + response_rate_points 

844 + 2 * poor_man_gaussian() 

845 ) 

846 

847 scores = ( 

848 select(User.id.label("user_id"), recommendation_score.label("score")) 

849 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

850 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

851 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

852 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

853 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

854 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

855 ).subquery() 

856 

857 session.execute( 

858 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

859 ) 

860 

861 logger.info("Updated recommendation scores") 

862 

863 

864update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

865update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

866 

867 

868def update_badges(payload: empty_pb2.Empty) -> None: 

869 with session_scope() as session: 

870 

871 def update_badge(badge_id: str, members: list[int]) -> None: 

872 badge = get_badge_dict()[badge_id] 

873 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

874 # in case the user ids don't exist in the db 

875 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

876 # we should add the badge to these 

877 add = set(actual_members) - set(user_ids) 

878 # we should remove the badge from these 

879 remove = set(user_ids) - set(actual_members) 

880 for user_id in add: 

881 user_add_badge(session, user_id, badge_id) 

882 

883 for user_id in remove: 

884 user_remove_badge(session, user_id, badge_id) 

885 

886 update_badge("founder", get_static_badge_dict()["founder"]) 

887 update_badge("board_member", get_static_badge_dict()["board_member"]) 

888 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

889 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all()) 

890 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

891 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

892 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

893 update_badge( 

894 "strong_verification", 

895 session.execute( 

896 select(User.id) 

897 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

898 .where(StrongVerificationAttempt.has_strong_verification(User)) 

899 ) 

900 .scalars() 

901 .all(), 

902 ) 

903 # volunteer badge for active volunteers (stopped_volunteering is null) 

904 update_badge( 

905 "volunteer", 

906 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(), 

907 ) 

908 # past_volunteer badge for past volunteers (stopped_volunteering is not null) 

909 update_badge( 

910 "past_volunteer", 

911 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None))) 

912 .scalars() 

913 .all(), 

914 ) 

915 

916 

917update_badges.PAYLOAD = empty_pb2.Empty 

918update_badges.SCHEDULE = timedelta(minutes=15) 

919 

920 

921def finalize_strong_verification(payload: "jobs_pb2.FinalizeStrongVerificationPayload") -> None: 

922 with session_scope() as session: 

923 verification_attempt = session.execute( 

924 select(StrongVerificationAttempt) 

925 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

926 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

927 ).scalar_one() 

928 response = requests.post( 

929 "https://passportreader.app/api/v1/session.get", 

930 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

931 json={"id": verification_attempt.iris_session_id}, 

932 timeout=10, 

933 verify="/etc/ssl/certs/ca-certificates.crt", 

934 ) 

935 if response.status_code != 200: 

936 raise Exception(f"Iris didn't return 200: {response.text}") 

937 json_data = response.json() 

938 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

939 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

940 ) 

941 assert verification_attempt.user_id == reference_payload.user_id 

942 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

943 assert verification_attempt.iris_session_id == json_data["id"] 

944 assert json_data["state"] == "APPROVED" 

945 

946 if json_data["document_type"] != "PASSPORT": 

947 verification_attempt.status = StrongVerificationAttemptStatus.failed 

948 notify( 

949 session, 

950 user_id=verification_attempt.user_id, 

951 topic_action="verification:sv_fail", 

952 key="", 

953 data=notification_data_pb2.VerificationSVFail( 

954 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

955 ), 

956 ) 

957 return 

958 

959 assert json_data["document_type"] == "PASSPORT" 

960 

961 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

962 nationality = json_data["nationality"] 

963 last_three_document_chars = json_data["document_number"][-3:] 

964 

965 existing_attempt = session.execute( 

966 select(StrongVerificationAttempt) 

967 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

968 .where(StrongVerificationAttempt.passport_nationality == nationality) 

969 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

970 .order_by(StrongVerificationAttempt.id) 

971 .limit(1) 

972 ).scalar_one_or_none() 

973 

974 verification_attempt.has_minimal_data = True 

975 verification_attempt.passport_expiry_date = expiry_date 

976 verification_attempt.passport_nationality = nationality 

977 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

978 

979 if existing_attempt: 

980 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

981 

982 if existing_attempt.user_id != verification_attempt.user_id: 

983 session.flush() 

984 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

985 

986 notify( 

987 session, 

988 user_id=verification_attempt.user_id, 

989 topic_action="verification:sv_fail", 

990 key="", 

991 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

992 ) 

993 return 

994 

995 verification_attempt.has_full_data = True 

996 verification_attempt.passport_encrypted_data = asym_encrypt( 

997 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

998 ) 

999 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

1000 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

1001 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

1002 

1003 session.flush() 

1004 

1005 strong_verification_completions_counter.inc() 

1006 

1007 user = verification_attempt.user 

1008 if verification_attempt.has_strong_verification(user): 

1009 badge_id = "strong_verification" 

1010 if session.execute( 

1011 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

1012 ).scalar_one_or_none(): 

1013 return 

1014 

1015 user_add_badge(session, user.id, badge_id, do_notify=False) 

1016 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success", key="") 

1017 else: 

1018 notify( 

1019 session, 

1020 user_id=verification_attempt.user_id, 

1021 topic_action="verification:sv_fail", 

1022 key="", 

1023 data=notification_data_pb2.VerificationSVFail( 

1024 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

1025 ), 

1026 ) 

1027 

1028 

1029finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload 

1030 

1031 

1032def send_activeness_probes(payload: empty_pb2.Empty) -> None: 

1033 with session_scope() as session: 

1034 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

1035 

1036 if config["ACTIVENESS_PROBES_ENABLED"]: 

1037 # current activeness probes 

1038 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

1039 

1040 # users who we should send an activeness probe to 

1041 new_probe_user_ids = ( 

1042 session.execute( 

1043 select(User.id) 

1044 .where(User.is_visible) 

1045 .where(User.hosting_status == HostingStatus.can_host) 

1046 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

1047 .where(User.id.not_in(select(subquery.c.user_id))) 

1048 ) 

1049 .scalars() 

1050 .all() 

1051 ) 

1052 

1053 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

1054 probes_today = session.execute( 

1055 select(func.count()) 

1056 .select_from(ActivenessProbe) 

1057 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

1058 ).scalar_one() 

1059 

1060 # send probes to max 2% of users per day 

1061 max_probes_per_day = 0.02 * total_users 

1062 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

1063 

1064 if len(new_probe_user_ids) > max_probe_size: 

1065 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

1066 

1067 for user_id in new_probe_user_ids: 

1068 session.add(ActivenessProbe(user_id=user_id)) 

1069 

1070 session.commit() 

1071 

1072 ## Step 2: actually send out probe notifications 

1073 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

1074 probes = ( 

1075 session.execute( 

1076 select(ActivenessProbe) 

1077 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

1078 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1079 .where(ActivenessProbe.is_pending) 

1080 ) 

1081 .scalars() 

1082 .all() 

1083 ) 

1084 

1085 for probe in probes: 

1086 probe.notifications_sent = probe_number_minus_1 + 1 

1087 context = make_background_user_context(user_id=probe.user.id) 

1088 notify( 

1089 session, 

1090 user_id=probe.user.id, 

1091 topic_action="activeness:probe", 

1092 key=str(probe.id), 

1093 data=notification_data_pb2.ActivenessProbe( 

1094 reminder_number=probe_number_minus_1 + 1, 

1095 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1096 ), 

1097 ) 

1098 session.commit() 

1099 

1100 ## Step 3: for those who haven't responded, mark them as failed 

1101 expired_probes = ( 

1102 session.execute( 

1103 select(ActivenessProbe) 

1104 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1105 .where(ActivenessProbe.is_pending) 

1106 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1107 ) 

1108 .scalars() 

1109 .all() 

1110 ) 

1111 

1112 for probe in expired_probes: 

1113 probe.responded = now() 

1114 probe.response = ActivenessProbeStatus.expired 

1115 if probe.user.hosting_status == HostingStatus.can_host: 

1116 probe.user.hosting_status = HostingStatus.maybe 

1117 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 

1118 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1119 session.commit() 

1120 

1121 

1122send_activeness_probes.PAYLOAD = empty_pb2.Empty 

1123send_activeness_probes.SCHEDULE = timedelta(minutes=60) 

1124 

1125 

1126def update_randomized_locations(payload: empty_pb2.Empty) -> None: 

1127 """ 

1128 We generate for each user a randomized location as follows: 

1129 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1130 - For each user, mix in the user_id for randomness 

1131 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1132 - Generate an angle from [0, 360] 

1133 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1134 """ 

1135 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1136 

1137 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]: 

1138 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1139 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1140 radius = 0.02 + 0.08 * radius_u 

1141 angle_rad = 2 * pi * angle_u 

1142 offset_lng = radius * cos(angle_rad) 

1143 offset_lat = radius * sin(angle_rad) 

1144 return lat + offset_lat, lng + offset_lng 

1145 

1146 user_updates: list[dict[str, Any]] = [] 

1147 

1148 with session_scope() as session: 

1149 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1150 

1151 for user_id, geom in users_to_update: 

1152 lat, lng = get_coordinates(geom) 

1153 user_updates.append( 

1154 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1155 ) 

1156 

1157 with session_scope() as session: 

1158 session.execute(update(User), user_updates) 

1159 

1160 

1161update_randomized_locations.PAYLOAD = empty_pb2.Empty 

1162update_randomized_locations.SCHEDULE = timedelta(hours=1) 

1163 

1164 

1165def send_event_reminders(payload: empty_pb2.Empty) -> None: 

1166 """ 

1167 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1168 """ 

1169 logger.info("Sending event reminder emails") 

1170 

1171 with session_scope() as session: 

1172 occurrences = ( 

1173 session.execute( 

1174 select(EventOccurrence) 

1175 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1176 .where(EventOccurrence.start_time >= now()) 

1177 ) 

1178 .scalars() 

1179 .all() 

1180 ) 

1181 

1182 for occurrence in occurrences: 

1183 results = session.execute( 

1184 select(User, EventOccurrenceAttendee) 

1185 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1186 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1187 .where(EventOccurrenceAttendee.reminder_sent == False) 

1188 ).all() 

1189 

1190 for user, attendee in results: 

1191 context = make_background_user_context(user_id=user.id) 

1192 

1193 notify( 

1194 session, 

1195 user_id=user.id, 

1196 topic_action="event:reminder", 

1197 key=str(occurrence.id), 

1198 data=notification_data_pb2.EventReminder( 

1199 event=event_to_pb(session, occurrence, context), 

1200 user=user_model_to_pb(user, session, context), 

1201 ), 

1202 ) 

1203 

1204 attendee.reminder_sent = True 

1205 session.commit() 

1206 

1207 

1208send_event_reminders.PAYLOAD = empty_pb2.Empty 

1209send_event_reminders.SCHEDULE = timedelta(hours=1) 

1210 

1211 

1212def check_expo_push_receipts(payload: empty_pb2.Empty) -> None: 

1213 """ 

1214 Check Expo push receipts in batch and update delivery attempts. 

1215 """ 

1216 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max 

1217 

1218 for iteration in range(MAX_ITERATIONS): 

1219 with session_scope() as session: 

1220 # Find all delivery attempts that need receipt checking 

1221 # Wait 15 minutes per Expo's recommendation before checking receipts 

1222 attempts = ( 

1223 session.execute( 

1224 select(PushNotificationDeliveryAttempt) 

1225 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None) 

1226 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None) 

1227 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15)) 

1228 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24)) 

1229 .limit(100) 

1230 ) 

1231 .scalars() 

1232 .all() 

1233 ) 

1234 

1235 if not attempts: 

1236 logger.debug("No Expo receipts to check") 

1237 return 

1238 

1239 logger.info(f"Checking {len(attempts)} Expo push receipts") 

1240 

1241 receipts = get_expo_push_receipts([attempt.expo_ticket_id for attempt in attempts]) 

1242 

1243 for attempt in attempts: 

1244 receipt = receipts.get(attempt.expo_ticket_id) 

1245 

1246 # Always mark as checked to avoid infinite loops 

1247 attempt.receipt_checked_at = now() 

1248 

1249 if receipt is None: 

1250 # Receipt not found after 15min - likely expired (>24h) or never existed 

1251 # Per Expo docs: receipts should be available within 15 minutes 

1252 attempt.receipt_status = "not_found" 

1253 continue 

1254 

1255 attempt.receipt_status = receipt.get("status") 

1256 

1257 if receipt.get("status") == "error": 

1258 details = receipt.get("details", {}) 

1259 error_code = details.get("error") 

1260 attempt.receipt_error_code = error_code 

1261 

1262 if error_code == "DeviceNotRegistered": 

1263 # Device token is no longer valid - disable the subscription 

1264 sub = session.execute( 

1265 select(PushNotificationSubscription).where( 

1266 PushNotificationSubscription.id == attempt.push_notification_subscription_id 

1267 ) 

1268 ).scalar_one() 

1269 

1270 if sub.disabled_at > now(): 

1271 sub.disabled_at = now() 

1272 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt") 

1273 push_notification_counter.labels( 

1274 platform="expo", outcome="permanent_subscription_failure_receipt" 

1275 ).inc() 

1276 else: 

1277 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}") 

1278 

1279 # If we get here, we've exhausted MAX_ITERATIONS without finishing 

1280 raise RuntimeError( 

1281 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - " 

1282 "there may be an unusually large backlog of receipts to check" 

1283 ) 

1284 

1285 

1286check_expo_push_receipts.PAYLOAD = empty_pb2.Empty 

1287check_expo_push_receipts.SCHEDULE = timedelta(minutes=5) 

1288 

1289 

1290def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None: 

1291 """ 

1292 Sends the postcard via external API and updates attempt status. 

1293 """ 

1294 with session_scope() as session: 

1295 attempt = session.execute( 

1296 select(PostalVerificationAttempt).where( 

1297 PostalVerificationAttempt.id == payload.postal_verification_attempt_id 

1298 ) 

1299 ).scalar_one_or_none() 

1300 

1301 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 

1302 logger.warning( 

1303 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state" 

1304 ) 

1305 return 

1306 

1307 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one() 

1308 

1309 result = send_postcard( 

1310 recipient_name=user_name, 

1311 address_line_1=attempt.address_line_1, 

1312 address_line_2=attempt.address_line_2, 

1313 city=attempt.city, 

1314 state=attempt.state, 

1315 postal_code=attempt.postal_code, 

1316 country=attempt.country, 

1317 verification_code=attempt.verification_code, 

1318 qr_code_url=urls.postal_verification_link(code=attempt.verification_code), 

1319 ) 

1320 

1321 if result.success: 

1322 attempt.status = PostalVerificationStatus.awaiting_verification 

1323 attempt.postcard_sent_at = func.now() 

1324 

1325 notify( 

1326 session, 

1327 user_id=attempt.user_id, 

1328 topic_action="postal_verification:postcard_sent", 

1329 key="", 

1330 data=notification_data_pb2.PostalVerificationPostcardSent( 

1331 city=attempt.city, 

1332 country=attempt.country, 

1333 ), 

1334 ) 

1335 else: 

1336 # Could retry or fail - for now, fail 

1337 attempt.status = PostalVerificationStatus.failed 

1338 logger.error(f"Postcard send failed: {result.error_message}") 

1339 

1340 

1341send_postal_verification_postcard.PAYLOAD = jobs_pb2.SendPostalVerificationPostcardPayload 

1342 

1343 

1344class DatabaseInconsistencyError(Exception): 

1345 """Raised when database consistency checks fail""" 

1346 

1347 pass 

1348 

1349 

1350def check_database_consistency(payload: empty_pb2.Empty) -> None: 

1351 """ 

1352 Checks database consistency and raises an exception if any issues are found. 

1353 """ 

1354 logger.info("Checking database consistency") 

1355 errors = [] 

1356 

1357 with session_scope() as session: 

1358 # Check that all non-deleted users have a profile gallery 

1359 users_without_gallery = session.execute( 

1360 select(User.id, User.username).where(User.is_deleted == False).where(User.profile_gallery_id.is_(None)) 

1361 ).all() 

1362 if users_without_gallery: 

1363 errors.append(f"Users without profile gallery: {users_without_gallery}") 

1364 

1365 # Check that all profile galleries point to their owner 

1366 mismatched_galleries = session.execute( 

1367 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id) 

1368 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id) 

1369 .where(User.profile_gallery_id.is_not(None)) 

1370 .where(PhotoGallery.owner_user_id != User.id) 

1371 ).all() 

1372 if mismatched_galleries: 

1373 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}") 

1374 

1375 # === Moderation System Consistency Checks === 

1376 

1377 # Check all ModerationStates have a known object_type 

1378 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT] 

1379 unknown_type_states = session.execute( 

1380 select(ModerationState.id, ModerationState.object_type).where( 

1381 ModerationState.object_type.not_in(known_object_types) 

1382 ) 

1383 ).all() 

1384 if unknown_type_states: 

1385 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}") 

1386 

1387 # Check resolved queue items point to log entries with resolving actions (APPROVE/HIDE, not CREATE/FLAG/UNFLAG) 

1388 resolving_actions = [ModerationAction.APPROVE, ModerationAction.HIDE] 

1389 invalid_resolved_actions = session.execute( 

1390 select(ModerationQueueItem.id, ModerationQueueItem.resolved_by_log_id, ModerationLog.action) 

1391 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id) 

1392 .where(ModerationQueueItem.resolved_by_log_id.is_not(None)) 

1393 .where(ModerationLog.action.not_in(resolving_actions)) 

1394 ).all() 

1395 if invalid_resolved_actions: 

1396 errors.append(f"Queue items resolved by non-resolving actions: {invalid_resolved_actions}") 

1397 

1398 # Check every ModerationState has at least one INITIAL_REVIEW queue item 

1399 # Skip items with ID < 2000000 as they were created before this check was introduced 

1400 states_without_initial_review = session.execute( 

1401 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1402 ModerationState.id >= 2000000, 

1403 ~exists( 

1404 select(1) 

1405 .where(ModerationQueueItem.moderation_state_id == ModerationState.id) 

1406 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1407 ), 

1408 ) 

1409 ).all() 

1410 if states_without_initial_review: 

1411 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}") 

1412 

1413 # Check every ModerationState has a CREATE log entry 

1414 # Skip items with ID < 2000000 as they were created before this check was introduced 

1415 states_without_create_log = session.execute( 

1416 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where( 

1417 ModerationState.id >= 2000000, 

1418 ~exists( 

1419 select(1) 

1420 .where(ModerationLog.moderation_state_id == ModerationState.id) 

1421 .where(ModerationLog.action == ModerationAction.CREATE) 

1422 ), 

1423 ) 

1424 ).all() 

1425 if states_without_create_log: 

1426 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}") 

1427 

1428 # Check resolved queue items point to log entries for the same moderation state 

1429 resolved_item_log_mismatches = session.execute( 

1430 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id) 

1431 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id) 

1432 .where(ModerationQueueItem.resolved_by_log_id.is_not(None)) 

1433 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id) 

1434 ).all() 

1435 if resolved_item_log_mismatches: 

1436 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}") 

1437 

1438 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it 

1439 hr_states = ( 

1440 session.execute( 

1441 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1442 ) 

1443 .scalars() 

1444 .all() 

1445 ) 

1446 for state_id in hr_states: 

1447 hr_count = session.execute( 

1448 select(func.count()).where(HostRequest.moderation_state_id == state_id) 

1449 ).scalar_one() 

1450 if hr_count != 1: 

1451 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)") 

1452 

1453 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it 

1454 gc_states = ( 

1455 session.execute( 

1456 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1457 ) 

1458 .scalars() 

1459 .all() 

1460 ) 

1461 for state_id in gc_states: 

1462 gc_count = session.execute( 

1463 select(func.count()).where(GroupChat.moderation_state_id == state_id) 

1464 ).scalar_one() 

1465 if gc_count != 1: 

1466 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)") 

1467 

1468 # Check ModerationState.object_id matches the actual object's ID 

1469 hr_object_id_mismatches = session.execute( 

1470 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id) 

1471 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id) 

1472 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST) 

1473 .where(ModerationState.object_id != HostRequest.conversation_id) 

1474 ).all() 

1475 if hr_object_id_mismatches: 

1476 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}") 

1477 

1478 gc_object_id_mismatches = session.execute( 

1479 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id) 

1480 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id) 

1481 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT) 

1482 .where(ModerationState.object_id != GroupChat.conversation_id) 

1483 ).all() 

1484 if gc_object_id_mismatches: 

1485 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}") 

1486 

1487 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState 

1488 hr_reverse_mismatches = session.execute( 

1489 select( 

1490 HostRequest.conversation_id, 

1491 HostRequest.moderation_state_id, 

1492 ModerationState.object_type, 

1493 ModerationState.object_id, 

1494 ) 

1495 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id) 

1496 .where( 

1497 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST) 

1498 | (ModerationState.object_id != HostRequest.conversation_id) 

1499 ) 

1500 ).all() 

1501 if hr_reverse_mismatches: 

1502 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}") 

1503 

1504 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState 

1505 gc_reverse_mismatches = session.execute( 

1506 select( 

1507 GroupChat.conversation_id, 

1508 GroupChat.moderation_state_id, 

1509 ModerationState.object_type, 

1510 ModerationState.object_id, 

1511 ) 

1512 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id) 

1513 .where( 

1514 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT) 

1515 | (ModerationState.object_id != GroupChat.conversation_id) 

1516 ) 

1517 ).all() 

1518 if gc_reverse_mismatches: 

1519 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}") 

1520 

1521 # Check notifications linked to VISIBLE/UNLISTED content have been processed 

1522 # When content becomes visible, handle_notification should create NotificationDelivery records. 

1523 # Notifications with moderation_state_id pointing to visible content but no delivery records are "hanging". 

1524 hanging_notifications = session.execute( 

1525 select(Notification.id, Notification.user_id, Notification.topic_action, ModerationState.visibility) 

1526 .join(ModerationState, Notification.moderation_state_id == ModerationState.id) 

1527 .where(ModerationState.visibility.in_([ModerationVisibility.VISIBLE, ModerationVisibility.UNLISTED])) 

1528 .where(~exists(select(1).where(NotificationDelivery.notification_id == Notification.id))) 

1529 ).all() 

1530 if hanging_notifications: 

1531 errors.append( 

1532 f"Notifications for VISIBLE/UNLISTED content without delivery records: {hanging_notifications}" 

1533 ) 

1534 

1535 # Ensure auto-approve deadline isn't being exceeded by a significant margin 

1536 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting 

1537 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1538 if deadline_seconds > 0: 

1539 grace_period = timedelta(minutes=5) 

1540 stale_initial_review_items = session.execute( 

1541 select( 

1542 ModerationQueueItem.id, 

1543 ModerationQueueItem.moderation_state_id, 

1544 ModerationQueueItem.time_created, 

1545 ) 

1546 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW) 

1547 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

1548 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period) 

1549 ).all() 

1550 if stale_initial_review_items: 

1551 errors.append( 

1552 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}" 

1553 ) 

1554 

1555 if errors: 

1556 raise DatabaseInconsistencyError("\n".join(errors)) 

1557 

1558 

1559check_database_consistency.PAYLOAD = empty_pb2.Empty 

1560check_database_consistency.SCHEDULE = timedelta(hours=24) 

1561 

1562 

1563def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None: 

1564 """ 

1565 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline. 

1566 Items explicitly actioned by moderators are left alone. 

1567 """ 

1568 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] 

1569 if deadline_seconds <= 0: 

1570 return 

1571 

1572 with session_scope() as session: 

1573 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"]) 

1574 

1575 items = ( 

1576 Moderation() 

1577 .GetModerationQueue( 

1578 request=moderation_pb2.GetModerationQueueReq( 

1579 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW], 

1580 unresolved_only=True, 

1581 page_size=100, 

1582 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)), 

1583 ), 

1584 context=ctx, 

1585 session=session, 

1586 ) 

1587 .queue_items 

1588 ) 

1589 

1590 if not items: 

1591 return 

1592 

1593 logger.info(f"Auto-approving {len(items)} moderation queue items") 

1594 for item in items: 

1595 Moderation().ModerateContent( 

1596 request=moderation_pb2.ModerateContentReq( 

1597 moderation_state_id=item.moderation_state_id, 

1598 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1599 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1600 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.", 

1601 ), 

1602 context=ctx, 

1603 session=session, 

1604 ) 

1605 moderation_auto_approved_counter.inc(len(items)) 

1606 

1607 

1608auto_approve_moderation_queue.PAYLOAD = empty_pb2.Empty 

1609auto_approve_moderation_queue.SCHEDULE = timedelta(seconds=15)