Coverage for app / backend / src / couchers / jobs / handlers.py: 90%
458 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1"""
2Background job servicers
3"""
5import logging
6from collections.abc import Sequence
7from datetime import date, timedelta
8from math import cos, pi, sin, sqrt
9from random import sample
10from typing import Any
12import requests
13from google.protobuf import empty_pb2
14from sqlalchemy import ColumnElement, Float, Function, Integer, select
15from sqlalchemy.orm import aliased
16from sqlalchemy.sql import (
17 and_,
18 case,
19 cast,
20 delete,
21 distinct,
22 exists,
23 extract,
24 func,
25 literal,
26 not_,
27 or_,
28 union_all,
29 update,
30)
32from couchers import urls
33from couchers.config import config
34from couchers.constants import (
35 ACTIVENESS_PROBE_EXPIRY_TIME,
36 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
37 ACTIVENESS_PROBE_TIME_REMINDERS,
38 EVENT_REMINDER_TIMEDELTA,
39 HOST_REQUEST_MAX_REMINDERS,
40 HOST_REQUEST_REMINDER_INTERVAL,
41)
42from couchers.context import make_background_user_context
43from couchers.crypto import (
44 USER_LOCATION_RANDOMIZATION_NAME,
45 asym_encrypt,
46 b64decode,
47 get_secret,
48 simple_decrypt,
49 stable_secure_uniform,
50)
51from couchers.db import session_scope
52from couchers.email.dev import print_dev_email
53from couchers.email.smtp import send_smtp_email
54from couchers.helpers.badges import user_add_badge, user_remove_badge
55from couchers.helpers.completed_profile import has_completed_profile_expression
56from couchers.materialized_views import (
57 UserResponseRate,
58)
59from couchers.metrics import (
60 moderation_auto_approved_counter,
61 push_notification_counter,
62 strong_verification_completions_counter,
63)
64from couchers.models import (
65 AccountDeletionToken,
66 ActivenessProbe,
67 ActivenessProbeStatus,
68 Cluster,
69 ClusterRole,
70 ClusterSubscription,
71 EventOccurrence,
72 EventOccurrenceAttendee,
73 GroupChat,
74 GroupChatSubscription,
75 HostingStatus,
76 HostRequest,
77 HostRequestStatus,
78 LoginToken,
79 MeetupStatus,
80 Message,
81 MessageType,
82 ModerationAction,
83 ModerationLog,
84 ModerationObjectType,
85 ModerationQueueItem,
86 ModerationState,
87 ModerationTrigger,
88 PassportSex,
89 PasswordResetToken,
90 PhotoGallery,
91 PostalVerificationAttempt,
92 PostalVerificationStatus,
93 PushNotificationDeliveryAttempt,
94 PushNotificationSubscription,
95 Reference,
96 StrongVerificationAttempt,
97 StrongVerificationAttemptStatus,
98 User,
99 UserBadge,
100 Volunteer,
101)
102from couchers.models.notifications import NotificationTopicAction
103from couchers.notifications.expo_api import get_expo_push_receipts
104from couchers.notifications.notify import notify
105from couchers.postal.postcard_service import send_postcard
106from couchers.proto import moderation_pb2, notification_data_pb2
107from couchers.proto.internal import internal_pb2, jobs_pb2
108from couchers.resources import get_badge_dict, get_static_badge_dict
109from couchers.servicers.api import user_model_to_pb
110from couchers.servicers.events import (
111 event_to_pb,
112)
113from couchers.servicers.moderation import Moderation
114from couchers.servicers.requests import host_request_to_pb
115from couchers.sql import (
116 users_visible_to_each_other,
117 where_moderated_content_visible,
118 where_moderated_content_visible_to_user_column,
119 where_user_columns_visible_to_each_other,
120 where_users_column_visible,
121)
122from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
123from couchers.tasks import send_duplicate_strong_verification_email
124from couchers.utils import (
125 Timestamp_from_datetime,
126 create_coordinate,
127 get_coordinates,
128 not_none,
129 now,
130)
132logger = logging.getLogger(__name__)
135def send_email(payload: jobs_pb2.SendEmailPayload) -> None:
136 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
137 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
138 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
139 # the sender must return a models.Email object that can be added to the database
140 email = sender(
141 sender_name=payload.sender_name,
142 sender_email=payload.sender_email,
143 recipient=payload.recipient,
144 subject=payload.subject,
145 plain=payload.plain,
146 html=payload.html,
147 list_unsubscribe_header=payload.list_unsubscribe_header,
148 source_data=payload.source_data,
149 )
150 with session_scope() as session:
151 session.add(email)
154def purge_login_tokens(payload: empty_pb2.Empty) -> None:
155 logger.info("Purging login tokens")
156 with session_scope() as session:
157 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
160def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None:
161 logger.info("Purging login tokens")
162 with session_scope() as session:
163 session.execute(
164 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
165 )
168def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None:
169 logger.info("Purging account deletion tokens")
170 with session_scope() as session:
171 session.execute(
172 delete(AccountDeletionToken)
173 .where(~AccountDeletionToken.is_valid)
174 .execution_options(synchronize_session=False)
175 )
178def send_message_notifications(payload: empty_pb2.Empty) -> None:
179 """
180 Sends out email notifications for messages that have been unseen for a long enough time
181 """
182 # very crude and dumb algorithm
183 logger.info("Sending out email notifications for unseen messages")
185 with session_scope() as session:
186 # users who have unnotified messages older than 5 minutes in any group chat
187 users = (
188 session.execute(
189 where_moderated_content_visible_to_user_column(
190 select(User)
191 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
192 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
193 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
194 GroupChat,
195 User.id,
196 )
197 .where(not_(GroupChatSubscription.is_muted))
198 .where(User.is_visible)
199 .where(Message.time >= GroupChatSubscription.joined)
200 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
201 .where(Message.id > User.last_notified_message_id)
202 .where(Message.id > GroupChatSubscription.last_seen_message_id)
203 .where(Message.time < now() - timedelta(minutes=5))
204 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
205 )
206 .scalars()
207 .unique()
208 )
210 for user in users:
211 context = make_background_user_context(user_id=user.id)
212 # now actually grab all the group chats, not just less than 5 min old
213 subquery = (
214 where_users_column_visible(
215 where_moderated_content_visible(
216 select(
217 GroupChatSubscription.group_chat_id.label("group_chat_id"),
218 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
219 func.max(Message.id).label("message_id"),
220 func.count(Message.id).label("count_unseen"),
221 )
222 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
223 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
224 context,
225 GroupChat,
226 is_list_operation=True,
227 )
228 .where(GroupChatSubscription.user_id == user.id)
229 .where(not_(GroupChatSubscription.is_muted))
230 .where(Message.id > user.last_notified_message_id)
231 .where(Message.id > GroupChatSubscription.last_seen_message_id)
232 .where(Message.time >= GroupChatSubscription.joined)
233 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
234 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)),
235 context,
236 Message.author_id,
237 )
238 .group_by(GroupChatSubscription.group_chat_id)
239 .order_by(func.max(Message.id).desc())
240 .subquery()
241 )
243 unseen_messages = session.execute(
244 where_moderated_content_visible(
245 select(GroupChat, Message, subquery.c.count_unseen)
246 .join(subquery, subquery.c.message_id == Message.id)
247 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id),
248 context,
249 GroupChat,
250 is_list_operation=True,
251 ).order_by(subquery.c.message_id.desc())
252 ).all()
254 if not unseen_messages:
255 continue
257 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
259 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str:
260 if group_chat.is_dm:
261 return f"You missed {count_unseen} message(s) from {message.author.name}"
262 else:
263 return f"You missed {count_unseen} message(s) in {group_chat.title}"
265 notify(
266 session,
267 user_id=user.id,
268 topic_action=NotificationTopicAction.chat__missed_messages,
269 key="",
270 data=notification_data_pb2.ChatMissedMessages(
271 messages=[
272 notification_data_pb2.ChatMessage(
273 author=user_model_to_pb(
274 message.author,
275 session,
276 context,
277 ),
278 message=format_title(message, group_chat, count_unseen),
279 text=message.text,
280 group_chat_id=message.conversation_id,
281 )
282 for group_chat, message, count_unseen in unseen_messages
283 ],
284 ),
285 )
286 session.commit()
289def send_request_notifications(payload: empty_pb2.Empty) -> None:
290 """
291 Sends out email notifications for unseen messages in host requests (as surfer or host)
292 """
293 logger.info("Sending out email notifications for unseen messages in host requests")
295 with session_scope() as session:
296 # Get all candidate users who might have unseen request messages
297 candidate_user_ids = (
298 session.execute(
299 select(User.id)
300 .where(User.is_visible)
301 .where(
302 or_(
303 # Users with unseen messages as surfer
304 exists(
305 select(1)
306 .select_from(HostRequest)
307 .join(Message, Message.conversation_id == HostRequest.conversation_id)
308 .where(HostRequest.surfer_user_id == User.id)
309 .where(Message.id > HostRequest.surfer_last_seen_message_id)
310 .where(Message.id > User.last_notified_request_message_id)
311 .where(Message.time < now() - timedelta(minutes=5))
312 .where(Message.message_type == MessageType.text)
313 ),
314 # Users with unseen messages as host
315 exists(
316 select(1)
317 .select_from(HostRequest)
318 .join(Message, Message.conversation_id == HostRequest.conversation_id)
319 .where(HostRequest.host_user_id == User.id)
320 .where(Message.id > HostRequest.host_last_seen_message_id)
321 .where(Message.id > User.last_notified_request_message_id)
322 .where(Message.time < now() - timedelta(minutes=5))
323 .where(Message.message_type == MessageType.text)
324 ),
325 )
326 )
327 )
328 .scalars()
329 .all()
330 )
332 for user_id in candidate_user_ids:
333 context = make_background_user_context(user_id=user_id)
335 # requests where this user is surfing
336 surfing_reqs = session.execute(
337 where_users_column_visible(
338 where_moderated_content_visible_to_user_column(
339 select(User, HostRequest, func.max(Message.id))
340 .where(User.id == user_id)
341 .join(HostRequest, HostRequest.surfer_user_id == User.id),
342 HostRequest,
343 HostRequest.surfer_user_id,
344 ),
345 context,
346 HostRequest.host_user_id,
347 )
348 .join(Message, Message.conversation_id == HostRequest.conversation_id)
349 .where(Message.id > HostRequest.surfer_last_seen_message_id)
350 .where(Message.id > User.last_notified_request_message_id)
351 .where(Message.time < now() - timedelta(minutes=5))
352 .where(Message.message_type == MessageType.text)
353 .group_by(User, HostRequest) # type: ignore[arg-type]
354 ).all()
356 # where this user is hosting
357 hosting_reqs = session.execute(
358 where_users_column_visible(
359 where_moderated_content_visible_to_user_column(
360 select(User, HostRequest, func.max(Message.id))
361 .where(User.id == user_id)
362 .join(HostRequest, HostRequest.host_user_id == User.id),
363 HostRequest,
364 HostRequest.host_user_id,
365 ),
366 context,
367 HostRequest.surfer_user_id,
368 )
369 .join(Message, Message.conversation_id == HostRequest.conversation_id)
370 .where(Message.id > HostRequest.host_last_seen_message_id)
371 .where(Message.id > User.last_notified_request_message_id)
372 .where(Message.time < now() - timedelta(minutes=5))
373 .where(Message.message_type == MessageType.text)
374 .group_by(User, HostRequest) # type: ignore[arg-type]
375 ).all()
377 for user, host_request, max_message_id in surfing_reqs:
378 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
379 session.flush()
381 notify(
382 session,
383 user_id=user.id,
384 topic_action=NotificationTopicAction.host_request__missed_messages,
385 key=str(host_request.conversation_id),
386 data=notification_data_pb2.HostRequestMissedMessages(
387 host_request=host_request_to_pb(host_request, session, context),
388 user=user_model_to_pb(host_request.host, session, context),
389 am_host=False,
390 ),
391 )
393 for user, host_request, max_message_id in hosting_reqs:
394 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
395 session.flush()
397 notify(
398 session,
399 user_id=user.id,
400 topic_action=NotificationTopicAction.host_request__missed_messages,
401 key=str(host_request.conversation_id),
402 data=notification_data_pb2.HostRequestMissedMessages(
403 host_request=host_request_to_pb(host_request, session, context),
404 user=user_model_to_pb(host_request.surfer, session, context),
405 am_host=True,
406 ),
407 )
410def send_onboarding_emails(payload: empty_pb2.Empty) -> None:
411 """
412 Sends out onboarding emails
413 """
414 logger.info("Sending out onboarding emails")
416 with session_scope() as session:
417 # first onboarding email
418 users = (
419 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
420 )
422 for user in users:
423 notify(
424 session,
425 user_id=user.id,
426 topic_action=NotificationTopicAction.onboarding__reminder,
427 key="1",
428 )
429 user.onboarding_emails_sent = 1
430 user.last_onboarding_email_sent = now()
431 session.commit()
433 # second onboarding email
434 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
435 users = (
436 session.execute(
437 select(User)
438 .where(User.is_visible)
439 .where(User.onboarding_emails_sent == 1)
440 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
441 .where(~has_completed_profile_expression())
442 )
443 .scalars()
444 .all()
445 )
447 for user in users:
448 notify(
449 session,
450 user_id=user.id,
451 topic_action=NotificationTopicAction.onboarding__reminder,
452 key="2",
453 )
454 user.onboarding_emails_sent = 2
455 user.last_onboarding_email_sent = now()
456 session.commit()
459def send_reference_reminders(payload: empty_pb2.Empty) -> None:
460 """
461 Sends out reminders to write references after hosting/staying
462 """
463 logger.info("Sending out reference reminder emails")
465 # Keep this in chronological order!
466 reference_reminder_schedule = [
467 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
468 # the end time to write a reference is supposed to be midnight in the host's timezone
469 # 8 pm ish on the last day of the stay
470 (1, timedelta(days=15) - timedelta(hours=20), 14),
471 # 2 pm ish a week after stay
472 (2, timedelta(days=8) - timedelta(hours=14), 7),
473 # 10 am ish 3 days before end of time to write ref
474 (3, timedelta(days=4) - timedelta(hours=10), 3),
475 ]
477 with session_scope() as session:
478 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
479 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
480 user = aliased(User)
481 other_user = aliased(User)
482 # surfers needing to write a ref
483 q1 = (
484 select(literal(True), HostRequest, user, other_user)
485 .join(user, user.id == HostRequest.surfer_user_id)
486 .join(other_user, other_user.id == HostRequest.host_user_id)
487 .outerjoin(
488 Reference,
489 and_(
490 Reference.host_request_id == HostRequest.conversation_id,
491 # if no reference is found in this join, then the surfer has not written a ref
492 Reference.from_user_id == HostRequest.surfer_user_id,
493 ),
494 )
495 .where(Reference.id == None)
496 .where(HostRequest.can_write_reference)
497 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
498 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
499 .where(HostRequest.surfer_reason_didnt_meetup == None)
500 .where(users_visible_to_each_other(user, other_user))
501 )
503 # hosts needing to write a ref
504 q2 = (
505 select(literal(False), HostRequest, user, other_user)
506 .join(user, user.id == HostRequest.host_user_id)
507 .join(other_user, other_user.id == HostRequest.surfer_user_id)
508 .outerjoin(
509 Reference,
510 and_(
511 Reference.host_request_id == HostRequest.conversation_id,
512 # if no reference is found in this join, then the host has not written a ref
513 Reference.from_user_id == HostRequest.host_user_id,
514 ),
515 )
516 .where(Reference.id == None)
517 .where(HostRequest.can_write_reference)
518 .where(HostRequest.host_sent_reference_reminders < reminder_number)
519 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
520 .where(HostRequest.host_reason_didnt_meetup == None)
521 .where(users_visible_to_each_other(user, other_user))
522 )
524 union = union_all(q1, q2).subquery()
525 query = select(
526 union.c[0].label("surfed"),
527 aliased(HostRequest, union),
528 aliased(user, union),
529 aliased(other_user, union),
530 )
531 reference_reminders = session.execute(query).all()
533 for surfed, host_request, user, other_user in reference_reminders:
534 # visibility and blocking already checked in sql
535 assert user.is_visible
536 context = make_background_user_context(user_id=user.id)
537 topic_action = (
538 NotificationTopicAction.reference__reminder_surfed
539 if surfed
540 else NotificationTopicAction.reference__reminder_hosted
541 )
542 notify(
543 session,
544 user_id=user.id,
545 topic_action=topic_action,
546 key=str(host_request.conversation_id),
547 data=notification_data_pb2.ReferenceReminder(
548 host_request_id=host_request.conversation_id,
549 other_user=user_model_to_pb(other_user, session, context),
550 days_left=reminder_days_left,
551 ),
552 )
553 if surfed:
554 host_request.surfer_sent_reference_reminders = reminder_number
555 else:
556 host_request.host_sent_reference_reminders = reminder_number
557 session.commit()
560def send_host_request_reminders(payload: empty_pb2.Empty) -> None:
561 with session_scope() as session:
562 host_has_sent_message = select(1).where(
563 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id
564 )
566 requests = (
567 session.execute(
568 where_user_columns_visible_to_each_other(
569 where_moderated_content_visible_to_user_column(
570 select(HostRequest),
571 HostRequest,
572 HostRequest.host_user_id,
573 )
574 .where(HostRequest.status == HostRequestStatus.pending)
575 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
576 .where(HostRequest.start_time > func.now())
577 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
578 .where(~exists(host_has_sent_message)),
579 HostRequest.host_user_id,
580 HostRequest.surfer_user_id,
581 )
582 )
583 .scalars()
584 .all()
585 )
587 for host_request in requests:
588 host_request.host_sent_request_reminders += 1
589 host_request.last_sent_request_reminder_time = now()
591 context = make_background_user_context(user_id=host_request.host_user_id)
592 notify(
593 session,
594 user_id=host_request.host_user_id,
595 topic_action=NotificationTopicAction.host_request__reminder,
596 key=str(host_request.conversation_id),
597 data=notification_data_pb2.HostRequestReminder(
598 host_request=host_request_to_pb(host_request, session, context),
599 surfer=user_model_to_pb(host_request.surfer, session, context),
600 ),
601 moderation_state_id=host_request.moderation_state_id,
602 )
604 session.commit()
607def add_users_to_email_list(payload: empty_pb2.Empty) -> None:
608 if not config["LISTMONK_ENABLED"]: 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true
609 logger.info("Not adding users to mailing list")
610 return
612 logger.info("Adding users to mailing list")
614 while True:
615 with session_scope() as session:
616 user = session.execute(
617 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
618 ).scalar_one_or_none()
619 if not user:
620 logger.info("Finished adding users to mailing list")
621 return
623 if user.opt_out_of_newsletter:
624 user.in_sync_with_newsletter = True
625 session.commit()
626 continue
628 r = requests.post(
629 config["LISTMONK_BASE_URL"] + "/api/subscribers",
630 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
631 json={
632 "email": user.email,
633 "name": user.name,
634 "lists": [config["LISTMONK_LIST_ID"]],
635 "preconfirm_subscriptions": True,
636 "attribs": {"couchers_user_id": user.id},
637 "status": "enabled",
638 },
639 timeout=10,
640 )
641 # the API returns if the user is already subscribed
642 if r.status_code == 200 or r.status_code == 409: 642 ↛ 646line 642 didn't jump to line 646 because the condition on line 642 was always true
643 user.in_sync_with_newsletter = True
644 session.commit()
645 else:
646 raise Exception("Failed to add users to mailing list")
649def enforce_community_membership(payload: empty_pb2.Empty) -> None:
650 tasks_enforce_community_memberships()
653def update_recommendation_scores(payload: empty_pb2.Empty) -> None:
654 text_fields = [
655 User.hometown,
656 User.occupation,
657 User.education,
658 User.about_me,
659 User.things_i_like,
660 User.about_place,
661 User.additional_information,
662 User.pet_details,
663 User.kid_details,
664 User.housemate_details,
665 User.other_host_info,
666 User.sleeping_details,
667 User.area,
668 User.house_rules,
669 ]
670 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
672 def poor_man_gaussian() -> ColumnElement[float] | float:
673 """
674 Produces an approximatley std normal random variate
675 """
676 trials = 5
677 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
679 def int_(stmt: Any) -> Function[int]:
680 return func.coalesce(cast(stmt, Integer), 0)
682 def float_(stmt: Any) -> Function[float]:
683 return func.coalesce(cast(stmt, Float), 0.0)
685 with session_scope() as session:
686 # profile
687 profile_text = ""
688 for field in text_fields:
689 profile_text += func.coalesce(field, "") # type: ignore[assignment]
690 text_length = func.length(profile_text)
691 home_text = ""
692 for field in home_fields:
693 home_text += func.coalesce(field, "") # type: ignore[assignment]
694 home_length = func.length(home_text)
696 filled_profile = int_(has_completed_profile_expression())
697 has_text = int_(text_length > 500)
698 long_text = int_(text_length > 2000)
699 can_host = int_(User.hosting_status == HostingStatus.can_host)
700 may_host = int_(User.hosting_status == HostingStatus.maybe)
701 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
702 filled_home = int_(User.has_completed_my_home)
703 filled_home_lots = int_(home_length > 200)
704 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
705 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
707 # references
708 left_ref_expr = int_(1).label("left_reference")
709 left_refs_subquery = (
710 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
711 )
712 left_reference = int_(left_refs_subquery.c.left_reference)
713 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
714 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
715 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
716 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
717 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
718 "has_bad_ref"
719 )
720 received_ref_subquery = (
721 select(
722 Reference.to_user_id.label("user_id"),
723 has_reference_expr,
724 has_multiple_types_expr,
725 has_bad_ref_expr,
726 ref_count_expr,
727 ref_avg_expr,
728 )
729 .group_by(Reference.to_user_id)
730 .subquery()
731 )
732 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
733 has_reference = int_(received_ref_subquery.c.has_reference)
734 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
735 rating_score = float_(
736 received_ref_subquery.c.ref_avg
737 * (
738 2 * func.least(received_ref_subquery.c.ref_count, 5)
739 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
740 )
741 )
742 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
744 # activeness
745 recently_active = int_(User.last_active >= now() - timedelta(days=180))
746 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
747 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
748 messaged_lots = int_(func.count(Message.id) > 5)
749 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
750 messaging_subquery = (
751 select(Message.author_id.label("user_id"), messaging_points_subquery)
752 .where(Message.message_type == MessageType.text)
753 .group_by(Message.author_id)
754 .subquery()
755 )
756 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
758 # verification
759 cb_subquery = (
760 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
761 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
762 .where(ClusterSubscription.role == ClusterRole.admin)
763 .where(Cluster.is_official_cluster)
764 .group_by(ClusterSubscription.user_id)
765 .subquery()
766 )
767 min_node_id = cb_subquery.c.min_node_id
768 cb = int_(min_node_id >= 1)
769 wcb = int_(min_node_id == 1)
770 badge_points = {
771 "founder": 100,
772 "board_member": 20,
773 "past_board_member": 5,
774 "strong_verification": 3,
775 "volunteer": 3,
776 "past_volunteer": 2,
777 "donor": 1,
778 "phone_verified": 1,
779 }
781 badge_subquery = (
782 select(
783 UserBadge.user_id.label("user_id"),
784 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
785 )
786 .group_by(UserBadge.user_id)
787 .subquery()
788 )
790 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
792 # response rate
793 hr_subquery = select(
794 UserResponseRate.user_id,
795 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
796 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
797 ).subquery()
798 response_time_33p = hr_subquery.c.response_time_33p
799 response_time_66p = hr_subquery.c.response_time_66p
800 # be careful with nulls
801 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
803 recommendation_score = (
804 hosting_status_points
805 + profile_points
806 + ref_score
807 + activeness_points
808 + other_points
809 + response_rate_points
810 + 2 * poor_man_gaussian()
811 )
813 scores = (
814 select(User.id.label("user_id"), recommendation_score.label("score"))
815 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
816 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
817 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
818 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
819 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
820 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
821 ).subquery()
823 session.execute(update(User).values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id))
825 logger.info("Updated recommendation scores")
828def update_badges(payload: empty_pb2.Empty) -> None:
829 with session_scope() as session:
831 def update_badge(badge_id: str, members: Sequence[int]) -> None:
832 badge = get_badge_dict()[badge_id]
833 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all()
834 # in case the user ids don't exist in the db
835 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
836 # we should add the badge to these
837 add = set(actual_members) - set(user_ids)
838 # we should remove the badge from these
839 remove = set(user_ids) - set(actual_members)
840 for user_id in add:
841 user_add_badge(session, user_id, badge.id)
843 for user_id in remove:
844 user_remove_badge(session, user_id, badge.id)
846 update_badge("founder", get_static_badge_dict()["founder"])
847 update_badge("board_member", get_static_badge_dict()["board_member"])
848 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
849 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all())
850 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
851 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
852 # strong verification requires passport on file + gender/sex correspondence and date of birth match
853 update_badge(
854 "strong_verification",
855 session.execute(
856 select(User.id)
857 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
858 .where(StrongVerificationAttempt.has_strong_verification(User))
859 )
860 .scalars()
861 .all(),
862 )
863 # volunteer badge for active volunteers (stopped_volunteering is null)
864 update_badge(
865 "volunteer",
866 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(),
867 )
868 # past_volunteer badge for past volunteers (stopped_volunteering is not null)
869 update_badge(
870 "past_volunteer",
871 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None)))
872 .scalars()
873 .all(),
874 )
877def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None:
878 with session_scope() as session:
879 verification_attempt = session.execute(
880 select(StrongVerificationAttempt)
881 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
882 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
883 ).scalar_one()
884 response = requests.post(
885 "https://passportreader.app/api/v1/session.get",
886 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
887 json={"id": verification_attempt.iris_session_id},
888 timeout=10,
889 verify="/etc/ssl/certs/ca-certificates.crt",
890 )
891 if response.status_code != 200: 891 ↛ 892line 891 didn't jump to line 892 because the condition on line 891 was never true
892 raise Exception(f"Iris didn't return 200: {response.text}")
893 json_data = response.json()
894 reference_payload = internal_pb2.VerificationReferencePayload.FromString(
895 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
896 )
897 assert verification_attempt.user_id == reference_payload.user_id
898 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
899 assert verification_attempt.iris_session_id == json_data["id"]
900 assert json_data["state"] == "APPROVED"
902 if json_data["document_type"] != "PASSPORT":
903 verification_attempt.status = StrongVerificationAttemptStatus.failed
904 notify(
905 session,
906 user_id=verification_attempt.user_id,
907 topic_action=NotificationTopicAction.verification__sv_fail,
908 key="",
909 data=notification_data_pb2.VerificationSVFail(
910 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
911 ),
912 )
913 return
915 assert json_data["document_type"] == "PASSPORT"
917 expiry_date = date.fromisoformat(json_data["expiry_date"])
918 nationality = json_data["nationality"]
919 last_three_document_chars = json_data["document_number"][-3:]
921 existing_attempt = session.execute(
922 select(StrongVerificationAttempt)
923 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
924 .where(StrongVerificationAttempt.passport_nationality == nationality)
925 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
926 .order_by(StrongVerificationAttempt.id)
927 .limit(1)
928 ).scalar_one_or_none()
930 verification_attempt.has_minimal_data = True
931 verification_attempt.passport_expiry_date = expiry_date
932 verification_attempt.passport_nationality = nationality
933 verification_attempt.passport_last_three_document_chars = last_three_document_chars
935 if existing_attempt:
936 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
938 if existing_attempt.user_id != verification_attempt.user_id:
939 session.flush()
940 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
942 notify(
943 session,
944 user_id=verification_attempt.user_id,
945 topic_action=NotificationTopicAction.verification__sv_fail,
946 key="",
947 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
948 )
949 return
951 verification_attempt.has_full_data = True
952 verification_attempt.passport_encrypted_data = asym_encrypt(
953 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
954 )
955 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
956 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
957 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
959 session.flush()
961 strong_verification_completions_counter.inc()
963 user = verification_attempt.user
964 if verification_attempt.has_strong_verification(user): 964 ↛ 979line 964 didn't jump to line 979 because the condition on line 964 was always true
965 badge_id = "strong_verification"
966 if session.execute(
967 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
968 ).scalar_one_or_none():
969 return
971 user_add_badge(session, user.id, badge_id, do_notify=False)
972 notify(
973 session,
974 user_id=verification_attempt.user_id,
975 topic_action=NotificationTopicAction.verification__sv_success,
976 key="",
977 )
978 else:
979 notify(
980 session,
981 user_id=verification_attempt.user_id,
982 topic_action=NotificationTopicAction.verification__sv_fail,
983 key="",
984 data=notification_data_pb2.VerificationSVFail(
985 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
986 ),
987 )
990def send_activeness_probes(payload: empty_pb2.Empty) -> None:
991 with session_scope() as session:
992 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
994 if config["ACTIVENESS_PROBES_ENABLED"]:
995 # current activeness probes
996 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
998 # users who we should send an activeness probe to
999 new_probe_user_ids = (
1000 session.execute(
1001 select(User.id)
1002 .where(User.is_visible)
1003 .where(User.hosting_status == HostingStatus.can_host)
1004 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
1005 .where(User.id.not_in(select(subquery.c.user_id)))
1006 )
1007 .scalars()
1008 .all()
1009 )
1011 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
1012 probes_today = session.execute(
1013 select(func.count())
1014 .select_from(ActivenessProbe)
1015 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
1016 ).scalar_one()
1018 # send probes to max 2% of users per day
1019 max_probes_per_day = 0.02 * total_users
1020 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
1022 if len(new_probe_user_ids) > max_probe_size: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true
1023 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
1025 for user_id in new_probe_user_ids:
1026 session.add(ActivenessProbe(user_id=user_id))
1028 session.commit()
1030 ## Step 2: actually send out probe notifications
1031 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
1032 probes = (
1033 session.execute(
1034 select(ActivenessProbe)
1035 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
1036 .where(ActivenessProbe.probe_initiated + delay < func.now())
1037 .where(ActivenessProbe.is_pending)
1038 )
1039 .scalars()
1040 .all()
1041 )
1043 for probe in probes:
1044 probe.notifications_sent = probe_number_minus_1 + 1
1045 context = make_background_user_context(user_id=probe.user.id)
1046 notify(
1047 session,
1048 user_id=probe.user.id,
1049 topic_action=NotificationTopicAction.activeness__probe,
1050 key=str(probe.id),
1051 data=notification_data_pb2.ActivenessProbe(
1052 reminder_number=probe_number_minus_1 + 1,
1053 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1054 ),
1055 )
1056 session.commit()
1058 ## Step 3: for those who haven't responded, mark them as failed
1059 expired_probes = (
1060 session.execute(
1061 select(ActivenessProbe)
1062 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1063 .where(ActivenessProbe.is_pending)
1064 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1065 )
1066 .scalars()
1067 .all()
1068 )
1070 for probe in expired_probes:
1071 probe.responded = now()
1072 probe.response = ActivenessProbeStatus.expired
1073 if probe.user.hosting_status == HostingStatus.can_host: 1073 ↛ 1075line 1073 didn't jump to line 1075 because the condition on line 1073 was always true
1074 probe.user.hosting_status = HostingStatus.maybe
1075 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1075 ↛ 1077line 1075 didn't jump to line 1077 because the condition on line 1075 was always true
1076 probe.user.meetup_status = MeetupStatus.open_to_meetup
1077 session.commit()
1080def update_randomized_locations(payload: empty_pb2.Empty) -> None:
1081 """
1082 We generate for each user a randomized location as follows:
1083 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1084 - For each user, mix in the user_id for randomness
1085 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1086 - Generate an angle from [0, 360]
1087 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1088 """
1089 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1091 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]:
1092 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1093 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1094 radius = 0.02 + 0.08 * radius_u
1095 angle_rad = 2 * pi * angle_u
1096 offset_lng = radius * cos(angle_rad)
1097 offset_lat = radius * sin(angle_rad)
1098 return lat + offset_lat, lng + offset_lng
1100 user_updates: list[dict[str, Any]] = []
1102 with session_scope() as session:
1103 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1105 for user_id, geom in users_to_update:
1106 lat, lng = get_coordinates(geom)
1107 user_updates.append(
1108 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1109 )
1111 with session_scope() as session:
1112 session.execute(update(User), user_updates)
1115def send_event_reminders(payload: empty_pb2.Empty) -> None:
1116 """
1117 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1118 """
1119 logger.info("Sending event reminder emails")
1121 with session_scope() as session:
1122 occurrences = (
1123 session.execute(
1124 select(EventOccurrence)
1125 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1126 .where(EventOccurrence.start_time >= now())
1127 )
1128 .scalars()
1129 .all()
1130 )
1132 for occurrence in occurrences:
1133 results = session.execute(
1134 select(User, EventOccurrenceAttendee)
1135 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1136 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1137 .where(EventOccurrenceAttendee.reminder_sent == False)
1138 ).all()
1140 for user, attendee in results:
1141 context = make_background_user_context(user_id=user.id)
1143 notify(
1144 session,
1145 user_id=user.id,
1146 topic_action=NotificationTopicAction.event__reminder,
1147 key=str(occurrence.id),
1148 data=notification_data_pb2.EventReminder(
1149 event=event_to_pb(session, occurrence, context),
1150 user=user_model_to_pb(user, session, context),
1151 ),
1152 moderation_state_id=occurrence.moderation_state_id,
1153 )
1155 attendee.reminder_sent = True
1156 session.commit()
1159def check_expo_push_receipts(payload: empty_pb2.Empty) -> None:
1160 """
1161 Check Expo push receipts in batch and update delivery attempts.
1162 """
1163 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max
1165 for iteration in range(MAX_ITERATIONS): 1165 ↛ 1227line 1165 didn't jump to line 1227 because the loop on line 1165 didn't complete
1166 with session_scope() as session:
1167 # Find all delivery attempts that need receipt checking
1168 # Wait 15 minutes per Expo's recommendation before checking receipts
1169 attempts = (
1170 session.execute(
1171 select(PushNotificationDeliveryAttempt)
1172 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None)
1173 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None)
1174 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15))
1175 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24))
1176 .limit(100)
1177 )
1178 .scalars()
1179 .all()
1180 )
1182 if not attempts:
1183 logger.debug("No Expo receipts to check")
1184 return
1186 logger.info(f"Checking {len(attempts)} Expo push receipts")
1188 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts])
1190 for attempt in attempts:
1191 receipt = receipts.get(not_none(attempt.expo_ticket_id))
1193 # Always mark as checked to avoid infinite loops
1194 attempt.receipt_checked_at = now()
1196 if receipt is None:
1197 # Receipt not found after 15min - likely expired (>24h) or never existed
1198 # Per Expo docs: receipts should be available within 15 minutes
1199 attempt.receipt_status = "not_found"
1200 continue
1202 attempt.receipt_status = receipt.get("status")
1204 if receipt.get("status") == "error":
1205 details = receipt.get("details", {})
1206 error_code = details.get("error")
1207 attempt.receipt_error_code = error_code
1209 if error_code == "DeviceNotRegistered": 1209 ↛ 1224line 1209 didn't jump to line 1224 because the condition on line 1209 was always true
1210 # Device token is no longer valid - disable the subscription
1211 sub = session.execute(
1212 select(PushNotificationSubscription).where(
1213 PushNotificationSubscription.id == attempt.push_notification_subscription_id
1214 )
1215 ).scalar_one()
1217 if sub.disabled_at > now(): 1217 ↛ 1190line 1217 didn't jump to line 1190 because the condition on line 1217 was always true
1218 sub.disabled_at = now()
1219 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt")
1220 push_notification_counter.labels(
1221 platform="expo", outcome="permanent_subscription_failure_receipt"
1222 ).inc()
1223 else:
1224 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}")
1226 # If we get here, we've exhausted MAX_ITERATIONS without finishing
1227 raise RuntimeError(
1228 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - "
1229 "there may be an unusually large backlog of receipts to check"
1230 )
1233def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None:
1234 """
1235 Sends the postcard via external API and updates attempt status.
1236 """
1237 with session_scope() as session:
1238 attempt = session.execute(
1239 select(PostalVerificationAttempt).where(
1240 PostalVerificationAttempt.id == payload.postal_verification_attempt_id
1241 )
1242 ).scalar_one_or_none()
1244 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1244 ↛ 1245line 1244 didn't jump to line 1245 because the condition on line 1244 was never true
1245 logger.warning(
1246 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state"
1247 )
1248 return
1250 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one()
1252 result = send_postcard(
1253 recipient_name=user_name,
1254 address_line_1=attempt.address_line_1,
1255 address_line_2=attempt.address_line_2,
1256 city=attempt.city,
1257 state=attempt.state,
1258 postal_code=attempt.postal_code,
1259 country=attempt.country,
1260 verification_code=not_none(attempt.verification_code),
1261 qr_code_url=urls.postal_verification_link(code=not_none(attempt.verification_code)),
1262 )
1264 if result.success:
1265 attempt.status = PostalVerificationStatus.awaiting_verification
1266 attempt.postcard_sent_at = func.now()
1268 notify(
1269 session,
1270 user_id=attempt.user_id,
1271 topic_action=NotificationTopicAction.postal_verification__postcard_sent,
1272 key="",
1273 data=notification_data_pb2.PostalVerificationPostcardSent(
1274 city=attempt.city,
1275 country=attempt.country,
1276 ),
1277 )
1278 else:
1279 # Could retry or fail - for now, fail
1280 attempt.status = PostalVerificationStatus.failed
1281 logger.error(f"Postcard send failed: {result.error_message}")
1284class DatabaseInconsistencyError(Exception):
1285 """Raised when database consistency checks fail"""
1287 pass
1290def check_database_consistency(payload: empty_pb2.Empty) -> None:
1291 """
1292 Checks database consistency and raises an exception if any issues are found.
1293 """
1294 logger.info("Checking database consistency")
1295 errors = []
1297 with session_scope() as session:
1298 # Check that all users have a profile gallery
1299 users_without_gallery = session.execute(
1300 select(User.id, User.username).where(User.profile_gallery_id.is_(None))
1301 ).all()
1302 if users_without_gallery:
1303 errors.append(f"Users without profile gallery: {users_without_gallery}")
1305 # Check that all profile galleries point to their owner
1306 mismatched_galleries = session.execute(
1307 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id)
1308 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id)
1309 .where(User.profile_gallery_id.is_not(None))
1310 .where(PhotoGallery.owner_user_id != User.id)
1311 ).all()
1312 if mismatched_galleries: 1312 ↛ 1313line 1312 didn't jump to line 1313 because the condition on line 1312 was never true
1313 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}")
1315 # === Moderation System Consistency Checks ===
1317 # Check every ModerationState has at least one INITIAL_REVIEW queue item
1318 # Skip items with ID < 2000000 as they were created before this check was introduced
1319 states_without_initial_review = session.execute(
1320 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1321 ModerationState.id >= 2000000,
1322 ~exists(
1323 select(1)
1324 .where(ModerationQueueItem.moderation_state_id == ModerationState.id)
1325 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review)
1326 ),
1327 )
1328 ).all()
1329 if states_without_initial_review: 1329 ↛ 1330line 1329 didn't jump to line 1330 because the condition on line 1329 was never true
1330 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}")
1332 # Check every ModerationState has a CREATE log entry
1333 # Skip items with ID < 2000000 as they were created before this check was introduced
1334 states_without_create_log = session.execute(
1335 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1336 ModerationState.id >= 2000000,
1337 ~exists(
1338 select(1)
1339 .where(ModerationLog.moderation_state_id == ModerationState.id)
1340 .where(ModerationLog.action == ModerationAction.create)
1341 ),
1342 )
1343 ).all()
1344 if states_without_create_log: 1344 ↛ 1345line 1344 didn't jump to line 1345 because the condition on line 1344 was never true
1345 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}")
1347 # Check resolved queue items point to log entries for the same moderation state
1348 resolved_item_log_mismatches = session.execute(
1349 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id)
1350 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1351 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1352 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id)
1353 ).all()
1354 if resolved_item_log_mismatches: 1354 ↛ 1355line 1354 didn't jump to line 1355 because the condition on line 1354 was never true
1355 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}")
1357 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it
1358 hr_states = (
1359 session.execute(
1360 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.host_request)
1361 )
1362 .scalars()
1363 .all()
1364 )
1365 for state_id in hr_states: 1365 ↛ 1366line 1365 didn't jump to line 1366 because the loop on line 1365 never started
1366 hr_count = session.execute(
1367 select(func.count()).where(HostRequest.moderation_state_id == state_id)
1368 ).scalar_one()
1369 if hr_count != 1:
1370 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)")
1372 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it
1373 gc_states = (
1374 session.execute(
1375 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.group_chat)
1376 )
1377 .scalars()
1378 .all()
1379 )
1380 for state_id in gc_states: 1380 ↛ 1381line 1380 didn't jump to line 1381 because the loop on line 1380 never started
1381 gc_count = session.execute(
1382 select(func.count()).where(GroupChat.moderation_state_id == state_id)
1383 ).scalar_one()
1384 if gc_count != 1:
1385 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)")
1387 # Check ModerationState.object_id matches the actual object's ID
1388 hr_object_id_mismatches = session.execute(
1389 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id)
1390 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id)
1391 .where(ModerationState.object_type == ModerationObjectType.host_request)
1392 .where(ModerationState.object_id != HostRequest.conversation_id)
1393 ).all()
1394 if hr_object_id_mismatches: 1394 ↛ 1395line 1394 didn't jump to line 1395 because the condition on line 1394 was never true
1395 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}")
1397 gc_object_id_mismatches = session.execute(
1398 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id)
1399 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id)
1400 .where(ModerationState.object_type == ModerationObjectType.group_chat)
1401 .where(ModerationState.object_id != GroupChat.conversation_id)
1402 ).all()
1403 if gc_object_id_mismatches: 1403 ↛ 1404line 1403 didn't jump to line 1404 because the condition on line 1403 was never true
1404 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}")
1406 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState
1407 hr_reverse_mismatches = session.execute(
1408 select(
1409 HostRequest.conversation_id,
1410 HostRequest.moderation_state_id,
1411 ModerationState.object_type,
1412 ModerationState.object_id,
1413 )
1414 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id)
1415 .where(
1416 (ModerationState.object_type != ModerationObjectType.host_request)
1417 | (ModerationState.object_id != HostRequest.conversation_id)
1418 )
1419 ).all()
1420 if hr_reverse_mismatches: 1420 ↛ 1421line 1420 didn't jump to line 1421 because the condition on line 1420 was never true
1421 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}")
1423 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState
1424 gc_reverse_mismatches = session.execute(
1425 select(
1426 GroupChat.conversation_id,
1427 GroupChat.moderation_state_id,
1428 ModerationState.object_type,
1429 ModerationState.object_id,
1430 )
1431 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id)
1432 .where(
1433 (ModerationState.object_type != ModerationObjectType.group_chat)
1434 | (ModerationState.object_id != GroupChat.conversation_id)
1435 )
1436 ).all()
1437 if gc_reverse_mismatches: 1437 ↛ 1438line 1437 didn't jump to line 1438 because the condition on line 1437 was never true
1438 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}")
1440 # Ensure auto-approve deadline isn't being exceeded by a significant margin
1441 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting
1442 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1443 if deadline_seconds > 0: 1443 ↛ 1444line 1443 didn't jump to line 1444 because the condition on line 1443 was never true
1444 grace_period = timedelta(minutes=5)
1445 stale_initial_review_items = session.execute(
1446 select(
1447 ModerationQueueItem.id,
1448 ModerationQueueItem.moderation_state_id,
1449 ModerationQueueItem.time_created,
1450 )
1451 .where(ModerationQueueItem.trigger == ModerationTrigger.initial_review)
1452 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
1453 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period)
1454 ).all()
1455 if stale_initial_review_items:
1456 errors.append(
1457 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}"
1458 )
1460 if errors:
1461 raise DatabaseInconsistencyError("\n".join(errors))
1464def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None:
1465 """
1466 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline.
1467 Items explicitly actioned by moderators are left alone.
1468 """
1469 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1470 if deadline_seconds <= 0:
1471 return
1473 with session_scope() as session:
1474 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"])
1476 items = (
1477 Moderation()
1478 .GetModerationQueue(
1479 request=moderation_pb2.GetModerationQueueReq(
1480 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW],
1481 unresolved_only=True,
1482 page_size=100,
1483 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)),
1484 ),
1485 context=ctx,
1486 session=session,
1487 )
1488 .queue_items
1489 )
1491 if not items:
1492 return
1494 logger.info(f"Auto-approving {len(items)} moderation queue items")
1495 for item in items:
1496 Moderation().ModerateContent(
1497 request=moderation_pb2.ModerateContentReq(
1498 moderation_state_id=item.moderation_state_id,
1499 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1500 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1501 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.",
1502 ),
1503 context=ctx,
1504 session=session,
1505 )
1506 moderation_auto_approved_counter.inc(len(items))