Coverage for src/couchers/jobs/handlers.py: 92%
519 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-25 10:58 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-25 10:58 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import cos, pi, sin, sqrt
8from random import sample
9from typing import Any
11import requests
12from google.protobuf import empty_pb2
13from sqlalchemy import Float, Integer
14from sqlalchemy.orm import aliased
15from sqlalchemy.sql import (
16 and_,
17 case,
18 cast,
19 delete,
20 distinct,
21 exists,
22 extract,
23 func,
24 literal,
25 not_,
26 or_,
27 union_all,
28 update,
29)
31from couchers import urls
32from couchers.config import config
33from couchers.constants import (
34 ACTIVENESS_PROBE_EXPIRY_TIME,
35 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
36 ACTIVENESS_PROBE_TIME_REMINDERS,
37 EVENT_REMINDER_TIMEDELTA,
38 HOST_REQUEST_MAX_REMINDERS,
39 HOST_REQUEST_REMINDER_INTERVAL,
40)
41from couchers.context import make_background_user_context
42from couchers.crypto import (
43 USER_LOCATION_RANDOMIZATION_NAME,
44 asym_encrypt,
45 b64decode,
46 get_secret,
47 simple_decrypt,
48 stable_secure_uniform,
49)
50from couchers.db import session_scope
51from couchers.email.dev import print_dev_email
52from couchers.email.smtp import send_smtp_email
53from couchers.helpers.badges import user_add_badge, user_remove_badge
54from couchers.materialized_views import (
55 UserResponseRate,
56 refresh_materialized_views,
57 refresh_materialized_views_rapid,
58)
59from couchers.metrics import (
60 moderation_auto_approved_counter,
61 push_notification_counter,
62 strong_verification_completions_counter,
63)
64from couchers.models import (
65 AccountDeletionToken,
66 ActivenessProbe,
67 ActivenessProbeStatus,
68 Cluster,
69 ClusterRole,
70 ClusterSubscription,
71 EventOccurrence,
72 EventOccurrenceAttendee,
73 GroupChat,
74 GroupChatSubscription,
75 HostingStatus,
76 HostRequest,
77 HostRequestStatus,
78 LoginToken,
79 MeetupStatus,
80 Message,
81 MessageType,
82 ModerationAction,
83 ModerationLog,
84 ModerationObjectType,
85 ModerationQueueItem,
86 ModerationState,
87 ModerationTrigger,
88 PassportSex,
89 PasswordResetToken,
90 PhotoGallery,
91 PostalVerificationAttempt,
92 PostalVerificationStatus,
93 PushNotificationDeliveryAttempt,
94 PushNotificationSubscription,
95 Reference,
96 StrongVerificationAttempt,
97 StrongVerificationAttemptStatus,
98 User,
99 UserBadge,
100 Volunteer,
101)
102from couchers.notifications.background import handle_email_digests, handle_notification
103from couchers.notifications.expo_api import get_expo_push_receipts
104from couchers.notifications.notify import notify
105from couchers.notifications.send_raw_push_notification import send_raw_push_notification_v2
106from couchers.postal.postcard_service import send_postcard
107from couchers.proto import moderation_pb2, notification_data_pb2
108from couchers.proto.internal import jobs_pb2, verification_pb2
109from couchers.resources import get_badge_dict, get_static_badge_dict
110from couchers.servicers.api import user_model_to_pb
111from couchers.servicers.conversations import generate_message_notifications
112from couchers.servicers.discussions import generate_create_discussion_notifications
113from couchers.servicers.editor import generate_new_blog_post_notifications
114from couchers.servicers.events import (
115 event_to_pb,
116 generate_event_cancel_notifications,
117 generate_event_create_notifications,
118 generate_event_delete_notifications,
119 generate_event_update_notifications,
120)
121from couchers.servicers.moderation import Moderation
122from couchers.servicers.requests import host_request_to_pb
123from couchers.servicers.threads import generate_reply_notifications
124from couchers.sql import couchers_select as select
125from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
126from couchers.tasks import send_duplicate_strong_verification_email
127from couchers.utils import (
128 Timestamp_from_datetime,
129 create_coordinate,
130 get_coordinates,
131 now,
132)
134logger = logging.getLogger(__name__)
137# these were straight up imported
138handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
140send_raw_push_notification_v2.PAYLOAD = jobs_pb2.SendRawPushNotificationPayloadV2
142handle_email_digests.PAYLOAD = empty_pb2.Empty
143handle_email_digests.SCHEDULE = timedelta(minutes=15)
145generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
147generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload
149generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload
151generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
153generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
155generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
157generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
159generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload
161refresh_materialized_views.PAYLOAD = empty_pb2.Empty
162refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
164refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
165refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
168def send_email(payload: jobs_pb2.SendEmailPayload) -> None:
169 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
170 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
171 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
172 # the sender must return a models.Email object that can be added to the database
173 email = sender(
174 sender_name=payload.sender_name,
175 sender_email=payload.sender_email,
176 recipient=payload.recipient,
177 subject=payload.subject,
178 plain=payload.plain,
179 html=payload.html,
180 list_unsubscribe_header=payload.list_unsubscribe_header,
181 source_data=payload.source_data,
182 )
183 with session_scope() as session:
184 session.add(email)
187send_email.PAYLOAD = jobs_pb2.SendEmailPayload
190def purge_login_tokens(payload: empty_pb2.Empty) -> None:
191 logger.info("Purging login tokens")
192 with session_scope() as session:
193 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
196purge_login_tokens.PAYLOAD = empty_pb2.Empty
197purge_login_tokens.SCHEDULE = timedelta(hours=24)
200def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None:
201 logger.info("Purging login tokens")
202 with session_scope() as session:
203 session.execute(
204 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
205 )
208purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
209purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
212def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None:
213 logger.info("Purging account deletion tokens")
214 with session_scope() as session:
215 session.execute(
216 delete(AccountDeletionToken)
217 .where(~AccountDeletionToken.is_valid)
218 .execution_options(synchronize_session=False)
219 )
222purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
223purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
226def send_message_notifications(payload: empty_pb2.Empty) -> None:
227 """
228 Sends out email notifications for messages that have been unseen for a long enough time
229 """
230 # very crude and dumb algorithm
231 logger.info("Sending out email notifications for unseen messages")
233 with session_scope() as session:
234 # users who have unnotified messages older than 5 minutes in any group chat
235 users = (
236 session.execute(
237 select(User)
238 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
239 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
240 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
241 .where_moderated_content_visible_to_user_column(GroupChat, User.id)
242 .where(not_(GroupChatSubscription.is_muted))
243 .where(User.is_visible)
244 .where(Message.time >= GroupChatSubscription.joined)
245 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
246 .where(Message.id > User.last_notified_message_id)
247 .where(Message.id > GroupChatSubscription.last_seen_message_id)
248 .where(Message.time < now() - timedelta(minutes=5))
249 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
250 )
251 .scalars()
252 .unique()
253 )
255 for user in users:
256 context = make_background_user_context(user_id=user.id)
257 # now actually grab all the group chats, not just less than 5 min old
258 subquery = (
259 select(
260 GroupChatSubscription.group_chat_id.label("group_chat_id"),
261 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
262 func.max(Message.id).label("message_id"),
263 func.count(Message.id).label("count_unseen"),
264 )
265 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
266 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
267 .where_moderated_content_visible(context, GroupChat, is_list_operation=True)
268 .where(GroupChatSubscription.user_id == user.id)
269 .where(not_(GroupChatSubscription.is_muted))
270 .where(Message.id > user.last_notified_message_id)
271 .where(Message.id > GroupChatSubscription.last_seen_message_id)
272 .where(Message.time >= GroupChatSubscription.joined)
273 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
274 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
275 .where_users_column_visible(context, Message.author_id)
276 .group_by(GroupChatSubscription.group_chat_id)
277 .order_by(func.max(Message.id).desc())
278 .subquery()
279 )
281 unseen_messages = session.execute(
282 select(GroupChat, Message, subquery.c.count_unseen)
283 .join(subquery, subquery.c.message_id == Message.id)
284 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
285 .where_moderated_content_visible(context, GroupChat, is_list_operation=True)
286 .order_by(subquery.c.message_id.desc())
287 ).all()
289 if not unseen_messages:
290 continue
292 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
294 def format_title(message, group_chat, count_unseen):
295 if group_chat.is_dm:
296 return f"You missed {count_unseen} message(s) from {message.author.name}"
297 else:
298 return f"You missed {count_unseen} message(s) in {group_chat.title}"
300 notify(
301 session,
302 user_id=user.id,
303 topic_action="chat:missed_messages",
304 key="",
305 data=notification_data_pb2.ChatMissedMessages(
306 messages=[
307 notification_data_pb2.ChatMessage(
308 author=user_model_to_pb(
309 message.author,
310 session,
311 context,
312 ),
313 message=format_title(message, group_chat, count_unseen),
314 text=message.text,
315 group_chat_id=message.conversation_id,
316 )
317 for group_chat, message, count_unseen in unseen_messages
318 ],
319 ),
320 )
321 session.commit()
324send_message_notifications.PAYLOAD = empty_pb2.Empty
325send_message_notifications.SCHEDULE = timedelta(minutes=3)
328def send_request_notifications(payload: empty_pb2.Empty) -> None:
329 """
330 Sends out email notifications for unseen messages in host requests (as surfer or host)
331 """
332 logger.info("Sending out email notifications for unseen messages in host requests")
334 with session_scope() as session:
335 # Get all candidate users who might have unseen request messages
336 candidate_user_ids = (
337 session.execute(
338 select(User.id)
339 .where(User.is_visible)
340 .where(
341 or_(
342 # Users with unseen messages as surfer
343 exists(
344 select(1)
345 .select_from(HostRequest)
346 .join(Message, Message.conversation_id == HostRequest.conversation_id)
347 .where(HostRequest.surfer_user_id == User.id)
348 .where(Message.id > HostRequest.surfer_last_seen_message_id)
349 .where(Message.id > User.last_notified_request_message_id)
350 .where(Message.time < now() - timedelta(minutes=5))
351 .where(Message.message_type == MessageType.text)
352 ),
353 # Users with unseen messages as host
354 exists(
355 select(1)
356 .select_from(HostRequest)
357 .join(Message, Message.conversation_id == HostRequest.conversation_id)
358 .where(HostRequest.host_user_id == User.id)
359 .where(Message.id > HostRequest.host_last_seen_message_id)
360 .where(Message.id > User.last_notified_request_message_id)
361 .where(Message.time < now() - timedelta(minutes=5))
362 .where(Message.message_type == MessageType.text)
363 ),
364 )
365 )
366 )
367 .scalars()
368 .all()
369 )
371 for user_id in candidate_user_ids:
372 context = make_background_user_context(user_id=user_id)
374 # requests where this user is surfing
375 surfing_reqs = session.execute(
376 select(User, HostRequest, func.max(Message.id))
377 .where(User.id == user_id)
378 .join(HostRequest, HostRequest.surfer_user_id == User.id)
379 .where_moderated_content_visible_to_user_column(HostRequest, HostRequest.surfer_user_id)
380 .where_users_column_visible(context, HostRequest.host_user_id)
381 .join(Message, Message.conversation_id == HostRequest.conversation_id)
382 .where(Message.id > HostRequest.surfer_last_seen_message_id)
383 .where(Message.id > User.last_notified_request_message_id)
384 .where(Message.time < now() - timedelta(minutes=5))
385 .where(Message.message_type == MessageType.text)
386 .group_by(User, HostRequest)
387 ).all()
389 # where this user is hosting
390 hosting_reqs = session.execute(
391 select(User, HostRequest, func.max(Message.id))
392 .where(User.id == user_id)
393 .join(HostRequest, HostRequest.host_user_id == User.id)
394 .where_moderated_content_visible_to_user_column(HostRequest, HostRequest.host_user_id)
395 .where_users_column_visible(context, HostRequest.surfer_user_id)
396 .join(Message, Message.conversation_id == HostRequest.conversation_id)
397 .where(Message.id > HostRequest.host_last_seen_message_id)
398 .where(Message.id > User.last_notified_request_message_id)
399 .where(Message.time < now() - timedelta(minutes=5))
400 .where(Message.message_type == MessageType.text)
401 .group_by(User, HostRequest)
402 ).all()
404 for user, host_request, max_message_id in surfing_reqs:
405 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
406 session.flush()
408 notify(
409 session,
410 user_id=user.id,
411 topic_action="host_request:missed_messages",
412 key=str(host_request.conversation_id),
413 data=notification_data_pb2.HostRequestMissedMessages(
414 host_request=host_request_to_pb(host_request, session, context),
415 user=user_model_to_pb(host_request.host, session, context),
416 am_host=False,
417 ),
418 )
420 for user, host_request, max_message_id in hosting_reqs:
421 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
422 session.flush()
424 notify(
425 session,
426 user_id=user.id,
427 topic_action="host_request:missed_messages",
428 key=str(host_request.conversation_id),
429 data=notification_data_pb2.HostRequestMissedMessages(
430 host_request=host_request_to_pb(host_request, session, context),
431 user=user_model_to_pb(host_request.surfer, session, context),
432 am_host=True,
433 ),
434 )
437send_request_notifications.PAYLOAD = empty_pb2.Empty
438send_request_notifications.SCHEDULE = timedelta(minutes=3)
441def send_onboarding_emails(payload: empty_pb2.Empty) -> None:
442 """
443 Sends out onboarding emails
444 """
445 logger.info("Sending out onboarding emails")
447 with session_scope() as session:
448 # first onboarding email
449 users = (
450 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
451 )
453 for user in users:
454 notify(
455 session,
456 user_id=user.id,
457 topic_action="onboarding:reminder",
458 key="1",
459 )
460 user.onboarding_emails_sent = 1
461 user.last_onboarding_email_sent = now()
462 session.commit()
464 # second onboarding email
465 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
466 users = (
467 session.execute(
468 select(User)
469 .where(User.is_visible)
470 .where(User.onboarding_emails_sent == 1)
471 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
472 .where(User.has_completed_profile == False)
473 )
474 .scalars()
475 .all()
476 )
478 for user in users:
479 notify(
480 session,
481 user_id=user.id,
482 topic_action="onboarding:reminder",
483 key="2",
484 )
485 user.onboarding_emails_sent = 2
486 user.last_onboarding_email_sent = now()
487 session.commit()
490send_onboarding_emails.PAYLOAD = empty_pb2.Empty
491send_onboarding_emails.SCHEDULE = timedelta(hours=1)
494def send_reference_reminders(payload: empty_pb2.Empty) -> None:
495 """
496 Sends out reminders to write references after hosting/staying
497 """
498 logger.info("Sending out reference reminder emails")
500 # Keep this in chronological order!
501 reference_reminder_schedule = [
502 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
503 # the end time to write a reference is supposed to be midnight in the host's timezone
504 # 8 pm ish on the last day of the stay
505 (1, timedelta(days=15) - timedelta(hours=20), 14),
506 # 2 pm ish a week after stay
507 (2, timedelta(days=8) - timedelta(hours=14), 7),
508 # 10 am ish 3 days before end of time to write ref
509 (3, timedelta(days=4) - timedelta(hours=10), 3),
510 ]
512 with session_scope() as session:
513 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
514 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
515 user = aliased(User)
516 other_user = aliased(User)
517 # surfers needing to write a ref
518 q1 = (
519 select(literal(True), HostRequest, user, other_user)
520 .join(user, user.id == HostRequest.surfer_user_id)
521 .join(other_user, other_user.id == HostRequest.host_user_id)
522 .outerjoin(
523 Reference,
524 and_(
525 Reference.host_request_id == HostRequest.conversation_id,
526 # if no reference is found in this join, then the surfer has not written a ref
527 Reference.from_user_id == HostRequest.surfer_user_id,
528 ),
529 )
530 .where(Reference.id == None)
531 .where(HostRequest.can_write_reference)
532 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
533 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
534 .where(HostRequest.surfer_reason_didnt_meetup == None)
535 .where_users_visible_to_each_other(user, other_user)
536 )
538 # hosts needing to write a ref
539 q2 = (
540 select(literal(False), HostRequest, user, other_user)
541 .join(user, user.id == HostRequest.host_user_id)
542 .join(other_user, other_user.id == HostRequest.surfer_user_id)
543 .outerjoin(
544 Reference,
545 and_(
546 Reference.host_request_id == HostRequest.conversation_id,
547 # if no reference is found in this join, then the host has not written a ref
548 Reference.from_user_id == HostRequest.host_user_id,
549 ),
550 )
551 .where(Reference.id == None)
552 .where(HostRequest.can_write_reference)
553 .where(HostRequest.host_sent_reference_reminders < reminder_number)
554 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
555 .where(HostRequest.host_reason_didnt_meetup == None)
556 .where_users_visible_to_each_other(user, other_user)
557 )
559 union = union_all(q1, q2).subquery()
560 union = select(
561 union.c[0].label("surfed"),
562 aliased(HostRequest, union),
563 aliased(user, union),
564 aliased(other_user, union),
565 )
566 reference_reminders = session.execute(union).all()
568 for surfed, host_request, user, other_user in reference_reminders:
569 # visibility and blocking already checked in sql
570 assert user.is_visible
571 context = make_background_user_context(user_id=user.id)
572 notify(
573 session,
574 user_id=user.id,
575 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
576 key=str(host_request.conversation_id),
577 data=notification_data_pb2.ReferenceReminder(
578 host_request_id=host_request.conversation_id,
579 other_user=user_model_to_pb(other_user, session, context),
580 days_left=reminder_days_left,
581 ),
582 )
583 if surfed:
584 host_request.surfer_sent_reference_reminders = reminder_number
585 else:
586 host_request.host_sent_reference_reminders = reminder_number
587 session.commit()
590send_reference_reminders.PAYLOAD = empty_pb2.Empty
591send_reference_reminders.SCHEDULE = timedelta(hours=1)
594def send_host_request_reminders(payload: empty_pb2.Empty) -> None:
595 with session_scope() as session:
596 host_has_sent_message = select(1).where(
597 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id
598 )
600 requests = (
601 session.execute(
602 select(HostRequest)
603 .where_moderated_content_visible_to_user_column(HostRequest, HostRequest.host_user_id)
604 .where(HostRequest.status == HostRequestStatus.pending)
605 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
606 .where(HostRequest.start_time > func.now())
607 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
608 .where(~exists(host_has_sent_message))
609 .where_user_columns_visible_to_each_other(HostRequest.host_user_id, HostRequest.surfer_user_id)
610 )
611 .scalars()
612 .all()
613 )
615 for host_request in requests:
616 host_request.host_sent_request_reminders += 1
617 host_request.last_sent_request_reminder_time = now()
619 context = make_background_user_context(user_id=host_request.host_user_id)
620 notify(
621 session,
622 user_id=host_request.host_user_id,
623 topic_action="host_request:reminder",
624 key=str(host_request.conversation_id),
625 data=notification_data_pb2.HostRequestReminder(
626 host_request=host_request_to_pb(host_request, session, context),
627 surfer=user_model_to_pb(host_request.surfer, session, context),
628 ),
629 moderation_state_id=host_request.moderation_state_id,
630 )
632 session.commit()
635send_host_request_reminders.PAYLOAD = empty_pb2.Empty
636send_host_request_reminders.SCHEDULE = timedelta(minutes=15)
639def add_users_to_email_list(payload: empty_pb2.Empty) -> None:
640 if not config["LISTMONK_ENABLED"]:
641 logger.info("Not adding users to mailing list")
642 return
644 logger.info("Adding users to mailing list")
646 while True:
647 with session_scope() as session:
648 user = session.execute(
649 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
650 ).scalar_one_or_none()
651 if not user:
652 logger.info("Finished adding users to mailing list")
653 return
655 if user.opt_out_of_newsletter:
656 user.in_sync_with_newsletter = True
657 session.commit()
658 continue
660 r = requests.post(
661 config["LISTMONK_BASE_URL"] + "/api/subscribers",
662 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
663 json={
664 "email": user.email,
665 "name": user.name,
666 "lists": [config["LISTMONK_LIST_ID"]],
667 "preconfirm_subscriptions": True,
668 "attribs": {"couchers_user_id": user.id},
669 "status": "enabled",
670 },
671 timeout=10,
672 )
673 # the API returns if the user is already subscribed
674 if r.status_code == 200 or r.status_code == 409:
675 user.in_sync_with_newsletter = True
676 session.commit()
677 else:
678 raise Exception("Failed to add users to mailing list")
681add_users_to_email_list.PAYLOAD = empty_pb2.Empty
682add_users_to_email_list.SCHEDULE = timedelta(hours=1)
685def enforce_community_membership(payload: empty_pb2.Empty) -> None:
686 tasks_enforce_community_memberships()
689enforce_community_membership.PAYLOAD = empty_pb2.Empty
690enforce_community_membership.SCHEDULE = timedelta(minutes=15)
693def update_recommendation_scores(payload: empty_pb2.Empty) -> None:
694 text_fields = [
695 User.hometown,
696 User.occupation,
697 User.education,
698 User.about_me,
699 User.things_i_like,
700 User.about_place,
701 User.additional_information,
702 User.pet_details,
703 User.kid_details,
704 User.housemate_details,
705 User.other_host_info,
706 User.sleeping_details,
707 User.area,
708 User.house_rules,
709 ]
710 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
712 def poor_man_gaussian():
713 """
714 Produces an approximatley std normal random variate
715 """
716 trials = 5
717 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
719 def int_(stmt):
720 return func.coalesce(cast(stmt, Integer), 0)
722 def float_(stmt):
723 return func.coalesce(cast(stmt, Float), 0.0)
725 with session_scope() as session:
726 # profile
727 profile_text = ""
728 for field in text_fields:
729 profile_text += func.coalesce(field, "")
730 text_length = func.length(profile_text)
731 home_text = ""
732 for field in home_fields:
733 home_text += func.coalesce(field, "")
734 home_length = func.length(home_text)
736 filled_profile = int_(User.has_completed_profile)
737 has_text = int_(text_length > 500)
738 long_text = int_(text_length > 2000)
739 can_host = int_(User.hosting_status == HostingStatus.can_host)
740 may_host = int_(User.hosting_status == HostingStatus.maybe)
741 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
742 filled_home = int_(User.has_completed_my_home)
743 filled_home_lots = int_(home_length > 200)
744 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
745 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
747 # references
748 left_ref_expr = int_(1).label("left_reference")
749 left_refs_subquery = (
750 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
751 )
752 left_reference = int_(left_refs_subquery.c.left_reference)
753 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
754 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
755 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
756 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
757 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
758 "has_bad_ref"
759 )
760 received_ref_subquery = (
761 select(
762 Reference.to_user_id.label("user_id"),
763 has_reference_expr,
764 has_multiple_types_expr,
765 has_bad_ref_expr,
766 ref_count_expr,
767 ref_avg_expr,
768 )
769 .group_by(Reference.to_user_id)
770 .subquery()
771 )
772 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
773 has_reference = int_(received_ref_subquery.c.has_reference)
774 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
775 rating_score = float_(
776 received_ref_subquery.c.ref_avg
777 * (
778 2 * func.least(received_ref_subquery.c.ref_count, 5)
779 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
780 )
781 )
782 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
784 # activeness
785 recently_active = int_(User.last_active >= now() - timedelta(days=180))
786 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
787 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
788 messaged_lots = int_(func.count(Message.id) > 5)
789 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
790 messaging_subquery = (
791 select(Message.author_id.label("user_id"), messaging_points_subquery)
792 .where(Message.message_type == MessageType.text)
793 .group_by(Message.author_id)
794 .subquery()
795 )
796 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
798 # verification
799 cb_subquery = (
800 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
801 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
802 .where(ClusterSubscription.role == ClusterRole.admin)
803 .where(Cluster.is_official_cluster)
804 .group_by(ClusterSubscription.user_id)
805 .subquery()
806 )
807 min_node_id = cb_subquery.c.min_node_id
808 cb = int_(min_node_id >= 1)
809 wcb = int_(min_node_id == 1)
810 badge_points = {
811 "founder": 100,
812 "board_member": 20,
813 "past_board_member": 5,
814 "strong_verification": 3,
815 "volunteer": 3,
816 "past_volunteer": 2,
817 "donor": 1,
818 "phone_verified": 1,
819 }
821 badge_subquery = (
822 select(
823 UserBadge.user_id.label("user_id"),
824 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
825 )
826 .group_by(UserBadge.user_id)
827 .subquery()
828 )
830 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
832 # response rate
833 hr_subquery = select(
834 UserResponseRate.user_id,
835 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
836 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
837 ).subquery()
838 response_time_33p = hr_subquery.c.response_time_33p
839 response_time_66p = hr_subquery.c.response_time_66p
840 # be careful with nulls
841 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
843 recommendation_score = (
844 hosting_status_points
845 + profile_points
846 + ref_score
847 + activeness_points
848 + other_points
849 + response_rate_points
850 + 2 * poor_man_gaussian()
851 )
853 scores = (
854 select(User.id.label("user_id"), recommendation_score.label("score"))
855 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
856 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
857 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
858 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
859 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
860 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
861 ).subquery()
863 session.execute(
864 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
865 )
867 logger.info("Updated recommendation scores")
870update_recommendation_scores.PAYLOAD = empty_pb2.Empty
871update_recommendation_scores.SCHEDULE = timedelta(hours=24)
874def update_badges(payload: empty_pb2.Empty) -> None:
875 with session_scope() as session:
877 def update_badge(badge_id: str, members: list[int]) -> None:
878 badge = get_badge_dict()[badge_id]
879 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
880 # in case the user ids don't exist in the db
881 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
882 # we should add the badge to these
883 add = set(actual_members) - set(user_ids)
884 # we should remove the badge from these
885 remove = set(user_ids) - set(actual_members)
886 for user_id in add:
887 user_add_badge(session, user_id, badge_id)
889 for user_id in remove:
890 user_remove_badge(session, user_id, badge_id)
892 update_badge("founder", get_static_badge_dict()["founder"])
893 update_badge("board_member", get_static_badge_dict()["board_member"])
894 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
895 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all())
896 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
897 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
898 # strong verification requires passport on file + gender/sex correspondence and date of birth match
899 update_badge(
900 "strong_verification",
901 session.execute(
902 select(User.id)
903 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
904 .where(StrongVerificationAttempt.has_strong_verification(User))
905 )
906 .scalars()
907 .all(),
908 )
909 # volunteer badge for active volunteers (stopped_volunteering is null)
910 update_badge(
911 "volunteer",
912 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(),
913 )
914 # past_volunteer badge for past volunteers (stopped_volunteering is not null)
915 update_badge(
916 "past_volunteer",
917 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None)))
918 .scalars()
919 .all(),
920 )
923update_badges.PAYLOAD = empty_pb2.Empty
924update_badges.SCHEDULE = timedelta(minutes=15)
927def finalize_strong_verification(payload: "jobs_pb2.FinalizeStrongVerificationPayload") -> None:
928 with session_scope() as session:
929 verification_attempt = session.execute(
930 select(StrongVerificationAttempt)
931 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
932 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
933 ).scalar_one()
934 response = requests.post(
935 "https://passportreader.app/api/v1/session.get",
936 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
937 json={"id": verification_attempt.iris_session_id},
938 timeout=10,
939 verify="/etc/ssl/certs/ca-certificates.crt",
940 )
941 if response.status_code != 200:
942 raise Exception(f"Iris didn't return 200: {response.text}")
943 json_data = response.json()
944 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
945 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
946 )
947 assert verification_attempt.user_id == reference_payload.user_id
948 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
949 assert verification_attempt.iris_session_id == json_data["id"]
950 assert json_data["state"] == "APPROVED"
952 if json_data["document_type"] != "PASSPORT":
953 verification_attempt.status = StrongVerificationAttemptStatus.failed
954 notify(
955 session,
956 user_id=verification_attempt.user_id,
957 topic_action="verification:sv_fail",
958 key="",
959 data=notification_data_pb2.VerificationSVFail(
960 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
961 ),
962 )
963 return
965 assert json_data["document_type"] == "PASSPORT"
967 expiry_date = date.fromisoformat(json_data["expiry_date"])
968 nationality = json_data["nationality"]
969 last_three_document_chars = json_data["document_number"][-3:]
971 existing_attempt = session.execute(
972 select(StrongVerificationAttempt)
973 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
974 .where(StrongVerificationAttempt.passport_nationality == nationality)
975 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
976 .order_by(StrongVerificationAttempt.id)
977 .limit(1)
978 ).scalar_one_or_none()
980 verification_attempt.has_minimal_data = True
981 verification_attempt.passport_expiry_date = expiry_date
982 verification_attempt.passport_nationality = nationality
983 verification_attempt.passport_last_three_document_chars = last_three_document_chars
985 if existing_attempt:
986 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
988 if existing_attempt.user_id != verification_attempt.user_id:
989 session.flush()
990 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
992 notify(
993 session,
994 user_id=verification_attempt.user_id,
995 topic_action="verification:sv_fail",
996 key="",
997 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
998 )
999 return
1001 verification_attempt.has_full_data = True
1002 verification_attempt.passport_encrypted_data = asym_encrypt(
1003 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
1004 )
1005 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
1006 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
1007 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
1009 session.flush()
1011 strong_verification_completions_counter.inc()
1013 user = verification_attempt.user
1014 if verification_attempt.has_strong_verification(user):
1015 badge_id = "strong_verification"
1016 if session.execute(
1017 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
1018 ).scalar_one_or_none():
1019 return
1021 user_add_badge(session, user.id, badge_id, do_notify=False)
1022 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success", key="")
1023 else:
1024 notify(
1025 session,
1026 user_id=verification_attempt.user_id,
1027 topic_action="verification:sv_fail",
1028 key="",
1029 data=notification_data_pb2.VerificationSVFail(
1030 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
1031 ),
1032 )
1035finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload
1038def send_activeness_probes(payload: empty_pb2.Empty) -> None:
1039 with session_scope() as session:
1040 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
1042 if config["ACTIVENESS_PROBES_ENABLED"]:
1043 # current activeness probes
1044 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
1046 # users who we should send an activeness probe to
1047 new_probe_user_ids = (
1048 session.execute(
1049 select(User.id)
1050 .where(User.is_visible)
1051 .where(User.hosting_status == HostingStatus.can_host)
1052 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
1053 .where(User.id.not_in(select(subquery.c.user_id)))
1054 )
1055 .scalars()
1056 .all()
1057 )
1059 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
1060 probes_today = session.execute(
1061 select(func.count())
1062 .select_from(ActivenessProbe)
1063 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
1064 ).scalar_one()
1066 # send probes to max 2% of users per day
1067 max_probes_per_day = 0.02 * total_users
1068 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
1070 if len(new_probe_user_ids) > max_probe_size:
1071 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
1073 for user_id in new_probe_user_ids:
1074 session.add(ActivenessProbe(user_id=user_id))
1076 session.commit()
1078 ## Step 2: actually send out probe notifications
1079 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
1080 probes = (
1081 session.execute(
1082 select(ActivenessProbe)
1083 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
1084 .where(ActivenessProbe.probe_initiated + delay < func.now())
1085 .where(ActivenessProbe.is_pending)
1086 )
1087 .scalars()
1088 .all()
1089 )
1091 for probe in probes:
1092 probe.notifications_sent = probe_number_minus_1 + 1
1093 context = make_background_user_context(user_id=probe.user.id)
1094 notify(
1095 session,
1096 user_id=probe.user.id,
1097 topic_action="activeness:probe",
1098 key=str(probe.id),
1099 data=notification_data_pb2.ActivenessProbe(
1100 reminder_number=probe_number_minus_1 + 1,
1101 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1102 ),
1103 )
1104 session.commit()
1106 ## Step 3: for those who haven't responded, mark them as failed
1107 expired_probes = (
1108 session.execute(
1109 select(ActivenessProbe)
1110 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1111 .where(ActivenessProbe.is_pending)
1112 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1113 )
1114 .scalars()
1115 .all()
1116 )
1118 for probe in expired_probes:
1119 probe.responded = now()
1120 probe.response = ActivenessProbeStatus.expired
1121 if probe.user.hosting_status == HostingStatus.can_host:
1122 probe.user.hosting_status = HostingStatus.maybe
1123 if probe.user.meetup_status == MeetupStatus.wants_to_meetup:
1124 probe.user.meetup_status = MeetupStatus.open_to_meetup
1125 session.commit()
1128send_activeness_probes.PAYLOAD = empty_pb2.Empty
1129send_activeness_probes.SCHEDULE = timedelta(minutes=60)
1132def update_randomized_locations(payload: empty_pb2.Empty) -> None:
1133 """
1134 We generate for each user a randomized location as follows:
1135 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1136 - For each user, mix in the user_id for randomness
1137 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1138 - Generate an angle from [0, 360]
1139 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1140 """
1141 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1143 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]:
1144 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1145 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1146 radius = 0.02 + 0.08 * radius_u
1147 angle_rad = 2 * pi * angle_u
1148 offset_lng = radius * cos(angle_rad)
1149 offset_lat = radius * sin(angle_rad)
1150 return lat + offset_lat, lng + offset_lng
1152 user_updates: list[dict[str, Any]] = []
1154 with session_scope() as session:
1155 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1157 for user_id, geom in users_to_update:
1158 lat, lng = get_coordinates(geom)
1159 user_updates.append(
1160 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1161 )
1163 with session_scope() as session:
1164 session.execute(update(User), user_updates)
1167update_randomized_locations.PAYLOAD = empty_pb2.Empty
1168update_randomized_locations.SCHEDULE = timedelta(hours=1)
1171def send_event_reminders(payload: empty_pb2.Empty) -> None:
1172 """
1173 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1174 """
1175 logger.info("Sending event reminder emails")
1177 with session_scope() as session:
1178 occurrences = (
1179 session.execute(
1180 select(EventOccurrence)
1181 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1182 .where(EventOccurrence.start_time >= now())
1183 )
1184 .scalars()
1185 .all()
1186 )
1188 for occurrence in occurrences:
1189 results = session.execute(
1190 select(User, EventOccurrenceAttendee)
1191 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1192 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1193 .where(EventOccurrenceAttendee.reminder_sent == False)
1194 ).all()
1196 for user, attendee in results:
1197 context = make_background_user_context(user_id=user.id)
1199 notify(
1200 session,
1201 user_id=user.id,
1202 topic_action="event:reminder",
1203 key=str(occurrence.id),
1204 data=notification_data_pb2.EventReminder(
1205 event=event_to_pb(session, occurrence, context),
1206 user=user_model_to_pb(user, session, context),
1207 ),
1208 )
1210 attendee.reminder_sent = True
1211 session.commit()
1214send_event_reminders.PAYLOAD = empty_pb2.Empty
1215send_event_reminders.SCHEDULE = timedelta(hours=1)
1218def check_expo_push_receipts(payload: empty_pb2.Empty) -> None:
1219 """
1220 Check Expo push receipts in batch and update delivery attempts.
1221 """
1222 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max
1224 for iteration in range(MAX_ITERATIONS):
1225 with session_scope() as session:
1226 # Find all delivery attempts that need receipt checking
1227 # Wait 15 minutes per Expo's recommendation before checking receipts
1228 attempts = (
1229 session.execute(
1230 select(PushNotificationDeliveryAttempt)
1231 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None)
1232 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None)
1233 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15))
1234 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24))
1235 .limit(100)
1236 )
1237 .scalars()
1238 .all()
1239 )
1241 if not attempts:
1242 logger.debug("No Expo receipts to check")
1243 return
1245 logger.info(f"Checking {len(attempts)} Expo push receipts")
1247 receipts = get_expo_push_receipts([attempt.expo_ticket_id for attempt in attempts])
1249 for attempt in attempts:
1250 receipt = receipts.get(attempt.expo_ticket_id)
1252 # Always mark as checked to avoid infinite loops
1253 attempt.receipt_checked_at = now()
1255 if receipt is None:
1256 # Receipt not found after 15min - likely expired (>24h) or never existed
1257 # Per Expo docs: receipts should be available within 15 minutes
1258 attempt.receipt_status = "not_found"
1259 continue
1261 attempt.receipt_status = receipt.get("status")
1263 if receipt.get("status") == "error":
1264 details = receipt.get("details", {})
1265 error_code = details.get("error")
1266 attempt.receipt_error_code = error_code
1268 if error_code == "DeviceNotRegistered":
1269 # Device token is no longer valid - disable the subscription
1270 sub = session.execute(
1271 select(PushNotificationSubscription).where(
1272 PushNotificationSubscription.id == attempt.push_notification_subscription_id
1273 )
1274 ).scalar_one()
1276 if sub.disabled_at > now():
1277 sub.disabled_at = now()
1278 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt")
1279 push_notification_counter.labels(
1280 platform="expo", outcome="permanent_subscription_failure_receipt"
1281 ).inc()
1282 else:
1283 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}")
1285 # If we get here, we've exhausted MAX_ITERATIONS without finishing
1286 raise RuntimeError(
1287 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - "
1288 "there may be an unusually large backlog of receipts to check"
1289 )
1292check_expo_push_receipts.PAYLOAD = empty_pb2.Empty
1293check_expo_push_receipts.SCHEDULE = timedelta(minutes=5)
1296def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None:
1297 """
1298 Sends the postcard via external API and updates attempt status.
1299 """
1300 with session_scope() as session:
1301 attempt = session.execute(
1302 select(PostalVerificationAttempt).where(
1303 PostalVerificationAttempt.id == payload.postal_verification_attempt_id
1304 )
1305 ).scalar_one_or_none()
1307 if not attempt or attempt.status != PostalVerificationStatus.in_progress:
1308 logger.warning(
1309 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state"
1310 )
1311 return
1313 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one()
1315 result = send_postcard(
1316 recipient_name=user_name,
1317 address_line_1=attempt.address_line_1,
1318 address_line_2=attempt.address_line_2,
1319 city=attempt.city,
1320 state=attempt.state,
1321 postal_code=attempt.postal_code,
1322 country=attempt.country,
1323 verification_code=attempt.verification_code,
1324 qr_code_url=urls.postal_verification_link(code=attempt.verification_code),
1325 )
1327 if result.success:
1328 attempt.status = PostalVerificationStatus.awaiting_verification
1329 attempt.postcard_sent_at = func.now()
1331 notify(
1332 session,
1333 user_id=attempt.user_id,
1334 topic_action="postal_verification:postcard_sent",
1335 key="",
1336 data=notification_data_pb2.PostalVerificationPostcardSent(
1337 city=attempt.city,
1338 country=attempt.country,
1339 ),
1340 )
1341 else:
1342 # Could retry or fail - for now, fail
1343 attempt.status = PostalVerificationStatus.failed
1344 logger.error(f"Postcard send failed: {result.error_message}")
1347send_postal_verification_postcard.PAYLOAD = jobs_pb2.SendPostalVerificationPostcardPayload
1350class DatabaseInconsistencyError(Exception):
1351 """Raised when database consistency checks fail"""
1353 pass
1356def check_database_consistency(payload: empty_pb2.Empty) -> None:
1357 """
1358 Checks database consistency and raises an exception if any issues are found.
1359 """
1360 logger.info("Checking database consistency")
1361 errors = []
1363 with session_scope() as session:
1364 # Check that all non-deleted users have a profile gallery
1365 users_without_gallery = session.execute(
1366 select(User.id, User.username).where(User.is_deleted == False).where(User.profile_gallery_id.is_(None))
1367 ).all()
1368 if users_without_gallery:
1369 errors.append(f"Users without profile gallery: {users_without_gallery}")
1371 # Check that all profile galleries point to their owner
1372 mismatched_galleries = session.execute(
1373 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id)
1374 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id)
1375 .where(User.profile_gallery_id.is_not(None))
1376 .where(PhotoGallery.owner_user_id != User.id)
1377 ).all()
1378 if mismatched_galleries:
1379 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}")
1381 # === Moderation System Consistency Checks ===
1383 # Check all ModerationStates have a known object_type
1384 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT]
1385 unknown_type_states = session.execute(
1386 select(ModerationState.id, ModerationState.object_type).where(
1387 ModerationState.object_type.not_in(known_object_types)
1388 )
1389 ).all()
1390 if unknown_type_states:
1391 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}")
1393 # Check every ModerationState has at least one INITIAL_REVIEW queue item
1394 # Skip items with ID < 2000000 as they were created before this check was introduced
1395 states_without_initial_review = session.execute(
1396 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1397 ModerationState.id >= 2000000,
1398 ~exists(
1399 select(1)
1400 .where(ModerationQueueItem.moderation_state_id == ModerationState.id)
1401 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1402 ),
1403 )
1404 ).all()
1405 if states_without_initial_review:
1406 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}")
1408 # Check every ModerationState has a CREATE log entry
1409 # Skip items with ID < 2000000 as they were created before this check was introduced
1410 states_without_create_log = session.execute(
1411 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1412 ModerationState.id >= 2000000,
1413 ~exists(
1414 select(1)
1415 .where(ModerationLog.moderation_state_id == ModerationState.id)
1416 .where(ModerationLog.action == ModerationAction.CREATE)
1417 ),
1418 )
1419 ).all()
1420 if states_without_create_log:
1421 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}")
1423 # Check resolved queue items point to log entries for the same moderation state
1424 resolved_item_log_mismatches = session.execute(
1425 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id)
1426 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1427 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1428 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id)
1429 ).all()
1430 if resolved_item_log_mismatches:
1431 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}")
1433 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it
1434 hr_states = (
1435 session.execute(
1436 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1437 )
1438 .scalars()
1439 .all()
1440 )
1441 for state_id in hr_states:
1442 hr_count = session.execute(
1443 select(func.count()).where(HostRequest.moderation_state_id == state_id)
1444 ).scalar_one()
1445 if hr_count != 1:
1446 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)")
1448 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it
1449 gc_states = (
1450 session.execute(
1451 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1452 )
1453 .scalars()
1454 .all()
1455 )
1456 for state_id in gc_states:
1457 gc_count = session.execute(
1458 select(func.count()).where(GroupChat.moderation_state_id == state_id)
1459 ).scalar_one()
1460 if gc_count != 1:
1461 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)")
1463 # Check ModerationState.object_id matches the actual object's ID
1464 hr_object_id_mismatches = session.execute(
1465 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id)
1466 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id)
1467 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1468 .where(ModerationState.object_id != HostRequest.conversation_id)
1469 ).all()
1470 if hr_object_id_mismatches:
1471 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}")
1473 gc_object_id_mismatches = session.execute(
1474 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id)
1475 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id)
1476 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1477 .where(ModerationState.object_id != GroupChat.conversation_id)
1478 ).all()
1479 if gc_object_id_mismatches:
1480 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}")
1482 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState
1483 hr_reverse_mismatches = session.execute(
1484 select(
1485 HostRequest.conversation_id,
1486 HostRequest.moderation_state_id,
1487 ModerationState.object_type,
1488 ModerationState.object_id,
1489 )
1490 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id)
1491 .where(
1492 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST)
1493 | (ModerationState.object_id != HostRequest.conversation_id)
1494 )
1495 ).all()
1496 if hr_reverse_mismatches:
1497 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}")
1499 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState
1500 gc_reverse_mismatches = session.execute(
1501 select(
1502 GroupChat.conversation_id,
1503 GroupChat.moderation_state_id,
1504 ModerationState.object_type,
1505 ModerationState.object_id,
1506 )
1507 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id)
1508 .where(
1509 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT)
1510 | (ModerationState.object_id != GroupChat.conversation_id)
1511 )
1512 ).all()
1513 if gc_reverse_mismatches:
1514 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}")
1516 # Ensure auto-approve deadline isn't being exceeded by a significant margin
1517 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting
1518 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1519 if deadline_seconds > 0:
1520 grace_period = timedelta(minutes=5)
1521 stale_initial_review_items = session.execute(
1522 select(
1523 ModerationQueueItem.id,
1524 ModerationQueueItem.moderation_state_id,
1525 ModerationQueueItem.time_created,
1526 )
1527 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1528 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
1529 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period)
1530 ).all()
1531 if stale_initial_review_items:
1532 errors.append(
1533 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}"
1534 )
1536 if errors:
1537 raise DatabaseInconsistencyError("\n".join(errors))
1540check_database_consistency.PAYLOAD = empty_pb2.Empty
1541check_database_consistency.SCHEDULE = timedelta(hours=24)
1544def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None:
1545 """
1546 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline.
1547 Items explicitly actioned by moderators are left alone.
1548 """
1549 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1550 if deadline_seconds <= 0:
1551 return
1553 with session_scope() as session:
1554 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"])
1556 items = (
1557 Moderation()
1558 .GetModerationQueue(
1559 request=moderation_pb2.GetModerationQueueReq(
1560 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW],
1561 unresolved_only=True,
1562 page_size=100,
1563 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)),
1564 ),
1565 context=ctx,
1566 session=session,
1567 )
1568 .queue_items
1569 )
1571 if not items:
1572 return
1574 logger.info(f"Auto-approving {len(items)} moderation queue items")
1575 for item in items:
1576 Moderation().ModerateContent(
1577 request=moderation_pb2.ModerateContentReq(
1578 moderation_state_id=item.moderation_state_id,
1579 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1580 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1581 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.",
1582 ),
1583 context=ctx,
1584 session=session,
1585 )
1586 moderation_auto_approved_counter.inc(len(items))
1589auto_approve_moderation_queue.PAYLOAD = empty_pb2.Empty
1590auto_approve_moderation_queue.SCHEDULE = timedelta(seconds=15)