Coverage for app / backend / src / couchers / jobs / handlers.py: 88%
462 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1"""
2Background job servicers
3"""
5import logging
6from collections.abc import Sequence
7from datetime import date, timedelta
8from math import cos, pi, sin, sqrt
9from random import sample
10from typing import Any
12import requests
13from google.protobuf import empty_pb2
14from sqlalchemy import ColumnElement, Float, Function, Integer, select
15from sqlalchemy.orm import aliased
16from sqlalchemy.sql import (
17 and_,
18 case,
19 cast,
20 delete,
21 distinct,
22 exists,
23 extract,
24 func,
25 literal,
26 not_,
27 or_,
28 union_all,
29 update,
30)
32from couchers import urls
33from couchers.config import config
34from couchers.constants import (
35 ACTIVENESS_PROBE_EXPIRY_TIME,
36 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
37 ACTIVENESS_PROBE_TIME_REMINDERS,
38 EVENT_REMINDER_TIMEDELTA,
39 HOST_REQUEST_MAX_REMINDERS,
40 HOST_REQUEST_REMINDER_INTERVAL,
41)
42from couchers.context import make_background_user_context
43from couchers.crypto import (
44 USER_LOCATION_RANDOMIZATION_NAME,
45 asym_encrypt,
46 b64decode,
47 get_secret,
48 simple_decrypt,
49 stable_secure_uniform,
50)
51from couchers.db import session_scope
52from couchers.email.dev import print_dev_email
53from couchers.email.smtp import send_smtp_email
54from couchers.helpers.badges import user_add_badge, user_remove_badge
55from couchers.helpers.completed_profile import has_completed_profile_expression
56from couchers.materialized_views import (
57 UserResponseRate,
58)
59from couchers.metrics import (
60 moderation_auto_approved_counter,
61 push_notification_counter,
62 strong_verification_completions_counter,
63)
64from couchers.models import (
65 AccountDeletionToken,
66 ActivenessProbe,
67 ActivenessProbeStatus,
68 Cluster,
69 ClusterRole,
70 ClusterSubscription,
71 EventOccurrence,
72 EventOccurrenceAttendee,
73 GroupChat,
74 GroupChatSubscription,
75 HostingStatus,
76 HostRequest,
77 HostRequestStatus,
78 LoginToken,
79 MeetupStatus,
80 Message,
81 MessageType,
82 ModerationAction,
83 ModerationLog,
84 ModerationObjectType,
85 ModerationQueueItem,
86 ModerationState,
87 ModerationTrigger,
88 PassportSex,
89 PasswordResetToken,
90 PhotoGallery,
91 PostalVerificationAttempt,
92 PostalVerificationStatus,
93 PushNotificationDeliveryAttempt,
94 PushNotificationSubscription,
95 Reference,
96 StrongVerificationAttempt,
97 StrongVerificationAttemptStatus,
98 User,
99 UserBadge,
100 Volunteer,
101)
102from couchers.models.notifications import NotificationTopicAction
103from couchers.notifications.expo_api import get_expo_push_receipts
104from couchers.notifications.notify import notify
105from couchers.postal.postcard_service import send_postcard
106from couchers.proto import moderation_pb2, notification_data_pb2
107from couchers.proto.internal import internal_pb2, jobs_pb2
108from couchers.resources import get_badge_dict, get_static_badge_dict
109from couchers.servicers.api import user_model_to_pb
110from couchers.servicers.events import (
111 event_to_pb,
112)
113from couchers.servicers.moderation import Moderation
114from couchers.servicers.requests import host_request_to_pb
115from couchers.sql import (
116 users_visible_to_each_other,
117 where_moderated_content_visible,
118 where_moderated_content_visible_to_user_column,
119 where_user_columns_visible_to_each_other,
120 where_users_column_visible,
121)
122from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
123from couchers.tasks import send_duplicate_strong_verification_email
124from couchers.utils import (
125 Timestamp_from_datetime,
126 create_coordinate,
127 get_coordinates,
128 not_none,
129 now,
130)
132logger = logging.getLogger(__name__)
135def send_email(payload: jobs_pb2.SendEmailPayload) -> None:
136 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
137 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
138 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
139 # the sender must return a models.Email object that can be added to the database
140 email = sender(
141 sender_name=payload.sender_name,
142 sender_email=payload.sender_email,
143 recipient=payload.recipient,
144 subject=payload.subject,
145 plain=payload.plain,
146 html=payload.html,
147 list_unsubscribe_header=payload.list_unsubscribe_header,
148 source_data=payload.source_data,
149 )
150 with session_scope() as session:
151 session.add(email)
154def purge_login_tokens(payload: empty_pb2.Empty) -> None:
155 logger.info("Purging login tokens")
156 with session_scope() as session:
157 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
160def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None:
161 logger.info("Purging login tokens")
162 with session_scope() as session:
163 session.execute(
164 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
165 )
168def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None:
169 logger.info("Purging account deletion tokens")
170 with session_scope() as session:
171 session.execute(
172 delete(AccountDeletionToken)
173 .where(~AccountDeletionToken.is_valid)
174 .execution_options(synchronize_session=False)
175 )
178def send_message_notifications(payload: empty_pb2.Empty) -> None:
179 """
180 Sends out email notifications for messages that have been unseen for a long enough time
181 """
182 # very crude and dumb algorithm
183 logger.info("Sending out email notifications for unseen messages")
185 with session_scope() as session:
186 # users who have unnotified messages older than 5 minutes in any group chat
187 users = (
188 session.execute(
189 where_moderated_content_visible_to_user_column(
190 select(User)
191 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
192 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
193 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
194 GroupChat,
195 User.id,
196 )
197 .where(not_(GroupChatSubscription.is_muted))
198 .where(User.is_visible)
199 .where(Message.time >= GroupChatSubscription.joined)
200 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
201 .where(Message.id > User.last_notified_message_id)
202 .where(Message.id > GroupChatSubscription.last_seen_message_id)
203 .where(Message.time < now() - timedelta(minutes=5))
204 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
205 )
206 .scalars()
207 .unique()
208 )
210 for user in users:
211 context = make_background_user_context(user_id=user.id)
212 # now actually grab all the group chats, not just less than 5 min old
213 subquery = (
214 where_users_column_visible(
215 where_moderated_content_visible(
216 select(
217 GroupChatSubscription.group_chat_id.label("group_chat_id"),
218 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
219 func.max(Message.id).label("message_id"),
220 func.count(Message.id).label("count_unseen"),
221 )
222 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
223 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
224 context,
225 GroupChat,
226 is_list_operation=True,
227 )
228 .where(GroupChatSubscription.user_id == user.id)
229 .where(not_(GroupChatSubscription.is_muted))
230 .where(Message.id > user.last_notified_message_id)
231 .where(Message.id > GroupChatSubscription.last_seen_message_id)
232 .where(Message.time >= GroupChatSubscription.joined)
233 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
234 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)),
235 context,
236 Message.author_id,
237 )
238 .group_by(GroupChatSubscription.group_chat_id)
239 .order_by(func.max(Message.id).desc())
240 .subquery()
241 )
243 unseen_messages = session.execute(
244 where_moderated_content_visible(
245 select(GroupChat, Message, subquery.c.count_unseen)
246 .join(subquery, subquery.c.message_id == Message.id)
247 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id),
248 context,
249 GroupChat,
250 is_list_operation=True,
251 ).order_by(subquery.c.message_id.desc())
252 ).all()
254 if not unseen_messages:
255 continue
257 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
259 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str:
260 if group_chat.is_dm:
261 return f"You missed {count_unseen} message(s) from {message.author.name}"
262 else:
263 return f"You missed {count_unseen} message(s) in {group_chat.title}"
265 notify(
266 session,
267 user_id=user.id,
268 topic_action=NotificationTopicAction.chat__missed_messages,
269 key="",
270 data=notification_data_pb2.ChatMissedMessages(
271 messages=[
272 notification_data_pb2.ChatMessage(
273 author=user_model_to_pb(
274 message.author,
275 session,
276 context,
277 ),
278 message=format_title(message, group_chat, count_unseen),
279 text=message.text,
280 group_chat_id=message.conversation_id,
281 )
282 for group_chat, message, count_unseen in unseen_messages
283 ],
284 ),
285 )
286 session.commit()
289def send_request_notifications(payload: empty_pb2.Empty) -> None:
290 """
291 Sends out email notifications for unseen messages in host requests (as surfer or host)
292 """
293 logger.info("Sending out email notifications for unseen messages in host requests")
295 with session_scope() as session:
296 # Get all candidate users who might have unseen request messages
297 candidate_user_ids = (
298 session.execute(
299 select(User.id)
300 .where(User.is_visible)
301 .where(
302 or_(
303 # Users with unseen messages as surfer
304 exists(
305 select(1)
306 .select_from(HostRequest)
307 .join(Message, Message.conversation_id == HostRequest.conversation_id)
308 .where(HostRequest.surfer_user_id == User.id)
309 .where(Message.id > HostRequest.surfer_last_seen_message_id)
310 .where(Message.id > User.last_notified_request_message_id)
311 .where(Message.time < now() - timedelta(minutes=5))
312 .where(Message.message_type == MessageType.text)
313 ),
314 # Users with unseen messages as host
315 exists(
316 select(1)
317 .select_from(HostRequest)
318 .join(Message, Message.conversation_id == HostRequest.conversation_id)
319 .where(HostRequest.host_user_id == User.id)
320 .where(Message.id > HostRequest.host_last_seen_message_id)
321 .where(Message.id > User.last_notified_request_message_id)
322 .where(Message.time < now() - timedelta(minutes=5))
323 .where(Message.message_type == MessageType.text)
324 ),
325 )
326 )
327 )
328 .scalars()
329 .all()
330 )
332 for user_id in candidate_user_ids:
333 context = make_background_user_context(user_id=user_id)
335 # requests where this user is surfing
336 surfing_reqs = session.execute(
337 where_users_column_visible(
338 where_moderated_content_visible_to_user_column(
339 select(User, HostRequest, func.max(Message.id))
340 .where(User.id == user_id)
341 .join(HostRequest, HostRequest.surfer_user_id == User.id),
342 HostRequest,
343 HostRequest.surfer_user_id,
344 ),
345 context,
346 HostRequest.host_user_id,
347 )
348 .join(Message, Message.conversation_id == HostRequest.conversation_id)
349 .where(Message.id > HostRequest.surfer_last_seen_message_id)
350 .where(Message.id > User.last_notified_request_message_id)
351 .where(Message.time < now() - timedelta(minutes=5))
352 .where(Message.message_type == MessageType.text)
353 .group_by(User, HostRequest) # type: ignore[arg-type]
354 ).all()
356 # where this user is hosting
357 hosting_reqs = session.execute(
358 where_users_column_visible(
359 where_moderated_content_visible_to_user_column(
360 select(User, HostRequest, func.max(Message.id))
361 .where(User.id == user_id)
362 .join(HostRequest, HostRequest.host_user_id == User.id),
363 HostRequest,
364 HostRequest.host_user_id,
365 ),
366 context,
367 HostRequest.surfer_user_id,
368 )
369 .join(Message, Message.conversation_id == HostRequest.conversation_id)
370 .where(Message.id > HostRequest.host_last_seen_message_id)
371 .where(Message.id > User.last_notified_request_message_id)
372 .where(Message.time < now() - timedelta(minutes=5))
373 .where(Message.message_type == MessageType.text)
374 .group_by(User, HostRequest) # type: ignore[arg-type]
375 ).all()
377 for user, host_request, max_message_id in surfing_reqs:
378 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
379 session.flush()
381 notify(
382 session,
383 user_id=user.id,
384 topic_action=NotificationTopicAction.host_request__missed_messages,
385 key=str(host_request.conversation_id),
386 data=notification_data_pb2.HostRequestMissedMessages(
387 host_request=host_request_to_pb(host_request, session, context),
388 user=user_model_to_pb(host_request.host, session, context),
389 am_host=False,
390 ),
391 )
393 for user, host_request, max_message_id in hosting_reqs:
394 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
395 session.flush()
397 notify(
398 session,
399 user_id=user.id,
400 topic_action=NotificationTopicAction.host_request__missed_messages,
401 key=str(host_request.conversation_id),
402 data=notification_data_pb2.HostRequestMissedMessages(
403 host_request=host_request_to_pb(host_request, session, context),
404 user=user_model_to_pb(host_request.surfer, session, context),
405 am_host=True,
406 ),
407 )
410def send_onboarding_emails(payload: empty_pb2.Empty) -> None:
411 """
412 Sends out onboarding emails
413 """
414 logger.info("Sending out onboarding emails")
416 with session_scope() as session:
417 # first onboarding email
418 users = (
419 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
420 )
422 for user in users:
423 notify(
424 session,
425 user_id=user.id,
426 topic_action=NotificationTopicAction.onboarding__reminder,
427 key="1",
428 )
429 user.onboarding_emails_sent = 1
430 user.last_onboarding_email_sent = now()
431 session.commit()
433 # second onboarding email
434 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
435 users = (
436 session.execute(
437 select(User)
438 .where(User.is_visible)
439 .where(User.onboarding_emails_sent == 1)
440 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
441 .where(~has_completed_profile_expression())
442 )
443 .scalars()
444 .all()
445 )
447 for user in users:
448 notify(
449 session,
450 user_id=user.id,
451 topic_action=NotificationTopicAction.onboarding__reminder,
452 key="2",
453 )
454 user.onboarding_emails_sent = 2
455 user.last_onboarding_email_sent = now()
456 session.commit()
459def send_reference_reminders(payload: empty_pb2.Empty) -> None:
460 """
461 Sends out reminders to write references after hosting/staying
462 """
463 logger.info("Sending out reference reminder emails")
465 # Keep this in chronological order!
466 reference_reminder_schedule = [
467 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
468 # the end time to write a reference is supposed to be midnight in the host's timezone
469 # 8 pm ish on the last day of the stay
470 (1, timedelta(days=15) - timedelta(hours=20), 14),
471 # 2 pm ish a week after stay
472 (2, timedelta(days=8) - timedelta(hours=14), 7),
473 # 10 am ish 3 days before end of time to write ref
474 (3, timedelta(days=4) - timedelta(hours=10), 3),
475 ]
477 with session_scope() as session:
478 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
479 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
480 user = aliased(User)
481 other_user = aliased(User)
482 # surfers needing to write a ref
483 q1 = (
484 select(literal(True), HostRequest, user, other_user)
485 .join(user, user.id == HostRequest.surfer_user_id)
486 .join(other_user, other_user.id == HostRequest.host_user_id)
487 .outerjoin(
488 Reference,
489 and_(
490 Reference.host_request_id == HostRequest.conversation_id,
491 # if no reference is found in this join, then the surfer has not written a ref
492 Reference.from_user_id == HostRequest.surfer_user_id,
493 ),
494 )
495 .where(Reference.id == None)
496 .where(HostRequest.can_write_reference)
497 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
498 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
499 .where(HostRequest.surfer_reason_didnt_meetup == None)
500 .where(users_visible_to_each_other(user, other_user))
501 )
503 # hosts needing to write a ref
504 q2 = (
505 select(literal(False), HostRequest, user, other_user)
506 .join(user, user.id == HostRequest.host_user_id)
507 .join(other_user, other_user.id == HostRequest.surfer_user_id)
508 .outerjoin(
509 Reference,
510 and_(
511 Reference.host_request_id == HostRequest.conversation_id,
512 # if no reference is found in this join, then the host has not written a ref
513 Reference.from_user_id == HostRequest.host_user_id,
514 ),
515 )
516 .where(Reference.id == None)
517 .where(HostRequest.can_write_reference)
518 .where(HostRequest.host_sent_reference_reminders < reminder_number)
519 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
520 .where(HostRequest.host_reason_didnt_meetup == None)
521 .where(users_visible_to_each_other(user, other_user))
522 )
524 union = union_all(q1, q2).subquery()
525 query = select(
526 union.c[0].label("surfed"),
527 aliased(HostRequest, union),
528 aliased(user, union),
529 aliased(other_user, union),
530 )
531 reference_reminders = session.execute(query).all()
533 for surfed, host_request, user, other_user in reference_reminders:
534 # visibility and blocking already checked in sql
535 assert user.is_visible
536 context = make_background_user_context(user_id=user.id)
537 topic_action = (
538 NotificationTopicAction.reference__reminder_surfed
539 if surfed
540 else NotificationTopicAction.reference__reminder_hosted
541 )
542 notify(
543 session,
544 user_id=user.id,
545 topic_action=topic_action,
546 key=str(host_request.conversation_id),
547 data=notification_data_pb2.ReferenceReminder(
548 host_request_id=host_request.conversation_id,
549 other_user=user_model_to_pb(other_user, session, context),
550 days_left=reminder_days_left,
551 ),
552 )
553 if surfed:
554 host_request.surfer_sent_reference_reminders = reminder_number
555 else:
556 host_request.host_sent_reference_reminders = reminder_number
557 session.commit()
560def send_host_request_reminders(payload: empty_pb2.Empty) -> None:
561 with session_scope() as session:
562 host_has_sent_message = select(1).where(
563 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id
564 )
566 requests = (
567 session.execute(
568 where_user_columns_visible_to_each_other(
569 where_moderated_content_visible_to_user_column(
570 select(HostRequest),
571 HostRequest,
572 HostRequest.host_user_id,
573 )
574 .where(HostRequest.status == HostRequestStatus.pending)
575 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
576 .where(HostRequest.start_time > func.now())
577 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
578 .where(~exists(host_has_sent_message)),
579 HostRequest.host_user_id,
580 HostRequest.surfer_user_id,
581 )
582 )
583 .scalars()
584 .all()
585 )
587 for host_request in requests:
588 host_request.host_sent_request_reminders += 1
589 host_request.last_sent_request_reminder_time = now()
591 context = make_background_user_context(user_id=host_request.host_user_id)
592 notify(
593 session,
594 user_id=host_request.host_user_id,
595 topic_action=NotificationTopicAction.host_request__reminder,
596 key=str(host_request.conversation_id),
597 data=notification_data_pb2.HostRequestReminder(
598 host_request=host_request_to_pb(host_request, session, context),
599 surfer=user_model_to_pb(host_request.surfer, session, context),
600 ),
601 moderation_state_id=host_request.moderation_state_id,
602 )
604 session.commit()
607def add_users_to_email_list(payload: empty_pb2.Empty) -> None:
608 if not config["LISTMONK_ENABLED"]: 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true
609 logger.info("Not adding users to mailing list")
610 return
612 logger.info("Adding users to mailing list")
614 while True:
615 with session_scope() as session:
616 user = session.execute(
617 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
618 ).scalar_one_or_none()
619 if not user:
620 logger.info("Finished adding users to mailing list")
621 return
623 if user.opt_out_of_newsletter:
624 user.in_sync_with_newsletter = True
625 session.commit()
626 continue
628 r = requests.post(
629 config["LISTMONK_BASE_URL"] + "/api/subscribers",
630 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
631 json={
632 "email": user.email,
633 "name": user.name,
634 "lists": [config["LISTMONK_LIST_ID"]],
635 "preconfirm_subscriptions": True,
636 "attribs": {"couchers_user_id": user.id},
637 "status": "enabled",
638 },
639 timeout=10,
640 )
641 # the API returns if the user is already subscribed
642 if r.status_code == 200 or r.status_code == 409: 642 ↛ 646line 642 didn't jump to line 646 because the condition on line 642 was always true
643 user.in_sync_with_newsletter = True
644 session.commit()
645 else:
646 raise Exception("Failed to add users to mailing list")
649def enforce_community_membership(payload: empty_pb2.Empty) -> None:
650 tasks_enforce_community_memberships()
653def update_recommendation_scores(payload: empty_pb2.Empty) -> None:
654 text_fields = [
655 User.hometown,
656 User.occupation,
657 User.education,
658 User.about_me,
659 User.things_i_like,
660 User.about_place,
661 User.additional_information,
662 User.pet_details,
663 User.kid_details,
664 User.housemate_details,
665 User.other_host_info,
666 User.sleeping_details,
667 User.area,
668 User.house_rules,
669 ]
670 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
672 def poor_man_gaussian() -> ColumnElement[float] | float:
673 """
674 Produces an approximatley std normal random variate
675 """
676 trials = 5
677 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
679 def int_(stmt: Any) -> Function[int]:
680 return func.coalesce(cast(stmt, Integer), 0)
682 def float_(stmt: Any) -> Function[float]:
683 return func.coalesce(cast(stmt, Float), 0.0)
685 with session_scope() as session:
686 # profile
687 profile_text = ""
688 for field in text_fields:
689 profile_text += func.coalesce(field, "") # type: ignore[assignment]
690 text_length = func.length(profile_text)
691 home_text = ""
692 for field in home_fields:
693 home_text += func.coalesce(field, "") # type: ignore[assignment]
694 home_length = func.length(home_text)
696 filled_profile = int_(has_completed_profile_expression())
697 has_text = int_(text_length > 500)
698 long_text = int_(text_length > 2000)
699 can_host = int_(User.hosting_status == HostingStatus.can_host)
700 may_host = int_(User.hosting_status == HostingStatus.maybe)
701 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
702 filled_home = int_(User.has_completed_my_home)
703 filled_home_lots = int_(home_length > 200)
704 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
705 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
707 # references
708 left_ref_expr = int_(1).label("left_reference")
709 left_refs_subquery = (
710 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
711 )
712 left_reference = int_(left_refs_subquery.c.left_reference)
713 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
714 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
715 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
716 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
717 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
718 "has_bad_ref"
719 )
720 received_ref_subquery = (
721 select(
722 Reference.to_user_id.label("user_id"),
723 has_reference_expr,
724 has_multiple_types_expr,
725 has_bad_ref_expr,
726 ref_count_expr,
727 ref_avg_expr,
728 )
729 .group_by(Reference.to_user_id)
730 .subquery()
731 )
732 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
733 has_reference = int_(received_ref_subquery.c.has_reference)
734 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
735 rating_score = float_(
736 received_ref_subquery.c.ref_avg
737 * (
738 2 * func.least(received_ref_subquery.c.ref_count, 5)
739 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
740 )
741 )
742 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
744 # activeness
745 recently_active = int_(User.last_active >= now() - timedelta(days=180))
746 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
747 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
748 messaged_lots = int_(func.count(Message.id) > 5)
749 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
750 messaging_subquery = (
751 select(Message.author_id.label("user_id"), messaging_points_subquery)
752 .where(Message.message_type == MessageType.text)
753 .group_by(Message.author_id)
754 .subquery()
755 )
756 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
758 # verification
759 cb_subquery = (
760 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
761 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
762 .where(ClusterSubscription.role == ClusterRole.admin)
763 .where(Cluster.is_official_cluster)
764 .group_by(ClusterSubscription.user_id)
765 .subquery()
766 )
767 min_node_id = cb_subquery.c.min_node_id
768 cb = int_(min_node_id >= 1)
769 wcb = int_(min_node_id == 1)
770 badge_points = {
771 "founder": 100,
772 "board_member": 20,
773 "past_board_member": 5,
774 "strong_verification": 3,
775 "volunteer": 3,
776 "past_volunteer": 2,
777 "donor": 1,
778 "phone_verified": 1,
779 }
781 badge_subquery = (
782 select(
783 UserBadge.user_id.label("user_id"),
784 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
785 )
786 .group_by(UserBadge.user_id)
787 .subquery()
788 )
790 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
792 # response rate
793 hr_subquery = select(
794 UserResponseRate.user_id,
795 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
796 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
797 ).subquery()
798 response_time_33p = hr_subquery.c.response_time_33p
799 response_time_66p = hr_subquery.c.response_time_66p
800 # be careful with nulls
801 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
803 recommendation_score = (
804 hosting_status_points
805 + profile_points
806 + ref_score
807 + activeness_points
808 + other_points
809 + response_rate_points
810 + 2 * poor_man_gaussian()
811 )
813 scores = (
814 select(User.id.label("user_id"), recommendation_score.label("score"))
815 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
816 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
817 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
818 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
819 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
820 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
821 ).subquery()
823 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id))
825 logger.info("Updated recommendation scores")
828def update_badges(payload: empty_pb2.Empty) -> None:
829 with session_scope() as session:
831 def update_badge(badge_id: str, members: Sequence[int]) -> None:
832 badge = get_badge_dict()[badge_id]
833 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all()
834 # in case the user ids don't exist in the db
835 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
836 # we should add the badge to these
837 add = set(actual_members) - set(user_ids)
838 # we should remove the badge from these
839 remove = set(user_ids) - set(actual_members)
840 for user_id in add:
841 user_add_badge(session, user_id, badge.id)
843 for user_id in remove:
844 user_remove_badge(session, user_id, badge.id)
846 update_badge("founder", get_static_badge_dict()["founder"])
847 update_badge("board_member", get_static_badge_dict()["board_member"])
848 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
849 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all())
850 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
851 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
852 # strong verification requires passport on file + gender/sex correspondence and date of birth match
853 update_badge(
854 "strong_verification",
855 session.execute(
856 select(User.id)
857 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
858 .where(StrongVerificationAttempt.has_strong_verification(User))
859 )
860 .scalars()
861 .all(),
862 )
863 # volunteer badge for active volunteers (stopped_volunteering is null)
864 update_badge(
865 "volunteer",
866 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(),
867 )
868 # past_volunteer badge for past volunteers (stopped_volunteering is not null)
869 update_badge(
870 "past_volunteer",
871 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None)))
872 .scalars()
873 .all(),
874 )
877def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None:
878 with session_scope() as session:
879 verification_attempt = session.execute(
880 select(StrongVerificationAttempt)
881 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
882 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
883 ).scalar_one()
884 response = requests.post(
885 "https://passportreader.app/api/v1/session.get",
886 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
887 json={"id": verification_attempt.iris_session_id},
888 timeout=10,
889 verify="/etc/ssl/certs/ca-certificates.crt",
890 )
891 if response.status_code != 200: 891 ↛ 892line 891 didn't jump to line 892 because the condition on line 891 was never true
892 raise Exception(f"Iris didn't return 200: {response.text}")
893 json_data = response.json()
894 reference_payload = internal_pb2.VerificationReferencePayload.FromString(
895 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
896 )
897 assert verification_attempt.user_id == reference_payload.user_id
898 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
899 assert verification_attempt.iris_session_id == json_data["id"]
900 assert json_data["state"] == "APPROVED"
902 if json_data["document_type"] != "PASSPORT":
903 verification_attempt.status = StrongVerificationAttemptStatus.failed
904 notify(
905 session,
906 user_id=verification_attempt.user_id,
907 topic_action=NotificationTopicAction.verification__sv_fail,
908 key="",
909 data=notification_data_pb2.VerificationSVFail(
910 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
911 ),
912 )
913 return
915 assert json_data["document_type"] == "PASSPORT"
917 expiry_date = date.fromisoformat(json_data["expiry_date"])
918 nationality = json_data["nationality"]
919 last_three_document_chars = json_data["document_number"][-3:]
921 existing_attempt = session.execute(
922 select(StrongVerificationAttempt)
923 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
924 .where(StrongVerificationAttempt.passport_nationality == nationality)
925 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
926 .order_by(StrongVerificationAttempt.id)
927 .limit(1)
928 ).scalar_one_or_none()
930 verification_attempt.has_minimal_data = True
931 verification_attempt.passport_expiry_date = expiry_date
932 verification_attempt.passport_nationality = nationality
933 verification_attempt.passport_last_three_document_chars = last_three_document_chars
935 if existing_attempt:
936 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
938 if existing_attempt.user_id != verification_attempt.user_id:
939 session.flush()
940 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
942 notify(
943 session,
944 user_id=verification_attempt.user_id,
945 topic_action=NotificationTopicAction.verification__sv_fail,
946 key="",
947 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
948 )
949 return
951 verification_attempt.has_full_data = True
952 verification_attempt.passport_encrypted_data = asym_encrypt(
953 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
954 )
955 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
956 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
957 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
959 session.flush()
961 strong_verification_completions_counter.inc()
963 user = verification_attempt.user
964 if verification_attempt.has_strong_verification(user): 964 ↛ 979line 964 didn't jump to line 979 because the condition on line 964 was always true
965 badge_id = "strong_verification"
966 if session.execute(
967 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
968 ).scalar_one_or_none():
969 return
971 user_add_badge(session, user.id, badge_id, do_notify=False)
972 notify(
973 session,
974 user_id=verification_attempt.user_id,
975 topic_action=NotificationTopicAction.verification__sv_success,
976 key="",
977 )
978 else:
979 notify(
980 session,
981 user_id=verification_attempt.user_id,
982 topic_action=NotificationTopicAction.verification__sv_fail,
983 key="",
984 data=notification_data_pb2.VerificationSVFail(
985 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
986 ),
987 )
990def send_activeness_probes(payload: empty_pb2.Empty) -> None:
991 with session_scope() as session:
992 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
994 if config["ACTIVENESS_PROBES_ENABLED"]:
995 # current activeness probes
996 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
998 # users who we should send an activeness probe to
999 new_probe_user_ids = (
1000 session.execute(
1001 select(User.id)
1002 .where(User.is_visible)
1003 .where(User.hosting_status == HostingStatus.can_host)
1004 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
1005 .where(User.id.not_in(select(subquery.c.user_id)))
1006 )
1007 .scalars()
1008 .all()
1009 )
1011 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
1012 probes_today = session.execute(
1013 select(func.count())
1014 .select_from(ActivenessProbe)
1015 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
1016 ).scalar_one()
1018 # send probes to max 2% of users per day
1019 max_probes_per_day = 0.02 * total_users
1020 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
1022 if len(new_probe_user_ids) > max_probe_size: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true
1023 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
1025 for user_id in new_probe_user_ids:
1026 session.add(ActivenessProbe(user_id=user_id))
1028 session.commit()
1030 ## Step 2: actually send out probe notifications
1031 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
1032 probes = (
1033 session.execute(
1034 select(ActivenessProbe)
1035 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
1036 .where(ActivenessProbe.probe_initiated + delay < func.now())
1037 .where(ActivenessProbe.is_pending)
1038 )
1039 .scalars()
1040 .all()
1041 )
1043 for probe in probes:
1044 probe.notifications_sent = probe_number_minus_1 + 1
1045 context = make_background_user_context(user_id=probe.user.id)
1046 notify(
1047 session,
1048 user_id=probe.user.id,
1049 topic_action=NotificationTopicAction.activeness__probe,
1050 key=str(probe.id),
1051 data=notification_data_pb2.ActivenessProbe(
1052 reminder_number=probe_number_minus_1 + 1,
1053 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1054 ),
1055 )
1056 session.commit()
1058 ## Step 3: for those who haven't responded, mark them as failed
1059 expired_probes = (
1060 session.execute(
1061 select(ActivenessProbe)
1062 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1063 .where(ActivenessProbe.is_pending)
1064 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1065 )
1066 .scalars()
1067 .all()
1068 )
1070 for probe in expired_probes:
1071 probe.responded = now()
1072 probe.response = ActivenessProbeStatus.expired
1073 if probe.user.hosting_status == HostingStatus.can_host: 1073 ↛ 1075line 1073 didn't jump to line 1075 because the condition on line 1073 was always true
1074 probe.user.hosting_status = HostingStatus.maybe
1075 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1075 ↛ 1077line 1075 didn't jump to line 1077 because the condition on line 1075 was always true
1076 probe.user.meetup_status = MeetupStatus.open_to_meetup
1077 session.commit()
1080def update_randomized_locations(payload: empty_pb2.Empty) -> None:
1081 """
1082 We generate for each user a randomized location as follows:
1083 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1084 - For each user, mix in the user_id for randomness
1085 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1086 - Generate an angle from [0, 360]
1087 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1088 """
1089 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1091 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]:
1092 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1093 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1094 radius = 0.02 + 0.08 * radius_u
1095 angle_rad = 2 * pi * angle_u
1096 offset_lng = radius * cos(angle_rad)
1097 offset_lat = radius * sin(angle_rad)
1098 return lat + offset_lat, lng + offset_lng
1100 user_updates: list[dict[str, Any]] = []
1102 with session_scope() as session:
1103 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1105 for user_id, geom in users_to_update:
1106 lat, lng = get_coordinates(geom)
1107 user_updates.append(
1108 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1109 )
1111 with session_scope() as session:
1112 session.execute(update(User), user_updates)
1115def send_event_reminders(payload: empty_pb2.Empty) -> None:
1116 """
1117 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1118 """
1119 logger.info("Sending event reminder emails")
1121 with session_scope() as session:
1122 occurrences = (
1123 session.execute(
1124 select(EventOccurrence)
1125 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1126 .where(EventOccurrence.start_time >= now())
1127 )
1128 .scalars()
1129 .all()
1130 )
1132 for occurrence in occurrences:
1133 results = session.execute(
1134 select(User, EventOccurrenceAttendee)
1135 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1136 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1137 .where(EventOccurrenceAttendee.reminder_sent == False)
1138 ).all()
1140 for user, attendee in results:
1141 context = make_background_user_context(user_id=user.id)
1143 notify(
1144 session,
1145 user_id=user.id,
1146 topic_action=NotificationTopicAction.event__reminder,
1147 key=str(occurrence.id),
1148 data=notification_data_pb2.EventReminder(
1149 event=event_to_pb(session, occurrence, context),
1150 user=user_model_to_pb(user, session, context),
1151 ),
1152 )
1154 attendee.reminder_sent = True
1155 session.commit()
1158def check_expo_push_receipts(payload: empty_pb2.Empty) -> None:
1159 """
1160 Check Expo push receipts in batch and update delivery attempts.
1161 """
1162 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max
1164 for iteration in range(MAX_ITERATIONS): 1164 ↛ 1226line 1164 didn't jump to line 1226 because the loop on line 1164 didn't complete
1165 with session_scope() as session:
1166 # Find all delivery attempts that need receipt checking
1167 # Wait 15 minutes per Expo's recommendation before checking receipts
1168 attempts = (
1169 session.execute(
1170 select(PushNotificationDeliveryAttempt)
1171 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None)
1172 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None)
1173 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15))
1174 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24))
1175 .limit(100)
1176 )
1177 .scalars()
1178 .all()
1179 )
1181 if not attempts:
1182 logger.debug("No Expo receipts to check")
1183 return
1185 logger.info(f"Checking {len(attempts)} Expo push receipts")
1187 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts])
1189 for attempt in attempts:
1190 receipt = receipts.get(not_none(attempt.expo_ticket_id))
1192 # Always mark as checked to avoid infinite loops
1193 attempt.receipt_checked_at = now()
1195 if receipt is None:
1196 # Receipt not found after 15min - likely expired (>24h) or never existed
1197 # Per Expo docs: receipts should be available within 15 minutes
1198 attempt.receipt_status = "not_found"
1199 continue
1201 attempt.receipt_status = receipt.get("status")
1203 if receipt.get("status") == "error":
1204 details = receipt.get("details", {})
1205 error_code = details.get("error")
1206 attempt.receipt_error_code = error_code
1208 if error_code == "DeviceNotRegistered": 1208 ↛ 1223line 1208 didn't jump to line 1223 because the condition on line 1208 was always true
1209 # Device token is no longer valid - disable the subscription
1210 sub = session.execute(
1211 select(PushNotificationSubscription).where(
1212 PushNotificationSubscription.id == attempt.push_notification_subscription_id
1213 )
1214 ).scalar_one()
1216 if sub.disabled_at > now(): 1216 ↛ 1189line 1216 didn't jump to line 1189 because the condition on line 1216 was always true
1217 sub.disabled_at = now()
1218 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt")
1219 push_notification_counter.labels(
1220 platform="expo", outcome="permanent_subscription_failure_receipt"
1221 ).inc()
1222 else:
1223 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}")
1225 # If we get here, we've exhausted MAX_ITERATIONS without finishing
1226 raise RuntimeError(
1227 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - "
1228 "there may be an unusually large backlog of receipts to check"
1229 )
1232def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None:
1233 """
1234 Sends the postcard via external API and updates attempt status.
1235 """
1236 with session_scope() as session:
1237 attempt = session.execute(
1238 select(PostalVerificationAttempt).where(
1239 PostalVerificationAttempt.id == payload.postal_verification_attempt_id
1240 )
1241 ).scalar_one_or_none()
1243 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1243 ↛ 1244line 1243 didn't jump to line 1244 because the condition on line 1243 was never true
1244 logger.warning(
1245 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state"
1246 )
1247 return
1249 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one()
1251 result = send_postcard(
1252 recipient_name=user_name,
1253 address_line_1=attempt.address_line_1,
1254 address_line_2=attempt.address_line_2,
1255 city=attempt.city,
1256 state=attempt.state,
1257 postal_code=attempt.postal_code,
1258 country=attempt.country,
1259 verification_code=not_none(attempt.verification_code),
1260 qr_code_url=urls.postal_verification_link(code=not_none(attempt.verification_code)),
1261 )
1263 if result.success:
1264 attempt.status = PostalVerificationStatus.awaiting_verification
1265 attempt.postcard_sent_at = func.now()
1267 notify(
1268 session,
1269 user_id=attempt.user_id,
1270 topic_action=NotificationTopicAction.postal_verification__postcard_sent,
1271 key="",
1272 data=notification_data_pb2.PostalVerificationPostcardSent(
1273 city=attempt.city,
1274 country=attempt.country,
1275 ),
1276 )
1277 else:
1278 # Could retry or fail - for now, fail
1279 attempt.status = PostalVerificationStatus.failed
1280 logger.error(f"Postcard send failed: {result.error_message}")
1283class DatabaseInconsistencyError(Exception):
1284 """Raised when database consistency checks fail"""
1286 pass
1289def check_database_consistency(payload: empty_pb2.Empty) -> None:
1290 """
1291 Checks database consistency and raises an exception if any issues are found.
1292 """
1293 logger.info("Checking database consistency")
1294 errors = []
1296 with session_scope() as session:
1297 # Check that all users have a profile gallery
1298 users_without_gallery = session.execute(
1299 select(User.id, User.username).where(User.profile_gallery_id.is_(None))
1300 ).all()
1301 if users_without_gallery:
1302 errors.append(f"Users without profile gallery: {users_without_gallery}")
1304 # Check that all profile galleries point to their owner
1305 mismatched_galleries = session.execute(
1306 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id)
1307 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id)
1308 .where(User.profile_gallery_id.is_not(None))
1309 .where(PhotoGallery.owner_user_id != User.id)
1310 ).all()
1311 if mismatched_galleries: 1311 ↛ 1312line 1311 didn't jump to line 1312 because the condition on line 1311 was never true
1312 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}")
1314 # === Moderation System Consistency Checks ===
1316 # Check all ModerationStates have a known object_type
1317 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT]
1318 unknown_type_states = session.execute(
1319 select(ModerationState.id, ModerationState.object_type).where(
1320 ModerationState.object_type.not_in(known_object_types)
1321 )
1322 ).all()
1323 if unknown_type_states: 1323 ↛ 1324line 1323 didn't jump to line 1324 because the condition on line 1323 was never true
1324 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}")
1326 # Check every ModerationState has at least one INITIAL_REVIEW queue item
1327 # Skip items with ID < 2000000 as they were created before this check was introduced
1328 states_without_initial_review = session.execute(
1329 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1330 ModerationState.id >= 2000000,
1331 ~exists(
1332 select(1)
1333 .where(ModerationQueueItem.moderation_state_id == ModerationState.id)
1334 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1335 ),
1336 )
1337 ).all()
1338 if states_without_initial_review: 1338 ↛ 1339line 1338 didn't jump to line 1339 because the condition on line 1338 was never true
1339 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}")
1341 # Check every ModerationState has a CREATE log entry
1342 # Skip items with ID < 2000000 as they were created before this check was introduced
1343 states_without_create_log = session.execute(
1344 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1345 ModerationState.id >= 2000000,
1346 ~exists(
1347 select(1)
1348 .where(ModerationLog.moderation_state_id == ModerationState.id)
1349 .where(ModerationLog.action == ModerationAction.CREATE)
1350 ),
1351 )
1352 ).all()
1353 if states_without_create_log: 1353 ↛ 1354line 1353 didn't jump to line 1354 because the condition on line 1353 was never true
1354 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}")
1356 # Check resolved queue items point to log entries for the same moderation state
1357 resolved_item_log_mismatches = session.execute(
1358 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id)
1359 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1360 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1361 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id)
1362 ).all()
1363 if resolved_item_log_mismatches: 1363 ↛ 1364line 1363 didn't jump to line 1364 because the condition on line 1363 was never true
1364 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}")
1366 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it
1367 hr_states = (
1368 session.execute(
1369 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1370 )
1371 .scalars()
1372 .all()
1373 )
1374 for state_id in hr_states: 1374 ↛ 1375line 1374 didn't jump to line 1375 because the loop on line 1374 never started
1375 hr_count = session.execute(
1376 select(func.count()).where(HostRequest.moderation_state_id == state_id)
1377 ).scalar_one()
1378 if hr_count != 1:
1379 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)")
1381 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it
1382 gc_states = (
1383 session.execute(
1384 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1385 )
1386 .scalars()
1387 .all()
1388 )
1389 for state_id in gc_states: 1389 ↛ 1390line 1389 didn't jump to line 1390 because the loop on line 1389 never started
1390 gc_count = session.execute(
1391 select(func.count()).where(GroupChat.moderation_state_id == state_id)
1392 ).scalar_one()
1393 if gc_count != 1:
1394 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)")
1396 # Check ModerationState.object_id matches the actual object's ID
1397 hr_object_id_mismatches = session.execute(
1398 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id)
1399 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id)
1400 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1401 .where(ModerationState.object_id != HostRequest.conversation_id)
1402 ).all()
1403 if hr_object_id_mismatches: 1403 ↛ 1404line 1403 didn't jump to line 1404 because the condition on line 1403 was never true
1404 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}")
1406 gc_object_id_mismatches = session.execute(
1407 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id)
1408 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id)
1409 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1410 .where(ModerationState.object_id != GroupChat.conversation_id)
1411 ).all()
1412 if gc_object_id_mismatches: 1412 ↛ 1413line 1412 didn't jump to line 1413 because the condition on line 1412 was never true
1413 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}")
1415 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState
1416 hr_reverse_mismatches = session.execute(
1417 select(
1418 HostRequest.conversation_id,
1419 HostRequest.moderation_state_id,
1420 ModerationState.object_type,
1421 ModerationState.object_id,
1422 )
1423 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id)
1424 .where(
1425 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST)
1426 | (ModerationState.object_id != HostRequest.conversation_id)
1427 )
1428 ).all()
1429 if hr_reverse_mismatches: 1429 ↛ 1430line 1429 didn't jump to line 1430 because the condition on line 1429 was never true
1430 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}")
1432 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState
1433 gc_reverse_mismatches = session.execute(
1434 select(
1435 GroupChat.conversation_id,
1436 GroupChat.moderation_state_id,
1437 ModerationState.object_type,
1438 ModerationState.object_id,
1439 )
1440 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id)
1441 .where(
1442 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT)
1443 | (ModerationState.object_id != GroupChat.conversation_id)
1444 )
1445 ).all()
1446 if gc_reverse_mismatches: 1446 ↛ 1447line 1446 didn't jump to line 1447 because the condition on line 1446 was never true
1447 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}")
1449 # Ensure auto-approve deadline isn't being exceeded by a significant margin
1450 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting
1451 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1452 if deadline_seconds > 0: 1452 ↛ 1453line 1452 didn't jump to line 1453 because the condition on line 1452 was never true
1453 grace_period = timedelta(minutes=5)
1454 stale_initial_review_items = session.execute(
1455 select(
1456 ModerationQueueItem.id,
1457 ModerationQueueItem.moderation_state_id,
1458 ModerationQueueItem.time_created,
1459 )
1460 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1461 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
1462 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period)
1463 ).all()
1464 if stale_initial_review_items:
1465 errors.append(
1466 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}"
1467 )
1469 if errors:
1470 raise DatabaseInconsistencyError("\n".join(errors))
1473def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None:
1474 """
1475 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline.
1476 Items explicitly actioned by moderators are left alone.
1477 """
1478 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1479 if deadline_seconds <= 0:
1480 return
1482 with session_scope() as session:
1483 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"])
1485 items = (
1486 Moderation()
1487 .GetModerationQueue(
1488 request=moderation_pb2.GetModerationQueueReq(
1489 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW],
1490 unresolved_only=True,
1491 page_size=100,
1492 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)),
1493 ),
1494 context=ctx,
1495 session=session,
1496 )
1497 .queue_items
1498 )
1500 if not items:
1501 return
1503 logger.info(f"Auto-approving {len(items)} moderation queue items")
1504 for item in items:
1505 Moderation().ModerateContent(
1506 request=moderation_pb2.ModerateContentReq(
1507 moderation_state_id=item.moderation_state_id,
1508 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1509 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1510 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.",
1511 ),
1512 context=ctx,
1513 session=session,
1514 )
1515 moderation_auto_approved_counter.inc(len(items))