Coverage for src/couchers/jobs/handlers.py: 92%
526 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-10 13:55 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-10 13:55 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import cos, pi, sin, sqrt
8from random import sample
9from typing import Any
11import requests
12from google.protobuf import empty_pb2
13from sqlalchemy import Float, Integer
14from sqlalchemy.orm import aliased
15from sqlalchemy.sql import (
16 and_,
17 case,
18 cast,
19 delete,
20 distinct,
21 exists,
22 extract,
23 func,
24 literal,
25 not_,
26 or_,
27 union_all,
28 update,
29)
31from couchers import urls
32from couchers.config import config
33from couchers.constants import (
34 ACTIVENESS_PROBE_EXPIRY_TIME,
35 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
36 ACTIVENESS_PROBE_TIME_REMINDERS,
37 EVENT_REMINDER_TIMEDELTA,
38 HOST_REQUEST_MAX_REMINDERS,
39 HOST_REQUEST_REMINDER_INTERVAL,
40)
41from couchers.context import make_background_user_context
42from couchers.crypto import (
43 USER_LOCATION_RANDOMIZATION_NAME,
44 asym_encrypt,
45 b64decode,
46 get_secret,
47 simple_decrypt,
48 stable_secure_uniform,
49)
50from couchers.db import session_scope
51from couchers.email.dev import print_dev_email
52from couchers.email.smtp import send_smtp_email
53from couchers.helpers.badges import user_add_badge, user_remove_badge
54from couchers.materialized_views import (
55 UserResponseRate,
56 refresh_materialized_views,
57 refresh_materialized_views_rapid,
58)
59from couchers.metrics import (
60 moderation_auto_approved_counter,
61 push_notification_counter,
62 strong_verification_completions_counter,
63)
64from couchers.models import (
65 AccountDeletionToken,
66 ActivenessProbe,
67 ActivenessProbeStatus,
68 Cluster,
69 ClusterRole,
70 ClusterSubscription,
71 EventOccurrence,
72 EventOccurrenceAttendee,
73 GroupChat,
74 GroupChatSubscription,
75 HostingStatus,
76 HostRequest,
77 HostRequestStatus,
78 LoginToken,
79 MeetupStatus,
80 Message,
81 MessageType,
82 ModerationAction,
83 ModerationLog,
84 ModerationObjectType,
85 ModerationQueueItem,
86 ModerationState,
87 ModerationTrigger,
88 ModerationVisibility,
89 Notification,
90 NotificationDelivery,
91 PassportSex,
92 PasswordResetToken,
93 PhotoGallery,
94 PostalVerificationAttempt,
95 PostalVerificationStatus,
96 PushNotificationDeliveryAttempt,
97 PushNotificationSubscription,
98 Reference,
99 StrongVerificationAttempt,
100 StrongVerificationAttemptStatus,
101 User,
102 UserBadge,
103 Volunteer,
104)
105from couchers.notifications.background import handle_email_digests, handle_notification
106from couchers.notifications.expo_api import get_expo_push_receipts
107from couchers.notifications.notify import notify
108from couchers.notifications.send_raw_push_notification import send_raw_push_notification_v2
109from couchers.postal.postcard_service import send_postcard
110from couchers.proto import moderation_pb2, notification_data_pb2
111from couchers.proto.internal import jobs_pb2, verification_pb2
112from couchers.resources import get_badge_dict, get_static_badge_dict
113from couchers.servicers.api import user_model_to_pb
114from couchers.servicers.conversations import generate_message_notifications
115from couchers.servicers.discussions import generate_create_discussion_notifications
116from couchers.servicers.editor import generate_new_blog_post_notifications
117from couchers.servicers.events import (
118 event_to_pb,
119 generate_event_cancel_notifications,
120 generate_event_create_notifications,
121 generate_event_delete_notifications,
122 generate_event_update_notifications,
123)
124from couchers.servicers.moderation import Moderation
125from couchers.servicers.requests import host_request_to_pb
126from couchers.servicers.threads import generate_reply_notifications
127from couchers.sql import couchers_select as select
128from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
129from couchers.tasks import send_duplicate_strong_verification_email
130from couchers.utils import (
131 Timestamp_from_datetime,
132 create_coordinate,
133 get_coordinates,
134 now,
135)
137logger = logging.getLogger(__name__)
140# these were straight up imported
141handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
143send_raw_push_notification_v2.PAYLOAD = jobs_pb2.SendRawPushNotificationPayloadV2
145handle_email_digests.PAYLOAD = empty_pb2.Empty
146handle_email_digests.SCHEDULE = timedelta(minutes=15)
148generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
150generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload
152generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload
154generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
156generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
158generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
160generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
162generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload
164refresh_materialized_views.PAYLOAD = empty_pb2.Empty
165refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
167refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
168refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
171def send_email(payload: jobs_pb2.SendEmailPayload) -> None:
172 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
173 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
174 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
175 # the sender must return a models.Email object that can be added to the database
176 email = sender(
177 sender_name=payload.sender_name,
178 sender_email=payload.sender_email,
179 recipient=payload.recipient,
180 subject=payload.subject,
181 plain=payload.plain,
182 html=payload.html,
183 list_unsubscribe_header=payload.list_unsubscribe_header,
184 source_data=payload.source_data,
185 )
186 with session_scope() as session:
187 session.add(email)
190send_email.PAYLOAD = jobs_pb2.SendEmailPayload
193def purge_login_tokens(payload: empty_pb2.Empty) -> None:
194 logger.info("Purging login tokens")
195 with session_scope() as session:
196 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
199purge_login_tokens.PAYLOAD = empty_pb2.Empty
200purge_login_tokens.SCHEDULE = timedelta(hours=24)
203def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None:
204 logger.info("Purging login tokens")
205 with session_scope() as session:
206 session.execute(
207 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
208 )
211purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
212purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
215def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None:
216 logger.info("Purging account deletion tokens")
217 with session_scope() as session:
218 session.execute(
219 delete(AccountDeletionToken)
220 .where(~AccountDeletionToken.is_valid)
221 .execution_options(synchronize_session=False)
222 )
225purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
226purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
229def send_message_notifications(payload: empty_pb2.Empty) -> None:
230 """
231 Sends out email notifications for messages that have been unseen for a long enough time
232 """
233 # very crude and dumb algorithm
234 logger.info("Sending out email notifications for unseen messages")
236 with session_scope() as session:
237 # users who have unnotified messages older than 5 minutes in any group chat
238 users = (
239 session.execute(
240 select(User)
241 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
242 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
243 .where(not_(GroupChatSubscription.is_muted))
244 .where(User.is_visible)
245 .where(Message.time >= GroupChatSubscription.joined)
246 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
247 .where(Message.id > User.last_notified_message_id)
248 .where(Message.id > GroupChatSubscription.last_seen_message_id)
249 .where(Message.time < now() - timedelta(minutes=5))
250 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
251 )
252 .scalars()
253 .unique()
254 )
256 for user in users:
257 context = make_background_user_context(user_id=user.id)
258 # now actually grab all the group chats, not just less than 5 min old
259 subquery = (
260 select(
261 GroupChatSubscription.group_chat_id.label("group_chat_id"),
262 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
263 func.max(Message.id).label("message_id"),
264 func.count(Message.id).label("count_unseen"),
265 )
266 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
267 .where(GroupChatSubscription.user_id == user.id)
268 .where(not_(GroupChatSubscription.is_muted))
269 .where(Message.id > user.last_notified_message_id)
270 .where(Message.id > GroupChatSubscription.last_seen_message_id)
271 .where(Message.time >= GroupChatSubscription.joined)
272 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
273 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
274 .where_users_column_visible(context, Message.author_id)
275 .group_by(GroupChatSubscription.group_chat_id)
276 .order_by(func.max(Message.id).desc())
277 .subquery()
278 )
280 unseen_messages = session.execute(
281 select(GroupChat, Message, subquery.c.count_unseen)
282 .join(subquery, subquery.c.message_id == Message.id)
283 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
284 .order_by(subquery.c.message_id.desc())
285 ).all()
287 if not unseen_messages:
288 continue
290 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
292 def format_title(message, group_chat, count_unseen):
293 if group_chat.is_dm:
294 return f"You missed {count_unseen} message(s) from {message.author.name}"
295 else:
296 return f"You missed {count_unseen} message(s) in {group_chat.title}"
298 notify(
299 session,
300 user_id=user.id,
301 topic_action="chat:missed_messages",
302 key="",
303 data=notification_data_pb2.ChatMissedMessages(
304 messages=[
305 notification_data_pb2.ChatMessage(
306 author=user_model_to_pb(
307 message.author,
308 session,
309 context,
310 ),
311 message=format_title(message, group_chat, count_unseen),
312 text=message.text,
313 group_chat_id=message.conversation_id,
314 )
315 for group_chat, message, count_unseen in unseen_messages
316 ],
317 ),
318 )
319 session.commit()
322send_message_notifications.PAYLOAD = empty_pb2.Empty
323send_message_notifications.SCHEDULE = timedelta(minutes=3)
326def send_request_notifications(payload: empty_pb2.Empty) -> None:
327 """
328 Sends out email notifications for unseen messages in host requests (as surfer or host)
329 """
330 logger.info("Sending out email notifications for unseen messages in host requests")
332 with session_scope() as session:
333 # Get all candidate users who might have unseen request messages
334 candidate_user_ids = (
335 session.execute(
336 select(User.id)
337 .where(User.is_visible)
338 .where(
339 or_(
340 # Users with unseen messages as surfer
341 exists(
342 select(1)
343 .select_from(HostRequest)
344 .join(Message, Message.conversation_id == HostRequest.conversation_id)
345 .where(HostRequest.surfer_user_id == User.id)
346 .where(Message.id > HostRequest.surfer_last_seen_message_id)
347 .where(Message.id > User.last_notified_request_message_id)
348 .where(Message.time < now() - timedelta(minutes=5))
349 .where(Message.message_type == MessageType.text)
350 ),
351 # Users with unseen messages as host
352 exists(
353 select(1)
354 .select_from(HostRequest)
355 .join(Message, Message.conversation_id == HostRequest.conversation_id)
356 .where(HostRequest.host_user_id == User.id)
357 .where(Message.id > HostRequest.host_last_seen_message_id)
358 .where(Message.id > User.last_notified_request_message_id)
359 .where(Message.time < now() - timedelta(minutes=5))
360 .where(Message.message_type == MessageType.text)
361 ),
362 )
363 )
364 )
365 .scalars()
366 .all()
367 )
369 for user_id in candidate_user_ids:
370 context = make_background_user_context(user_id=user_id)
372 # requests where this user is surfing
373 surfing_reqs = session.execute(
374 select(User, HostRequest, func.max(Message.id))
375 .where(User.id == user_id)
376 .join(HostRequest, HostRequest.surfer_user_id == User.id)
377 .where_users_column_visible(context, HostRequest.host_user_id)
378 .join(Message, Message.conversation_id == HostRequest.conversation_id)
379 .where(Message.id > HostRequest.surfer_last_seen_message_id)
380 .where(Message.id > User.last_notified_request_message_id)
381 .where(Message.time < now() - timedelta(minutes=5))
382 .where(Message.message_type == MessageType.text)
383 .group_by(User, HostRequest)
384 ).all()
386 # where this user is hosting
387 hosting_reqs = session.execute(
388 select(User, HostRequest, func.max(Message.id))
389 .where(User.id == user_id)
390 .join(HostRequest, HostRequest.host_user_id == User.id)
391 .where_users_column_visible(context, HostRequest.surfer_user_id)
392 .join(Message, Message.conversation_id == HostRequest.conversation_id)
393 .where(Message.id > HostRequest.host_last_seen_message_id)
394 .where(Message.id > User.last_notified_request_message_id)
395 .where(Message.time < now() - timedelta(minutes=5))
396 .where(Message.message_type == MessageType.text)
397 .group_by(User, HostRequest)
398 ).all()
400 for user, host_request, max_message_id in surfing_reqs:
401 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
402 session.flush()
404 notify(
405 session,
406 user_id=user.id,
407 topic_action="host_request:missed_messages",
408 key=str(host_request.conversation_id),
409 data=notification_data_pb2.HostRequestMissedMessages(
410 host_request=host_request_to_pb(host_request, session, context),
411 user=user_model_to_pb(host_request.host, session, context),
412 am_host=False,
413 ),
414 )
416 for user, host_request, max_message_id in hosting_reqs:
417 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
418 session.flush()
420 notify(
421 session,
422 user_id=user.id,
423 topic_action="host_request:missed_messages",
424 key=str(host_request.conversation_id),
425 data=notification_data_pb2.HostRequestMissedMessages(
426 host_request=host_request_to_pb(host_request, session, context),
427 user=user_model_to_pb(host_request.surfer, session, context),
428 am_host=True,
429 ),
430 )
433send_request_notifications.PAYLOAD = empty_pb2.Empty
434send_request_notifications.SCHEDULE = timedelta(minutes=3)
437def send_onboarding_emails(payload: empty_pb2.Empty) -> None:
438 """
439 Sends out onboarding emails
440 """
441 logger.info("Sending out onboarding emails")
443 with session_scope() as session:
444 # first onboarding email
445 users = (
446 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
447 )
449 for user in users:
450 notify(
451 session,
452 user_id=user.id,
453 topic_action="onboarding:reminder",
454 key="1",
455 )
456 user.onboarding_emails_sent = 1
457 user.last_onboarding_email_sent = now()
458 session.commit()
460 # second onboarding email
461 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
462 users = (
463 session.execute(
464 select(User)
465 .where(User.is_visible)
466 .where(User.onboarding_emails_sent == 1)
467 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
468 .where(User.has_completed_profile == False)
469 )
470 .scalars()
471 .all()
472 )
474 for user in users:
475 notify(
476 session,
477 user_id=user.id,
478 topic_action="onboarding:reminder",
479 key="2",
480 )
481 user.onboarding_emails_sent = 2
482 user.last_onboarding_email_sent = now()
483 session.commit()
486send_onboarding_emails.PAYLOAD = empty_pb2.Empty
487send_onboarding_emails.SCHEDULE = timedelta(hours=1)
490def send_reference_reminders(payload: empty_pb2.Empty) -> None:
491 """
492 Sends out reminders to write references after hosting/staying
493 """
494 logger.info("Sending out reference reminder emails")
496 # Keep this in chronological order!
497 reference_reminder_schedule = [
498 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
499 # the end time to write a reference is supposed to be midnight in the host's timezone
500 # 8 pm ish on the last day of the stay
501 (1, timedelta(days=15) - timedelta(hours=20), 14),
502 # 2 pm ish a week after stay
503 (2, timedelta(days=8) - timedelta(hours=14), 7),
504 # 10 am ish 3 days before end of time to write ref
505 (3, timedelta(days=4) - timedelta(hours=10), 3),
506 ]
508 with session_scope() as session:
509 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
510 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
511 user = aliased(User)
512 other_user = aliased(User)
513 # surfers needing to write a ref
514 q1 = (
515 select(literal(True), HostRequest, user, other_user)
516 .join(user, user.id == HostRequest.surfer_user_id)
517 .join(other_user, other_user.id == HostRequest.host_user_id)
518 .outerjoin(
519 Reference,
520 and_(
521 Reference.host_request_id == HostRequest.conversation_id,
522 # if no reference is found in this join, then the surfer has not written a ref
523 Reference.from_user_id == HostRequest.surfer_user_id,
524 ),
525 )
526 .where(Reference.id == None)
527 .where(HostRequest.can_write_reference)
528 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
529 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
530 .where(HostRequest.surfer_reason_didnt_meetup == None)
531 .where_users_visible_to_each_other(user, other_user)
532 )
534 # hosts needing to write a ref
535 q2 = (
536 select(literal(False), HostRequest, user, other_user)
537 .join(user, user.id == HostRequest.host_user_id)
538 .join(other_user, other_user.id == HostRequest.surfer_user_id)
539 .outerjoin(
540 Reference,
541 and_(
542 Reference.host_request_id == HostRequest.conversation_id,
543 # if no reference is found in this join, then the host has not written a ref
544 Reference.from_user_id == HostRequest.host_user_id,
545 ),
546 )
547 .where(Reference.id == None)
548 .where(HostRequest.can_write_reference)
549 .where(HostRequest.host_sent_reference_reminders < reminder_number)
550 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
551 .where(HostRequest.host_reason_didnt_meetup == None)
552 .where_users_visible_to_each_other(user, other_user)
553 )
555 union = union_all(q1, q2).subquery()
556 union = select(
557 union.c[0].label("surfed"),
558 aliased(HostRequest, union),
559 aliased(user, union),
560 aliased(other_user, union),
561 )
562 reference_reminders = session.execute(union).all()
564 for surfed, host_request, user, other_user in reference_reminders:
565 # visibility and blocking already checked in sql
566 assert user.is_visible
567 context = make_background_user_context(user_id=user.id)
568 notify(
569 session,
570 user_id=user.id,
571 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
572 key=str(host_request.conversation_id),
573 data=notification_data_pb2.ReferenceReminder(
574 host_request_id=host_request.conversation_id,
575 other_user=user_model_to_pb(other_user, session, context),
576 days_left=reminder_days_left,
577 ),
578 )
579 if surfed:
580 host_request.surfer_sent_reference_reminders = reminder_number
581 else:
582 host_request.host_sent_reference_reminders = reminder_number
583 session.commit()
586send_reference_reminders.PAYLOAD = empty_pb2.Empty
587send_reference_reminders.SCHEDULE = timedelta(hours=1)
590def send_host_request_reminders(payload: empty_pb2.Empty) -> None:
591 with session_scope() as session:
592 host_has_sent_message = select(1).where(
593 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id
594 )
596 requests = (
597 session.execute(
598 select(HostRequest)
599 .where(HostRequest.status == HostRequestStatus.pending)
600 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
601 .where(HostRequest.start_time > func.now())
602 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
603 .where(~exists(host_has_sent_message))
604 .where_user_columns_visible_to_each_other(HostRequest.host_user_id, HostRequest.surfer_user_id)
605 )
606 .scalars()
607 .all()
608 )
610 for host_request in requests:
611 host_request.host_sent_request_reminders += 1
612 host_request.last_sent_request_reminder_time = now()
614 context = make_background_user_context(user_id=host_request.host_user_id)
615 notify(
616 session,
617 user_id=host_request.host_user_id,
618 topic_action="host_request:reminder",
619 key=str(host_request.conversation_id),
620 data=notification_data_pb2.HostRequestReminder(
621 host_request=host_request_to_pb(host_request, session, context),
622 surfer=user_model_to_pb(host_request.surfer, session, context),
623 ),
624 )
626 session.commit()
629send_host_request_reminders.PAYLOAD = empty_pb2.Empty
630send_host_request_reminders.SCHEDULE = timedelta(minutes=15)
633def add_users_to_email_list(payload: empty_pb2.Empty) -> None:
634 if not config["LISTMONK_ENABLED"]:
635 logger.info("Not adding users to mailing list")
636 return
638 logger.info("Adding users to mailing list")
640 while True:
641 with session_scope() as session:
642 user = session.execute(
643 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
644 ).scalar_one_or_none()
645 if not user:
646 logger.info("Finished adding users to mailing list")
647 return
649 if user.opt_out_of_newsletter:
650 user.in_sync_with_newsletter = True
651 session.commit()
652 continue
654 r = requests.post(
655 config["LISTMONK_BASE_URL"] + "/api/subscribers",
656 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
657 json={
658 "email": user.email,
659 "name": user.name,
660 "lists": [config["LISTMONK_LIST_ID"]],
661 "preconfirm_subscriptions": True,
662 "attribs": {"couchers_user_id": user.id},
663 "status": "enabled",
664 },
665 timeout=10,
666 )
667 # the API returns if the user is already subscribed
668 if r.status_code == 200 or r.status_code == 409:
669 user.in_sync_with_newsletter = True
670 session.commit()
671 else:
672 raise Exception("Failed to add users to mailing list")
675add_users_to_email_list.PAYLOAD = empty_pb2.Empty
676add_users_to_email_list.SCHEDULE = timedelta(hours=1)
679def enforce_community_membership(payload: empty_pb2.Empty) -> None:
680 tasks_enforce_community_memberships()
683enforce_community_membership.PAYLOAD = empty_pb2.Empty
684enforce_community_membership.SCHEDULE = timedelta(minutes=15)
687def update_recommendation_scores(payload: empty_pb2.Empty) -> None:
688 text_fields = [
689 User.hometown,
690 User.occupation,
691 User.education,
692 User.about_me,
693 User.things_i_like,
694 User.about_place,
695 User.additional_information,
696 User.pet_details,
697 User.kid_details,
698 User.housemate_details,
699 User.other_host_info,
700 User.sleeping_details,
701 User.area,
702 User.house_rules,
703 ]
704 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
706 def poor_man_gaussian():
707 """
708 Produces an approximatley std normal random variate
709 """
710 trials = 5
711 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
713 def int_(stmt):
714 return func.coalesce(cast(stmt, Integer), 0)
716 def float_(stmt):
717 return func.coalesce(cast(stmt, Float), 0.0)
719 with session_scope() as session:
720 # profile
721 profile_text = ""
722 for field in text_fields:
723 profile_text += func.coalesce(field, "")
724 text_length = func.length(profile_text)
725 home_text = ""
726 for field in home_fields:
727 home_text += func.coalesce(field, "")
728 home_length = func.length(home_text)
730 filled_profile = int_(User.has_completed_profile)
731 has_text = int_(text_length > 500)
732 long_text = int_(text_length > 2000)
733 can_host = int_(User.hosting_status == HostingStatus.can_host)
734 may_host = int_(User.hosting_status == HostingStatus.maybe)
735 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
736 filled_home = int_(User.has_completed_my_home)
737 filled_home_lots = int_(home_length > 200)
738 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
739 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
741 # references
742 left_ref_expr = int_(1).label("left_reference")
743 left_refs_subquery = (
744 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
745 )
746 left_reference = int_(left_refs_subquery.c.left_reference)
747 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
748 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
749 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
750 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
751 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
752 "has_bad_ref"
753 )
754 received_ref_subquery = (
755 select(
756 Reference.to_user_id.label("user_id"),
757 has_reference_expr,
758 has_multiple_types_expr,
759 has_bad_ref_expr,
760 ref_count_expr,
761 ref_avg_expr,
762 )
763 .group_by(Reference.to_user_id)
764 .subquery()
765 )
766 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
767 has_reference = int_(received_ref_subquery.c.has_reference)
768 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
769 rating_score = float_(
770 received_ref_subquery.c.ref_avg
771 * (
772 2 * func.least(received_ref_subquery.c.ref_count, 5)
773 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
774 )
775 )
776 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
778 # activeness
779 recently_active = int_(User.last_active >= now() - timedelta(days=180))
780 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
781 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
782 messaged_lots = int_(func.count(Message.id) > 5)
783 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
784 messaging_subquery = (
785 select(Message.author_id.label("user_id"), messaging_points_subquery)
786 .where(Message.message_type == MessageType.text)
787 .group_by(Message.author_id)
788 .subquery()
789 )
790 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
792 # verification
793 cb_subquery = (
794 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
795 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
796 .where(ClusterSubscription.role == ClusterRole.admin)
797 .where(Cluster.is_official_cluster)
798 .group_by(ClusterSubscription.user_id)
799 .subquery()
800 )
801 min_node_id = cb_subquery.c.min_node_id
802 cb = int_(min_node_id >= 1)
803 wcb = int_(min_node_id == 1)
804 badge_points = {
805 "founder": 100,
806 "board_member": 20,
807 "past_board_member": 5,
808 "strong_verification": 3,
809 "volunteer": 3,
810 "past_volunteer": 2,
811 "donor": 1,
812 "phone_verified": 1,
813 }
815 badge_subquery = (
816 select(
817 UserBadge.user_id.label("user_id"),
818 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
819 )
820 .group_by(UserBadge.user_id)
821 .subquery()
822 )
824 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
826 # response rate
827 hr_subquery = select(
828 UserResponseRate.user_id,
829 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
830 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
831 ).subquery()
832 response_time_33p = hr_subquery.c.response_time_33p
833 response_time_66p = hr_subquery.c.response_time_66p
834 # be careful with nulls
835 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
837 recommendation_score = (
838 hosting_status_points
839 + profile_points
840 + ref_score
841 + activeness_points
842 + other_points
843 + response_rate_points
844 + 2 * poor_man_gaussian()
845 )
847 scores = (
848 select(User.id.label("user_id"), recommendation_score.label("score"))
849 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
850 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
851 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
852 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
853 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
854 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
855 ).subquery()
857 session.execute(
858 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
859 )
861 logger.info("Updated recommendation scores")
864update_recommendation_scores.PAYLOAD = empty_pb2.Empty
865update_recommendation_scores.SCHEDULE = timedelta(hours=24)
868def update_badges(payload: empty_pb2.Empty) -> None:
869 with session_scope() as session:
871 def update_badge(badge_id: str, members: list[int]) -> None:
872 badge = get_badge_dict()[badge_id]
873 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
874 # in case the user ids don't exist in the db
875 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
876 # we should add the badge to these
877 add = set(actual_members) - set(user_ids)
878 # we should remove the badge from these
879 remove = set(user_ids) - set(actual_members)
880 for user_id in add:
881 user_add_badge(session, user_id, badge_id)
883 for user_id in remove:
884 user_remove_badge(session, user_id, badge_id)
886 update_badge("founder", get_static_badge_dict()["founder"])
887 update_badge("board_member", get_static_badge_dict()["board_member"])
888 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
889 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all())
890 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
891 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
892 # strong verification requires passport on file + gender/sex correspondence and date of birth match
893 update_badge(
894 "strong_verification",
895 session.execute(
896 select(User.id)
897 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
898 .where(StrongVerificationAttempt.has_strong_verification(User))
899 )
900 .scalars()
901 .all(),
902 )
903 # volunteer badge for active volunteers (stopped_volunteering is null)
904 update_badge(
905 "volunteer",
906 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(),
907 )
908 # past_volunteer badge for past volunteers (stopped_volunteering is not null)
909 update_badge(
910 "past_volunteer",
911 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None)))
912 .scalars()
913 .all(),
914 )
917update_badges.PAYLOAD = empty_pb2.Empty
918update_badges.SCHEDULE = timedelta(minutes=15)
921def finalize_strong_verification(payload: "jobs_pb2.FinalizeStrongVerificationPayload") -> None:
922 with session_scope() as session:
923 verification_attempt = session.execute(
924 select(StrongVerificationAttempt)
925 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
926 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
927 ).scalar_one()
928 response = requests.post(
929 "https://passportreader.app/api/v1/session.get",
930 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
931 json={"id": verification_attempt.iris_session_id},
932 timeout=10,
933 verify="/etc/ssl/certs/ca-certificates.crt",
934 )
935 if response.status_code != 200:
936 raise Exception(f"Iris didn't return 200: {response.text}")
937 json_data = response.json()
938 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
939 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
940 )
941 assert verification_attempt.user_id == reference_payload.user_id
942 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
943 assert verification_attempt.iris_session_id == json_data["id"]
944 assert json_data["state"] == "APPROVED"
946 if json_data["document_type"] != "PASSPORT":
947 verification_attempt.status = StrongVerificationAttemptStatus.failed
948 notify(
949 session,
950 user_id=verification_attempt.user_id,
951 topic_action="verification:sv_fail",
952 key="",
953 data=notification_data_pb2.VerificationSVFail(
954 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
955 ),
956 )
957 return
959 assert json_data["document_type"] == "PASSPORT"
961 expiry_date = date.fromisoformat(json_data["expiry_date"])
962 nationality = json_data["nationality"]
963 last_three_document_chars = json_data["document_number"][-3:]
965 existing_attempt = session.execute(
966 select(StrongVerificationAttempt)
967 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
968 .where(StrongVerificationAttempt.passport_nationality == nationality)
969 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
970 .order_by(StrongVerificationAttempt.id)
971 .limit(1)
972 ).scalar_one_or_none()
974 verification_attempt.has_minimal_data = True
975 verification_attempt.passport_expiry_date = expiry_date
976 verification_attempt.passport_nationality = nationality
977 verification_attempt.passport_last_three_document_chars = last_three_document_chars
979 if existing_attempt:
980 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
982 if existing_attempt.user_id != verification_attempt.user_id:
983 session.flush()
984 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
986 notify(
987 session,
988 user_id=verification_attempt.user_id,
989 topic_action="verification:sv_fail",
990 key="",
991 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
992 )
993 return
995 verification_attempt.has_full_data = True
996 verification_attempt.passport_encrypted_data = asym_encrypt(
997 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
998 )
999 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
1000 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
1001 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
1003 session.flush()
1005 strong_verification_completions_counter.inc()
1007 user = verification_attempt.user
1008 if verification_attempt.has_strong_verification(user):
1009 badge_id = "strong_verification"
1010 if session.execute(
1011 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
1012 ).scalar_one_or_none():
1013 return
1015 user_add_badge(session, user.id, badge_id, do_notify=False)
1016 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success", key="")
1017 else:
1018 notify(
1019 session,
1020 user_id=verification_attempt.user_id,
1021 topic_action="verification:sv_fail",
1022 key="",
1023 data=notification_data_pb2.VerificationSVFail(
1024 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
1025 ),
1026 )
1029finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload
1032def send_activeness_probes(payload: empty_pb2.Empty) -> None:
1033 with session_scope() as session:
1034 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
1036 if config["ACTIVENESS_PROBES_ENABLED"]:
1037 # current activeness probes
1038 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
1040 # users who we should send an activeness probe to
1041 new_probe_user_ids = (
1042 session.execute(
1043 select(User.id)
1044 .where(User.is_visible)
1045 .where(User.hosting_status == HostingStatus.can_host)
1046 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
1047 .where(User.id.not_in(select(subquery.c.user_id)))
1048 )
1049 .scalars()
1050 .all()
1051 )
1053 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
1054 probes_today = session.execute(
1055 select(func.count())
1056 .select_from(ActivenessProbe)
1057 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
1058 ).scalar_one()
1060 # send probes to max 2% of users per day
1061 max_probes_per_day = 0.02 * total_users
1062 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
1064 if len(new_probe_user_ids) > max_probe_size:
1065 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
1067 for user_id in new_probe_user_ids:
1068 session.add(ActivenessProbe(user_id=user_id))
1070 session.commit()
1072 ## Step 2: actually send out probe notifications
1073 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
1074 probes = (
1075 session.execute(
1076 select(ActivenessProbe)
1077 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
1078 .where(ActivenessProbe.probe_initiated + delay < func.now())
1079 .where(ActivenessProbe.is_pending)
1080 )
1081 .scalars()
1082 .all()
1083 )
1085 for probe in probes:
1086 probe.notifications_sent = probe_number_minus_1 + 1
1087 context = make_background_user_context(user_id=probe.user.id)
1088 notify(
1089 session,
1090 user_id=probe.user.id,
1091 topic_action="activeness:probe",
1092 key=str(probe.id),
1093 data=notification_data_pb2.ActivenessProbe(
1094 reminder_number=probe_number_minus_1 + 1,
1095 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1096 ),
1097 )
1098 session.commit()
1100 ## Step 3: for those who haven't responded, mark them as failed
1101 expired_probes = (
1102 session.execute(
1103 select(ActivenessProbe)
1104 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1105 .where(ActivenessProbe.is_pending)
1106 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1107 )
1108 .scalars()
1109 .all()
1110 )
1112 for probe in expired_probes:
1113 probe.responded = now()
1114 probe.response = ActivenessProbeStatus.expired
1115 if probe.user.hosting_status == HostingStatus.can_host:
1116 probe.user.hosting_status = HostingStatus.maybe
1117 if probe.user.meetup_status == MeetupStatus.wants_to_meetup:
1118 probe.user.meetup_status = MeetupStatus.open_to_meetup
1119 session.commit()
1122send_activeness_probes.PAYLOAD = empty_pb2.Empty
1123send_activeness_probes.SCHEDULE = timedelta(minutes=60)
1126def update_randomized_locations(payload: empty_pb2.Empty) -> None:
1127 """
1128 We generate for each user a randomized location as follows:
1129 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1130 - For each user, mix in the user_id for randomness
1131 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1132 - Generate an angle from [0, 360]
1133 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1134 """
1135 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1137 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]:
1138 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1139 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1140 radius = 0.02 + 0.08 * radius_u
1141 angle_rad = 2 * pi * angle_u
1142 offset_lng = radius * cos(angle_rad)
1143 offset_lat = radius * sin(angle_rad)
1144 return lat + offset_lat, lng + offset_lng
1146 user_updates: list[dict[str, Any]] = []
1148 with session_scope() as session:
1149 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1151 for user_id, geom in users_to_update:
1152 lat, lng = get_coordinates(geom)
1153 user_updates.append(
1154 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1155 )
1157 with session_scope() as session:
1158 session.execute(update(User), user_updates)
1161update_randomized_locations.PAYLOAD = empty_pb2.Empty
1162update_randomized_locations.SCHEDULE = timedelta(hours=1)
1165def send_event_reminders(payload: empty_pb2.Empty) -> None:
1166 """
1167 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1168 """
1169 logger.info("Sending event reminder emails")
1171 with session_scope() as session:
1172 occurrences = (
1173 session.execute(
1174 select(EventOccurrence)
1175 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1176 .where(EventOccurrence.start_time >= now())
1177 )
1178 .scalars()
1179 .all()
1180 )
1182 for occurrence in occurrences:
1183 results = session.execute(
1184 select(User, EventOccurrenceAttendee)
1185 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1186 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1187 .where(EventOccurrenceAttendee.reminder_sent == False)
1188 ).all()
1190 for user, attendee in results:
1191 context = make_background_user_context(user_id=user.id)
1193 notify(
1194 session,
1195 user_id=user.id,
1196 topic_action="event:reminder",
1197 key=str(occurrence.id),
1198 data=notification_data_pb2.EventReminder(
1199 event=event_to_pb(session, occurrence, context),
1200 user=user_model_to_pb(user, session, context),
1201 ),
1202 )
1204 attendee.reminder_sent = True
1205 session.commit()
1208send_event_reminders.PAYLOAD = empty_pb2.Empty
1209send_event_reminders.SCHEDULE = timedelta(hours=1)
1212def check_expo_push_receipts(payload: empty_pb2.Empty) -> None:
1213 """
1214 Check Expo push receipts in batch and update delivery attempts.
1215 """
1216 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max
1218 for iteration in range(MAX_ITERATIONS):
1219 with session_scope() as session:
1220 # Find all delivery attempts that need receipt checking
1221 # Wait 15 minutes per Expo's recommendation before checking receipts
1222 attempts = (
1223 session.execute(
1224 select(PushNotificationDeliveryAttempt)
1225 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None)
1226 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None)
1227 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15))
1228 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24))
1229 .limit(100)
1230 )
1231 .scalars()
1232 .all()
1233 )
1235 if not attempts:
1236 logger.debug("No Expo receipts to check")
1237 return
1239 logger.info(f"Checking {len(attempts)} Expo push receipts")
1241 receipts = get_expo_push_receipts([attempt.expo_ticket_id for attempt in attempts])
1243 for attempt in attempts:
1244 receipt = receipts.get(attempt.expo_ticket_id)
1246 # Always mark as checked to avoid infinite loops
1247 attempt.receipt_checked_at = now()
1249 if receipt is None:
1250 # Receipt not found after 15min - likely expired (>24h) or never existed
1251 # Per Expo docs: receipts should be available within 15 minutes
1252 attempt.receipt_status = "not_found"
1253 continue
1255 attempt.receipt_status = receipt.get("status")
1257 if receipt.get("status") == "error":
1258 details = receipt.get("details", {})
1259 error_code = details.get("error")
1260 attempt.receipt_error_code = error_code
1262 if error_code == "DeviceNotRegistered":
1263 # Device token is no longer valid - disable the subscription
1264 sub = session.execute(
1265 select(PushNotificationSubscription).where(
1266 PushNotificationSubscription.id == attempt.push_notification_subscription_id
1267 )
1268 ).scalar_one()
1270 if sub.disabled_at > now():
1271 sub.disabled_at = now()
1272 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt")
1273 push_notification_counter.labels(
1274 platform="expo", outcome="permanent_subscription_failure_receipt"
1275 ).inc()
1276 else:
1277 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}")
1279 # If we get here, we've exhausted MAX_ITERATIONS without finishing
1280 raise RuntimeError(
1281 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - "
1282 "there may be an unusually large backlog of receipts to check"
1283 )
1286check_expo_push_receipts.PAYLOAD = empty_pb2.Empty
1287check_expo_push_receipts.SCHEDULE = timedelta(minutes=5)
1290def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None:
1291 """
1292 Sends the postcard via external API and updates attempt status.
1293 """
1294 with session_scope() as session:
1295 attempt = session.execute(
1296 select(PostalVerificationAttempt).where(
1297 PostalVerificationAttempt.id == payload.postal_verification_attempt_id
1298 )
1299 ).scalar_one_or_none()
1301 if not attempt or attempt.status != PostalVerificationStatus.in_progress:
1302 logger.warning(
1303 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state"
1304 )
1305 return
1307 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one()
1309 result = send_postcard(
1310 recipient_name=user_name,
1311 address_line_1=attempt.address_line_1,
1312 address_line_2=attempt.address_line_2,
1313 city=attempt.city,
1314 state=attempt.state,
1315 postal_code=attempt.postal_code,
1316 country=attempt.country,
1317 verification_code=attempt.verification_code,
1318 qr_code_url=urls.postal_verification_link(code=attempt.verification_code),
1319 )
1321 if result.success:
1322 attempt.status = PostalVerificationStatus.awaiting_verification
1323 attempt.postcard_sent_at = func.now()
1325 notify(
1326 session,
1327 user_id=attempt.user_id,
1328 topic_action="postal_verification:postcard_sent",
1329 key="",
1330 data=notification_data_pb2.PostalVerificationPostcardSent(
1331 city=attempt.city,
1332 country=attempt.country,
1333 ),
1334 )
1335 else:
1336 # Could retry or fail - for now, fail
1337 attempt.status = PostalVerificationStatus.failed
1338 logger.error(f"Postcard send failed: {result.error_message}")
1341send_postal_verification_postcard.PAYLOAD = jobs_pb2.SendPostalVerificationPostcardPayload
1344class DatabaseInconsistencyError(Exception):
1345 """Raised when database consistency checks fail"""
1347 pass
1350def check_database_consistency(payload: empty_pb2.Empty) -> None:
1351 """
1352 Checks database consistency and raises an exception if any issues are found.
1353 """
1354 logger.info("Checking database consistency")
1355 errors = []
1357 with session_scope() as session:
1358 # Check that all non-deleted users have a profile gallery
1359 users_without_gallery = session.execute(
1360 select(User.id, User.username).where(User.is_deleted == False).where(User.profile_gallery_id.is_(None))
1361 ).all()
1362 if users_without_gallery:
1363 errors.append(f"Users without profile gallery: {users_without_gallery}")
1365 # Check that all profile galleries point to their owner
1366 mismatched_galleries = session.execute(
1367 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id)
1368 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id)
1369 .where(User.profile_gallery_id.is_not(None))
1370 .where(PhotoGallery.owner_user_id != User.id)
1371 ).all()
1372 if mismatched_galleries:
1373 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}")
1375 # === Moderation System Consistency Checks ===
1377 # Check all ModerationStates have a known object_type
1378 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT]
1379 unknown_type_states = session.execute(
1380 select(ModerationState.id, ModerationState.object_type).where(
1381 ModerationState.object_type.not_in(known_object_types)
1382 )
1383 ).all()
1384 if unknown_type_states:
1385 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}")
1387 # Check resolved queue items point to log entries with resolving actions (APPROVE/HIDE, not CREATE/FLAG/UNFLAG)
1388 resolving_actions = [ModerationAction.APPROVE, ModerationAction.HIDE]
1389 invalid_resolved_actions = session.execute(
1390 select(ModerationQueueItem.id, ModerationQueueItem.resolved_by_log_id, ModerationLog.action)
1391 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1392 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1393 .where(ModerationLog.action.not_in(resolving_actions))
1394 ).all()
1395 if invalid_resolved_actions:
1396 errors.append(f"Queue items resolved by non-resolving actions: {invalid_resolved_actions}")
1398 # Check every ModerationState has at least one INITIAL_REVIEW queue item
1399 # Skip items with ID < 2000000 as they were created before this check was introduced
1400 states_without_initial_review = session.execute(
1401 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1402 ModerationState.id >= 2000000,
1403 ~exists(
1404 select(1)
1405 .where(ModerationQueueItem.moderation_state_id == ModerationState.id)
1406 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1407 ),
1408 )
1409 ).all()
1410 if states_without_initial_review:
1411 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}")
1413 # Check every ModerationState has a CREATE log entry
1414 # Skip items with ID < 2000000 as they were created before this check was introduced
1415 states_without_create_log = session.execute(
1416 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1417 ModerationState.id >= 2000000,
1418 ~exists(
1419 select(1)
1420 .where(ModerationLog.moderation_state_id == ModerationState.id)
1421 .where(ModerationLog.action == ModerationAction.CREATE)
1422 ),
1423 )
1424 ).all()
1425 if states_without_create_log:
1426 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}")
1428 # Check resolved queue items point to log entries for the same moderation state
1429 resolved_item_log_mismatches = session.execute(
1430 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id)
1431 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1432 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1433 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id)
1434 ).all()
1435 if resolved_item_log_mismatches:
1436 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}")
1438 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it
1439 hr_states = (
1440 session.execute(
1441 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1442 )
1443 .scalars()
1444 .all()
1445 )
1446 for state_id in hr_states:
1447 hr_count = session.execute(
1448 select(func.count()).where(HostRequest.moderation_state_id == state_id)
1449 ).scalar_one()
1450 if hr_count != 1:
1451 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)")
1453 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it
1454 gc_states = (
1455 session.execute(
1456 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1457 )
1458 .scalars()
1459 .all()
1460 )
1461 for state_id in gc_states:
1462 gc_count = session.execute(
1463 select(func.count()).where(GroupChat.moderation_state_id == state_id)
1464 ).scalar_one()
1465 if gc_count != 1:
1466 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)")
1468 # Check ModerationState.object_id matches the actual object's ID
1469 hr_object_id_mismatches = session.execute(
1470 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id)
1471 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id)
1472 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1473 .where(ModerationState.object_id != HostRequest.conversation_id)
1474 ).all()
1475 if hr_object_id_mismatches:
1476 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}")
1478 gc_object_id_mismatches = session.execute(
1479 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id)
1480 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id)
1481 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1482 .where(ModerationState.object_id != GroupChat.conversation_id)
1483 ).all()
1484 if gc_object_id_mismatches:
1485 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}")
1487 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState
1488 hr_reverse_mismatches = session.execute(
1489 select(
1490 HostRequest.conversation_id,
1491 HostRequest.moderation_state_id,
1492 ModerationState.object_type,
1493 ModerationState.object_id,
1494 )
1495 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id)
1496 .where(
1497 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST)
1498 | (ModerationState.object_id != HostRequest.conversation_id)
1499 )
1500 ).all()
1501 if hr_reverse_mismatches:
1502 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}")
1504 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState
1505 gc_reverse_mismatches = session.execute(
1506 select(
1507 GroupChat.conversation_id,
1508 GroupChat.moderation_state_id,
1509 ModerationState.object_type,
1510 ModerationState.object_id,
1511 )
1512 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id)
1513 .where(
1514 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT)
1515 | (ModerationState.object_id != GroupChat.conversation_id)
1516 )
1517 ).all()
1518 if gc_reverse_mismatches:
1519 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}")
1521 # Check notifications linked to VISIBLE/UNLISTED content have been processed
1522 # When content becomes visible, handle_notification should create NotificationDelivery records.
1523 # Notifications with moderation_state_id pointing to visible content but no delivery records are "hanging".
1524 hanging_notifications = session.execute(
1525 select(Notification.id, Notification.user_id, Notification.topic_action, ModerationState.visibility)
1526 .join(ModerationState, Notification.moderation_state_id == ModerationState.id)
1527 .where(ModerationState.visibility.in_([ModerationVisibility.VISIBLE, ModerationVisibility.UNLISTED]))
1528 .where(~exists(select(1).where(NotificationDelivery.notification_id == Notification.id)))
1529 ).all()
1530 if hanging_notifications:
1531 errors.append(
1532 f"Notifications for VISIBLE/UNLISTED content without delivery records: {hanging_notifications}"
1533 )
1535 # Ensure auto-approve deadline isn't being exceeded by a significant margin
1536 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting
1537 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1538 if deadline_seconds > 0:
1539 grace_period = timedelta(minutes=5)
1540 stale_initial_review_items = session.execute(
1541 select(
1542 ModerationQueueItem.id,
1543 ModerationQueueItem.moderation_state_id,
1544 ModerationQueueItem.time_created,
1545 )
1546 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1547 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
1548 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period)
1549 ).all()
1550 if stale_initial_review_items:
1551 errors.append(
1552 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}"
1553 )
1555 if errors:
1556 raise DatabaseInconsistencyError("\n".join(errors))
1559check_database_consistency.PAYLOAD = empty_pb2.Empty
1560check_database_consistency.SCHEDULE = timedelta(hours=24)
1563def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None:
1564 """
1565 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline.
1566 Items explicitly actioned by moderators are left alone.
1567 """
1568 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1569 if deadline_seconds <= 0:
1570 return
1572 with session_scope() as session:
1573 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"])
1575 items = (
1576 Moderation()
1577 .GetModerationQueue(
1578 request=moderation_pb2.GetModerationQueueReq(
1579 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW],
1580 unresolved_only=True,
1581 page_size=100,
1582 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)),
1583 ),
1584 context=ctx,
1585 session=session,
1586 )
1587 .queue_items
1588 )
1590 if not items:
1591 return
1593 logger.info(f"Auto-approving {len(items)} moderation queue items")
1594 for item in items:
1595 Moderation().ModerateContent(
1596 request=moderation_pb2.ModerateContentReq(
1597 moderation_state_id=item.moderation_state_id,
1598 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1599 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1600 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.",
1601 ),
1602 context=ctx,
1603 session=session,
1604 )
1605 moderation_auto_approved_counter.inc(len(items))
1608auto_approve_moderation_queue.PAYLOAD = empty_pb2.Empty
1609auto_approve_moderation_queue.SCHEDULE = timedelta(seconds=15)