Coverage for src / couchers / jobs / handlers.py: 88%
460 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-02 11:17 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-02 11:17 +0000
1"""
2Background job servicers
3"""
5import logging
6from collections.abc import Sequence
7from datetime import date, timedelta
8from math import cos, pi, sin, sqrt
9from random import sample
10from typing import Any
11from typing import cast as t_cast
13import requests
14from google.protobuf import empty_pb2
15from sqlalchemy import ColumnElement, Float, Function, Integer, Table, select
16from sqlalchemy.orm import aliased
17from sqlalchemy.sql import (
18 and_,
19 case,
20 cast,
21 delete,
22 distinct,
23 exists,
24 extract,
25 func,
26 literal,
27 not_,
28 or_,
29 union_all,
30 update,
31)
33from couchers import urls
34from couchers.config import config
35from couchers.constants import (
36 ACTIVENESS_PROBE_EXPIRY_TIME,
37 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
38 ACTIVENESS_PROBE_TIME_REMINDERS,
39 EVENT_REMINDER_TIMEDELTA,
40 HOST_REQUEST_MAX_REMINDERS,
41 HOST_REQUEST_REMINDER_INTERVAL,
42)
43from couchers.context import make_background_user_context
44from couchers.crypto import (
45 USER_LOCATION_RANDOMIZATION_NAME,
46 asym_encrypt,
47 b64decode,
48 get_secret,
49 simple_decrypt,
50 stable_secure_uniform,
51)
52from couchers.db import session_scope
53from couchers.email.dev import print_dev_email
54from couchers.email.smtp import send_smtp_email
55from couchers.helpers.badges import user_add_badge, user_remove_badge
56from couchers.materialized_views import (
57 UserResponseRate,
58)
59from couchers.metrics import (
60 moderation_auto_approved_counter,
61 push_notification_counter,
62 strong_verification_completions_counter,
63)
64from couchers.models import (
65 AccountDeletionToken,
66 ActivenessProbe,
67 ActivenessProbeStatus,
68 Cluster,
69 ClusterRole,
70 ClusterSubscription,
71 EventOccurrence,
72 EventOccurrenceAttendee,
73 GroupChat,
74 GroupChatSubscription,
75 HostingStatus,
76 HostRequest,
77 HostRequestStatus,
78 LoginToken,
79 MeetupStatus,
80 Message,
81 MessageType,
82 ModerationAction,
83 ModerationLog,
84 ModerationObjectType,
85 ModerationQueueItem,
86 ModerationState,
87 ModerationTrigger,
88 PassportSex,
89 PasswordResetToken,
90 PhotoGallery,
91 PostalVerificationAttempt,
92 PostalVerificationStatus,
93 PushNotificationDeliveryAttempt,
94 PushNotificationSubscription,
95 Reference,
96 StrongVerificationAttempt,
97 StrongVerificationAttemptStatus,
98 User,
99 UserBadge,
100 Volunteer,
101)
102from couchers.notifications.expo_api import get_expo_push_receipts
103from couchers.notifications.notify import notify
104from couchers.postal.postcard_service import send_postcard
105from couchers.proto import moderation_pb2, notification_data_pb2
106from couchers.proto.internal import jobs_pb2, verification_pb2
107from couchers.resources import get_badge_dict, get_static_badge_dict
108from couchers.servicers.api import user_model_to_pb
109from couchers.servicers.events import (
110 event_to_pb,
111)
112from couchers.servicers.moderation import Moderation
113from couchers.servicers.requests import host_request_to_pb
114from couchers.sql import (
115 users_visible_to_each_other,
116 where_moderated_content_visible,
117 where_moderated_content_visible_to_user_column,
118 where_user_columns_visible_to_each_other,
119 where_users_column_visible,
120)
121from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
122from couchers.tasks import send_duplicate_strong_verification_email
123from couchers.utils import (
124 Timestamp_from_datetime,
125 create_coordinate,
126 get_coordinates,
127 not_none,
128 now,
129)
131logger = logging.getLogger(__name__)
134def send_email(payload: jobs_pb2.SendEmailPayload) -> None:
135 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
136 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
137 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
138 # the sender must return a models.Email object that can be added to the database
139 email = sender(
140 sender_name=payload.sender_name,
141 sender_email=payload.sender_email,
142 recipient=payload.recipient,
143 subject=payload.subject,
144 plain=payload.plain,
145 html=payload.html,
146 list_unsubscribe_header=payload.list_unsubscribe_header,
147 source_data=payload.source_data,
148 )
149 with session_scope() as session:
150 session.add(email)
153def purge_login_tokens(payload: empty_pb2.Empty) -> None:
154 logger.info("Purging login tokens")
155 with session_scope() as session:
156 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
159def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None:
160 logger.info("Purging login tokens")
161 with session_scope() as session:
162 session.execute(
163 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
164 )
167def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None:
168 logger.info("Purging account deletion tokens")
169 with session_scope() as session:
170 session.execute(
171 delete(AccountDeletionToken)
172 .where(~AccountDeletionToken.is_valid)
173 .execution_options(synchronize_session=False)
174 )
177def send_message_notifications(payload: empty_pb2.Empty) -> None:
178 """
179 Sends out email notifications for messages that have been unseen for a long enough time
180 """
181 # very crude and dumb algorithm
182 logger.info("Sending out email notifications for unseen messages")
184 with session_scope() as session:
185 # users who have unnotified messages older than 5 minutes in any group chat
186 users = (
187 session.execute(
188 where_moderated_content_visible_to_user_column(
189 select(User)
190 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
191 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
192 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
193 GroupChat,
194 User.id,
195 )
196 .where(not_(GroupChatSubscription.is_muted))
197 .where(User.is_visible)
198 .where(Message.time >= GroupChatSubscription.joined)
199 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
200 .where(Message.id > User.last_notified_message_id)
201 .where(Message.id > GroupChatSubscription.last_seen_message_id)
202 .where(Message.time < now() - timedelta(minutes=5))
203 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
204 )
205 .scalars()
206 .unique()
207 )
209 for user in users:
210 context = make_background_user_context(user_id=user.id)
211 # now actually grab all the group chats, not just less than 5 min old
212 subquery = (
213 where_users_column_visible(
214 where_moderated_content_visible(
215 select(
216 GroupChatSubscription.group_chat_id.label("group_chat_id"),
217 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
218 func.max(Message.id).label("message_id"),
219 func.count(Message.id).label("count_unseen"),
220 )
221 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
222 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id),
223 context,
224 GroupChat,
225 is_list_operation=True,
226 )
227 .where(GroupChatSubscription.user_id == user.id)
228 .where(not_(GroupChatSubscription.is_muted))
229 .where(Message.id > user.last_notified_message_id)
230 .where(Message.id > GroupChatSubscription.last_seen_message_id)
231 .where(Message.time >= GroupChatSubscription.joined)
232 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
233 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)),
234 context,
235 Message.author_id,
236 )
237 .group_by(GroupChatSubscription.group_chat_id)
238 .order_by(func.max(Message.id).desc())
239 .subquery()
240 )
242 unseen_messages = session.execute(
243 where_moderated_content_visible(
244 select(GroupChat, Message, subquery.c.count_unseen)
245 .join(subquery, subquery.c.message_id == Message.id)
246 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id),
247 context,
248 GroupChat,
249 is_list_operation=True,
250 ).order_by(subquery.c.message_id.desc())
251 ).all()
253 if not unseen_messages:
254 continue
256 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
258 def format_title(message: Message, group_chat: GroupChat, count_unseen: int) -> str:
259 if group_chat.is_dm:
260 return f"You missed {count_unseen} message(s) from {message.author.name}"
261 else:
262 return f"You missed {count_unseen} message(s) in {group_chat.title}"
264 notify(
265 session,
266 user_id=user.id,
267 topic_action="chat:missed_messages",
268 key="",
269 data=notification_data_pb2.ChatMissedMessages(
270 messages=[
271 notification_data_pb2.ChatMessage(
272 author=user_model_to_pb(
273 message.author,
274 session,
275 context,
276 ),
277 message=format_title(message, group_chat, count_unseen),
278 text=message.text,
279 group_chat_id=message.conversation_id,
280 )
281 for group_chat, message, count_unseen in unseen_messages
282 ],
283 ),
284 )
285 session.commit()
288def send_request_notifications(payload: empty_pb2.Empty) -> None:
289 """
290 Sends out email notifications for unseen messages in host requests (as surfer or host)
291 """
292 logger.info("Sending out email notifications for unseen messages in host requests")
294 with session_scope() as session:
295 # Get all candidate users who might have unseen request messages
296 candidate_user_ids = (
297 session.execute(
298 select(User.id)
299 .where(User.is_visible)
300 .where(
301 or_(
302 # Users with unseen messages as surfer
303 exists(
304 select(1)
305 .select_from(HostRequest)
306 .join(Message, Message.conversation_id == HostRequest.conversation_id)
307 .where(HostRequest.surfer_user_id == User.id)
308 .where(Message.id > HostRequest.surfer_last_seen_message_id)
309 .where(Message.id > User.last_notified_request_message_id)
310 .where(Message.time < now() - timedelta(minutes=5))
311 .where(Message.message_type == MessageType.text)
312 ),
313 # Users with unseen messages as host
314 exists(
315 select(1)
316 .select_from(HostRequest)
317 .join(Message, Message.conversation_id == HostRequest.conversation_id)
318 .where(HostRequest.host_user_id == User.id)
319 .where(Message.id > HostRequest.host_last_seen_message_id)
320 .where(Message.id > User.last_notified_request_message_id)
321 .where(Message.time < now() - timedelta(minutes=5))
322 .where(Message.message_type == MessageType.text)
323 ),
324 )
325 )
326 )
327 .scalars()
328 .all()
329 )
331 for user_id in candidate_user_ids:
332 context = make_background_user_context(user_id=user_id)
334 # requests where this user is surfing
335 surfing_reqs = session.execute(
336 where_users_column_visible(
337 where_moderated_content_visible_to_user_column(
338 select(User, HostRequest, func.max(Message.id))
339 .where(User.id == user_id)
340 .join(HostRequest, HostRequest.surfer_user_id == User.id),
341 HostRequest,
342 HostRequest.surfer_user_id,
343 ),
344 context,
345 HostRequest.host_user_id,
346 )
347 .join(Message, Message.conversation_id == HostRequest.conversation_id)
348 .where(Message.id > HostRequest.surfer_last_seen_message_id)
349 .where(Message.id > User.last_notified_request_message_id)
350 .where(Message.time < now() - timedelta(minutes=5))
351 .where(Message.message_type == MessageType.text)
352 .group_by(User, HostRequest) # type: ignore[arg-type]
353 ).all()
355 # where this user is hosting
356 hosting_reqs = session.execute(
357 where_users_column_visible(
358 where_moderated_content_visible_to_user_column(
359 select(User, HostRequest, func.max(Message.id))
360 .where(User.id == user_id)
361 .join(HostRequest, HostRequest.host_user_id == User.id),
362 HostRequest,
363 HostRequest.host_user_id,
364 ),
365 context,
366 HostRequest.surfer_user_id,
367 )
368 .join(Message, Message.conversation_id == HostRequest.conversation_id)
369 .where(Message.id > HostRequest.host_last_seen_message_id)
370 .where(Message.id > User.last_notified_request_message_id)
371 .where(Message.time < now() - timedelta(minutes=5))
372 .where(Message.message_type == MessageType.text)
373 .group_by(User, HostRequest) # type: ignore[arg-type]
374 ).all()
376 for user, host_request, max_message_id in surfing_reqs:
377 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
378 session.flush()
380 notify(
381 session,
382 user_id=user.id,
383 topic_action="host_request:missed_messages",
384 key=str(host_request.conversation_id),
385 data=notification_data_pb2.HostRequestMissedMessages(
386 host_request=host_request_to_pb(host_request, session, context),
387 user=user_model_to_pb(host_request.host, session, context),
388 am_host=False,
389 ),
390 )
392 for user, host_request, max_message_id in hosting_reqs:
393 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
394 session.flush()
396 notify(
397 session,
398 user_id=user.id,
399 topic_action="host_request:missed_messages",
400 key=str(host_request.conversation_id),
401 data=notification_data_pb2.HostRequestMissedMessages(
402 host_request=host_request_to_pb(host_request, session, context),
403 user=user_model_to_pb(host_request.surfer, session, context),
404 am_host=True,
405 ),
406 )
409def send_onboarding_emails(payload: empty_pb2.Empty) -> None:
410 """
411 Sends out onboarding emails
412 """
413 logger.info("Sending out onboarding emails")
415 with session_scope() as session:
416 # first onboarding email
417 users = (
418 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
419 )
421 for user in users:
422 notify(
423 session,
424 user_id=user.id,
425 topic_action="onboarding:reminder",
426 key="1",
427 )
428 user.onboarding_emails_sent = 1
429 user.last_onboarding_email_sent = now()
430 session.commit()
432 # second onboarding email
433 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
434 users = (
435 session.execute(
436 select(User)
437 .where(User.is_visible)
438 .where(User.onboarding_emails_sent == 1)
439 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
440 .where(User.has_completed_profile == False)
441 )
442 .scalars()
443 .all()
444 )
446 for user in users:
447 notify(
448 session,
449 user_id=user.id,
450 topic_action="onboarding:reminder",
451 key="2",
452 )
453 user.onboarding_emails_sent = 2
454 user.last_onboarding_email_sent = now()
455 session.commit()
458def send_reference_reminders(payload: empty_pb2.Empty) -> None:
459 """
460 Sends out reminders to write references after hosting/staying
461 """
462 logger.info("Sending out reference reminder emails")
464 # Keep this in chronological order!
465 reference_reminder_schedule = [
466 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
467 # the end time to write a reference is supposed to be midnight in the host's timezone
468 # 8 pm ish on the last day of the stay
469 (1, timedelta(days=15) - timedelta(hours=20), 14),
470 # 2 pm ish a week after stay
471 (2, timedelta(days=8) - timedelta(hours=14), 7),
472 # 10 am ish 3 days before end of time to write ref
473 (3, timedelta(days=4) - timedelta(hours=10), 3),
474 ]
476 with session_scope() as session:
477 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
478 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
479 user = aliased(User)
480 other_user = aliased(User)
481 # surfers needing to write a ref
482 q1 = (
483 select(literal(True), HostRequest, user, other_user)
484 .join(user, user.id == HostRequest.surfer_user_id)
485 .join(other_user, other_user.id == HostRequest.host_user_id)
486 .outerjoin(
487 Reference,
488 and_(
489 Reference.host_request_id == HostRequest.conversation_id,
490 # if no reference is found in this join, then the surfer has not written a ref
491 Reference.from_user_id == HostRequest.surfer_user_id,
492 ),
493 )
494 .where(Reference.id == None)
495 .where(HostRequest.can_write_reference)
496 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
497 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
498 .where(HostRequest.surfer_reason_didnt_meetup == None)
499 .where(users_visible_to_each_other(user, other_user))
500 )
502 # hosts needing to write a ref
503 q2 = (
504 select(literal(False), HostRequest, user, other_user)
505 .join(user, user.id == HostRequest.host_user_id)
506 .join(other_user, other_user.id == HostRequest.surfer_user_id)
507 .outerjoin(
508 Reference,
509 and_(
510 Reference.host_request_id == HostRequest.conversation_id,
511 # if no reference is found in this join, then the host has not written a ref
512 Reference.from_user_id == HostRequest.host_user_id,
513 ),
514 )
515 .where(Reference.id == None)
516 .where(HostRequest.can_write_reference)
517 .where(HostRequest.host_sent_reference_reminders < reminder_number)
518 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
519 .where(HostRequest.host_reason_didnt_meetup == None)
520 .where(users_visible_to_each_other(user, other_user))
521 )
523 union = union_all(q1, q2).subquery()
524 query = select(
525 union.c[0].label("surfed"),
526 aliased(HostRequest, union),
527 aliased(user, union),
528 aliased(other_user, union),
529 )
530 reference_reminders = session.execute(query).all()
532 for surfed, host_request, user, other_user in reference_reminders:
533 # visibility and blocking already checked in sql
534 assert user.is_visible
535 context = make_background_user_context(user_id=user.id)
536 notify(
537 session,
538 user_id=user.id,
539 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
540 key=str(host_request.conversation_id),
541 data=notification_data_pb2.ReferenceReminder(
542 host_request_id=host_request.conversation_id,
543 other_user=user_model_to_pb(other_user, session, context),
544 days_left=reminder_days_left,
545 ),
546 )
547 if surfed:
548 host_request.surfer_sent_reference_reminders = reminder_number
549 else:
550 host_request.host_sent_reference_reminders = reminder_number
551 session.commit()
554def send_host_request_reminders(payload: empty_pb2.Empty) -> None:
555 with session_scope() as session:
556 host_has_sent_message = select(1).where(
557 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id
558 )
560 requests = (
561 session.execute(
562 where_user_columns_visible_to_each_other(
563 where_moderated_content_visible_to_user_column(
564 select(HostRequest),
565 HostRequest,
566 HostRequest.host_user_id,
567 )
568 .where(HostRequest.status == HostRequestStatus.pending)
569 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
570 .where(HostRequest.start_time > func.now())
571 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
572 .where(~exists(host_has_sent_message)),
573 HostRequest.host_user_id,
574 HostRequest.surfer_user_id,
575 )
576 )
577 .scalars()
578 .all()
579 )
581 for host_request in requests:
582 host_request.host_sent_request_reminders += 1
583 host_request.last_sent_request_reminder_time = now()
585 context = make_background_user_context(user_id=host_request.host_user_id)
586 notify(
587 session,
588 user_id=host_request.host_user_id,
589 topic_action="host_request:reminder",
590 key=str(host_request.conversation_id),
591 data=notification_data_pb2.HostRequestReminder(
592 host_request=host_request_to_pb(host_request, session, context),
593 surfer=user_model_to_pb(host_request.surfer, session, context),
594 ),
595 moderation_state_id=host_request.moderation_state_id,
596 )
598 session.commit()
601def add_users_to_email_list(payload: empty_pb2.Empty) -> None:
602 if not config["LISTMONK_ENABLED"]: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true
603 logger.info("Not adding users to mailing list")
604 return
606 logger.info("Adding users to mailing list")
608 while True:
609 with session_scope() as session:
610 user = session.execute(
611 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
612 ).scalar_one_or_none()
613 if not user:
614 logger.info("Finished adding users to mailing list")
615 return
617 if user.opt_out_of_newsletter:
618 user.in_sync_with_newsletter = True
619 session.commit()
620 continue
622 r = requests.post(
623 config["LISTMONK_BASE_URL"] + "/api/subscribers",
624 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
625 json={
626 "email": user.email,
627 "name": user.name,
628 "lists": [config["LISTMONK_LIST_ID"]],
629 "preconfirm_subscriptions": True,
630 "attribs": {"couchers_user_id": user.id},
631 "status": "enabled",
632 },
633 timeout=10,
634 )
635 # the API returns if the user is already subscribed
636 if r.status_code == 200 or r.status_code == 409: 636 ↛ 640line 636 didn't jump to line 640 because the condition on line 636 was always true
637 user.in_sync_with_newsletter = True
638 session.commit()
639 else:
640 raise Exception("Failed to add users to mailing list")
643def enforce_community_membership(payload: empty_pb2.Empty) -> None:
644 tasks_enforce_community_memberships()
647def update_recommendation_scores(payload: empty_pb2.Empty) -> None:
648 text_fields = [
649 User.hometown,
650 User.occupation,
651 User.education,
652 User.about_me,
653 User.things_i_like,
654 User.about_place,
655 User.additional_information,
656 User.pet_details,
657 User.kid_details,
658 User.housemate_details,
659 User.other_host_info,
660 User.sleeping_details,
661 User.area,
662 User.house_rules,
663 ]
664 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
666 def poor_man_gaussian() -> ColumnElement[float] | float:
667 """
668 Produces an approximatley std normal random variate
669 """
670 trials = 5
671 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
673 def int_(stmt: Any) -> Function[int]:
674 return func.coalesce(cast(stmt, Integer), 0)
676 def float_(stmt: Any) -> Function[float]:
677 return func.coalesce(cast(stmt, Float), 0.0)
679 with session_scope() as session:
680 # profile
681 profile_text = ""
682 for field in text_fields:
683 profile_text += func.coalesce(field, "") # type: ignore[assignment]
684 text_length = func.length(profile_text)
685 home_text = ""
686 for field in home_fields:
687 home_text += func.coalesce(field, "") # type: ignore[assignment]
688 home_length = func.length(home_text)
690 filled_profile = int_(User.has_completed_profile)
691 has_text = int_(text_length > 500)
692 long_text = int_(text_length > 2000)
693 can_host = int_(User.hosting_status == HostingStatus.can_host)
694 may_host = int_(User.hosting_status == HostingStatus.maybe)
695 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
696 filled_home = int_(User.has_completed_my_home)
697 filled_home_lots = int_(home_length > 200)
698 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
699 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
701 # references
702 left_ref_expr = int_(1).label("left_reference")
703 left_refs_subquery = (
704 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
705 )
706 left_reference = int_(left_refs_subquery.c.left_reference)
707 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
708 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
709 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
710 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
711 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
712 "has_bad_ref"
713 )
714 received_ref_subquery = (
715 select(
716 Reference.to_user_id.label("user_id"),
717 has_reference_expr,
718 has_multiple_types_expr,
719 has_bad_ref_expr,
720 ref_count_expr,
721 ref_avg_expr,
722 )
723 .group_by(Reference.to_user_id)
724 .subquery()
725 )
726 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
727 has_reference = int_(received_ref_subquery.c.has_reference)
728 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
729 rating_score = float_(
730 received_ref_subquery.c.ref_avg
731 * (
732 2 * func.least(received_ref_subquery.c.ref_count, 5)
733 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
734 )
735 )
736 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
738 # activeness
739 recently_active = int_(User.last_active >= now() - timedelta(days=180))
740 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
741 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
742 messaged_lots = int_(func.count(Message.id) > 5)
743 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
744 messaging_subquery = (
745 select(Message.author_id.label("user_id"), messaging_points_subquery)
746 .where(Message.message_type == MessageType.text)
747 .group_by(Message.author_id)
748 .subquery()
749 )
750 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
752 # verification
753 cb_subquery = (
754 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
755 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
756 .where(ClusterSubscription.role == ClusterRole.admin)
757 .where(Cluster.is_official_cluster)
758 .group_by(ClusterSubscription.user_id)
759 .subquery()
760 )
761 min_node_id = cb_subquery.c.min_node_id
762 cb = int_(min_node_id >= 1)
763 wcb = int_(min_node_id == 1)
764 badge_points = {
765 "founder": 100,
766 "board_member": 20,
767 "past_board_member": 5,
768 "strong_verification": 3,
769 "volunteer": 3,
770 "past_volunteer": 2,
771 "donor": 1,
772 "phone_verified": 1,
773 }
775 badge_subquery = (
776 select(
777 UserBadge.user_id.label("user_id"),
778 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
779 )
780 .group_by(UserBadge.user_id)
781 .subquery()
782 )
784 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
786 # response rate
787 hr_subquery = select(
788 UserResponseRate.user_id,
789 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
790 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
791 ).subquery()
792 response_time_33p = hr_subquery.c.response_time_33p
793 response_time_66p = hr_subquery.c.response_time_66p
794 # be careful with nulls
795 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
797 recommendation_score = (
798 hosting_status_points
799 + profile_points
800 + ref_score
801 + activeness_points
802 + other_points
803 + response_rate_points
804 + 2 * poor_man_gaussian()
805 )
807 scores = (
808 select(User.id.label("user_id"), recommendation_score.label("score"))
809 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
810 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
811 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
812 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
813 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
814 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
815 ).subquery()
817 session.execute(
818 t_cast(Table, User.__table__)
819 .update()
820 .values(recommendation_score=scores.c.score)
821 .where(User.id == scores.c.user_id)
822 )
824 logger.info("Updated recommendation scores")
827def update_badges(payload: empty_pb2.Empty) -> None:
828 with session_scope() as session:
830 def update_badge(badge_id: str, members: Sequence[int]) -> None:
831 badge = get_badge_dict()[badge_id]
832 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge.id)).scalars().all()
833 # in case the user ids don't exist in the db
834 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
835 # we should add the badge to these
836 add = set(actual_members) - set(user_ids)
837 # we should remove the badge from these
838 remove = set(user_ids) - set(actual_members)
839 for user_id in add:
840 user_add_badge(session, user_id, badge.id)
842 for user_id in remove:
843 user_remove_badge(session, user_id, badge.id)
845 update_badge("founder", get_static_badge_dict()["founder"])
846 update_badge("board_member", get_static_badge_dict()["board_member"])
847 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
848 update_badge("donor", session.execute(select(User.id).where(User.last_donated.is_not(None))).scalars().all())
849 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
850 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
851 # strong verification requires passport on file + gender/sex correspondence and date of birth match
852 update_badge(
853 "strong_verification",
854 session.execute(
855 select(User.id)
856 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
857 .where(StrongVerificationAttempt.has_strong_verification(User))
858 )
859 .scalars()
860 .all(),
861 )
862 # volunteer badge for active volunteers (stopped_volunteering is null)
863 update_badge(
864 "volunteer",
865 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_(None))).scalars().all(),
866 )
867 # past_volunteer badge for past volunteers (stopped_volunteering is not null)
868 update_badge(
869 "past_volunteer",
870 session.execute(select(Volunteer.user_id).where(Volunteer.stopped_volunteering.is_not(None)))
871 .scalars()
872 .all(),
873 )
876def finalize_strong_verification(payload: jobs_pb2.FinalizeStrongVerificationPayload) -> None:
877 with session_scope() as session:
878 verification_attempt = session.execute(
879 select(StrongVerificationAttempt)
880 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
881 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
882 ).scalar_one()
883 response = requests.post(
884 "https://passportreader.app/api/v1/session.get",
885 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
886 json={"id": verification_attempt.iris_session_id},
887 timeout=10,
888 verify="/etc/ssl/certs/ca-certificates.crt",
889 )
890 if response.status_code != 200: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true
891 raise Exception(f"Iris didn't return 200: {response.text}")
892 json_data = response.json()
893 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
894 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
895 )
896 assert verification_attempt.user_id == reference_payload.user_id
897 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
898 assert verification_attempt.iris_session_id == json_data["id"]
899 assert json_data["state"] == "APPROVED"
901 if json_data["document_type"] != "PASSPORT":
902 verification_attempt.status = StrongVerificationAttemptStatus.failed
903 notify(
904 session,
905 user_id=verification_attempt.user_id,
906 topic_action="verification:sv_fail",
907 key="",
908 data=notification_data_pb2.VerificationSVFail(
909 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
910 ),
911 )
912 return
914 assert json_data["document_type"] == "PASSPORT"
916 expiry_date = date.fromisoformat(json_data["expiry_date"])
917 nationality = json_data["nationality"]
918 last_three_document_chars = json_data["document_number"][-3:]
920 existing_attempt = session.execute(
921 select(StrongVerificationAttempt)
922 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
923 .where(StrongVerificationAttempt.passport_nationality == nationality)
924 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
925 .order_by(StrongVerificationAttempt.id)
926 .limit(1)
927 ).scalar_one_or_none()
929 verification_attempt.has_minimal_data = True
930 verification_attempt.passport_expiry_date = expiry_date
931 verification_attempt.passport_nationality = nationality
932 verification_attempt.passport_last_three_document_chars = last_three_document_chars
934 if existing_attempt:
935 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
937 if existing_attempt.user_id != verification_attempt.user_id:
938 session.flush()
939 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
941 notify(
942 session,
943 user_id=verification_attempt.user_id,
944 topic_action="verification:sv_fail",
945 key="",
946 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
947 )
948 return
950 verification_attempt.has_full_data = True
951 verification_attempt.passport_encrypted_data = asym_encrypt(
952 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
953 )
954 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
955 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
956 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
958 session.flush()
960 strong_verification_completions_counter.inc()
962 user = verification_attempt.user
963 if verification_attempt.has_strong_verification(user): 963 ↛ 973line 963 didn't jump to line 973 because the condition on line 963 was always true
964 badge_id = "strong_verification"
965 if session.execute(
966 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
967 ).scalar_one_or_none():
968 return
970 user_add_badge(session, user.id, badge_id, do_notify=False)
971 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success", key="")
972 else:
973 notify(
974 session,
975 user_id=verification_attempt.user_id,
976 topic_action="verification:sv_fail",
977 key="",
978 data=notification_data_pb2.VerificationSVFail(
979 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
980 ),
981 )
984def send_activeness_probes(payload: empty_pb2.Empty) -> None:
985 with session_scope() as session:
986 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
988 if config["ACTIVENESS_PROBES_ENABLED"]:
989 # current activeness probes
990 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
992 # users who we should send an activeness probe to
993 new_probe_user_ids = (
994 session.execute(
995 select(User.id)
996 .where(User.is_visible)
997 .where(User.hosting_status == HostingStatus.can_host)
998 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
999 .where(User.id.not_in(select(subquery.c.user_id)))
1000 )
1001 .scalars()
1002 .all()
1003 )
1005 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
1006 probes_today = session.execute(
1007 select(func.count())
1008 .select_from(ActivenessProbe)
1009 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
1010 ).scalar_one()
1012 # send probes to max 2% of users per day
1013 max_probes_per_day = 0.02 * total_users
1014 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
1016 if len(new_probe_user_ids) > max_probe_size: 1016 ↛ 1017line 1016 didn't jump to line 1017 because the condition on line 1016 was never true
1017 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
1019 for user_id in new_probe_user_ids:
1020 session.add(ActivenessProbe(user_id=user_id))
1022 session.commit()
1024 ## Step 2: actually send out probe notifications
1025 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
1026 probes = (
1027 session.execute(
1028 select(ActivenessProbe)
1029 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
1030 .where(ActivenessProbe.probe_initiated + delay < func.now())
1031 .where(ActivenessProbe.is_pending)
1032 )
1033 .scalars()
1034 .all()
1035 )
1037 for probe in probes:
1038 probe.notifications_sent = probe_number_minus_1 + 1
1039 context = make_background_user_context(user_id=probe.user.id)
1040 notify(
1041 session,
1042 user_id=probe.user.id,
1043 topic_action="activeness:probe",
1044 key=str(probe.id),
1045 data=notification_data_pb2.ActivenessProbe(
1046 reminder_number=probe_number_minus_1 + 1,
1047 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1048 ),
1049 )
1050 session.commit()
1052 ## Step 3: for those who haven't responded, mark them as failed
1053 expired_probes = (
1054 session.execute(
1055 select(ActivenessProbe)
1056 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1057 .where(ActivenessProbe.is_pending)
1058 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1059 )
1060 .scalars()
1061 .all()
1062 )
1064 for probe in expired_probes:
1065 probe.responded = now()
1066 probe.response = ActivenessProbeStatus.expired
1067 if probe.user.hosting_status == HostingStatus.can_host: 1067 ↛ 1069line 1067 didn't jump to line 1069 because the condition on line 1067 was always true
1068 probe.user.hosting_status = HostingStatus.maybe
1069 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 1069 ↛ 1071line 1069 didn't jump to line 1071 because the condition on line 1069 was always true
1070 probe.user.meetup_status = MeetupStatus.open_to_meetup
1071 session.commit()
1074def update_randomized_locations(payload: empty_pb2.Empty) -> None:
1075 """
1076 We generate for each user a randomized location as follows:
1077 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1078 - For each user, mix in the user_id for randomness
1079 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1080 - Generate an angle from [0, 360]
1081 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1082 """
1083 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1085 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]:
1086 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1087 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1088 radius = 0.02 + 0.08 * radius_u
1089 angle_rad = 2 * pi * angle_u
1090 offset_lng = radius * cos(angle_rad)
1091 offset_lat = radius * sin(angle_rad)
1092 return lat + offset_lat, lng + offset_lng
1094 user_updates: list[dict[str, Any]] = []
1096 with session_scope() as session:
1097 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1099 for user_id, geom in users_to_update:
1100 lat, lng = get_coordinates(geom)
1101 user_updates.append(
1102 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1103 )
1105 with session_scope() as session:
1106 session.execute(update(User), user_updates)
1109def send_event_reminders(payload: empty_pb2.Empty) -> None:
1110 """
1111 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1112 """
1113 logger.info("Sending event reminder emails")
1115 with session_scope() as session:
1116 occurrences = (
1117 session.execute(
1118 select(EventOccurrence)
1119 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1120 .where(EventOccurrence.start_time >= now())
1121 )
1122 .scalars()
1123 .all()
1124 )
1126 for occurrence in occurrences:
1127 results = session.execute(
1128 select(User, EventOccurrenceAttendee)
1129 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1130 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1131 .where(EventOccurrenceAttendee.reminder_sent == False)
1132 ).all()
1134 for user, attendee in results:
1135 context = make_background_user_context(user_id=user.id)
1137 notify(
1138 session,
1139 user_id=user.id,
1140 topic_action="event:reminder",
1141 key=str(occurrence.id),
1142 data=notification_data_pb2.EventReminder(
1143 event=event_to_pb(session, occurrence, context),
1144 user=user_model_to_pb(user, session, context),
1145 ),
1146 )
1148 attendee.reminder_sent = True
1149 session.commit()
1152def check_expo_push_receipts(payload: empty_pb2.Empty) -> None:
1153 """
1154 Check Expo push receipts in batch and update delivery attempts.
1155 """
1156 MAX_ITERATIONS = 100 # Safety limit: 100 batches * 100 attempts = 10,000 max
1158 for iteration in range(MAX_ITERATIONS): 1158 ↛ 1220line 1158 didn't jump to line 1220 because the loop on line 1158 didn't complete
1159 with session_scope() as session:
1160 # Find all delivery attempts that need receipt checking
1161 # Wait 15 minutes per Expo's recommendation before checking receipts
1162 attempts = (
1163 session.execute(
1164 select(PushNotificationDeliveryAttempt)
1165 .where(PushNotificationDeliveryAttempt.expo_ticket_id != None)
1166 .where(PushNotificationDeliveryAttempt.receipt_checked_at == None)
1167 .where(PushNotificationDeliveryAttempt.time < now() - timedelta(minutes=15))
1168 .where(PushNotificationDeliveryAttempt.time > now() - timedelta(hours=24))
1169 .limit(100)
1170 )
1171 .scalars()
1172 .all()
1173 )
1175 if not attempts:
1176 logger.debug("No Expo receipts to check")
1177 return
1179 logger.info(f"Checking {len(attempts)} Expo push receipts")
1181 receipts = get_expo_push_receipts([not_none(attempt.expo_ticket_id) for attempt in attempts])
1183 for attempt in attempts:
1184 receipt = receipts.get(not_none(attempt.expo_ticket_id))
1186 # Always mark as checked to avoid infinite loops
1187 attempt.receipt_checked_at = now()
1189 if receipt is None:
1190 # Receipt not found after 15min - likely expired (>24h) or never existed
1191 # Per Expo docs: receipts should be available within 15 minutes
1192 attempt.receipt_status = "not_found"
1193 continue
1195 attempt.receipt_status = receipt.get("status")
1197 if receipt.get("status") == "error":
1198 details = receipt.get("details", {})
1199 error_code = details.get("error")
1200 attempt.receipt_error_code = error_code
1202 if error_code == "DeviceNotRegistered": 1202 ↛ 1217line 1202 didn't jump to line 1217 because the condition on line 1202 was always true
1203 # Device token is no longer valid - disable the subscription
1204 sub = session.execute(
1205 select(PushNotificationSubscription).where(
1206 PushNotificationSubscription.id == attempt.push_notification_subscription_id
1207 )
1208 ).scalar_one()
1210 if sub.disabled_at > now(): 1210 ↛ 1183line 1210 didn't jump to line 1183 because the condition on line 1210 was always true
1211 sub.disabled_at = now()
1212 logger.info(f"Disabled push sub {sub.id} due to DeviceNotRegistered in receipt")
1213 push_notification_counter.labels(
1214 platform="expo", outcome="permanent_subscription_failure_receipt"
1215 ).inc()
1216 else:
1217 logger.warning(f"Expo receipt error for ticket {attempt.expo_ticket_id}: {error_code}")
1219 # If we get here, we've exhausted MAX_ITERATIONS without finishing
1220 raise RuntimeError(
1221 f"check_expo_push_receipts exceeded {MAX_ITERATIONS} iterations - "
1222 "there may be an unusually large backlog of receipts to check"
1223 )
1226def send_postal_verification_postcard(payload: jobs_pb2.SendPostalVerificationPostcardPayload) -> None:
1227 """
1228 Sends the postcard via external API and updates attempt status.
1229 """
1230 with session_scope() as session:
1231 attempt = session.execute(
1232 select(PostalVerificationAttempt).where(
1233 PostalVerificationAttempt.id == payload.postal_verification_attempt_id
1234 )
1235 ).scalar_one_or_none()
1237 if not attempt or attempt.status != PostalVerificationStatus.in_progress: 1237 ↛ 1238line 1237 didn't jump to line 1238 because the condition on line 1237 was never true
1238 logger.warning(
1239 f"Postal verification attempt {payload.postal_verification_attempt_id} not found or wrong state"
1240 )
1241 return
1243 user_name = session.execute(select(User.name).where(User.id == attempt.user_id)).scalar_one()
1245 result = send_postcard(
1246 recipient_name=user_name,
1247 address_line_1=attempt.address_line_1,
1248 address_line_2=attempt.address_line_2,
1249 city=attempt.city,
1250 state=attempt.state,
1251 postal_code=attempt.postal_code,
1252 country=attempt.country,
1253 verification_code=not_none(attempt.verification_code),
1254 qr_code_url=urls.postal_verification_link(code=not_none(attempt.verification_code)),
1255 )
1257 if result.success:
1258 attempt.status = PostalVerificationStatus.awaiting_verification
1259 attempt.postcard_sent_at = func.now()
1261 notify(
1262 session,
1263 user_id=attempt.user_id,
1264 topic_action="postal_verification:postcard_sent",
1265 key="",
1266 data=notification_data_pb2.PostalVerificationPostcardSent(
1267 city=attempt.city,
1268 country=attempt.country,
1269 ),
1270 )
1271 else:
1272 # Could retry or fail - for now, fail
1273 attempt.status = PostalVerificationStatus.failed
1274 logger.error(f"Postcard send failed: {result.error_message}")
1277class DatabaseInconsistencyError(Exception):
1278 """Raised when database consistency checks fail"""
1280 pass
1283def check_database_consistency(payload: empty_pb2.Empty) -> None:
1284 """
1285 Checks database consistency and raises an exception if any issues are found.
1286 """
1287 logger.info("Checking database consistency")
1288 errors = []
1290 with session_scope() as session:
1291 # Check that all non-deleted users have a profile gallery
1292 users_without_gallery = session.execute(
1293 select(User.id, User.username).where(User.is_deleted == False).where(User.profile_gallery_id.is_(None))
1294 ).all()
1295 if users_without_gallery:
1296 errors.append(f"Users without profile gallery: {users_without_gallery}")
1298 # Check that all profile galleries point to their owner
1299 mismatched_galleries = session.execute(
1300 select(User.id, User.username, User.profile_gallery_id, PhotoGallery.owner_user_id)
1301 .join(PhotoGallery, User.profile_gallery_id == PhotoGallery.id)
1302 .where(User.profile_gallery_id.is_not(None))
1303 .where(PhotoGallery.owner_user_id != User.id)
1304 ).all()
1305 if mismatched_galleries: 1305 ↛ 1306line 1305 didn't jump to line 1306 because the condition on line 1305 was never true
1306 errors.append(f"Profile galleries with mismatched owner: {mismatched_galleries}")
1308 # === Moderation System Consistency Checks ===
1310 # Check all ModerationStates have a known object_type
1311 known_object_types = [ModerationObjectType.HOST_REQUEST, ModerationObjectType.GROUP_CHAT]
1312 unknown_type_states = session.execute(
1313 select(ModerationState.id, ModerationState.object_type).where(
1314 ModerationState.object_type.not_in(known_object_types)
1315 )
1316 ).all()
1317 if unknown_type_states: 1317 ↛ 1318line 1317 didn't jump to line 1318 because the condition on line 1317 was never true
1318 errors.append(f"ModerationStates with unknown object_type: {unknown_type_states}")
1320 # Check every ModerationState has at least one INITIAL_REVIEW queue item
1321 # Skip items with ID < 2000000 as they were created before this check was introduced
1322 states_without_initial_review = session.execute(
1323 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1324 ModerationState.id >= 2000000,
1325 ~exists(
1326 select(1)
1327 .where(ModerationQueueItem.moderation_state_id == ModerationState.id)
1328 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1329 ),
1330 )
1331 ).all()
1332 if states_without_initial_review: 1332 ↛ 1333line 1332 didn't jump to line 1333 because the condition on line 1332 was never true
1333 errors.append(f"ModerationStates without INITIAL_REVIEW queue item: {states_without_initial_review}")
1335 # Check every ModerationState has a CREATE log entry
1336 # Skip items with ID < 2000000 as they were created before this check was introduced
1337 states_without_create_log = session.execute(
1338 select(ModerationState.id, ModerationState.object_type, ModerationState.object_id).where(
1339 ModerationState.id >= 2000000,
1340 ~exists(
1341 select(1)
1342 .where(ModerationLog.moderation_state_id == ModerationState.id)
1343 .where(ModerationLog.action == ModerationAction.CREATE)
1344 ),
1345 )
1346 ).all()
1347 if states_without_create_log: 1347 ↛ 1348line 1347 didn't jump to line 1348 because the condition on line 1347 was never true
1348 errors.append(f"ModerationStates without CREATE log entry: {states_without_create_log}")
1350 # Check resolved queue items point to log entries for the same moderation state
1351 resolved_item_log_mismatches = session.execute(
1352 select(ModerationQueueItem.id, ModerationQueueItem.moderation_state_id, ModerationLog.moderation_state_id)
1353 .join(ModerationLog, ModerationQueueItem.resolved_by_log_id == ModerationLog.id)
1354 .where(ModerationQueueItem.resolved_by_log_id.is_not(None))
1355 .where(ModerationQueueItem.moderation_state_id != ModerationLog.moderation_state_id)
1356 ).all()
1357 if resolved_item_log_mismatches: 1357 ↛ 1358line 1357 didn't jump to line 1358 because the condition on line 1357 was never true
1358 errors.append(f"Resolved queue items with mismatched moderation_state_id: {resolved_item_log_mismatches}")
1360 # Check every HOST_REQUEST ModerationState has exactly one HostRequest pointing to it
1361 hr_states = (
1362 session.execute(
1363 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1364 )
1365 .scalars()
1366 .all()
1367 )
1368 for state_id in hr_states: 1368 ↛ 1369line 1368 didn't jump to line 1369 because the loop on line 1368 never started
1369 hr_count = session.execute(
1370 select(func.count()).where(HostRequest.moderation_state_id == state_id)
1371 ).scalar_one()
1372 if hr_count != 1:
1373 errors.append(f"ModerationState {state_id} (HOST_REQUEST) has {hr_count} HostRequests (expected 1)")
1375 # Check every GROUP_CHAT ModerationState has exactly one GroupChat pointing to it
1376 gc_states = (
1377 session.execute(
1378 select(ModerationState.id).where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1379 )
1380 .scalars()
1381 .all()
1382 )
1383 for state_id in gc_states: 1383 ↛ 1384line 1383 didn't jump to line 1384 because the loop on line 1383 never started
1384 gc_count = session.execute(
1385 select(func.count()).where(GroupChat.moderation_state_id == state_id)
1386 ).scalar_one()
1387 if gc_count != 1:
1388 errors.append(f"ModerationState {state_id} (GROUP_CHAT) has {gc_count} GroupChats (expected 1)")
1390 # Check ModerationState.object_id matches the actual object's ID
1391 hr_object_id_mismatches = session.execute(
1392 select(ModerationState.id, ModerationState.object_id, HostRequest.conversation_id)
1393 .join(HostRequest, HostRequest.moderation_state_id == ModerationState.id)
1394 .where(ModerationState.object_type == ModerationObjectType.HOST_REQUEST)
1395 .where(ModerationState.object_id != HostRequest.conversation_id)
1396 ).all()
1397 if hr_object_id_mismatches: 1397 ↛ 1398line 1397 didn't jump to line 1398 because the condition on line 1397 was never true
1398 errors.append(f"ModerationState object_id mismatch for HOST_REQUEST: {hr_object_id_mismatches}")
1400 gc_object_id_mismatches = session.execute(
1401 select(ModerationState.id, ModerationState.object_id, GroupChat.conversation_id)
1402 .join(GroupChat, GroupChat.moderation_state_id == ModerationState.id)
1403 .where(ModerationState.object_type == ModerationObjectType.GROUP_CHAT)
1404 .where(ModerationState.object_id != GroupChat.conversation_id)
1405 ).all()
1406 if gc_object_id_mismatches: 1406 ↛ 1407line 1406 didn't jump to line 1407 because the condition on line 1406 was never true
1407 errors.append(f"ModerationState object_id mismatch for GROUP_CHAT: {gc_object_id_mismatches}")
1409 # Check reverse mapping: HostRequest's moderation_state points to correct ModerationState
1410 hr_reverse_mismatches = session.execute(
1411 select(
1412 HostRequest.conversation_id,
1413 HostRequest.moderation_state_id,
1414 ModerationState.object_type,
1415 ModerationState.object_id,
1416 )
1417 .join(ModerationState, HostRequest.moderation_state_id == ModerationState.id)
1418 .where(
1419 (ModerationState.object_type != ModerationObjectType.HOST_REQUEST)
1420 | (ModerationState.object_id != HostRequest.conversation_id)
1421 )
1422 ).all()
1423 if hr_reverse_mismatches: 1423 ↛ 1424line 1423 didn't jump to line 1424 because the condition on line 1423 was never true
1424 errors.append(f"HostRequest points to ModerationState with wrong type/object_id: {hr_reverse_mismatches}")
1426 # Check reverse mapping: GroupChat's moderation_state points to correct ModerationState
1427 gc_reverse_mismatches = session.execute(
1428 select(
1429 GroupChat.conversation_id,
1430 GroupChat.moderation_state_id,
1431 ModerationState.object_type,
1432 ModerationState.object_id,
1433 )
1434 .join(ModerationState, GroupChat.moderation_state_id == ModerationState.id)
1435 .where(
1436 (ModerationState.object_type != ModerationObjectType.GROUP_CHAT)
1437 | (ModerationState.object_id != GroupChat.conversation_id)
1438 )
1439 ).all()
1440 if gc_reverse_mismatches: 1440 ↛ 1441line 1440 didn't jump to line 1441 because the condition on line 1440 was never true
1441 errors.append(f"GroupChat points to ModerationState with wrong type/object_id: {gc_reverse_mismatches}")
1443 # Ensure auto-approve deadline isn't being exceeded by a significant margin
1444 # The auto-approver runs every 15s, so allow 5 minutes grace before alerting
1445 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1446 if deadline_seconds > 0: 1446 ↛ 1447line 1446 didn't jump to line 1447 because the condition on line 1446 was never true
1447 grace_period = timedelta(minutes=5)
1448 stale_initial_review_items = session.execute(
1449 select(
1450 ModerationQueueItem.id,
1451 ModerationQueueItem.moderation_state_id,
1452 ModerationQueueItem.time_created,
1453 )
1454 .where(ModerationQueueItem.trigger == ModerationTrigger.INITIAL_REVIEW)
1455 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
1456 .where(ModerationQueueItem.time_created < now() - timedelta(seconds=deadline_seconds) - grace_period)
1457 ).all()
1458 if stale_initial_review_items:
1459 errors.append(
1460 f"INITIAL_REVIEW items exceeding auto-approve deadline by >5min: {stale_initial_review_items}"
1461 )
1463 if errors:
1464 raise DatabaseInconsistencyError("\n".join(errors))
1467def auto_approve_moderation_queue(payload: empty_pb2.Empty) -> None:
1468 """
1469 Dead man's switch: auto-approves unresolved INITIAL_REVIEW items older than the deadline.
1470 Items explicitly actioned by moderators are left alone.
1471 """
1472 deadline_seconds = config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"]
1473 if deadline_seconds <= 0:
1474 return
1476 with session_scope() as session:
1477 ctx = make_background_user_context(user_id=config["MODERATION_BOT_USER_ID"])
1479 items = (
1480 Moderation()
1481 .GetModerationQueue(
1482 request=moderation_pb2.GetModerationQueueReq(
1483 triggers=[moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW],
1484 unresolved_only=True,
1485 page_size=100,
1486 created_before=Timestamp_from_datetime(now() - timedelta(seconds=deadline_seconds)),
1487 ),
1488 context=ctx,
1489 session=session,
1490 )
1491 .queue_items
1492 )
1494 if not items:
1495 return
1497 logger.info(f"Auto-approving {len(items)} moderation queue items")
1498 for item in items:
1499 Moderation().ModerateContent(
1500 request=moderation_pb2.ModerateContentReq(
1501 moderation_state_id=item.moderation_state_id,
1502 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1503 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1504 reason=f"Auto-approved: moderation deadline of {deadline_seconds} seconds exceeded.",
1505 ),
1506 context=ctx,
1507 session=session,
1508 )
1509 moderation_auto_approved_counter.inc(len(items))