Coverage for app / backend / src / couchers / jobs / handlers.py: 89%
474 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1"""
2Background job servicers
3"""
5import logging
6from collections.abc import Sequence
7from datetime import date, timedelta
8from math import cos, pi, sin, sqrt
9from random import sample
10from typing import Any
12import requests
13from google.protobuf import empty_pb2
14from sqlalchemy import ColumnElement, Float, Function, Integer, select
15from sqlalchemy.orm import aliased
16from sqlalchemy.sql import (
17 and_,
18 case,
19 cast,
20 delete,
21 distinct,
22 exists,
23 extract,
24 func,
25 literal,
26 not_,
27 or_,
28 union_all,
29 update,
30)
32from couchers.config import config
33from couchers.constants import (
34 ACTIVENESS_PROBE_EXPIRY_TIME,
35 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
36 ACTIVENESS_PROBE_TIME_REMINDERS,
37 EVENT_REMINDER_TIMEDELTA,
38 HOST_REQUEST_MAX_REMINDERS,
39 HOST_REQUEST_REMINDER_INTERVAL,
40)
41from couchers.context import make_background_user_context
42from couchers.crypto import (
43 USER_LOCATION_RANDOMIZATION_NAME,
44 asym_encrypt,
45 b64decode,
46 get_secret,
47 simple_decrypt,
48 stable_secure_uniform,
49)
50from couchers.db import session_scope
51from couchers.email.dev import print_dev_email
52from couchers.email.smtp import send_smtp_email
53from couchers.event_log import log_event
54from couchers.helpers.badges import user_add_badge, user_remove_badge
55from couchers.helpers.completed_profile import has_completed_profile_expression
56from couchers.materialized_views import (
57 UserResponseRate,
58)
59from couchers.metrics import (
60 moderation_auto_approved_counter,
61 postcards_sent_counter,
62 push_notification_counter,
63 strong_verification_completions_counter,
64)
65from couchers.models import (
66 AccountDeletionToken,
67 ActivenessProbe,
68 ActivenessProbeStatus,
69 Cluster,
70 ClusterRole,
71 ClusterSubscription,
72 EventOccurrence,
73 EventOccurrenceAttendee,
74 GroupChat,
75 GroupChatSubscription,
76 HostingStatus,
77 HostRequest,
78 HostRequestStatus,
79 LoginToken,
80 MeetupStatus,
81 Message,
82 MessageType,
83 ModerationAction,
84 ModerationLog,
85 ModerationObjectType,
86 ModerationQueueItem,
87 ModerationState,
88 ModerationTrigger,
89 PassportSex,
90 PasswordResetToken,
91 PhotoGallery,
92 PostalVerificationAttempt,
93 PostalVerificationStatus,
94 PushNotificationDeliveryAttempt,
95 PushNotificationSubscription,
96 Reference,
97 StrongVerificationAttempt,
98 StrongVerificationAttemptStatus,
99 User,
100 UserBadge,
101 Volunteer,
102)
103from couchers.models.notifications import NotificationTopicAction
104from couchers.notifications.expo_api import get_expo_push_receipts
105from couchers.notifications.notify import notify
106from couchers.postal.my_postcard import get_order_ids, send_postcard
107from couchers.proto import moderation_pb2, notification_data_pb2
108from couchers.proto.internal import internal_pb2, jobs_pb2
109from couchers.resources import get_badge_dict, get_static_badge_dict
110from couchers.sentry import report_message
111from couchers.servicers.api import user_model_to_pb
112from couchers.servicers.events import (
113 event_to_pb,
114)
115from couchers.servicers.moderation import Moderation
116from couchers.servicers.requests import host_request_to_pb
117from couchers.sql import (
118 users_visible_to_each_other,
119 where_moderated_content_visible,
120 where_moderated_content_visible_to_user_column,
121 where_user_columns_visible_to_each_other,
122 where_users_column_visible,
123)
124from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
125from couchers.tasks import send_duplicate_strong_verification_email
126from couchers.utils import (
127 Timestamp_from_datetime,
128 create_coordinate,
129 get_coordinates,
130 not_none,
131 now,
132)
134logger = logging.getLogger(__name__)
137def send_email(payload: jobs_pb2.SendEmailPayload) -> None:
138 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
139 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
140 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
141 # the sender must return a models.Email object that can be added to the database
142 email = sender(
143 sender_name=payload.sender_name,
144 sender_email=payload.sender_email,
145 recipient=payload.recipient,
146 subject=payload.subject,
147 plain=payload.plain,
148 html=payload.html,
149 list_unsubscribe_header=payload.list_unsubscribe_header,
150 source_data=payload.source_data,
151 )
152 with session_scope() as session:
153 session.add(email)
156def purge_login_tokens(payload: empty_pb2.Empty) -> None:
157 logger.info("Purging login tokens")
158 with session_scope() as session:
159 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
162def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None:
163 logger.info("Purging login tokens")
164 with session_scope() as session:
165 session.execute(
166 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
167 )
170def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None:
171 logger.info("Purging account deletion tokens")
172 with session_scope() as session:
173 session.execute(
174 delete(AccountDeletionToken)
175 .where(~AccountDeletionToken.is_valid)
176 .execution_options(synchronize_session=False)
177 )
180def send_message_notifications(payload: empty_pb2.Empty) -> None:
181 """
182 Sends out email notifications for messages that have been unseen for a long enough time
183 """
184 # very crude and dumb algorithm
185 logger.info("Sending out email notifications for unseen messages")
187 with session_scope() as session:
188 # users who have unnotified messages older than 5 minutes in any group chat
189 users = (
190 session.execute(
191 where_moderated_content_visible_to_user_column(
192 select(User)
193 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
194 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
195 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
196 GroupChat,
197 User.id,
198 )
199 .where(not_(GroupChatSubscription.is_muted))
200 .where(User.is_visible)
201 .where(Message.time >= GroupChatSubscription.joined)
202 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
203 .where(Message.id > User.last_notified_message_id)
204 .where(Message.id > GroupChatSubscription.last_seen_message_id)
205 .where(Message.time < now() - timedelta(minutes=5))
206 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
207 )
208 .scalars()
209 .unique()
210 )
212 for user in users:
213 context = make_background_user_context(user_id=user.id)
214 # now actually grab all the group chats, not just less than 5 min old
215 subquery = (
216 where_users_column_visible(
217 where_moderated_content_visible(
218 select(
219 GroupChatSubscription.group_chat_id.label("group_chat_id"),
220 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
221 func.max(Message.id).label("message_id"),
222 func.count(Message.id).label("count_unseen"),
223 )
224 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
225 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
226 context,
227 GroupChat,
228 is_list_operation=True,
229 )
230 .where(GroupChatSubscription.user_id == user.id)
231 .where(not_(GroupChatSubscription.is_muted))
232 .where(Message.id > user.last_notified_message_id)
233 .where(Message.id > GroupChatSubscription.last_seen_message_id)
234 .where(Message.time >= GroupChatSubscription.joined)
235 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
236 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)),
237 context,
238 Message.author_id,
239 )
240 .group_by(GroupChatSubscription.group_chat_id)
241 .order_by(func.max(Message.id).desc())
242 .subquery()
243 )
245 unseen_messages = session.execute(
246 where_moderated_content_visible(
247 select(GroupChat, Message, subquery.c.count_unseen)
248 .join(subquery, subquery.c.message_id == Message.id)
249 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id),
250 context,
251 GroupChat,
252 is_list_operation=True,
253 ).order_by(subquery.c.message_id.desc())
254 ).all()
256 if not unseen_messages:
257 continue
259 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
261 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str:
262 if group_chat.is_dm:
263 return f"You missed {count_unseen} message(s) from {message.author.name}"
264 else:
265 return f"You missed {count_unseen} message(s) in {group_chat.title}"
267 notify(
268 session,
269 user_id=user.id,
270 topic_action=NotificationTopicAction.chat__missed_messages,
271 key="",
272 data=notification_data_pb2.ChatMissedMessages(
273 messages=[
274 notification_data_pb2.ChatMessage(
275 author=user_model_to_pb(
276 message.author,
277 session,
278 context,
279 ),
280 message=format_title(message, group_chat, count_unseen),
281 text=message.text,
282 group_chat_id=message.conversation_id,
283 )
284 for group_chat, message, count_unseen in unseen_messages
285 ],
286 ),
287 )
288 session.commit()
291def send_request_notifications(payload: empty_pb2.Empty) -> None:
292 """
293 Sends out email notifications for unseen messages in host requests (as surfer or host)
294 """
295 logger.info("Sending out email notifications for unseen messages in host requests")
297 with session_scope() as session:
298 # Get all candidate users who might have unseen request messages.
299 # Drive from host_requests/messages (selective) rather than scanning all users (expensive).
300 surfer_ids = (
301 select(User.id)
302 .join(HostRequest, HostRequest.initiator_user_id == User.id)
303 .join(Message, Message.conversation_id == HostRequest.conversation_id)
304 .where(User.is_visible)
305 .where(Message.id > HostRequest.initiator_last_seen_message_id)
306 .where(Message.id > User.last_notified_request_message_id)
307 .where(Message.time < now() - timedelta(minutes=5))
308 .where(Message.message_type == MessageType.text)
309 )
310 host_ids = (
311 select(User.id)
312 .join(HostRequest, HostRequest.recipient_user_id == User.id)
313 .join(Message, Message.conversation_id == HostRequest.conversation_id)
314 .where(User.is_visible)
315 .where(Message.id > HostRequest.recipient_last_seen_message_id)
316 .where(Message.id > User.last_notified_request_message_id)
317 .where(Message.time < now() - timedelta(minutes=5))
318 .where(Message.message_type == MessageType.text)
319 )
320 candidate_user_ids = session.execute(union_all(surfer_ids, host_ids)).scalars().unique().all()
322 for user_id in candidate_user_ids:
323 context = make_background_user_context(user_id=user_id)
325 # requests where this user is surfing
326 surfing_reqs = session.execute(
327 where_users_column_visible(
328 where_moderated_content_visible_to_user_column(
329 select(User, HostRequest, func.max(Message.id))
330 .where(User.id == user_id)
331 .join(HostRequest, HostRequest.initiator_user_id == User.id),
332 HostRequest,
333 HostRequest.initiator_user_id,
334 ),
335 context,
336 HostRequest.recipient_user_id,
337 )
338 .join(Message, Message.conversation_id == HostRequest.conversation_id)
339 .where(Message.id > HostRequest.initiator_last_seen_message_id)
340 .where(Message.id > User.last_notified_request_message_id)
341 .where(Message.time < now() - timedelta(minutes=5))
342 .where(Message.message_type == MessageType.text)
343 .group_by(User, HostRequest) # type: ignore[arg-type]
344 ).all()
346 # where this user is hosting
347 hosting_reqs = session.execute(
348 where_users_column_visible(
349 where_moderated_content_visible_to_user_column(
350 select(User, HostRequest, func.max(Message.id))
351 .where(User.id == user_id)
352 .join(HostRequest, HostRequest.recipient_user_id == User.id),
353 HostRequest,
354 HostRequest.recipient_user_id,
355 ),
356 context,
357 HostRequest.initiator_user_id,
358 )
359 .join(Message, Message.conversation_id == HostRequest.conversation_id)
360 .where(Message.id > HostRequest.recipient_last_seen_message_id)
361 .where(Message.id > User.last_notified_request_message_id)
362 .where(Message.time < now() - timedelta(minutes=5))
363 .where(Message.message_type == MessageType.text)
364 .group_by(User, HostRequest) # type: ignore[arg-type]
365 ).all()
367 for user, host_request, max_message_id in surfing_reqs:
368 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
369 session.flush()
371 notify(
372 session,
373 user_id=user.id,
374 topic_action=NotificationTopicAction.host_request__missed_messages,
375 key=str(host_request.conversation_id),
376 data=notification_data_pb2.HostRequestMissedMessages(
377 host_request=host_request_to_pb(host_request, session, context),
378 user=user_model_to_pb(host_request.recipient, session, context),
379 am_host=False,
380 ),
381 )
383 for user, host_request, max_message_id in hosting_reqs:
384 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
385 session.flush()
387 # When a host request is created, the recipient immediately receives a
388 # host_request__create notification that includes the initial message text.
389 # A few minutes later, this background job sees that same message as "unseen"
390 # (the recipient hasn't opened the request yet) and would send a duplicate
391 # missed_messages notification.
392 #
393 # To prevent this, we check if the only unseen text message in this host
394 # request is the very first text message in the conversation (i.e. the
395 # creation message). If so, we skip sending missed_messages — the user was
396 # already notified via host_request__create.
397 #
398 # Advancing last_notified_request_message_id above is safe even when we skip
399 # the notification: this watermark is only ever advanced when we process all
400 # unseen messages for the user, so skipping one notification doesn't cause us
401 # to miss future messages in other host requests.
402 only_creation_message = not session.execute(
403 select(
404 select(func.count())
405 .where(Message.conversation_id == host_request.conversation_id)
406 .where(Message.message_type == MessageType.text)
407 .scalar_subquery()
408 > 1
409 )
410 ).scalar_one()
411 if only_creation_message:
412 continue
414 notify(
415 session,
416 user_id=user.id,
417 topic_action=NotificationTopicAction.host_request__missed_messages,
418 key=str(host_request.conversation_id),
419 data=notification_data_pb2.HostRequestMissedMessages(
420 host_request=host_request_to_pb(host_request, session, context),
421 user=user_model_to_pb(host_request.initiator, session, context),
422 am_host=True,
423 ),
424 )
427def send_onboarding_emails(payload: empty_pb2.Empty) -> None:
428 """
429 Sends out onboarding emails
430 """
431 logger.info("Sending out onboarding emails")
433 with session_scope() as session:
434 # first onboarding email
435 users = (
436 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
437 )
439 for user in users:
440 notify(
441 session,
442 user_id=user.id,
443 topic_action=NotificationTopicAction.onboarding__reminder,
444 key="1",
445 )
446 user.onboarding_emails_sent = 1
447 user.last_onboarding_email_sent = now()
448 session.commit()
450 # second onboarding email
451 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
452 users = (
453 session.execute(
454 select(User)
455 .where(User.is_visible)
456 .where(User.onboarding_emails_sent == 1)
457 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
458 .where(~has_completed_profile_expression())
459 )
460 .scalars()
461 .all()
462 )
464 for user in users:
465 notify(
466 session,
467 user_id=user.id,
468 topic_action=NotificationTopicAction.onboarding__reminder,
469 key="2",
470 )
471 user.onboarding_emails_sent = 2
472 user.last_onboarding_email_sent = now()
473 session.commit()
476def send_reference_reminders(payload: empty_pb2.Empty) -> None:
477 """
478 Sends out reminders to write references after hosting/staying
479 """
480 logger.info("Sending out reference reminder emails")
482 # Keep this in chronological order!
483 reference_reminder_schedule = [
484 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
485 # the end time to write a reference is supposed to be midnight in the host's timezone
486 # 8 pm ish on the last day of the stay
487 (1, timedelta(days=15) - timedelta(hours=20), 14),
488 # 2 pm ish a week after stay
489 (2, timedelta(days=8) - timedelta(hours=14), 7),
490 # 10 am ish 3 days before end of time to write ref
491 (3, timedelta(days=4) - timedelta(hours=10), 3),
492 ]
494 with session_scope() as session:
495 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
496 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
497 user = aliased(User)
498 other_user = aliased(User)
499 # surfers needing to write a ref
500 q1 = (
501 select(literal(True), HostRequest, user, other_user)
502 .join(user, user.id == HostRequest.initiator_user_id)
503 .join(other_user, other_user.id == HostRequest.recipient_user_id)
504 .outerjoin(
505 Reference,
506 and_(
507 Reference.host_request_id == HostRequest.conversation_id,
508 # if no reference is found in this join, then the surfer has not written a ref
509 Reference.from_user_id == HostRequest.initiator_user_id,
510 ),
511 )
512 .where(Reference.id == None)
513 .where(HostRequest.can_write_reference)
514 .where(HostRequest.initiator_sent_reference_reminders < reminder_number)
515 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
516 .where(HostRequest.initiator_reason_didnt_meetup == None)
517 .where(users_visible_to_each_other(user, other_user))
518 )
520 # hosts needing to write a ref
521 q2 = (
522 select(literal(False), HostRequest, user, other_user)
523 .join(user, user.id == HostRequest.recipient_user_id)
524 .join(other_user, other_user.id == HostRequest.initiator_user_id)
525 .outerjoin(
526 Reference,
527 and_(
528 Reference.host_request_id == HostRequest.conversation_id,
529 # if no reference is found in this join, then the host has not written a ref
530 Reference.from_user_id == HostRequest.recipient_user_id,
531 ),
532 )
533 .where(Reference.id == None)
534 .where(HostRequest.can_write_reference)
535 .where(HostRequest.recipient_sent_reference_reminders < reminder_number)
536 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
537 .where(HostRequest.recipient_reason_didnt_meetup == None)
538 .where(users_visible_to_each_other(user, other_user))
539 )
541 union = union_all(q1, q2).subquery()
542 query = select(
543 union.c[0].label("surfed"),
544 aliased(HostRequest, union),
545 aliased(user, union),
546 aliased(other_user, union),
547 )
548 reference_reminders = session.execute(query).all()
550 for surfed, host_request, user, other_user in reference_reminders:
551 # visibility and blocking already checked in sql
552 assert user.is_visible
553 context = make_background_user_context(user_id=user.id)
554 topic_action = (
555 NotificationTopicAction.reference__reminder_surfed
556 if surfed
557 else NotificationTopicAction.reference__reminder_hosted
558 )
559 notify(
560 session,
561 user_id=user.id,
562 topic_action=topic_action,
563 key=str(host_request.conversation_id),
564 data=notification_data_pb2.ReferenceReminder(
565 host_request_id=host_request.conversation_id,
566 other_user=user_model_to_pb(other_user, session, context),
567 days_left=reminder_days_left,
568 ),
569 )
570 if surfed:
571 host_request.initiator_sent_reference_reminders = reminder_number
572 else:
573 host_request.recipient_sent_reference_reminders = reminder_number
574 session.commit()
577def send_host_request_reminders(payload: empty_pb2.Empty) -> None:
578 with session_scope() as session:
579 host_has_sent_message = select(1).where(
580 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.recipient_user_id
581 )
583 requests = (
584 session.execute(
585 where_user_columns_visible_to_each_other(
586 where_moderated_content_visible_to_user_column(
587 select(HostRequest),
588 HostRequest,
589 HostRequest.recipient_user_id,
590 )
591 .where(HostRequest.status == HostRequestStatus.pending)
592 .where(HostRequest.recipient_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
593 .where(HostRequest.start_time > func.now())
594 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
595 .where(~exists(host_has_sent_message)),
596 HostRequest.recipient_user_id,
597 HostRequest.initiator_user_id,
598 )
599 )
600 .scalars()
601 .all()
602 )
604 for host_request in requests:
605 host_request.recipient_sent_request_reminders += 1
606 host_request.last_sent_request_reminder_time = now()
608 context = make_background_user_context(user_id=host_request.recipient_user_id)
609 notify(
610 session,
611 user_id=host_request.recipient_user_id,
612 topic_action=NotificationTopicAction.host_request__reminder,
613 key=str(host_request.conversation_id),
614 data=notification_data_pb2.HostRequestReminder(
615 host_request=host_request_to_pb(host_request, session, context),
616 surfer=user_model_to_pb(host_request.initiator, session, context),
617 ),
618 moderation_state_id=host_request.moderation_state_id,
619 )
621 session.commit()
624def add_users_to_email_list(payload: empty_pb2.Empty) -> None:
625 if not config["LISTMONK_ENABLED"]: 625 ↛ 626line 625 didn't jump to line 626 because the condition on line 625 was never true
626 logger.info("Not adding users to mailing list")
627 return
629 logger.info("Adding users to mailing list")
631 while True:
632 with session_scope() as session:
633 user = session.execute(
634 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
635 ).scalar_one_or_none()
636 if not user:
637 logger.info("Finished adding users to mailing list")
638 return
640 if user.opt_out_of_newsletter:
641 user.in_sync_with_newsletter = True
642 session.commit()
643 continue
645 r = requests.post(
646 config["LISTMONK_BASE_URL"] + "/api/subscribers",
647 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
648 json={
649 "email": user.email,
650 "name": user.name,
651 "lists": [config["LISTMONK_LIST_ID"]],
652 "preconfirm_subscriptions": True,
653 "attribs": {"couchers_user_id": user.id},
654 "status": "enabled",
655 },
656 timeout=10,
657 )
658 # the API returns if the user is already subscribed
659 if r.status_code == 200 or r.status_code == 409: 659 ↛ 663line 659 didn't jump to line 663 because the condition on line 659 was always true
660 user.in_sync_with_newsletter = True
661 session.commit()
662 else:
663 raise Exception("Failed to add users to mailing list")
666def enforce_community_membership(payload: empty_pb2.Empty) -> None:
667 tasks_enforce_community_memberships()
670def update_recommendation_scores(payload: empty_pb2.Empty) -> None:
671 text_fields = [
672 User.hometown,
673 User.occupation,
674 User.education,
675 User.about_me,
676 User.things_i_like,
677 User.about_place,
678 User.additional_information,
679 User.pet_details,
680 User.kid_details,
681 User.housemate_details,
682 User.other_host_info,
683 User.sleeping_details,
684 User.area,
685 User.house_rules,
686 ]
687 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
689 def poor_man_gaussian() -> ColumnElement[float] | float:
690 """
691 Produces an approximatley std normal random variate
692 """
693 trials = 5
694 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
696 def int_(stmt: Any) -> Function[int]:
697 return func.coalesce(cast(stmt, Integer), 0)
699 def float_(stmt: Any) -> Function[float]:
700 return func.coalesce(cast(stmt, Float), 0.0)
702 with session_scope() as session:
703 # profile
704 profile_text = ""
705 for field in text_fields:
706 profile_text += func.coalesce(field, "") # type: ignore[assignment]
707 text_length = func.length(profile_text)
708 home_text = ""
709 for field in home_fields:
710 home_text += func.coalesce(field, "") # type: ignore[assignment]
711 home_length = func.length(home_text)
713 filled_profile = int_(has_completed_profile_expression())
714 has_text = int_(text_length > 500)
715 long_text = int_(text_length > 2000)
716 can_host = int_(User.hosting_status == HostingStatus.can_host)
717 may_host = int_(User.hosting_status == HostingStatus.maybe)
718 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
719 filled_home = int_(User.has_completed_my_home)
720 filled_home_lots = int_(home_length > 200)
721 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
722 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
724 # references
725 left_ref_expr = int_(1).label("left_reference")
726 left_refs_subquery = (
727 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
728 )
729 left_reference = int_(left_refs_subquery.c.left_reference)
730 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
731 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
732 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
733 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
734 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
735 "has_bad_ref"
736 )
737 received_ref_subquery = (
738 select(
739 Reference.to_user_id.label("user_id"),
740 has_reference_expr,
741 has_multiple_types_expr,
742 has_bad_ref_expr,
743 ref_count_expr,
744 ref_avg_expr,
745 )
746 .group_by(Reference.to_user_id)
747 .subquery()
748 )
749 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
750 has_reference = int_(received_ref_subquery.c.has_reference)
751 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
752 rating_score = float_(
753 received_ref_subquery.c.ref_avg
754 * (
755 2 * func.least(received_ref_subquery.c.ref_count, 5)
756 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
757 )
758 )
759 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
761 # activeness
762 recently_active = int_(User.last_active >= now() - timedelta(days=180))
763 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
764 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
765 messaged_lots = int_(func.count(Message.id) > 5)
766 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
767 messaging_subquery = (
768 select(Message.author_id.label("user_id"), messaging_points_subquery)
769 .where(Message.message_type == MessageType.text)
770 .group_by(Message.author_id)
771 .subquery()
772 )
773 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
775 # verification
776 cb_subquery = (
777 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
778 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
779 .where(ClusterSubscription.role == ClusterRole.admin)
780 .where(Cluster.is_official_cluster)
781 .group_by(ClusterSubscription.user_id)
782 .subquery()
783 )
784 min_node_id = cb_subquery.c.min_node_id
785 cb = int_(min_node_id >= 1)
786 wcb = int_(min_node_id == 1)
787 badge_points = {
788 "founder": 100,
789 "board_member": 20,
790 "past_board_member": 5,
791 "strong_verification": 3,
792 "volunteer": 3,
793 "past_volunteer": 2,
794 "donor": 1,
795 "phone_verified": 1,
796 }
798 badge_subquery = (
799 select(
800 UserBadge.user_id.label("user_id"),
801 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
802 )
803 .group_by(UserBadge.user_id)
804 .subquery()
805 )
807 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
809 # response rate
810 hr_subquery = select(
811 UserResponseRate.user_id,
812 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
813 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
814 ).subquery()
815 response_time_33p = hr_subquery.c.response_time_33p
816 response_time_66p = hr_subquery.c.response_time_66p
817 # be careful with nulls
818 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
820 recommendation_score = (
821 hosting_status_points
822 + profile_points
823 + ref_score
824 + activeness_points
825 + other_points
826 + response_rate_points
827 + 2 * poor_man_gaussian()
828 )
830 scores = (
831 select(User.id.label("user_id"), recommendation_score.label("score"))
832 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
833 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
834 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
835 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
836 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
837 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
838 ).subquery()
840 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id))
842 logger.info("Updated recommendation scores")
845def update_badges(payload: empty_pb2.Empty) -> None:
846 with session_scope() as session:
848 def update_badge(badge_id: str, members: Sequence[int]) -> None:
849 badge = get_badge_dict()[badge_id]
850 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all()
851 # in case the user ids don't exist in the db
852 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
853 # we should add the badge to these
854 add = set(actual_members) - set(user_ids)
855 # we should remove the badge from these
856 remove = set(user_ids) - set(actual_members)
857 for user_id in add:
858 user_add_badge(session, user_id, badge.id)
860 for user_id in remove:
861 user_remove_badge(session, user_id, badge.id)
863 update_badge("founder", get_static_badge_dict()["founder"])
864 update_badge("board_member", get_static_badge_dict()["board_member"])
865 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
866 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all())
867 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
868 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
869 # strong verification requires passport on file + gender/sex correspondence and date of birth match
870 update_badge(
871 "strong_verification",
872 session.execute(
873 select(User.id)
874 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
875 .where(StrongVerificationAttempt.has_strong_verification(User))
876 )
877 .scalars()
878 .all(),
879 )
880 # volunteer badge for active volunteers (stopped_volunteering is null)
881 update_badge(
882 "volunteer",
883 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(),
884 )
885 # past_volunteer badge for past volunteers (stopped_volunteering is not null)
886 update_badge(
887 "past_volunteer",
888 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None)))
889 .scalars()
890 .all(),
891 )
894def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None:
895 with session_scope() as session:
896 verification_attempt = session.execute(
897 select(StrongVerificationAttempt)
898 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
899 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
900 ).scalar_one()
901 response = requests.post(
902 "https://passportreader.app/api/v1/session.get",
903 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
904 json={"id": verification_attempt.iris_session_id},
905 timeout=10,
906 verify="/etc/ssl/certs/ca-certificates.crt",
907 )
908 if response.status_code != 200: 908 ↛ 909line 908 didn't jump to line 909 because the condition on line 908 was never true
909 raise Exception(f"Iris didn't return 200: {response.text}")
910 json_data = response.json()
911 reference_payload = internal_pb2.VerificationReferencePayload.FromString(
912 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
913 )
914 assert verification_attempt.user_id == reference_payload.user_id
915 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
916 assert verification_attempt.iris_session_id == json_data["id"]
917 assert json_data["state"] == "APPROVED"
919 if json_data["document_type"] != "PASSPORT":
920 verification_attempt.status = StrongVerificationAttemptStatus.failed
921 notify(
922 session,
923 user_id=verification_attempt.user_id,
924 topic_action=NotificationTopicAction.verification__sv_fail,
925 key="",
926 data=notification_data_pb2.VerificationSVFail(
927 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
928 ),
929 )
930 return
932 assert json_data["document_type"] == "PASSPORT"
934 expiry_date = date.fromisoformat(json_data["expiry_date"])
935 nationality = json_data["nationality"]
936 last_three_document_chars = json_data["document_number"][-3:]
938 existing_attempt = session.execute(
939 select(StrongVerificationAttempt)
940 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
941 .where(StrongVerificationAttempt.passport_nationality == nationality)
942 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
943 .order_by(StrongVerificationAttempt.id)
944 .limit(1)
945 ).scalar_one_or_none()
947 verification_attempt.has_minimal_data = True
948 verification_attempt.passport_expiry_date = expiry_date
949 verification_attempt.passport_nationality = nationality
950 verification_attempt.passport_last_three_document_chars = last_three_document_chars
952 if existing_attempt:
953 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
955 if existing_attempt.user_id != verification_attempt.user_id:
956 session.flush()
957 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
959 notify(
960 session,
961 user_id=verification_attempt.user_id,
962 topic_action=NotificationTopicAction.verification__sv_fail,
963 key="",
964 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
965 )
966 return
968 verification_attempt.has_full_data = True
969 verification_attempt.passport_encrypted_data = asym_encrypt(
970 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
971 )
972 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
973 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
974 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
976 session.flush()
978 strong_verification_completions_counter.inc()
980 user = verification_attempt.user
981 if verification_attempt.has_strong_verification(user): 981 ↛ 996line 981 didn't jump to line 996 because the condition on line 981 was always true
982 badge_id = "strong_verification"
983 if session.execute(
984 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
985 ).scalar_one_or_none():
986 return
988 user_add_badge(session, user.id, badge_id, do_notify=False)
989 notify(
990 session,
991 user_id=verification_attempt.user_id,
992 topic_action=NotificationTopicAction.verification__sv_success,
993 key="",
994 )
995 else:
996 notify(
997 session,
998 user_id=verification_attempt.user_id,
999 topic_action=NotificationTopicAction.verification__sv_fail,
1000 key="",
1001 data=notification_data_pb2.VerificationSVFail(
1002 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
1003 ),
1004 )
1007def send_activeness_probes(payload: empty_pb2.Empty) -> None:
1008 with session_scope() as session:
1009 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
1011 if config["ACTIVENESS_PROBES_ENABLED"]:
1012 # current activeness probes
1013 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
1015 # users who we should send an activeness probe to
1016 new_probe_user_ids = (
1017 session.execute(
1018 select(User.id)
1019 .where(User.is_visible)
1020 .where(User.hosting_status == HostingStatus.can_host)
1021 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
1022 .where(User.id.not_in(select(subquery.c.user_id)))
1023 )
1024 .scalars()
1025 .all()
1026 )
1028 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
1029 probes_today = session.execute(
1030 select(func.count())
1031 .select_from(ActivenessProbe)
1032 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
1033 ).scalar_one()
1035 # send probes to max 2% of users per day
1036 max_probes_per_day = 0.02 * total_users
1037 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
1039 if len(new_probe_user_ids) > max_probe_size: 1039 ↛ 1040line 1039 didn't jump to line 1040 because the condition on line 1039 was never true
1040 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
1042 for user_id in new_probe_user_ids:
1043 session.add(ActivenessProbe(user_id=user_id))
1045 session.commit()
1047 ## Step 2: actually send out probe notifications
1048 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
1049 probes = (
1050 session.execute(
1051 select(ActivenessProbe)
1052 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
1053 .where(ActivenessProbe.probe_initiated + delay < func.now())
1054 .where(ActivenessProbe.is_pending)
1055 )
1056 .scalars()
1057 .all()
1058 )
1060 for probe in probes:
1061 probe.notifications_sent = probe_number_minus_1 + 1
1062 context = make_background_user_context(user_id=probe.user.id)
1063 notify(
1064 session,
1065 user_id=probe.user.id,
1066 topic_action=NotificationTopicAction.activeness__probe,
1067 key=str(probe.id),
1068 data=notification_data_pb2.ActivenessProbe(
1069 reminder_number=probe_number_minus_1 + 1,
1070 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1071 ),
1072 )
1073 session.commit()
1075 ## Step 3: for those who haven't responded, mark them as failed
1076 expired_probes = (
1077 session.execute(
1078 select(ActivenessProbe)
1079 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1080 .where(ActivenessProbe.is_pending)
1081 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1082 )
1083 .scalars()
1084 .all()
1085 )
1087 for probe in expired_probes:
1088 probe.responded = now()
1089 probe.response = ActivenessProbeStatus.expired
1090 if probe.user.hosting_status == HostingStatus.can_host: 1090 ↛ 1092line 1090 didn't jump to line 1092 because the condition on line 1090 was always true
1091 probe.user.hosting_status = HostingStatus.maybe
1092 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1092 ↛ 1094line 1092 didn't jump to line 1094 because the condition on line 1092 was always true
1093 probe.user.meetup_status = MeetupStatus.open_to_meetup
1094 session.commit()
1097def update_randomized_locations(payload: empty_pb2.Empty) -> None:
1098 """
1099 We generate for each user a randomized location as follows:
1100 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1101 - For each user, mix in the user_id for randomness
1102 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1103 - Generate an angle from [0, 360]
1104 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1105 """
1106 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1108 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]:
1109 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1110 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1111 radius = 0.02 + 0.08 * radius_u
1112 angle_rad = 2 * pi * angle_u
1113 offset_lng = radius * cos(angle_rad)
1114 offset_lat = radius * sin(angle_rad)
1115 return lat + offset_lat, lng + offset_lng
1117 user_updates: list[dict[str, Any]] = []
1119 with session_scope() as session:
1120 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1122 for user_id, geom in users_to_update:
1123 lat, lng = get_coordinates(geom)
1124 user_updates.append(
1125 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1126 )
1128 with session_scope() as session:
1129 session.execute(update(User), user_updates)
1132def send_event_reminders(payload: empty_pb2.Empty) -> None:
1133 """
1134 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1135 """
1136 logger.info("Sending event reminder emails")
1138 with session_scope() as session:
1139 occurrences = (
1140 session.execute(
1141 select(EventOccurrence)
1142 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1143 .where(EventOccurrence.start_time >= now())
1144 .where(~EventOccurrence.is_cancelled)
1145 .where(~EventOccurrence.is_deleted)
1146 )
1147 .scalars()
1148 .all()
1149 )
1151 for occurrence in occurrences:
1152 results = session.execute(
1153 select(User, EventOccurrenceAttendee)
1154 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1155 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1156 .where(EventOccurrenceAttendee.reminder_sent == False)
1157 ).all()
1159 for user, attendee in results:
1160 context = make_background_user_context(user_id=user.id)
1162 notify(
1163 session,
1164 user_id=user.id,
1165 topic_action=NotificationTopicAction.event__reminder,
1166 key=str(occurrence.id),
1167 data=notification_data_pb2.EventReminder(
1168 event=event_to_pb(session, occurrence, context),
1169 user=user_model_to_pb(user, session, context),
1170 ),
1171 moderation_state_id=occurrence.moderation_state_id,
1172 )
1174 attendee.reminder_sent = True
1175 session.commit()
1178def check_expo_push_receipts(payload: empty_pb2.Empty) -> None:
1179 """
1180 Check Expo push receipts in batch and update delivery attempts.
1181 """
1182 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max
1184 for iteration in range(MAX_ITERATIONS): 1184 ↛ 1246line 1184 didn't jump to line 1246 because the loop on line 1184 didn't complete
1185 with session_scope() as session:
1186 # Find all delivery attempts that need receipt checking
1187 # Wait 15 minutes per Expo's recommendation before checking receipts
1188 attempts = (
1189 session.execute(
1190 select(PushNotificationDeliveryAttempt)
1191 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None)
1192 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None)
1193 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15))
1194 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24))
1195 .limit(100)
1196 )
1197 .scalars()
1198 .all()
1199 )
1201 if not attempts:
1202 logger.debug("No Expo receipts to check")
1203 return
1205 logger.info(f"Checking {len(attempts)} Expo push receipts")
1207 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts])
1209 for attempt in attempts:
1210 receipt = receipts.get(not_none(attempt.expo_ticket_id))
1212 # Always mark as checked to avoid infinite loops
1213 attempt.receipt_checked_at = now()
1215 if receipt is None:
1216 # Receipt not found after 15min - likely expired (>24h) or never existed
1217 # Per Expo docs: receipts should be available within 15 minutes
1218 attempt.receipt_status = "not_found"
1219 continue
1221 attempt.receipt_status = receipt.get("status")
1223 if receipt.get("status") == "error":
1224 details = receipt.get("details", {})
1225 error_code = details.get("error")
1226 attempt.receipt_error_code = error_code
1228 if error_code == "DeviceNotRegistered": 1228 ↛ 1243line 1228 didn't jump to line 1243 because the condition on line 1228 was always true
1229 # Device token is no longer valid - disable the subscription
1230 sub = session.execute(
1231 select(PushNotificationSubscription).where(
1232 PushNotificationSubscription.id == attempt.push_notification_subscription_id
1233 )
1234 ).scalar_one()
1236 if sub.disabled_at > now(): 1236 ↛ 1209line 1236 didn't jump to line 1209 because the condition on line 1236 was always true
1237 sub.disabled_at = now()
1238 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt")
1239 push_notification_counter.labels(
1240 platform="expo", outcome="permanent_subscription_failure_receipt"
1241 ).inc()
1242 else:
1243 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}")
1245 # If we get here, we've exhausted MAX_ITERATIONS without finishing
1246 raise RuntimeError(
1247 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - "
1248 "there may be an unusually large backlog of receipts to check"
1249 )
1252def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None:
1253 """
1254 Sends the postcard via external API and updates attempt status.
1255 """
1256 with session_scope() as session:
1257 attempt = session.execute(
1258 select(PostalVerificationAttempt).where(
1259 PostalVerificationAttempt.id == payload.postal_verification_attempt_id
1260 )
1261 ).scalar_one_or_none()
1263 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1263 ↛ 1264line 1263 didn't jump to line 1264 because the condition on line 1263 was never true
1264 logger.warning(
1265 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state"
1266 )
1267 return
1269 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one()
1271 job_id = send_postcard(
1272 recipient_name=user_name,
1273 address_line_1=attempt.address_line_1,
1274 address_line_2=attempt.address_line_2,
1275 city=attempt.city,
1276 state=attempt.state,
1277 postal_code=attempt.postal_code,
1278 country=attempt.country_code,
1279 verification_code=not_none(attempt.verification_code),
1280 )
1282 attempt.mypostcard_job_id = job_id
1283 attempt.status = PostalVerificationStatus.awaiting_verification
1284 attempt.postcard_sent_at = func.now()
1286 postcards_sent_counter.labels(country_code=attempt.country_code).inc()
1288 context = make_background_user_context(attempt.user_id)
1289 log_event(
1290 context,
1291 session,
1292 "postcard.sent",
1293 {
1294 "attempt_id": attempt.id,
1295 "country": attempt.country_code,
1296 "city": attempt.city,
1297 "mypostcard_job_id": job_id,
1298 },
1299 )
1301 notify(
1302 session,
1303 user_id=attempt.user_id,
1304 topic_action=NotificationTopicAction.postal_verification__postcard_sent,
1305 key="",
1306 data=notification_data_pb2.PostalVerificationPostcardSent(
1307 city=attempt.city,
1308 country=attempt.country_code,
1309 ),
1310 )
1313def check_mypostcard_jobs(payload: empty_pb2.Empty) -> None:
1314 """
1315 Checks that all MyPostcard jobs from the last week are tied to a postal verification attempt.
1316 """
1317 if not config["ENABLE_POSTAL_VERIFICATION"]:
1318 return
1320 with session_scope() as session:
1321 mypostcard_job_ids = set(
1322 get_order_ids(
1323 date_from=(now() - timedelta(days=7)).date(),
1324 date_to=now().date(),
1325 )
1326 )
1328 known_job_ids = set(
1329 session.execute(
1330 select(PostalVerificationAttempt.mypostcard_job_id).where(
1331 PostalVerificationAttempt.mypostcard_job_id.isnot(None),
1332 PostalVerificationAttempt.created >= now() - timedelta(days=14),
1333 )
1334 )
1335 .scalars()
1336 .all()
1337 )
1339 orphaned = mypostcard_job_ids - known_job_ids
1340 if orphaned:
1341 report_message(
1342 f"Found {len(orphaned)} orphaned MyPostcard jobs not tied to any verification attempt: {orphaned}"
1343 )
1346class DatabaseInconsistencyError(Exception):
1347 """Raised when database consistency checks fail"""
1349 pass
1352def check_database_consistency(payload: empty_pb2.Empty) -> None:
1353 """
1354 Checks database consistency and raises an exception if any issues are found.
1355 """
1356 logger.info("Checking database consistency")
1357 errors = []
1359 with session_scope() as session:
1360 # Check that all users have a profile gallery
1361 users_without_gallery = session.execute(
1362 select(User.id, User.username).where(User.profile_gallery_id.is_(None))
1363 ).all()
1364 if users_without_gallery:
1365 errors.append(f"Users without profile gallery: {users_without_gallery}")
1367 # Check that all profile galleries point to their owner
1368 mismatched_galleries = session.execute(
1369 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id)
1370 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id)
1371 .where(User.profile_gallery_id.is_not(None))
1372 .where(PhotoGallery.owner_user_id != User.id)
1373 ).all()
1374 if mismatched_galleries: 1374 ↛ 1375line 1374 didn't jump to line 1375 because the condition on line 1374 was never true
1375 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}")
1377 # === Moderation System Consistency Checks ===
1379 # Check every ModerationState has at least one INITIAL_REVIEW queue item
1380 # Skip items with ID < 2000000 as they were created before this check was introduced
1381 states_without_initial_review = session.execute(
1382 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1383 ModerationState.id >= 2000000,
1384 ~exists(
1385 select(1)
1386 .where(ModerationQueueItem.moderation_state_id == ModerationState.id)
1387 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review)
1388 ),
1389 )
1390 ).all()
1391 if states_without_initial_review: 1391 ↛ 1392line 1391 didn't jump to line 1392 because the condition on line 1391 was never true
1392 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}")
1394 # Check every ModerationState has a CREATE log entry
1395 # Skip items with ID < 2000000 as they were created before this check was introduced
1396 states_without_create_log = session.execute(
1397 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1398 ModerationState.id >= 2000000,
1399 ~exists(
1400 select(1)
1401 .where(ModerationLog.moderation_state_id == ModerationState.id)
1402 .where(ModerationLog.action == ModerationAction.create)
1403 ),
1404 )
1405 ).all()
1406 if states_without_create_log: 1406 ↛ 1407line 1406 didn't jump to line 1407 because the condition on line 1406 was never true
1407 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}")
1409 # Check resolved queue items point to log entries for the same moderation state
1410 resolved_item_log_mismatches = session.execute(
1411 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id)
1412 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1413 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1414 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id)
1415 ).all()
1416 if resolved_item_log_mismatches: 1416 ↛ 1417line 1416 didn't jump to line 1417 because the condition on line 1416 was never true
1417 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}")
1419 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it
1420 hr_states = (
1421 session.execute(
1422 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.host_request)
1423 )
1424 .scalars()
1425 .all()
1426 )
1427 for state_id in hr_states: 1427 ↛ 1428line 1427 didn't jump to line 1428 because the loop on line 1427 never started
1428 hr_count = session.execute(
1429 select(func.count()).where(HostRequest.moderation_state_id == state_id)
1430 ).scalar_one()
1431 if hr_count != 1:
1432 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)")
1434 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it
1435 gc_states = (
1436 session.execute(
1437 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.group_chat)
1438 )
1439 .scalars()
1440 .all()
1441 )
1442 for state_id in gc_states: 1442 ↛ 1443line 1442 didn't jump to line 1443 because the loop on line 1442 never started
1443 gc_count = session.execute(
1444 select(func.count()).where(GroupChat.moderation_state_id == state_id)
1445 ).scalar_one()
1446 if gc_count != 1:
1447 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)")
1449 # Check ModerationState.object_id matches the actual object's ID
1450 hr_object_id_mismatches = session.execute(
1451 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id)
1452 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id)
1453 .where(ModerationState.object_type == ModerationObjectType.host_request)
1454 .where(ModerationState.object_id != HostRequest.conversation_id)
1455 ).all()
1456 if hr_object_id_mismatches: 1456 ↛ 1457line 1456 didn't jump to line 1457 because the condition on line 1456 was never true
1457 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}")
1459 gc_object_id_mismatches = session.execute(
1460 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id)
1461 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id)
1462 .where(ModerationState.object_type == ModerationObjectType.group_chat)
1463 .where(ModerationState.object_id != GroupChat.conversation_id)
1464 ).all()
1465 if gc_object_id_mismatches: 1465 ↛ 1466line 1465 didn't jump to line 1466 because the condition on line 1465 was never true
1466 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}")
1468 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState
1469 hr_reverse_mismatches = session.execute(
1470 select(
1471 HostRequest.conversation_id,
1472 HostRequest.moderation_state_id,
1473 ModerationState.object_type,
1474 ModerationState.object_id,
1475 )
1476 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id)
1477 .where(
1478 (ModerationState.object_type != ModerationObjectType.host_request)
1479 | (ModerationState.object_id != HostRequest.conversation_id)
1480 )
1481 ).all()
1482 if hr_reverse_mismatches: 1482 ↛ 1483line 1482 didn't jump to line 1483 because the condition on line 1482 was never true
1483 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}")
1485 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState
1486 gc_reverse_mismatches = session.execute(
1487 select(
1488 GroupChat.conversation_id,
1489 GroupChat.moderation_state_id,
1490 ModerationState.object_type,
1491 ModerationState.object_id,
1492 )
1493 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id)
1494 .where(
1495 (ModerationState.object_type != ModerationObjectType.group_chat)
1496 | (ModerationState.object_id != GroupChat.conversation_id)
1497 )
1498 ).all()
1499 if gc_reverse_mismatches: 1499 ↛ 1500line 1499 didn't jump to line 1500 because the condition on line 1499 was never true
1500 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}")
1502 # Ensure auto-approve deadline isn't being exceeded by a significant margin
1503 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting
1504 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1505 if deadline_seconds > 0: 1505 ↛ 1506line 1505 didn't jump to line 1506 because the condition on line 1505 was never true
1506 grace_period = timedelta(minutes=5)
1507 stale_initial_review_items = session.execute(
1508 select(
1509 ModerationQueueItem.id,
1510 ModerationQueueItem.moderation_state_id,
1511 ModerationQueueItem.time_created,
1512 )
1513 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review)
1514 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
1515 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period)
1516 ).all()
1517 if stale_initial_review_items:
1518 errors.append(
1519 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}"
1520 )
1522 if errors:
1523 raise DatabaseInconsistencyError("\n".join(errors))
1526def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None:
1527 """
1528 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline.
1529 Items explicitly actioned by moderators are left alone.
1530 """
1531 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1532 if deadline_seconds <= 0:
1533 return
1535 with session_scope() as session:
1536 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"])
1538 items = (
1539 Moderation()
1540 .GetModerationQueue(
1541 request=moderation_pb2.GetModerationQueueReq(
1542 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW],
1543 unresolved_only=True,
1544 page_size=100,
1545 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)),
1546 ),
1547 context=ctx,
1548 session=session,
1549 )
1550 .queue_items
1551 )
1553 if not items:
1554 return
1556 logger.info(f"Auto-approving {len(items)} moderation queue items")
1557 for item in items:
1558 Moderation().ModerateContent(
1559 request=moderation_pb2.ModerateContentReq(
1560 moderation_state_id=item.moderation_state_id,
1561 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1562 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1563 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.",
1564 ),
1565 context=ctx,
1566 session=session,
1567 )
1568 moderation_auto_approved_counter.inc(len(items))