Coverage for src / couchers / jobs / handlers.py: 88%
461 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-08 13:48 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-08 13:48 +0000
1"""
2Background job servicers
3"""
5import logging
6from collections.abc import Sequence
7from datetime import date, timedelta
8from math import cos, pi, sin, sqrt
9from random import sample
10from typing import Any
12import requests
13from google.protobuf import empty_pb2
14from sqlalchemy import ColumnElement, Float, Function, Integer, select
15from sqlalchemy.orm import aliased
16from sqlalchemy.sql import (
17 and_,
18 case,
19 cast,
20 delete,
21 distinct,
22 exists,
23 extract,
24 func,
25 literal,
26 not_,
27 or_,
28 union_all,
29 update,
30)
32from couchers import urls
33from couchers.config import config
34from couchers.constants import (
35 ACTIVENESS_PROBE_EXPIRY_TIME,
36 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
37 ACTIVENESS_PROBE_TIME_REMINDERS,
38 EVENT_REMINDER_TIMEDELTA,
39 HOST_REQUEST_MAX_REMINDERS,
40 HOST_REQUEST_REMINDER_INTERVAL,
41)
42from couchers.context import make_background_user_context
43from couchers.crypto import (
44 USER_LOCATION_RANDOMIZATION_NAME,
45 asym_encrypt,
46 b64decode,
47 get_secret,
48 simple_decrypt,
49 stable_secure_uniform,
50)
51from couchers.db import session_scope
52from couchers.email.dev import print_dev_email
53from couchers.email.smtp import send_smtp_email
54from couchers.helpers.badges import user_add_badge, user_remove_badge
55from couchers.materialized_views import (
56 UserResponseRate,
57)
58from couchers.metrics import (
59 moderation_auto_approved_counter,
60 push_notification_counter,
61 strong_verification_completions_counter,
62)
63from couchers.models import (
64 AccountDeletionToken,
65 ActivenessProbe,
66 ActivenessProbeStatus,
67 Cluster,
68 ClusterRole,
69 ClusterSubscription,
70 EventOccurrence,
71 EventOccurrenceAttendee,
72 GroupChat,
73 GroupChatSubscription,
74 HostingStatus,
75 HostRequest,
76 HostRequestStatus,
77 LoginToken,
78 MeetupStatus,
79 Message,
80 MessageType,
81 ModerationAction,
82 ModerationLog,
83 ModerationObjectType,
84 ModerationQueueItem,
85 ModerationState,
86 ModerationTrigger,
87 PassportSex,
88 PasswordResetToken,
89 PhotoGallery,
90 PostalVerificationAttempt,
91 PostalVerificationStatus,
92 PushNotificationDeliveryAttempt,
93 PushNotificationSubscription,
94 Reference,
95 StrongVerificationAttempt,
96 StrongVerificationAttemptStatus,
97 User,
98 UserBadge,
99 Volunteer,
100)
101from couchers.models.notifications import NotificationTopicAction
102from couchers.notifications.expo_api import get_expo_push_receipts
103from couchers.notifications.notify import notify
104from couchers.postal.postcard_service import send_postcard
105from couchers.proto import moderation_pb2, notification_data_pb2
106from couchers.proto.internal import jobs_pb2, verification_pb2
107from couchers.resources import get_badge_dict, get_static_badge_dict
108from couchers.servicers.api import user_model_to_pb
109from couchers.servicers.events import (
110 event_to_pb,
111)
112from couchers.servicers.moderation import Moderation
113from couchers.servicers.requests import host_request_to_pb
114from couchers.sql import (
115 users_visible_to_each_other,
116 where_moderated_content_visible,
117 where_moderated_content_visible_to_user_column,
118 where_user_columns_visible_to_each_other,
119 where_users_column_visible,
120)
121from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
122from couchers.tasks import send_duplicate_strong_verification_email
123from couchers.utils import (
124 Timestamp_from_datetime,
125 create_coordinate,
126 get_coordinates,
127 not_none,
128 now,
129)
131logger = logging.getLogger(__name__)
134def send_email(payload: jobs_pb2.SendEmailPayload) -> None:
135 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
136 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
137 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
138 # the sender must return a models.Email object that can be added to the database
139 email = sender(
140 sender_name=payload.sender_name,
141 sender_email=payload.sender_email,
142 recipient=payload.recipient,
143 subject=payload.subject,
144 plain=payload.plain,
145 html=payload.html,
146 list_unsubscribe_header=payload.list_unsubscribe_header,
147 source_data=payload.source_data,
148 )
149 with session_scope() as session:
150 session.add(email)
153def purge_login_tokens(payload: empty_pb2.Empty) -> None:
154 logger.info("Purging login tokens")
155 with session_scope() as session:
156 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
159def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None:
160 logger.info("Purging login tokens")
161 with session_scope() as session:
162 session.execute(
163 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
164 )
167def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None:
168 logger.info("Purging account deletion tokens")
169 with session_scope() as session:
170 session.execute(
171 delete(AccountDeletionToken)
172 .where(~AccountDeletionToken.is_valid)
173 .execution_options(synchronize_session=False)
174 )
177def send_message_notifications(payload: empty_pb2.Empty) -> None:
178 """
179 Sends out email notifications for messages that have been unseen for a long enough time
180 """
181 # very crude and dumb algorithm
182 logger.info("Sending out email notifications for unseen messages")
184 with session_scope() as session:
185 # users who have unnotified messages older than 5 minutes in any group chat
186 users = (
187 session.execute(
188 where_moderated_content_visible_to_user_column(
189 select(User)
190 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
191 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
192 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
193 GroupChat,
194 User.id,
195 )
196 .where(not_(GroupChatSubscription.is_muted))
197 .where(User.is_visible)
198 .where(Message.time >= GroupChatSubscription.joined)
199 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
200 .where(Message.id > User.last_notified_message_id)
201 .where(Message.id > GroupChatSubscription.last_seen_message_id)
202 .where(Message.time < now() - timedelta(minutes=5))
203 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
204 )
205 .scalars()
206 .unique()
207 )
209 for user in users:
210 context = make_background_user_context(user_id=user.id)
211 # now actually grab all the group chats, not just less than 5 min old
212 subquery = (
213 where_users_column_visible(
214 where_moderated_content_visible(
215 select(
216 GroupChatSubscription.group_chat_id.label("group_chat_id"),
217 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
218 func.max(Message.id).label("message_id"),
219 func.count(Message.id).label("count_unseen"),
220 )
221 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
222 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
223 context,
224 GroupChat,
225 is_list_operation=True,
226 )
227 .where(GroupChatSubscription.user_id == user.id)
228 .where(not_(GroupChatSubscription.is_muted))
229 .where(Message.id > user.last_notified_message_id)
230 .where(Message.id > GroupChatSubscription.last_seen_message_id)
231 .where(Message.time >= GroupChatSubscription.joined)
232 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
233 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)),
234 context,
235 Message.author_id,
236 )
237 .group_by(GroupChatSubscription.group_chat_id)
238 .order_by(func.max(Message.id).desc())
239 .subquery()
240 )
242 unseen_messages = session.execute(
243 where_moderated_content_visible(
244 select(GroupChat, Message, subquery.c.count_unseen)
245 .join(subquery, subquery.c.message_id == Message.id)
246 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id),
247 context,
248 GroupChat,
249 is_list_operation=True,
250 ).order_by(subquery.c.message_id.desc())
251 ).all()
253 if not unseen_messages:
254 continue
256 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
258 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str:
259 if group_chat.is_dm:
260 return f"You missed {count_unseen} message(s) from {message.author.name}"
261 else:
262 return f"You missed {count_unseen} message(s) in {group_chat.title}"
264 notify(
265 session,
266 user_id=user.id,
267 topic_action=NotificationTopicAction.chat__missed_messages,
268 key="",
269 data=notification_data_pb2.ChatMissedMessages(
270 messages=[
271 notification_data_pb2.ChatMessage(
272 author=user_model_to_pb(
273 message.author,
274 session,
275 context,
276 ),
277 message=format_title(message, group_chat, count_unseen),
278 text=message.text,
279 group_chat_id=message.conversation_id,
280 )
281 for group_chat, message, count_unseen in unseen_messages
282 ],
283 ),
284 )
285 session.commit()
288def send_request_notifications(payload: empty_pb2.Empty) -> None:
289 """
290 Sends out email notifications for unseen messages in host requests (as surfer or host)
291 """
292 logger.info("Sending out email notifications for unseen messages in host requests")
294 with session_scope() as session:
295 # Get all candidate users who might have unseen request messages
296 candidate_user_ids = (
297 session.execute(
298 select(User.id)
299 .where(User.is_visible)
300 .where(
301 or_(
302 # Users with unseen messages as surfer
303 exists(
304 select(1)
305 .select_from(HostRequest)
306 .join(Message, Message.conversation_id == HostRequest.conversation_id)
307 .where(HostRequest.surfer_user_id == User.id)
308 .where(Message.id > HostRequest.surfer_last_seen_message_id)
309 .where(Message.id > User.last_notified_request_message_id)
310 .where(Message.time < now() - timedelta(minutes=5))
311 .where(Message.message_type == MessageType.text)
312 ),
313 # Users with unseen messages as host
314 exists(
315 select(1)
316 .select_from(HostRequest)
317 .join(Message, Message.conversation_id == HostRequest.conversation_id)
318 .where(HostRequest.host_user_id == User.id)
319 .where(Message.id > HostRequest.host_last_seen_message_id)
320 .where(Message.id > User.last_notified_request_message_id)
321 .where(Message.time < now() - timedelta(minutes=5))
322 .where(Message.message_type == MessageType.text)
323 ),
324 )
325 )
326 )
327 .scalars()
328 .all()
329 )
331 for user_id in candidate_user_ids:
332 context = make_background_user_context(user_id=user_id)
334 # requests where this user is surfing
335 surfing_reqs = session.execute(
336 where_users_column_visible(
337 where_moderated_content_visible_to_user_column(
338 select(User, HostRequest, func.max(Message.id))
339 .where(User.id == user_id)
340 .join(HostRequest, HostRequest.surfer_user_id == User.id),
341 HostRequest,
342 HostRequest.surfer_user_id,
343 ),
344 context,
345 HostRequest.host_user_id,
346 )
347 .join(Message, Message.conversation_id == HostRequest.conversation_id)
348 .where(Message.id > HostRequest.surfer_last_seen_message_id)
349 .where(Message.id > User.last_notified_request_message_id)
350 .where(Message.time < now() - timedelta(minutes=5))
351 .where(Message.message_type == MessageType.text)
352 .group_by(User, HostRequest) # type: ignore[arg-type]
353 ).all()
355 # where this user is hosting
356 hosting_reqs = session.execute(
357 where_users_column_visible(
358 where_moderated_content_visible_to_user_column(
359 select(User, HostRequest, func.max(Message.id))
360 .where(User.id == user_id)
361 .join(HostRequest, HostRequest.host_user_id == User.id),
362 HostRequest,
363 HostRequest.host_user_id,
364 ),
365 context,
366 HostRequest.surfer_user_id,
367 )
368 .join(Message, Message.conversation_id == HostRequest.conversation_id)
369 .where(Message.id > HostRequest.host_last_seen_message_id)
370 .where(Message.id > User.last_notified_request_message_id)
371 .where(Message.time < now() - timedelta(minutes=5))
372 .where(Message.message_type == MessageType.text)
373 .group_by(User, HostRequest) # type: ignore[arg-type]
374 ).all()
376 for user, host_request, max_message_id in surfing_reqs:
377 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
378 session.flush()
380 notify(
381 session,
382 user_id=user.id,
383 topic_action=NotificationTopicAction.host_request__missed_messages,
384 key=str(host_request.conversation_id),
385 data=notification_data_pb2.HostRequestMissedMessages(
386 host_request=host_request_to_pb(host_request, session, context),
387 user=user_model_to_pb(host_request.host, session, context),
388 am_host=False,
389 ),
390 )
392 for user, host_request, max_message_id in hosting_reqs:
393 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
394 session.flush()
396 notify(
397 session,
398 user_id=user.id,
399 topic_action=NotificationTopicAction.host_request__missed_messages,
400 key=str(host_request.conversation_id),
401 data=notification_data_pb2.HostRequestMissedMessages(
402 host_request=host_request_to_pb(host_request, session, context),
403 user=user_model_to_pb(host_request.surfer, session, context),
404 am_host=True,
405 ),
406 )
409def send_onboarding_emails(payload: empty_pb2.Empty) -> None:
410 """
411 Sends out onboarding emails
412 """
413 logger.info("Sending out onboarding emails")
415 with session_scope() as session:
416 # first onboarding email
417 users = (
418 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
419 )
421 for user in users:
422 notify(
423 session,
424 user_id=user.id,
425 topic_action=NotificationTopicAction.onboarding__reminder,
426 key="1",
427 )
428 user.onboarding_emails_sent = 1
429 user.last_onboarding_email_sent = now()
430 session.commit()
432 # second onboarding email
433 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
434 users = (
435 session.execute(
436 select(User)
437 .where(User.is_visible)
438 .where(User.onboarding_emails_sent == 1)
439 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
440 .where(User.has_completed_profile == False)
441 )
442 .scalars()
443 .all()
444 )
446 for user in users:
447 notify(
448 session,
449 user_id=user.id,
450 topic_action=NotificationTopicAction.onboarding__reminder,
451 key="2",
452 )
453 user.onboarding_emails_sent = 2
454 user.last_onboarding_email_sent = now()
455 session.commit()
458def send_reference_reminders(payload: empty_pb2.Empty) -> None:
459 """
460 Sends out reminders to write references after hosting/staying
461 """
462 logger.info("Sending out reference reminder emails")
464 # Keep this in chronological order!
465 reference_reminder_schedule = [
466 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
467 # the end time to write a reference is supposed to be midnight in the host's timezone
468 # 8 pm ish on the last day of the stay
469 (1, timedelta(days=15) - timedelta(hours=20), 14),
470 # 2 pm ish a week after stay
471 (2, timedelta(days=8) - timedelta(hours=14), 7),
472 # 10 am ish 3 days before end of time to write ref
473 (3, timedelta(days=4) - timedelta(hours=10), 3),
474 ]
476 with session_scope() as session:
477 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
478 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
479 user = aliased(User)
480 other_user = aliased(User)
481 # surfers needing to write a ref
482 q1 = (
483 select(literal(True), HostRequest, user, other_user)
484 .join(user, user.id == HostRequest.surfer_user_id)
485 .join(other_user, other_user.id == HostRequest.host_user_id)
486 .outerjoin(
487 Reference,
488 and_(
489 Reference.host_request_id == HostRequest.conversation_id,
490 # if no reference is found in this join, then the surfer has not written a ref
491 Reference.from_user_id == HostRequest.surfer_user_id,
492 ),
493 )
494 .where(Reference.id == None)
495 .where(HostRequest.can_write_reference)
496 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
497 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
498 .where(HostRequest.surfer_reason_didnt_meetup == None)
499 .where(users_visible_to_each_other(user, other_user))
500 )
502 # hosts needing to write a ref
503 q2 = (
504 select(literal(False), HostRequest, user, other_user)
505 .join(user, user.id == HostRequest.host_user_id)
506 .join(other_user, other_user.id == HostRequest.surfer_user_id)
507 .outerjoin(
508 Reference,
509 and_(
510 Reference.host_request_id == HostRequest.conversation_id,
511 # if no reference is found in this join, then the host has not written a ref
512 Reference.from_user_id == HostRequest.host_user_id,
513 ),
514 )
515 .where(Reference.id == None)
516 .where(HostRequest.can_write_reference)
517 .where(HostRequest.host_sent_reference_reminders < reminder_number)
518 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
519 .where(HostRequest.host_reason_didnt_meetup == None)
520 .where(users_visible_to_each_other(user, other_user))
521 )
523 union = union_all(q1, q2).subquery()
524 query = select(
525 union.c[0].label("surfed"),
526 aliased(HostRequest, union),
527 aliased(user, union),
528 aliased(other_user, union),
529 )
530 reference_reminders = session.execute(query).all()
532 for surfed, host_request, user, other_user in reference_reminders:
533 # visibility and blocking already checked in sql
534 assert user.is_visible
535 context = make_background_user_context(user_id=user.id)
536 topic_action = (
537 NotificationTopicAction.reference__reminder_surfed
538 if surfed
539 else NotificationTopicAction.reference__reminder_hosted
540 )
541 notify(
542 session,
543 user_id=user.id,
544 topic_action=topic_action,
545 key=str(host_request.conversation_id),
546 data=notification_data_pb2.ReferenceReminder(
547 host_request_id=host_request.conversation_id,
548 other_user=user_model_to_pb(other_user, session, context),
549 days_left=reminder_days_left,
550 ),
551 )
552 if surfed:
553 host_request.surfer_sent_reference_reminders = reminder_number
554 else:
555 host_request.host_sent_reference_reminders = reminder_number
556 session.commit()
559def send_host_request_reminders(payload: empty_pb2.Empty) -> None:
560 with session_scope() as session:
561 host_has_sent_message = select(1).where(
562 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id
563 )
565 requests = (
566 session.execute(
567 where_user_columns_visible_to_each_other(
568 where_moderated_content_visible_to_user_column(
569 select(HostRequest),
570 HostRequest,
571 HostRequest.host_user_id,
572 )
573 .where(HostRequest.status == HostRequestStatus.pending)
574 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
575 .where(HostRequest.start_time > func.now())
576 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
577 .where(~exists(host_has_sent_message)),
578 HostRequest.host_user_id,
579 HostRequest.surfer_user_id,
580 )
581 )
582 .scalars()
583 .all()
584 )
586 for host_request in requests:
587 host_request.host_sent_request_reminders += 1
588 host_request.last_sent_request_reminder_time = now()
590 context = make_background_user_context(user_id=host_request.host_user_id)
591 notify(
592 session,
593 user_id=host_request.host_user_id,
594 topic_action=NotificationTopicAction.host_request__reminder,
595 key=str(host_request.conversation_id),
596 data=notification_data_pb2.HostRequestReminder(
597 host_request=host_request_to_pb(host_request, session, context),
598 surfer=user_model_to_pb(host_request.surfer, session, context),
599 ),
600 moderation_state_id=host_request.moderation_state_id,
601 )
603 session.commit()
606def add_users_to_email_list(payload: empty_pb2.Empty) -> None:
607 if not config["LISTMONK_ENABLED"]: 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true
608 logger.info("Not adding users to mailing list")
609 return
611 logger.info("Adding users to mailing list")
613 while True:
614 with session_scope() as session:
615 user = session.execute(
616 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
617 ).scalar_one_or_none()
618 if not user:
619 logger.info("Finished adding users to mailing list")
620 return
622 if user.opt_out_of_newsletter:
623 user.in_sync_with_newsletter = True
624 session.commit()
625 continue
627 r = requests.post(
628 config["LISTMONK_BASE_URL"] + "/api/subscribers",
629 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
630 json={
631 "email": user.email,
632 "name": user.name,
633 "lists": [config["LISTMONK_LIST_ID"]],
634 "preconfirm_subscriptions": True,
635 "attribs": {"couchers_user_id": user.id},
636 "status": "enabled",
637 },
638 timeout=10,
639 )
640 # the API returns if the user is already subscribed
641 if r.status_code == 200 or r.status_code == 409: 641 ↛ 645line 641 didn't jump to line 645 because the condition on line 641 was always true
642 user.in_sync_with_newsletter = True
643 session.commit()
644 else:
645 raise Exception("Failed to add users to mailing list")
648def enforce_community_membership(payload: empty_pb2.Empty) -> None:
649 tasks_enforce_community_memberships()
652def update_recommendation_scores(payload: empty_pb2.Empty) -> None:
653 text_fields = [
654 User.hometown,
655 User.occupation,
656 User.education,
657 User.about_me,
658 User.things_i_like,
659 User.about_place,
660 User.additional_information,
661 User.pet_details,
662 User.kid_details,
663 User.housemate_details,
664 User.other_host_info,
665 User.sleeping_details,
666 User.area,
667 User.house_rules,
668 ]
669 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
671 def poor_man_gaussian() -> ColumnElement[float] | float:
672 """
673 Produces an approximatley std normal random variate
674 """
675 trials = 5
676 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
678 def int_(stmt: Any) -> Function[int]:
679 return func.coalesce(cast(stmt, Integer), 0)
681 def float_(stmt: Any) -> Function[float]:
682 return func.coalesce(cast(stmt, Float), 0.0)
684 with session_scope() as session:
685 # profile
686 profile_text = ""
687 for field in text_fields:
688 profile_text += func.coalesce(field, "") # type: ignore[assignment]
689 text_length = func.length(profile_text)
690 home_text = ""
691 for field in home_fields:
692 home_text += func.coalesce(field, "") # type: ignore[assignment]
693 home_length = func.length(home_text)
695 filled_profile = int_(User.has_completed_profile)
696 has_text = int_(text_length > 500)
697 long_text = int_(text_length > 2000)
698 can_host = int_(User.hosting_status == HostingStatus.can_host)
699 may_host = int_(User.hosting_status == HostingStatus.maybe)
700 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
701 filled_home = int_(User.has_completed_my_home)
702 filled_home_lots = int_(home_length > 200)
703 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
704 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
706 # references
707 left_ref_expr = int_(1).label("left_reference")
708 left_refs_subquery = (
709 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
710 )
711 left_reference = int_(left_refs_subquery.c.left_reference)
712 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
713 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
714 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
715 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
716 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
717 "has_bad_ref"
718 )
719 received_ref_subquery = (
720 select(
721 Reference.to_user_id.label("user_id"),
722 has_reference_expr,
723 has_multiple_types_expr,
724 has_bad_ref_expr,
725 ref_count_expr,
726 ref_avg_expr,
727 )
728 .group_by(Reference.to_user_id)
729 .subquery()
730 )
731 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
732 has_reference = int_(received_ref_subquery.c.has_reference)
733 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
734 rating_score = float_(
735 received_ref_subquery.c.ref_avg
736 * (
737 2 * func.least(received_ref_subquery.c.ref_count, 5)
738 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
739 )
740 )
741 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
743 # activeness
744 recently_active = int_(User.last_active >= now() - timedelta(days=180))
745 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
746 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
747 messaged_lots = int_(func.count(Message.id) > 5)
748 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
749 messaging_subquery = (
750 select(Message.author_id.label("user_id"), messaging_points_subquery)
751 .where(Message.message_type == MessageType.text)
752 .group_by(Message.author_id)
753 .subquery()
754 )
755 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
757 # verification
758 cb_subquery = (
759 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
760 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
761 .where(ClusterSubscription.role == ClusterRole.admin)
762 .where(Cluster.is_official_cluster)
763 .group_by(ClusterSubscription.user_id)
764 .subquery()
765 )
766 min_node_id = cb_subquery.c.min_node_id
767 cb = int_(min_node_id >= 1)
768 wcb = int_(min_node_id == 1)
769 badge_points = {
770 "founder": 100,
771 "board_member": 20,
772 "past_board_member": 5,
773 "strong_verification": 3,
774 "volunteer": 3,
775 "past_volunteer": 2,
776 "donor": 1,
777 "phone_verified": 1,
778 }
780 badge_subquery = (
781 select(
782 UserBadge.user_id.label("user_id"),
783 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
784 )
785 .group_by(UserBadge.user_id)
786 .subquery()
787 )
789 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
791 # response rate
792 hr_subquery = select(
793 UserResponseRate.user_id,
794 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
795 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
796 ).subquery()
797 response_time_33p = hr_subquery.c.response_time_33p
798 response_time_66p = hr_subquery.c.response_time_66p
799 # be careful with nulls
800 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
802 recommendation_score = (
803 hosting_status_points
804 + profile_points
805 + ref_score
806 + activeness_points
807 + other_points
808 + response_rate_points
809 + 2 * poor_man_gaussian()
810 )
812 scores = (
813 select(User.id.label("user_id"), recommendation_score.label("score"))
814 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
815 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
816 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
817 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
818 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
819 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
820 ).subquery()
822 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id))
824 logger.info("Updated recommendation scores")
827def update_badges(payload: empty_pb2.Empty) -> None:
828 with session_scope() as session:
830 def update_badge(badge_id: str, members: Sequence[int]) -> None:
831 badge = get_badge_dict()[badge_id]
832 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all()
833 # in case the user ids don't exist in the db
834 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
835 # we should add the badge to these
836 add = set(actual_members) - set(user_ids)
837 # we should remove the badge from these
838 remove = set(user_ids) - set(actual_members)
839 for user_id in add:
840 user_add_badge(session, user_id, badge.id)
842 for user_id in remove:
843 user_remove_badge(session, user_id, badge.id)
845 update_badge("founder", get_static_badge_dict()["founder"])
846 update_badge("board_member", get_static_badge_dict()["board_member"])
847 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
848 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all())
849 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
850 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
851 # strong verification requires passport on file + gender/sex correspondence and date of birth match
852 update_badge(
853 "strong_verification",
854 session.execute(
855 select(User.id)
856 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
857 .where(StrongVerificationAttempt.has_strong_verification(User))
858 )
859 .scalars()
860 .all(),
861 )
862 # volunteer badge for active volunteers (stopped_volunteering is null)
863 update_badge(
864 "volunteer",
865 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(),
866 )
867 # past_volunteer badge for past volunteers (stopped_volunteering is not null)
868 update_badge(
869 "past_volunteer",
870 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None)))
871 .scalars()
872 .all(),
873 )
876def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None:
877 with session_scope() as session:
878 verification_attempt = session.execute(
879 select(StrongVerificationAttempt)
880 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
881 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
882 ).scalar_one()
883 response = requests.post(
884 "https://passportreader.app/api/v1/session.get",
885 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
886 json={"id": verification_attempt.iris_session_id},
887 timeout=10,
888 verify="/etc/ssl/certs/ca-certificates.crt",
889 )
890 if response.status_code != 200: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true
891 raise Exception(f"Iris didn't return 200: {response.text}")
892 json_data = response.json()
893 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
894 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
895 )
896 assert verification_attempt.user_id == reference_payload.user_id
897 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
898 assert verification_attempt.iris_session_id == json_data["id"]
899 assert json_data["state"] == "APPROVED"
901 if json_data["document_type"] != "PASSPORT":
902 verification_attempt.status = StrongVerificationAttemptStatus.failed
903 notify(
904 session,
905 user_id=verification_attempt.user_id,
906 topic_action=NotificationTopicAction.verification__sv_fail,
907 key="",
908 data=notification_data_pb2.VerificationSVFail(
909 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
910 ),
911 )
912 return
914 assert json_data["document_type"] == "PASSPORT"
916 expiry_date = date.fromisoformat(json_data["expiry_date"])
917 nationality = json_data["nationality"]
918 last_three_document_chars = json_data["document_number"][-3:]
920 existing_attempt = session.execute(
921 select(StrongVerificationAttempt)
922 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
923 .where(StrongVerificationAttempt.passport_nationality == nationality)
924 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
925 .order_by(StrongVerificationAttempt.id)
926 .limit(1)
927 ).scalar_one_or_none()
929 verification_attempt.has_minimal_data = True
930 verification_attempt.passport_expiry_date = expiry_date
931 verification_attempt.passport_nationality = nationality
932 verification_attempt.passport_last_three_document_chars = last_three_document_chars
934 if existing_attempt:
935 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
937 if existing_attempt.user_id != verification_attempt.user_id:
938 session.flush()
939 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
941 notify(
942 session,
943 user_id=verification_attempt.user_id,
944 topic_action=NotificationTopicAction.verification__sv_fail,
945 key="",
946 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
947 )
948 return
950 verification_attempt.has_full_data = True
951 verification_attempt.passport_encrypted_data = asym_encrypt(
952 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
953 )
954 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
955 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
956 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
958 session.flush()
960 strong_verification_completions_counter.inc()
962 user = verification_attempt.user
963 if verification_attempt.has_strong_verification(user): 963 ↛ 978line 963 didn't jump to line 978 because the condition on line 963 was always true
964 badge_id = "strong_verification"
965 if session.execute(
966 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
967 ).scalar_one_or_none():
968 return
970 user_add_badge(session, user.id, badge_id, do_notify=False)
971 notify(
972 session,
973 user_id=verification_attempt.user_id,
974 topic_action=NotificationTopicAction.verification__sv_success,
975 key="",
976 )
977 else:
978 notify(
979 session,
980 user_id=verification_attempt.user_id,
981 topic_action=NotificationTopicAction.verification__sv_fail,
982 key="",
983 data=notification_data_pb2.VerificationSVFail(
984 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
985 ),
986 )
989def send_activeness_probes(payload: empty_pb2.Empty) -> None:
990 with session_scope() as session:
991 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
993 if config["ACTIVENESS_PROBES_ENABLED"]:
994 # current activeness probes
995 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
997 # users who we should send an activeness probe to
998 new_probe_user_ids = (
999 session.execute(
1000 select(User.id)
1001 .where(User.is_visible)
1002 .where(User.hosting_status == HostingStatus.can_host)
1003 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
1004 .where(User.id.not_in(select(subquery.c.user_id)))
1005 )
1006 .scalars()
1007 .all()
1008 )
1010 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
1011 probes_today = session.execute(
1012 select(func.count())
1013 .select_from(ActivenessProbe)
1014 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
1015 ).scalar_one()
1017 # send probes to max 2% of users per day
1018 max_probes_per_day = 0.02 * total_users
1019 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
1021 if len(new_probe_user_ids) > max_probe_size: 1021 ↛ 1022line 1021 didn't jump to line 1022 because the condition on line 1021 was never true
1022 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
1024 for user_id in new_probe_user_ids:
1025 session.add(ActivenessProbe(user_id=user_id))
1027 session.commit()
1029 ## Step 2: actually send out probe notifications
1030 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
1031 probes = (
1032 session.execute(
1033 select(ActivenessProbe)
1034 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
1035 .where(ActivenessProbe.probe_initiated + delay < func.now())
1036 .where(ActivenessProbe.is_pending)
1037 )
1038 .scalars()
1039 .all()
1040 )
1042 for probe in probes:
1043 probe.notifications_sent = probe_number_minus_1 + 1
1044 context = make_background_user_context(user_id=probe.user.id)
1045 notify(
1046 session,
1047 user_id=probe.user.id,
1048 topic_action=NotificationTopicAction.activeness__probe,
1049 key=str(probe.id),
1050 data=notification_data_pb2.ActivenessProbe(
1051 reminder_number=probe_number_minus_1 + 1,
1052 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1053 ),
1054 )
1055 session.commit()
1057 ## Step 3: for those who haven't responded, mark them as failed
1058 expired_probes = (
1059 session.execute(
1060 select(ActivenessProbe)
1061 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1062 .where(ActivenessProbe.is_pending)
1063 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1064 )
1065 .scalars()
1066 .all()
1067 )
1069 for probe in expired_probes:
1070 probe.responded = now()
1071 probe.response = ActivenessProbeStatus.expired
1072 if probe.user.hosting_status == HostingStatus.can_host: 1072 ↛ 1074line 1072 didn't jump to line 1074 because the condition on line 1072 was always true
1073 probe.user.hosting_status = HostingStatus.maybe
1074 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1074 ↛ 1076line 1074 didn't jump to line 1076 because the condition on line 1074 was always true
1075 probe.user.meetup_status = MeetupStatus.open_to_meetup
1076 session.commit()
1079def update_randomized_locations(payload: empty_pb2.Empty) -> None:
1080 """
1081 We generate for each user a randomized location as follows:
1082 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1083 - For each user, mix in the user_id for randomness
1084 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1085 - Generate an angle from [0, 360]
1086 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1087 """
1088 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1090 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]:
1091 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1092 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1093 radius = 0.02 + 0.08 * radius_u
1094 angle_rad = 2 * pi * angle_u
1095 offset_lng = radius * cos(angle_rad)
1096 offset_lat = radius * sin(angle_rad)
1097 return lat + offset_lat, lng + offset_lng
1099 user_updates: list[dict[str, Any]] = []
1101 with session_scope() as session:
1102 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1104 for user_id, geom in users_to_update:
1105 lat, lng = get_coordinates(geom)
1106 user_updates.append(
1107 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1108 )
1110 with session_scope() as session:
1111 session.execute(update(User), user_updates)
1114def send_event_reminders(payload: empty_pb2.Empty) -> None:
1115 """
1116 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1117 """
1118 logger.info("Sending event reminder emails")
1120 with session_scope() as session:
1121 occurrences = (
1122 session.execute(
1123 select(EventOccurrence)
1124 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1125 .where(EventOccurrence.start_time >= now())
1126 )
1127 .scalars()
1128 .all()
1129 )
1131 for occurrence in occurrences:
1132 results = session.execute(
1133 select(User, EventOccurrenceAttendee)
1134 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1135 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1136 .where(EventOccurrenceAttendee.reminder_sent == False)
1137 ).all()
1139 for user, attendee in results:
1140 context = make_background_user_context(user_id=user.id)
1142 notify(
1143 session,
1144 user_id=user.id,
1145 topic_action=NotificationTopicAction.event__reminder,
1146 key=str(occurrence.id),
1147 data=notification_data_pb2.EventReminder(
1148 event=event_to_pb(session, occurrence, context),
1149 user=user_model_to_pb(user, session, context),
1150 ),
1151 )
1153 attendee.reminder_sent = True
1154 session.commit()
1157def check_expo_push_receipts(payload: empty_pb2.Empty) -> None:
1158 """
1159 Check Expo push receipts in batch and update delivery attempts.
1160 """
1161 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max
1163 for iteration in range(MAX_ITERATIONS): 1163 ↛ 1225line 1163 didn't jump to line 1225 because the loop on line 1163 didn't complete
1164 with session_scope() as session:
1165 # Find all delivery attempts that need receipt checking
1166 # Wait 15 minutes per Expo's recommendation before checking receipts
1167 attempts = (
1168 session.execute(
1169 select(PushNotificationDeliveryAttempt)
1170 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None)
1171 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None)
1172 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15))
1173 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24))
1174 .limit(100)
1175 )
1176 .scalars()
1177 .all()
1178 )
1180 if not attempts:
1181 logger.debug("No Expo receipts to check")
1182 return
1184 logger.info(f"Checking {len(attempts)} Expo push receipts")
1186 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts])
1188 for attempt in attempts:
1189 receipt = receipts.get(not_none(attempt.expo_ticket_id))
1191 # Always mark as checked to avoid infinite loops
1192 attempt.receipt_checked_at = now()
1194 if receipt is None:
1195 # Receipt not found after 15min - likely expired (>24h) or never existed
1196 # Per Expo docs: receipts should be available within 15 minutes
1197 attempt.receipt_status = "not_found"
1198 continue
1200 attempt.receipt_status = receipt.get("status")
1202 if receipt.get("status") == "error":
1203 details = receipt.get("details", {})
1204 error_code = details.get("error")
1205 attempt.receipt_error_code = error_code
1207 if error_code == "DeviceNotRegistered": 1207 ↛ 1222line 1207 didn't jump to line 1222 because the condition on line 1207 was always true
1208 # Device token is no longer valid - disable the subscription
1209 sub = session.execute(
1210 select(PushNotificationSubscription).where(
1211 PushNotificationSubscription.id == attempt.push_notification_subscription_id
1212 )
1213 ).scalar_one()
1215 if sub.disabled_at > now(): 1215 ↛ 1188line 1215 didn't jump to line 1188 because the condition on line 1215 was always true
1216 sub.disabled_at = now()
1217 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt")
1218 push_notification_counter.labels(
1219 platform="expo", outcome="permanent_subscription_failure_receipt"
1220 ).inc()
1221 else:
1222 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}")
1224 # If we get here, we've exhausted MAX_ITERATIONS without finishing
1225 raise RuntimeError(
1226 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - "
1227 "there may be an unusually large backlog of receipts to check"
1228 )
1231def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None:
1232 """
1233 Sends the postcard via external API and updates attempt status.
1234 """
1235 with session_scope() as session:
1236 attempt = session.execute(
1237 select(PostalVerificationAttempt).where(
1238 PostalVerificationAttempt.id == payload.postal_verification_attempt_id
1239 )
1240 ).scalar_one_or_none()
1242 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1242 ↛ 1243line 1242 didn't jump to line 1243 because the condition on line 1242 was never true
1243 logger.warning(
1244 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state"
1245 )
1246 return
1248 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one()
1250 result = send_postcard(
1251 recipient_name=user_name,
1252 address_line_1=attempt.address_line_1,
1253 address_line_2=attempt.address_line_2,
1254 city=attempt.city,
1255 state=attempt.state,
1256 postal_code=attempt.postal_code,
1257 country=attempt.country,
1258 verification_code=not_none(attempt.verification_code),
1259 qr_code_url=urls.postal_verification_link(code=not_none(attempt.verification_code)),
1260 )
1262 if result.success:
1263 attempt.status = PostalVerificationStatus.awaiting_verification
1264 attempt.postcard_sent_at = func.now()
1266 notify(
1267 session,
1268 user_id=attempt.user_id,
1269 topic_action=NotificationTopicAction.postal_verification__postcard_sent,
1270 key="",
1271 data=notification_data_pb2.PostalVerificationPostcardSent(
1272 city=attempt.city,
1273 country=attempt.country,
1274 ),
1275 )
1276 else:
1277 # Could retry or fail - for now, fail
1278 attempt.status = PostalVerificationStatus.failed
1279 logger.error(f"Postcard send failed: {result.error_message}")
1282class DatabaseInconsistencyError(Exception):
1283 """Raised when database consistency checks fail"""
1285 pass
1288def check_database_consistency(payload: empty_pb2.Empty) -> None:
1289 """
1290 Checks database consistency and raises an exception if any issues are found.
1291 """
1292 logger.info("Checking database consistency")
1293 errors = []
1295 with session_scope() as session:
1296 # Check that all non-deleted users have a profile gallery
1297 users_without_gallery = session.execute(
1298 select(User.id, User.username).where(User.is_deleted == False).where(User.profile_gallery_id.is_(None))
1299 ).all()
1300 if users_without_gallery:
1301 errors.append(f"Users without profile gallery: {users_without_gallery}")
1303 # Check that all profile galleries point to their owner
1304 mismatched_galleries = session.execute(
1305 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id)
1306 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id)
1307 .where(User.profile_gallery_id.is_not(None))
1308 .where(PhotoGallery.owner_user_id != User.id)
1309 ).all()
1310 if mismatched_galleries: 1310 ↛ 1311line 1310 didn't jump to line 1311 because the condition on line 1310 was never true
1311 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}")
1313 # === Moderation System Consistency Checks ===
1315 # Check all ModerationStates have a known object_type
1316 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT]
1317 unknown_type_states = session.execute(
1318 select(ModerationState.id, ModerationState.object_type).where(
1319 ModerationState.object_type.not_in(known_object_types)
1320 )
1321 ).all()
1322 if unknown_type_states: 1322 ↛ 1323line 1322 didn't jump to line 1323 because the condition on line 1322 was never true
1323 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}")
1325 # Check every ModerationState has at least one INITIAL_REVIEW queue item
1326 # Skip items with ID < 2000000 as they were created before this check was introduced
1327 states_without_initial_review = session.execute(
1328 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1329 ModerationState.id >= 2000000,
1330 ~exists(
1331 select(1)
1332 .where(ModerationQueueItem.moderation_state_id == ModerationState.id)
1333 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1334 ),
1335 )
1336 ).all()
1337 if states_without_initial_review: 1337 ↛ 1338line 1337 didn't jump to line 1338 because the condition on line 1337 was never true
1338 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}")
1340 # Check every ModerationState has a CREATE log entry
1341 # Skip items with ID < 2000000 as they were created before this check was introduced
1342 states_without_create_log = session.execute(
1343 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1344 ModerationState.id >= 2000000,
1345 ~exists(
1346 select(1)
1347 .where(ModerationLog.moderation_state_id == ModerationState.id)
1348 .where(ModerationLog.action == ModerationAction.CREATE)
1349 ),
1350 )
1351 ).all()
1352 if states_without_create_log: 1352 ↛ 1353line 1352 didn't jump to line 1353 because the condition on line 1352 was never true
1353 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}")
1355 # Check resolved queue items point to log entries for the same moderation state
1356 resolved_item_log_mismatches = session.execute(
1357 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id)
1358 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1359 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1360 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id)
1361 ).all()
1362 if resolved_item_log_mismatches: 1362 ↛ 1363line 1362 didn't jump to line 1363 because the condition on line 1362 was never true
1363 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}")
1365 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it
1366 hr_states = (
1367 session.execute(
1368 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1369 )
1370 .scalars()
1371 .all()
1372 )
1373 for state_id in hr_states: 1373 ↛ 1374line 1373 didn't jump to line 1374 because the loop on line 1373 never started
1374 hr_count = session.execute(
1375 select(func.count()).where(HostRequest.moderation_state_id == state_id)
1376 ).scalar_one()
1377 if hr_count != 1:
1378 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)")
1380 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it
1381 gc_states = (
1382 session.execute(
1383 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1384 )
1385 .scalars()
1386 .all()
1387 )
1388 for state_id in gc_states: 1388 ↛ 1389line 1388 didn't jump to line 1389 because the loop on line 1388 never started
1389 gc_count = session.execute(
1390 select(func.count()).where(GroupChat.moderation_state_id == state_id)
1391 ).scalar_one()
1392 if gc_count != 1:
1393 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)")
1395 # Check ModerationState.object_id matches the actual object's ID
1396 hr_object_id_mismatches = session.execute(
1397 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id)
1398 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id)
1399 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1400 .where(ModerationState.object_id != HostRequest.conversation_id)
1401 ).all()
1402 if hr_object_id_mismatches: 1402 ↛ 1403line 1402 didn't jump to line 1403 because the condition on line 1402 was never true
1403 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}")
1405 gc_object_id_mismatches = session.execute(
1406 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id)
1407 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id)
1408 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1409 .where(ModerationState.object_id != GroupChat.conversation_id)
1410 ).all()
1411 if gc_object_id_mismatches: 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true
1412 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}")
1414 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState
1415 hr_reverse_mismatches = session.execute(
1416 select(
1417 HostRequest.conversation_id,
1418 HostRequest.moderation_state_id,
1419 ModerationState.object_type,
1420 ModerationState.object_id,
1421 )
1422 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id)
1423 .where(
1424 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST)
1425 | (ModerationState.object_id != HostRequest.conversation_id)
1426 )
1427 ).all()
1428 if hr_reverse_mismatches: 1428 ↛ 1429line 1428 didn't jump to line 1429 because the condition on line 1428 was never true
1429 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}")
1431 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState
1432 gc_reverse_mismatches = session.execute(
1433 select(
1434 GroupChat.conversation_id,
1435 GroupChat.moderation_state_id,
1436 ModerationState.object_type,
1437 ModerationState.object_id,
1438 )
1439 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id)
1440 .where(
1441 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT)
1442 | (ModerationState.object_id != GroupChat.conversation_id)
1443 )
1444 ).all()
1445 if gc_reverse_mismatches: 1445 ↛ 1446line 1445 didn't jump to line 1446 because the condition on line 1445 was never true
1446 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}")
1448 # Ensure auto-approve deadline isn't being exceeded by a significant margin
1449 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting
1450 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1451 if deadline_seconds > 0: 1451 ↛ 1452line 1451 didn't jump to line 1452 because the condition on line 1451 was never true
1452 grace_period = timedelta(minutes=5)
1453 stale_initial_review_items = session.execute(
1454 select(
1455 ModerationQueueItem.id,
1456 ModerationQueueItem.moderation_state_id,
1457 ModerationQueueItem.time_created,
1458 )
1459 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1460 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
1461 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period)
1462 ).all()
1463 if stale_initial_review_items:
1464 errors.append(
1465 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}"
1466 )
1468 if errors:
1469 raise DatabaseInconsistencyError("\n".join(errors))
1472def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None:
1473 """
1474 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline.
1475 Items explicitly actioned by moderators are left alone.
1476 """
1477 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1478 if deadline_seconds <= 0:
1479 return
1481 with session_scope() as session:
1482 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"])
1484 items = (
1485 Moderation()
1486 .GetModerationQueue(
1487 request=moderation_pb2.GetModerationQueueReq(
1488 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW],
1489 unresolved_only=True,
1490 page_size=100,
1491 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)),
1492 ),
1493 context=ctx,
1494 session=session,
1495 )
1496 .queue_items
1497 )
1499 if not items:
1500 return
1502 logger.info(f"Auto-approving {len(items)} moderation queue items")
1503 for item in items:
1504 Moderation().ModerateContent(
1505 request=moderation_pb2.ModerateContentReq(
1506 moderation_state_id=item.moderation_state_id,
1507 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1508 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1509 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.",
1510 ),
1511 context=ctx,
1512 session=session,
1513 )
1514 moderation_auto_approved_counter.inc(len(items))