Coverage for app/backend/src/couchers/jobs/handlers.py: 88%
492 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1"""
2Background job servicers
3"""
5import logging
6from collections.abc import Sequence
7from datetime import date, timedelta
8from math import cos, pi, sin, sqrt
9from random import sample
10from typing import Any
12import requests
13from google.protobuf import empty_pb2
14from sqlalchemy import ColumnElement, Float, Function, Integer, select
15from sqlalchemy.orm import aliased
16from sqlalchemy.sql import (
17 and_,
18 case,
19 cast,
20 delete,
21 distinct,
22 exists,
23 extract,
24 func,
25 literal,
26 not_,
27 or_,
28 union_all,
29 update,
30)
32from couchers import experimentation
33from couchers.config import config
34from couchers.constants import (
35 ACTIVENESS_PROBE_EXPIRY_TIME,
36 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
37 ACTIVENESS_PROBE_TIME_REMINDERS,
38 EVENT_REMINDER_TIMEDELTA,
39 HOST_REQUEST_MAX_REMINDERS,
40 HOST_REQUEST_REMINDER_INTERVAL,
41 MODERATION_AUTO_APPROVE_FLAG_PRIORITY,
42)
43from couchers.context import make_background_user_context, make_notification_user_context
44from couchers.crypto import (
45 USER_LOCATION_RANDOMIZATION_NAME,
46 asym_encrypt,
47 b64decode,
48 get_secret,
49 simple_decrypt,
50 stable_secure_uniform,
51)
52from couchers.db import session_scope
53from couchers.email.dev import print_dev_email
54from couchers.email.smtp import send_smtp_email
55from couchers.event_log import log_event
56from couchers.helpers.badges import user_add_badge, user_remove_badge
57from couchers.helpers.completed_profile import has_completed_profile_expression
58from couchers.materialized_views import (
59 UserResponseRate,
60)
61from couchers.metrics import (
62 moderation_auto_approved_counter,
63 postcards_sent_counter,
64 push_notification_counter,
65 strong_verification_completions_counter,
66)
67from couchers.models import (
68 AccountDeletionToken,
69 ActivenessProbe,
70 ActivenessProbeStatus,
71 Cluster,
72 ClusterRole,
73 ClusterSubscription,
74 EventOccurrence,
75 EventOccurrenceAttendee,
76 GroupChat,
77 GroupChatSubscription,
78 HostingStatus,
79 HostRequest,
80 HostRequestStatus,
81 LoginToken,
82 MeetupStatus,
83 Message,
84 MessageType,
85 ModerationAction,
86 ModerationLog,
87 ModerationObjectType,
88 ModerationQueueItem,
89 ModerationState,
90 ModerationTrigger,
91 PassportSex,
92 PasswordResetToken,
93 PhotoGallery,
94 PostalVerificationAttempt,
95 PostalVerificationStatus,
96 PushNotificationDeliveryAttempt,
97 PushNotificationSubscription,
98 Reference,
99 StrongVerificationAttempt,
100 StrongVerificationAttemptStatus,
101 User,
102 UserBadge,
103 Volunteer,
104)
105from couchers.models.notifications import NotificationTopicAction
106from couchers.notifications.expo_api import get_expo_push_receipts
107from couchers.notifications.notify import notify
108from couchers.postal.my_postcard import get_order_ids, send_postcard
109from couchers.proto import moderation_pb2, notification_data_pb2
110from couchers.proto.internal import internal_pb2, jobs_pb2
111from couchers.resources import get_badge_dict, get_static_badge_dict
112from couchers.sentry import report_message
113from couchers.servicers.api import user_model_to_pb
114from couchers.servicers.events import (
115 event_to_pb,
116)
117from couchers.servicers.moderation import Moderation
118from couchers.servicers.requests import host_request_to_pb
119from couchers.sql import (
120 users_visible_to_each_other,
121 where_moderated_content_visible,
122 where_moderated_content_visible_to_user_column,
123 where_user_columns_visible_to_each_other,
124 where_users_column_visible,
125)
126from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
127from couchers.tasks import send_duplicate_strong_verification_email
128from couchers.utils import (
129 Timestamp_from_datetime,
130 create_coordinate,
131 get_coordinates,
132 not_none,
133 now,
134)
136logger = logging.getLogger(__name__)
139def send_email(payload: jobs_pb2.SendEmailPayload) -> None:
140 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
141 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
142 sender = send_smtp_email if config.ENABLE_EMAIL else print_dev_email
143 # the sender must return a models.Email object that can be added to the database
144 email = sender(payload)
145 with session_scope() as session:
146 session.add(email)
149def purge_login_tokens(payload: empty_pb2.Empty) -> None:
150 logger.info("Purging login tokens")
151 with session_scope() as session:
152 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
155def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None:
156 logger.info("Purging login tokens")
157 with session_scope() as session:
158 session.execute(
159 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
160 )
163def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None:
164 logger.info("Purging account deletion tokens")
165 with session_scope() as session:
166 session.execute(
167 delete(AccountDeletionToken)
168 .where(~AccountDeletionToken.is_valid)
169 .execution_options(synchronize_session=False)
170 )
173def send_message_notifications(payload: empty_pb2.Empty) -> None:
174 """
175 Sends out email notifications for messages that have been unseen for a long enough time
176 """
177 # very crude and dumb algorithm
178 logger.info("Sending out email notifications for unseen messages")
180 with session_scope() as session:
181 # users who have unnotified messages older than 5 minutes in any group chat
182 users = (
183 session.execute(
184 where_moderated_content_visible_to_user_column(
185 select(User)
186 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
187 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
188 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
189 GroupChat,
190 User.id,
191 )
192 .where(not_(GroupChatSubscription.is_muted))
193 .where(User.is_visible)
194 .where(Message.time >= GroupChatSubscription.joined)
195 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
196 .where(Message.id > User.last_notified_message_id)
197 .where(Message.id > GroupChatSubscription.last_seen_message_id)
198 .where(Message.time < now() - timedelta(minutes=5))
199 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
200 )
201 .scalars()
202 .unique()
203 )
205 for user in users:
206 context = make_notification_user_context(user_id=user.id)
207 # now actually grab all the group chats, not just less than 5 min old
208 subquery = (
209 where_users_column_visible(
210 where_moderated_content_visible(
211 select(
212 GroupChatSubscription.group_chat_id.label("group_chat_id"),
213 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
214 func.max(Message.id).label("message_id"),
215 func.count(Message.id).label("unseen_count"),
216 )
217 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
218 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
219 context,
220 GroupChat,
221 is_list_operation=True,
222 )
223 .where(GroupChatSubscription.user_id == user.id)
224 .where(not_(GroupChatSubscription.is_muted))
225 .where(Message.id > user.last_notified_message_id)
226 .where(Message.id > GroupChatSubscription.last_seen_message_id)
227 .where(Message.time >= GroupChatSubscription.joined)
228 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
229 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)),
230 context,
231 Message.author_id,
232 )
233 .group_by(GroupChatSubscription.group_chat_id)
234 .order_by(func.max(Message.id).desc())
235 .subquery()
236 )
238 unseen_messages = session.execute(
239 where_moderated_content_visible(
240 select(GroupChat, Message, subquery.c.unseen_count)
241 .join(subquery, subquery.c.message_id == Message.id)
242 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id),
243 context,
244 GroupChat,
245 is_list_operation=True,
246 ).order_by(subquery.c.message_id.desc())
247 ).all()
249 if not unseen_messages:
250 continue
252 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
254 notify(
255 session,
256 user_id=user.id,
257 topic_action=NotificationTopicAction.chat__missed_messages,
258 key="",
259 data=notification_data_pb2.ChatMissedMessages(
260 messages=[
261 notification_data_pb2.ChatMessage(
262 author=user_model_to_pb(
263 message.author,
264 session,
265 context,
266 ),
267 text=message.text,
268 group_chat_id=message.conversation_id,
269 group_chat_title=group_chat.title or None,
270 unseen_count=unseen_count,
271 )
272 for group_chat, message, unseen_count in unseen_messages
273 ],
274 ),
275 )
276 session.commit()
279def send_request_notifications(payload: empty_pb2.Empty) -> None:
280 """
281 Sends out email notifications for unseen messages in host requests (as surfer or host)
282 """
283 logger.info("Sending out email notifications for unseen messages in host requests")
285 with session_scope() as session:
286 # Get all candidate users who might have unseen request messages.
287 # Drive from host_requests/messages (selective) rather than scanning all users (expensive).
288 surfer_ids = (
289 select(User.id)
290 .join(HostRequest, HostRequest.initiator_user_id == User.id)
291 .join(Message, Message.conversation_id == HostRequest.conversation_id)
292 .where(User.is_visible)
293 .where(Message.id > HostRequest.initiator_last_seen_message_id)
294 .where(Message.id > User.last_notified_request_message_id)
295 .where(Message.time < now() - timedelta(minutes=5))
296 .where(Message.message_type == MessageType.text)
297 )
298 host_ids = (
299 select(User.id)
300 .join(HostRequest, HostRequest.recipient_user_id == User.id)
301 .join(Message, Message.conversation_id == HostRequest.conversation_id)
302 .where(User.is_visible)
303 .where(Message.id > HostRequest.recipient_last_seen_message_id)
304 .where(Message.id > User.last_notified_request_message_id)
305 .where(Message.time < now() - timedelta(minutes=5))
306 .where(Message.message_type == MessageType.text)
307 )
308 candidate_user_ids = session.execute(union_all(surfer_ids, host_ids)).scalars().unique().all()
310 for user_id in candidate_user_ids:
311 context = make_notification_user_context(user_id=user_id)
313 # requests where this user is surfing
314 surfing_reqs = session.execute(
315 where_users_column_visible(
316 where_moderated_content_visible_to_user_column(
317 select(User, HostRequest, func.max(Message.id))
318 .where(User.id == user_id)
319 .join(HostRequest, HostRequest.initiator_user_id == User.id),
320 HostRequest,
321 HostRequest.initiator_user_id,
322 ),
323 context,
324 HostRequest.recipient_user_id,
325 )
326 .join(Message, Message.conversation_id == HostRequest.conversation_id)
327 .where(Message.id > HostRequest.initiator_last_seen_message_id)
328 .where(Message.id > User.last_notified_request_message_id)
329 .where(Message.time < now() - timedelta(minutes=5))
330 .where(Message.message_type == MessageType.text)
331 .group_by(User, HostRequest) # type: ignore[arg-type]
332 ).all()
334 # where this user is hosting
335 hosting_reqs = session.execute(
336 where_users_column_visible(
337 where_moderated_content_visible_to_user_column(
338 select(User, HostRequest, func.max(Message.id))
339 .where(User.id == user_id)
340 .join(HostRequest, HostRequest.recipient_user_id == User.id),
341 HostRequest,
342 HostRequest.recipient_user_id,
343 ),
344 context,
345 HostRequest.initiator_user_id,
346 )
347 .join(Message, Message.conversation_id == HostRequest.conversation_id)
348 .where(Message.id > HostRequest.recipient_last_seen_message_id)
349 .where(Message.id > User.last_notified_request_message_id)
350 .where(Message.time < now() - timedelta(minutes=5))
351 .where(Message.message_type == MessageType.text)
352 .group_by(User, HostRequest) # type: ignore[arg-type]
353 ).all()
355 for user, host_request, max_message_id in surfing_reqs:
356 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
357 session.flush()
359 notify(
360 session,
361 user_id=user.id,
362 topic_action=NotificationTopicAction.host_request__missed_messages,
363 key=str(host_request.conversation_id),
364 data=notification_data_pb2.HostRequestMissedMessages(
365 host_request=host_request_to_pb(host_request, session, context),
366 user=user_model_to_pb(host_request.recipient, session, context),
367 am_host=False,
368 ),
369 )
371 for user, host_request, max_message_id in hosting_reqs:
372 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
373 session.flush()
375 # When a host request is created, the recipient immediately receives a
376 # host_request__create notification that includes the initial message text.
377 # A few minutes later, this background job sees that same message as "unseen"
378 # (the recipient hasn't opened the request yet) and would send a duplicate
379 # missed_messages notification.
380 #
381 # To prevent this, we check if the only unseen text message in this host
382 # request is the very first text message in the conversation (i.e. the
383 # creation message). If so, we skip sending missed_messages — the user was
384 # already notified via host_request__create.
385 #
386 # Advancing last_notified_request_message_id above is safe even when we skip
387 # the notification: this watermark is only ever advanced when we process all
388 # unseen messages for the user, so skipping one notification doesn't cause us
389 # to miss future messages in other host requests.
390 only_creation_message = not session.execute(
391 select(
392 select(func.count())
393 .where(Message.conversation_id == host_request.conversation_id)
394 .where(Message.message_type == MessageType.text)
395 .scalar_subquery()
396 > 1
397 )
398 ).scalar_one()
399 if only_creation_message:
400 continue
402 notify(
403 session,
404 user_id=user.id,
405 topic_action=NotificationTopicAction.host_request__missed_messages,
406 key=str(host_request.conversation_id),
407 data=notification_data_pb2.HostRequestMissedMessages(
408 host_request=host_request_to_pb(host_request, session, context),
409 user=user_model_to_pb(host_request.initiator, session, context),
410 am_host=True,
411 ),
412 )
415def send_onboarding_emails(payload: empty_pb2.Empty) -> None:
416 """
417 Sends out onboarding emails
418 """
419 logger.info("Sending out onboarding emails")
421 with session_scope() as session:
422 # first onboarding email
423 users = (
424 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
425 )
427 for user in users:
428 notify(
429 session,
430 user_id=user.id,
431 topic_action=NotificationTopicAction.onboarding__reminder,
432 key="1",
433 )
434 user.onboarding_emails_sent = 1
435 user.last_onboarding_email_sent = now()
436 session.commit()
438 # second onboarding email
439 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
440 users = (
441 session.execute(
442 select(User)
443 .where(User.is_visible)
444 .where(User.onboarding_emails_sent == 1)
445 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
446 .where(~has_completed_profile_expression())
447 )
448 .scalars()
449 .all()
450 )
452 for user in users:
453 notify(
454 session,
455 user_id=user.id,
456 topic_action=NotificationTopicAction.onboarding__reminder,
457 key="2",
458 )
459 user.onboarding_emails_sent = 2
460 user.last_onboarding_email_sent = now()
461 session.commit()
464def send_reference_reminders(payload: empty_pb2.Empty) -> None:
465 """
466 Sends out reminders to write references after hosting/staying
467 """
468 logger.info("Sending out reference reminder emails")
470 # Keep this in chronological order!
471 reference_reminder_schedule = [
472 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
473 # the end time to write a reference is supposed to be midnight in the host's timezone
474 # 8 pm ish on the last day of the stay
475 (1, timedelta(days=15) - timedelta(hours=20), 14),
476 # 2 pm ish a week after stay
477 (2, timedelta(days=8) - timedelta(hours=14), 7),
478 # 10 am ish 3 days before end of time to write ref
479 (3, timedelta(days=4) - timedelta(hours=10), 3),
480 ]
482 with session_scope() as session:
483 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
484 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
485 user = aliased(User)
486 other_user = aliased(User)
487 # surfers needing to write a ref
488 q1 = (
489 select(literal(True), HostRequest, user, other_user)
490 .join(user, user.id == HostRequest.initiator_user_id)
491 .join(other_user, other_user.id == HostRequest.recipient_user_id)
492 .outerjoin(
493 Reference,
494 and_(
495 Reference.host_request_id == HostRequest.conversation_id,
496 # if no reference is found in this join, then the surfer has not written a ref
497 Reference.from_user_id == HostRequest.initiator_user_id,
498 ),
499 )
500 .where(Reference.id == None)
501 .where(HostRequest.can_write_reference)
502 .where(HostRequest.initiator_sent_reference_reminders < reminder_number)
503 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
504 .where(HostRequest.initiator_reason_didnt_meetup == None)
505 .where(users_visible_to_each_other(self_user=user, other_user=other_user))
506 )
508 # hosts needing to write a ref
509 q2 = (
510 select(literal(False), HostRequest, user, other_user)
511 .join(user, user.id == HostRequest.recipient_user_id)
512 .join(other_user, other_user.id == HostRequest.initiator_user_id)
513 .outerjoin(
514 Reference,
515 and_(
516 Reference.host_request_id == HostRequest.conversation_id,
517 # if no reference is found in this join, then the host has not written a ref
518 Reference.from_user_id == HostRequest.recipient_user_id,
519 ),
520 )
521 .where(Reference.id == None)
522 .where(HostRequest.can_write_reference)
523 .where(HostRequest.recipient_sent_reference_reminders < reminder_number)
524 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
525 .where(HostRequest.recipient_reason_didnt_meetup == None)
526 .where(users_visible_to_each_other(self_user=user, other_user=other_user))
527 )
529 union = union_all(q1, q2).subquery()
530 query = select(
531 union.c[0].label("surfed"),
532 aliased(HostRequest, union),
533 aliased(user, union),
534 aliased(other_user, union),
535 )
536 reference_reminders = session.execute(query).all()
538 for surfed, host_request, user, other_user in reference_reminders:
539 # visibility and blocking already checked in sql
540 assert user.is_visible
541 context = make_notification_user_context(user_id=user.id)
542 topic_action = (
543 NotificationTopicAction.reference__reminder_surfed
544 if surfed
545 else NotificationTopicAction.reference__reminder_hosted
546 )
547 notify(
548 session,
549 user_id=user.id,
550 topic_action=topic_action,
551 key=str(host_request.conversation_id),
552 data=notification_data_pb2.ReferenceReminder(
553 host_request_id=host_request.conversation_id,
554 other_user=user_model_to_pb(other_user, session, context),
555 days_left=reminder_days_left,
556 ),
557 )
558 if surfed:
559 host_request.initiator_sent_reference_reminders = reminder_number
560 else:
561 host_request.recipient_sent_reference_reminders = reminder_number
562 session.commit()
565def send_host_request_reminders(payload: empty_pb2.Empty) -> None:
566 with session_scope() as session:
567 host_has_sent_message = select(1).where(
568 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.recipient_user_id
569 )
571 requests = (
572 session.execute(
573 where_user_columns_visible_to_each_other(
574 where_moderated_content_visible_to_user_column(
575 select(HostRequest),
576 HostRequest,
577 HostRequest.recipient_user_id,
578 )
579 .where(HostRequest.status == HostRequestStatus.pending)
580 .where(HostRequest.recipient_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
581 .where(HostRequest.start_time > func.now())
582 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
583 .where(~exists(host_has_sent_message)),
584 self_column=HostRequest.recipient_user_id,
585 other_column=HostRequest.initiator_user_id,
586 )
587 )
588 .scalars()
589 .all()
590 )
592 for host_request in requests:
593 host_request.recipient_sent_request_reminders += 1
594 host_request.last_sent_request_reminder_time = now()
596 context = make_notification_user_context(user_id=host_request.recipient_user_id)
597 notify(
598 session,
599 user_id=host_request.recipient_user_id,
600 topic_action=NotificationTopicAction.host_request__reminder,
601 key=str(host_request.conversation_id),
602 data=notification_data_pb2.HostRequestReminder(
603 host_request=host_request_to_pb(host_request, session, context),
604 surfer=user_model_to_pb(host_request.initiator, session, context),
605 ),
606 moderation_state_id=host_request.moderation_state_id,
607 )
609 session.commit()
612def add_users_to_email_list(payload: empty_pb2.Empty) -> None:
613 if not experimentation.get_global_boolean_value("listmonk_enabled", default=False): 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true
614 logger.info("Not adding users to mailing list")
615 return
617 sess = requests.Session()
618 sess.auth = (config.LISTMONK_API_USERNAME, config.LISTMONK_API_KEY)
620 def sync_subscriber(user: User, status: str) -> None:
621 r = sess.post(
622 config.LISTMONK_BASE_URL + "/api/subscribers",
623 json={
624 "email": user.email,
625 "name": user.name,
626 "lists": [config.LISTMONK_LIST_ID],
627 "preconfirm_subscriptions": True,
628 "attribs": {"couchers_user_id": user.id},
629 "status": status,
630 },
631 timeout=10,
632 )
633 # the API returns 409 if the subscriber already exists
634 if r.status_code not in (200, 409): 634 ↛ 635line 634 didn't jump to line 635 because the condition on line 634 was never true
635 raise Exception("Failed to update user mailing list status")
637 logger.info("Adding users to mailing list")
639 while True:
640 with session_scope() as session:
641 user = session.execute(
642 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
643 ).scalar_one_or_none()
644 if not user:
645 logger.info("Finished adding users to mailing list")
646 break
648 if not user.opt_out_of_newsletter:
649 sync_subscriber(user, "enabled")
651 user.in_sync_with_newsletter = True
652 session.commit()
654 if experimentation.get_global_boolean_value("remove_removed_users_from_mailing_list_enabled", default=False): 654 ↛ 655line 654 didn't jump to line 655 because the condition on line 654 was never true
655 with session_scope() as session:
656 session.execute(
657 update(User)
658 .where(~User.is_visible | User.is_shadowed)
659 .where(User.opt_out_of_newsletter == False)
660 .values(opt_out_of_newsletter=True, in_sync_with_newsletter=False)
661 )
662 session.commit()
664 while True:
665 with session_scope() as session:
666 user = session.execute(
667 select(User)
668 .where(~User.is_visible | User.is_shadowed)
669 .where(User.in_sync_with_newsletter == False)
670 .limit(1)
671 ).scalar_one_or_none()
672 if not user:
673 logger.info("Finished removing users from mailing list")
674 return
676 sync_subscriber(user, "blocklisted")
677 user.in_sync_with_newsletter = True
678 session.commit()
681def enforce_community_membership(payload: empty_pb2.Empty) -> None:
682 tasks_enforce_community_memberships()
685def update_recommendation_scores(payload: empty_pb2.Empty) -> None:
686 text_fields = [
687 User.hometown,
688 User.occupation,
689 User.education,
690 User.about_me,
691 User.things_i_like,
692 User.about_place,
693 User.additional_information,
694 User.pet_details,
695 User.kid_details,
696 User.housemate_details,
697 User.other_host_info,
698 User.sleeping_details,
699 User.area,
700 User.house_rules,
701 ]
702 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
704 def poor_man_gaussian() -> ColumnElement[float] | float:
705 """
706 Produces an approximatley std normal random variate
707 """
708 trials = 5
709 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
711 def int_(stmt: Any) -> Function[int]:
712 return func.coalesce(cast(stmt, Integer), 0)
714 def float_(stmt: Any) -> Function[float]:
715 return func.coalesce(cast(stmt, Float), 0.0)
717 with session_scope() as session:
718 # profile
719 profile_text = ""
720 for field in text_fields:
721 profile_text += func.coalesce(field, "") # type: ignore[assignment]
722 text_length = func.length(profile_text)
723 home_text = ""
724 for field in home_fields:
725 home_text += func.coalesce(field, "") # type: ignore[assignment]
726 home_length = func.length(home_text)
728 filled_profile = int_(has_completed_profile_expression())
729 has_text = int_(text_length > 500)
730 long_text = int_(text_length > 2000)
731 can_host = int_(User.hosting_status == HostingStatus.can_host)
732 may_host = int_(User.hosting_status == HostingStatus.maybe)
733 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
734 filled_home = int_(User.has_completed_my_home)
735 filled_home_lots = int_(home_length > 200)
736 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
737 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
739 # references
740 left_ref_expr = int_(1).label("left_reference")
741 left_refs_subquery = (
742 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
743 )
744 left_reference = int_(left_refs_subquery.c.left_reference)
745 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
746 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
747 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
748 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
749 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
750 "has_bad_ref"
751 )
752 received_ref_subquery = (
753 select(
754 Reference.to_user_id.label("user_id"),
755 has_reference_expr,
756 has_multiple_types_expr,
757 has_bad_ref_expr,
758 ref_count_expr,
759 ref_avg_expr,
760 )
761 .group_by(Reference.to_user_id)
762 .subquery()
763 )
764 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
765 has_reference = int_(received_ref_subquery.c.has_reference)
766 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
767 rating_score = float_(
768 received_ref_subquery.c.ref_avg
769 * (
770 2 * func.least(received_ref_subquery.c.ref_count, 5)
771 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
772 )
773 )
774 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
776 # activeness
777 recently_active = int_(User.last_active >= now() - timedelta(days=180))
778 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
779 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
780 messaged_lots = int_(func.count(Message.id) > 5)
781 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
782 messaging_subquery = (
783 select(Message.author_id.label("user_id"), messaging_points_subquery)
784 .where(Message.message_type == MessageType.text)
785 .group_by(Message.author_id)
786 .subquery()
787 )
788 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
790 # verification
791 cb_subquery = (
792 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
793 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
794 .where(ClusterSubscription.role == ClusterRole.admin)
795 .where(Cluster.is_official_cluster)
796 .group_by(ClusterSubscription.user_id)
797 .subquery()
798 )
799 min_node_id = cb_subquery.c.min_node_id
800 cb = int_(min_node_id >= 1)
801 wcb = int_(min_node_id == 1)
802 badge_points = {
803 "founder": 100,
804 "board_member": 20,
805 "past_board_member": 5,
806 "strong_verification": 3,
807 "volunteer": 3,
808 "past_volunteer": 2,
809 "donor": 1,
810 "phone_verified": 1,
811 }
813 badge_subquery = (
814 select(
815 UserBadge.user_id.label("user_id"),
816 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
817 )
818 .group_by(UserBadge.user_id)
819 .subquery()
820 )
822 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
824 # response rate
825 hr_subquery = select(
826 UserResponseRate.user_id,
827 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
828 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
829 ).subquery()
830 response_time_33p = hr_subquery.c.response_time_33p
831 response_time_66p = hr_subquery.c.response_time_66p
832 # be careful with nulls
833 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
835 recommendation_score = (
836 hosting_status_points
837 + profile_points
838 + ref_score
839 + activeness_points
840 + other_points
841 + response_rate_points
842 + 2 * poor_man_gaussian()
843 )
845 scores = (
846 select(User.id.label("user_id"), recommendation_score.label("score"))
847 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
848 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
849 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
850 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
851 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
852 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
853 ).subquery()
855 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id))
857 logger.info("Updated recommendation scores")
860def update_badges(payload: empty_pb2.Empty) -> None:
861 with session_scope() as session:
863 def update_badge(badge_id: str, members: Sequence[int]) -> None:
864 badge = get_badge_dict()[badge_id]
865 # this batch job has no per-user context to evaluate the gate against, so it's global
866 if badge.flag is not None and not experimentation.get_global_boolean_value(badge.flag, default=True):
867 members = []
868 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all()
869 # in case the user ids don't exist in the db
870 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
871 # we should add the badge to these
872 add = set(actual_members) - set(user_ids)
873 # we should remove the badge from these
874 remove = set(user_ids) - set(actual_members)
875 for user_id in add:
876 user_add_badge(session, user_id, badge.id)
878 for user_id in remove:
879 user_remove_badge(session, user_id, badge.id)
881 update_badge("founder", get_static_badge_dict()["founder"])
882 update_badge("board_member", get_static_badge_dict()["board_member"])
883 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
884 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all())
885 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
886 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
887 # strong verification requires passport on file + gender/sex correspondence and date of birth match
888 update_badge(
889 "strong_verification",
890 session.execute(
891 select(User.id)
892 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
893 .where(StrongVerificationAttempt.has_strong_verification(User))
894 )
895 .scalars()
896 .all(),
897 )
898 # volunteer badge for active volunteers (stopped_volunteering is null)
899 update_badge(
900 "volunteer",
901 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(),
902 )
903 # past_volunteer badge for past volunteers (stopped_volunteering is not null)
904 update_badge(
905 "past_volunteer",
906 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None)))
907 .scalars()
908 .all(),
909 )
912def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None:
913 with session_scope() as session:
914 verification_attempt = session.execute(
915 select(StrongVerificationAttempt)
916 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
917 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
918 ).scalar_one()
919 response = requests.post(
920 "https://passportreader.app/api/v1/session.get",
921 auth=(config.IRIS_ID_PUBKEY, config.IRIS_ID_SECRET),
922 json={"id": verification_attempt.iris_session_id},
923 timeout=10,
924 verify="/etc/ssl/certs/ca-certificates.crt",
925 )
926 if response.status_code != 200: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true
927 raise Exception(f"Iris didn't return 200: {response.text}")
928 json_data = response.json()
929 reference_payload = internal_pb2.VerificationReferencePayload.FromString(
930 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
931 )
932 assert verification_attempt.user_id == reference_payload.user_id
933 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
934 assert verification_attempt.iris_session_id == json_data["id"]
935 assert json_data["state"] == "APPROVED"
937 if json_data["document_type"] != "PASSPORT":
938 verification_attempt.status = StrongVerificationAttemptStatus.failed
939 notify(
940 session,
941 user_id=verification_attempt.user_id,
942 topic_action=NotificationTopicAction.verification__sv_fail,
943 key="",
944 data=notification_data_pb2.VerificationSVFail(
945 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
946 ),
947 )
948 return
950 assert json_data["document_type"] == "PASSPORT"
952 expiry_date = date.fromisoformat(json_data["expiry_date"])
953 nationality = json_data["nationality"]
954 last_three_document_chars = json_data["document_number"][-3:]
956 existing_attempt = session.execute(
957 select(StrongVerificationAttempt)
958 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
959 .where(StrongVerificationAttempt.passport_nationality == nationality)
960 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
961 .order_by(StrongVerificationAttempt.id)
962 .limit(1)
963 ).scalar_one_or_none()
965 verification_attempt.has_minimal_data = True
966 verification_attempt.passport_expiry_date = expiry_date
967 verification_attempt.passport_nationality = nationality
968 verification_attempt.passport_last_three_document_chars = last_three_document_chars
970 if existing_attempt:
971 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
973 if existing_attempt.user_id != verification_attempt.user_id:
974 session.flush()
975 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
977 notify(
978 session,
979 user_id=verification_attempt.user_id,
980 topic_action=NotificationTopicAction.verification__sv_fail,
981 key="",
982 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
983 )
984 return
986 verification_attempt.has_full_data = True
987 verification_attempt.passport_encrypted_data = asym_encrypt(
988 config.VERIFICATION_DATA_PUBLIC_KEY, response.text.encode("utf8")
989 )
990 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
991 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
992 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
994 session.flush()
996 strong_verification_completions_counter.inc()
998 user = verification_attempt.user
999 if verification_attempt.has_strong_verification(user): 999 ↛ 1014line 999 didn't jump to line 1014 because the condition on line 999 was always true
1000 badge_id = "strong_verification"
1001 if session.execute(
1002 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
1003 ).scalar_one_or_none():
1004 return
1006 user_add_badge(session, user.id, badge_id, do_notify=False)
1007 notify(
1008 session,
1009 user_id=verification_attempt.user_id,
1010 topic_action=NotificationTopicAction.verification__sv_success,
1011 key="",
1012 )
1013 else:
1014 notify(
1015 session,
1016 user_id=verification_attempt.user_id,
1017 topic_action=NotificationTopicAction.verification__sv_fail,
1018 key="",
1019 data=notification_data_pb2.VerificationSVFail(
1020 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
1021 ),
1022 )
1025def send_activeness_probes(payload: empty_pb2.Empty) -> None:
1026 with session_scope() as session:
1027 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
1029 if config.ACTIVENESS_PROBES_ENABLED:
1030 # current activeness probes
1031 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
1033 # users who we should send an activeness probe to
1034 new_probe_user_ids = (
1035 session.execute(
1036 select(User.id)
1037 .where(User.is_visible)
1038 .where(User.hosting_status == HostingStatus.can_host)
1039 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
1040 .where(User.id.not_in(select(subquery.c.user_id)))
1041 )
1042 .scalars()
1043 .all()
1044 )
1046 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
1047 probes_today = session.execute(
1048 select(func.count())
1049 .select_from(ActivenessProbe)
1050 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
1051 ).scalar_one()
1053 # send probes to max 2% of users per day
1054 max_probes_per_day = 0.02 * total_users
1055 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
1057 if len(new_probe_user_ids) > max_probe_size: 1057 ↛ 1058line 1057 didn't jump to line 1058 because the condition on line 1057 was never true
1058 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
1060 for user_id in new_probe_user_ids:
1061 session.add(ActivenessProbe(user_id=user_id))
1063 session.commit()
1065 ## Step 2: actually send out probe notifications
1066 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
1067 probes = (
1068 session.execute(
1069 select(ActivenessProbe)
1070 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
1071 .where(ActivenessProbe.probe_initiated + delay < func.now())
1072 .where(ActivenessProbe.is_pending)
1073 )
1074 .scalars()
1075 .all()
1076 )
1078 for probe in probes:
1079 probe.notifications_sent = probe_number_minus_1 + 1
1080 context = make_notification_user_context(user_id=probe.user.id)
1081 notify(
1082 session,
1083 user_id=probe.user.id,
1084 topic_action=NotificationTopicAction.activeness__probe,
1085 key=str(probe.id),
1086 data=notification_data_pb2.ActivenessProbe(
1087 reminder_number=probe_number_minus_1 + 1,
1088 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1089 ),
1090 )
1091 session.commit()
1093 ## Step 3: for those who haven't responded, mark them as failed
1094 expired_probes = (
1095 session.execute(
1096 select(ActivenessProbe)
1097 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1098 .where(ActivenessProbe.is_pending)
1099 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1100 )
1101 .scalars()
1102 .all()
1103 )
1105 for probe in expired_probes:
1106 probe.responded = now()
1107 probe.response = ActivenessProbeStatus.expired
1108 if probe.user.hosting_status == HostingStatus.can_host: 1108 ↛ 1110line 1108 didn't jump to line 1110 because the condition on line 1108 was always true
1109 probe.user.hosting_status = HostingStatus.maybe
1110 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1110 ↛ 1112line 1110 didn't jump to line 1112 because the condition on line 1110 was always true
1111 probe.user.meetup_status = MeetupStatus.open_to_meetup
1112 session.commit()
1115def update_randomized_locations(payload: empty_pb2.Empty) -> None:
1116 """
1117 We generate for each user a randomized location as follows:
1118 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1119 - For each user, mix in the user_id for randomness
1120 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1121 - Generate an angle from [0, 360]
1122 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1123 """
1124 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1126 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]:
1127 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1128 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1129 radius = 0.02 + 0.08 * radius_u
1130 angle_rad = 2 * pi * angle_u
1131 offset_lng = radius * cos(angle_rad)
1132 offset_lat = radius * sin(angle_rad)
1133 return lat + offset_lat, lng + offset_lng
1135 user_updates: list[dict[str, Any]] = []
1137 with session_scope() as session:
1138 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1140 for user_id, geom in users_to_update:
1141 lat, lng = get_coordinates(geom)
1142 user_updates.append(
1143 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1144 )
1146 with session_scope() as session:
1147 session.execute(update(User), user_updates)
1150def send_event_reminders(payload: empty_pb2.Empty) -> None:
1151 """
1152 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1153 """
1154 logger.info("Sending event reminder emails")
1156 with session_scope() as session:
1157 occurrences = (
1158 session.execute(
1159 select(EventOccurrence)
1160 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1161 .where(EventOccurrence.start_time >= now())
1162 .where(~EventOccurrence.is_cancelled)
1163 .where(~EventOccurrence.is_deleted)
1164 )
1165 .scalars()
1166 .all()
1167 )
1169 for occurrence in occurrences:
1170 results = session.execute(
1171 select(User, EventOccurrenceAttendee)
1172 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1173 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1174 .where(EventOccurrenceAttendee.reminder_sent == False)
1175 .where(User.is_visible)
1176 .where(~User.is_shadowed)
1177 ).all()
1179 for user, attendee in results:
1180 context = make_notification_user_context(user_id=user.id)
1182 notify(
1183 session,
1184 user_id=user.id,
1185 topic_action=NotificationTopicAction.event__reminder,
1186 key=str(occurrence.id),
1187 data=notification_data_pb2.EventReminder(
1188 event=event_to_pb(session, occurrence, context),
1189 user=user_model_to_pb(user, session, context),
1190 ),
1191 moderation_state_id=occurrence.moderation_state_id,
1192 )
1194 attendee.reminder_sent = True
1195 session.commit()
1198def check_expo_push_receipts(payload: empty_pb2.Empty) -> None:
1199 """
1200 Check Expo push receipts in batch and update delivery attempts.
1201 """
1202 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max
1204 for iteration in range(MAX_ITERATIONS): 1204 ↛ 1266line 1204 didn't jump to line 1266 because the loop on line 1204 didn't complete
1205 with session_scope() as session:
1206 # Find all delivery attempts that need receipt checking
1207 # Wait 15 minutes per Expo's recommendation before checking receipts
1208 attempts = (
1209 session.execute(
1210 select(PushNotificationDeliveryAttempt)
1211 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None)
1212 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None)
1213 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15))
1214 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24))
1215 .limit(100)
1216 )
1217 .scalars()
1218 .all()
1219 )
1221 if not attempts:
1222 logger.debug("No Expo receipts to check")
1223 return
1225 logger.info(f"Checking {len(attempts)} Expo push receipts")
1227 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts])
1229 for attempt in attempts:
1230 receipt = receipts.get(not_none(attempt.expo_ticket_id))
1232 # Always mark as checked to avoid infinite loops
1233 attempt.receipt_checked_at = now()
1235 if receipt is None:
1236 # Receipt not found after 15min - likely expired (>24h) or never existed
1237 # Per Expo docs: receipts should be available within 15 minutes
1238 attempt.receipt_status = "not_found"
1239 continue
1241 attempt.receipt_status = receipt.get("status")
1243 if receipt.get("status") == "error":
1244 details = receipt.get("details", {})
1245 error_code = details.get("error")
1246 attempt.receipt_error_code = error_code
1248 if error_code == "DeviceNotRegistered": 1248 ↛ 1263line 1248 didn't jump to line 1263 because the condition on line 1248 was always true
1249 # Device token is no longer valid - disable the subscription
1250 sub = session.execute(
1251 select(PushNotificationSubscription).where(
1252 PushNotificationSubscription.id == attempt.push_notification_subscription_id
1253 )
1254 ).scalar_one()
1256 if sub.disabled_at > now(): 1256 ↛ 1229line 1256 didn't jump to line 1229 because the condition on line 1256 was always true
1257 sub.disabled_at = now()
1258 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt")
1259 push_notification_counter.labels(
1260 platform="expo", outcome="permanent_subscription_failure_receipt"
1261 ).inc()
1262 else:
1263 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}")
1265 # If we get here, we've exhausted MAX_ITERATIONS without finishing
1266 raise RuntimeError(
1267 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - "
1268 "there may be an unusually large backlog of receipts to check"
1269 )
1272def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None:
1273 """
1274 Sends the postcard via external API and updates attempt status.
1275 """
1276 with session_scope() as session:
1277 attempt = session.execute(
1278 select(PostalVerificationAttempt).where(
1279 PostalVerificationAttempt.id == payload.postal_verification_attempt_id
1280 )
1281 ).scalar_one_or_none()
1283 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1283 ↛ 1284line 1283 didn't jump to line 1284 because the condition on line 1283 was never true
1284 logger.warning(
1285 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state"
1286 )
1287 return
1289 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one()
1291 job_id = send_postcard(
1292 recipient_name=user_name,
1293 address_line_1=attempt.address_line_1,
1294 address_line_2=attempt.address_line_2,
1295 city=attempt.city,
1296 state=attempt.state,
1297 postal_code=attempt.postal_code,
1298 country=attempt.country_code,
1299 verification_code=not_none(attempt.verification_code),
1300 )
1302 attempt.mypostcard_job_id = job_id
1303 attempt.status = PostalVerificationStatus.awaiting_verification
1304 attempt.postcard_sent_at = func.now()
1306 postcards_sent_counter.labels(country_code=attempt.country_code).inc()
1308 context = make_background_user_context(attempt.user_id)
1309 log_event(
1310 context,
1311 session,
1312 "postcard.sent",
1313 {
1314 "attempt_id": attempt.id,
1315 "country": attempt.country_code,
1316 "city": attempt.city,
1317 "mypostcard_job_id": job_id,
1318 },
1319 )
1321 notify(
1322 session,
1323 user_id=attempt.user_id,
1324 topic_action=NotificationTopicAction.postal_verification__postcard_sent,
1325 key="",
1326 data=notification_data_pb2.PostalVerificationPostcardSent(
1327 city=attempt.city,
1328 country=attempt.country_code,
1329 ),
1330 )
1333def check_mypostcard_jobs(payload: empty_pb2.Empty) -> None:
1334 """
1335 Checks that all MyPostcard jobs from the last week are tied to a postal verification attempt.
1336 """
1337 if not experimentation.get_global_boolean_value("postal_verification_enabled", default=False):
1338 return
1340 with session_scope() as session:
1341 mypostcard_job_ids = set(
1342 get_order_ids(
1343 date_from=(now() - timedelta(days=7)).date(),
1344 date_to=now().date(),
1345 )
1346 )
1348 known_job_ids = set(
1349 session.execute(
1350 select(PostalVerificationAttempt.mypostcard_job_id).where(
1351 PostalVerificationAttempt.mypostcard_job_id.isnot(None),
1352 PostalVerificationAttempt.created >= now() - timedelta(days=14),
1353 )
1354 )
1355 .scalars()
1356 .all()
1357 )
1359 orphaned = mypostcard_job_ids - known_job_ids
1360 if orphaned:
1361 report_message(
1362 f"Found {len(orphaned)} orphaned MyPostcard jobs not tied to any verification attempt: {orphaned}"
1363 )
1366class DatabaseInconsistencyError(Exception):
1367 """Raised when database consistency checks fail"""
1369 pass
1372def check_database_consistency(payload: empty_pb2.Empty) -> None:
1373 """
1374 Checks database consistency and raises an exception if any issues are found.
1375 """
1376 logger.info("Checking database consistency")
1377 errors = []
1379 with session_scope() as session:
1380 # Check that all users have a profile gallery
1381 users_without_gallery = session.execute(
1382 select(User.id, User.username).where(User.profile_gallery_id.is_(None))
1383 ).all()
1384 if users_without_gallery:
1385 errors.append(f"Users without profile gallery: {users_without_gallery}")
1387 # Check that all profile galleries point to their owner
1388 mismatched_galleries = session.execute(
1389 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id)
1390 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id)
1391 .where(User.profile_gallery_id.is_not(None))
1392 .where(PhotoGallery.owner_user_id != User.id)
1393 ).all()
1394 if mismatched_galleries: 1394 ↛ 1395line 1394 didn't jump to line 1395 because the condition on line 1394 was never true
1395 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}")
1397 # === Moderation System Consistency Checks ===
1399 # Check every ModerationState has at least one INITIAL_REVIEW queue item
1400 # Skip items with ID < 2000000 as they were created before this check was introduced
1401 states_without_initial_review = session.execute(
1402 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1403 ModerationState.id >= 2000000,
1404 ~exists(
1405 select(1)
1406 .where(ModerationQueueItem.moderation_state_id == ModerationState.id)
1407 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review)
1408 ),
1409 )
1410 ).all()
1411 if states_without_initial_review: 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true
1412 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}")
1414 # Check every ModerationState has a CREATE log entry
1415 # Skip items with ID < 2000000 as they were created before this check was introduced
1416 states_without_create_log = session.execute(
1417 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1418 ModerationState.id >= 2000000,
1419 ~exists(
1420 select(1)
1421 .where(ModerationLog.moderation_state_id == ModerationState.id)
1422 .where(ModerationLog.action == ModerationAction.create)
1423 ),
1424 )
1425 ).all()
1426 if states_without_create_log: 1426 ↛ 1427line 1426 didn't jump to line 1427 because the condition on line 1426 was never true
1427 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}")
1429 # Check resolved queue items point to log entries for the same moderation state
1430 resolved_item_log_mismatches = session.execute(
1431 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id)
1432 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1433 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1434 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id)
1435 ).all()
1436 if resolved_item_log_mismatches: 1436 ↛ 1437line 1436 didn't jump to line 1437 because the condition on line 1436 was never true
1437 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}")
1439 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it
1440 hr_states = (
1441 session.execute(
1442 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.host_request)
1443 )
1444 .scalars()
1445 .all()
1446 )
1447 for state_id in hr_states: 1447 ↛ 1448line 1447 didn't jump to line 1448 because the loop on line 1447 never started
1448 hr_count = session.execute(
1449 select(func.count()).where(HostRequest.moderation_state_id == state_id)
1450 ).scalar_one()
1451 if hr_count != 1:
1452 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)")
1454 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it
1455 gc_states = (
1456 session.execute(
1457 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.group_chat)
1458 )
1459 .scalars()
1460 .all()
1461 )
1462 for state_id in gc_states:
1463 gc_count = session.execute(
1464 select(func.count()).where(GroupChat.moderation_state_id == state_id)
1465 ).scalar_one()
1466 if gc_count != 1: 1466 ↛ 1467line 1466 didn't jump to line 1467 because the condition on line 1466 was never true
1467 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)")
1469 # Check ModerationState.object_id matches the actual object's ID
1470 hr_object_id_mismatches = session.execute(
1471 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id)
1472 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id)
1473 .where(ModerationState.object_type == ModerationObjectType.host_request)
1474 .where(ModerationState.object_id != HostRequest.conversation_id)
1475 ).all()
1476 if hr_object_id_mismatches: 1476 ↛ 1477line 1476 didn't jump to line 1477 because the condition on line 1476 was never true
1477 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}")
1479 gc_object_id_mismatches = session.execute(
1480 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id)
1481 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id)
1482 .where(ModerationState.object_type == ModerationObjectType.group_chat)
1483 .where(ModerationState.object_id != GroupChat.conversation_id)
1484 ).all()
1485 if gc_object_id_mismatches: 1485 ↛ 1486line 1485 didn't jump to line 1486 because the condition on line 1485 was never true
1486 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}")
1488 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState
1489 hr_reverse_mismatches = session.execute(
1490 select(
1491 HostRequest.conversation_id,
1492 HostRequest.moderation_state_id,
1493 ModerationState.object_type,
1494 ModerationState.object_id,
1495 )
1496 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id)
1497 .where(
1498 (ModerationState.object_type != ModerationObjectType.host_request)
1499 | (ModerationState.object_id != HostRequest.conversation_id)
1500 )
1501 ).all()
1502 if hr_reverse_mismatches: 1502 ↛ 1503line 1502 didn't jump to line 1503 because the condition on line 1502 was never true
1503 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}")
1505 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState
1506 gc_reverse_mismatches = session.execute(
1507 select(
1508 GroupChat.conversation_id,
1509 GroupChat.moderation_state_id,
1510 ModerationState.object_type,
1511 ModerationState.object_id,
1512 )
1513 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id)
1514 .where(
1515 (ModerationState.object_type != ModerationObjectType.group_chat)
1516 | (ModerationState.object_id != GroupChat.conversation_id)
1517 )
1518 ).all()
1519 if gc_reverse_mismatches: 1519 ↛ 1520line 1519 didn't jump to line 1520 because the condition on line 1519 was never true
1520 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}")
1522 # Ensure auto-approve deadline isn't being exceeded by a significant margin
1523 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting
1524 deadline_seconds = config.MODERATION_AUTO_APPROVE_DEADLINE_SECONDS
1525 if deadline_seconds > 0: 1525 ↛ 1526line 1525 didn't jump to line 1526 because the condition on line 1525 was never true
1526 grace_period = timedelta(minutes=5)
1527 stale_initial_review_items = session.execute(
1528 select(
1529 ModerationQueueItem.id,
1530 ModerationQueueItem.moderation_state_id,
1531 ModerationQueueItem.time_created,
1532 )
1533 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review)
1534 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
1535 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period)
1536 ).all()
1537 if stale_initial_review_items:
1538 errors.append(
1539 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}"
1540 )
1542 if errors:
1543 raise DatabaseInconsistencyError("\n".join(errors))
1546def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None:
1547 """
1548 Dead man's switch: approves unresolved INITIAL_REVIEW content older than the deadline to VISIBLE, then
1549 re-flags it as a high-priority MACHINE_FLAG superseding only the INITIAL_REVIEW item. The switch only fires
1550 when moderators are behind, so every auto-approved item stays in the queue for a human to check. Other open
1551 flags are untouched, and items already actioned by moderators are left alone.
1552 """
1553 deadline_seconds = config.MODERATION_AUTO_APPROVE_DEADLINE_SECONDS
1554 if deadline_seconds <= 0:
1555 return
1557 with session_scope() as session:
1558 ctx = make_background_user_context(user_id=config.MODERATION_BOT_USER_ID)
1560 items = (
1561 Moderation()
1562 .GetModerationQueue(
1563 request=moderation_pb2.GetModerationQueueReq(
1564 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW],
1565 unresolved_only=True,
1566 page_size=100,
1567 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)),
1568 ),
1569 context=ctx,
1570 session=session,
1571 )
1572 .queue_items
1573 )
1575 if not items:
1576 return
1578 # Skip items whose author is shadowed; their content stays in shadowed state indefinitely
1579 approvable = [item for item in items if not item.moderation_state.author.shadowed]
1580 if not approvable:
1581 return
1583 logger.info(f"Auto-approving {len(approvable)} moderation queue items")
1584 reason = f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded."
1585 for item in approvable:
1586 Moderation().ModerateContent(
1587 request=moderation_pb2.ModerateContentReq(
1588 moderation_state_id=item.moderation_state_id,
1589 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1590 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1591 reason=reason,
1592 clear_flags=False,
1593 ),
1594 context=ctx,
1595 session=session,
1596 )
1597 Moderation().ModerateContent(
1598 request=moderation_pb2.ModerateContentReq(
1599 moderation_state_id=item.moderation_state_id,
1600 action=moderation_pb2.MODERATION_ACTION_FLAG,
1601 trigger=moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG,
1602 priority=MODERATION_AUTO_APPROVE_FLAG_PRIORITY,
1603 reason=reason,
1604 supersede_queue_item_id=item.queue_item_id,
1605 ),
1606 context=ctx,
1607 session=session,
1608 )
1609 moderation_auto_approved_counter.inc(len(approvable))