Coverage for src/couchers/jobs/handlers.py: 96%
393 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import cos, pi, sin, sqrt
8from random import sample
10import requests
11from google.protobuf import empty_pb2
12from sqlalchemy import Float, Integer
13from sqlalchemy.orm import aliased
14from sqlalchemy.sql import (
15 and_,
16 case,
17 cast,
18 delete,
19 distinct,
20 exists,
21 extract,
22 func,
23 literal,
24 not_,
25 or_,
26 select,
27 union_all,
28 update,
29)
31from couchers.config import config
32from couchers.constants import (
33 ACTIVENESS_PROBE_EXPIRY_TIME,
34 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
35 ACTIVENESS_PROBE_TIME_REMINDERS,
36 EVENT_REMINDER_TIMEDELTA,
37 HOST_REQUEST_MAX_REMINDERS,
38 HOST_REQUEST_REMINDER_INTERVAL,
39)
40from couchers.context import make_background_user_context
41from couchers.crypto import (
42 USER_LOCATION_RANDOMIZATION_NAME,
43 asym_encrypt,
44 b64decode,
45 get_secret,
46 simple_decrypt,
47 stable_secure_uniform,
48)
49from couchers.db import session_scope
50from couchers.email.dev import print_dev_email
51from couchers.email.smtp import send_smtp_email
52from couchers.helpers.badges import user_add_badge, user_remove_badge
53from couchers.materialized_views import (
54 UserResponseRate,
55 refresh_materialized_views,
56 refresh_materialized_views_rapid,
57)
58from couchers.metrics import strong_verification_completions_counter
59from couchers.models import (
60 AccountDeletionToken,
61 ActivenessProbe,
62 ActivenessProbeStatus,
63 Cluster,
64 ClusterRole,
65 ClusterSubscription,
66 EventOccurrence,
67 EventOccurrenceAttendee,
68 GroupChat,
69 GroupChatSubscription,
70 HostingStatus,
71 HostRequest,
72 HostRequestStatus,
73 Invoice,
74 LoginToken,
75 MeetupStatus,
76 Message,
77 MessageType,
78 PassportSex,
79 PasswordResetToken,
80 Reference,
81 StrongVerificationAttempt,
82 StrongVerificationAttemptStatus,
83 User,
84 UserBadge,
85)
86from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification
87from couchers.notifications.notify import notify
88from couchers.resources import get_badge_dict, get_static_badge_dict
89from couchers.servicers.admin import generate_new_blog_post_notifications
90from couchers.servicers.api import user_model_to_pb
91from couchers.servicers.blocking import is_not_visible
92from couchers.servicers.conversations import generate_message_notifications
93from couchers.servicers.discussions import generate_create_discussion_notifications
94from couchers.servicers.events import (
95 event_to_pb,
96 generate_event_cancel_notifications,
97 generate_event_create_notifications,
98 generate_event_delete_notifications,
99 generate_event_update_notifications,
100)
101from couchers.servicers.requests import host_request_to_pb
102from couchers.servicers.threads import generate_reply_notifications
103from couchers.sql import couchers_select as select
104from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
105from couchers.tasks import send_duplicate_strong_verification_email
106from couchers.utils import (
107 Timestamp_from_datetime,
108 create_coordinate,
109 get_coordinates,
110 now,
111)
112from proto import notification_data_pb2
113from proto.internal import jobs_pb2, verification_pb2
115logger = logging.getLogger(__name__)
117# these were straight up imported
118handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
120send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload
122handle_email_digests.PAYLOAD = empty_pb2.Empty
123handle_email_digests.SCHEDULE = timedelta(minutes=15)
125generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
127generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload
129generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload
131generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
133generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
135generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
137generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
139generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload
141refresh_materialized_views.PAYLOAD = empty_pb2.Empty
142refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
144refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
145refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
148def send_email(payload):
149 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
150 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
151 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
152 # the sender must return a models.Email object that can be added to the database
153 email = sender(
154 sender_name=payload.sender_name,
155 sender_email=payload.sender_email,
156 recipient=payload.recipient,
157 subject=payload.subject,
158 plain=payload.plain,
159 html=payload.html,
160 list_unsubscribe_header=payload.list_unsubscribe_header,
161 source_data=payload.source_data,
162 )
163 with session_scope() as session:
164 session.add(email)
167send_email.PAYLOAD = jobs_pb2.SendEmailPayload
170def purge_login_tokens(payload):
171 logger.info("Purging login tokens")
172 with session_scope() as session:
173 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
176purge_login_tokens.PAYLOAD = empty_pb2.Empty
177purge_login_tokens.SCHEDULE = timedelta(hours=24)
180def purge_password_reset_tokens(payload):
181 logger.info("Purging login tokens")
182 with session_scope() as session:
183 session.execute(
184 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
185 )
188purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
189purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
192def purge_account_deletion_tokens(payload):
193 logger.info("Purging account deletion tokens")
194 with session_scope() as session:
195 session.execute(
196 delete(AccountDeletionToken)
197 .where(~AccountDeletionToken.is_valid)
198 .execution_options(synchronize_session=False)
199 )
202purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
203purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
206def send_message_notifications(payload):
207 """
208 Sends out email notifications for messages that have been unseen for a long enough time
209 """
210 # very crude and dumb algorithm
211 logger.info("Sending out email notifications for unseen messages")
213 with session_scope() as session:
214 # users who have unnotified messages older than 5 minutes in any group chat
215 users = (
216 session.execute(
217 select(User)
218 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
219 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
220 .where(not_(GroupChatSubscription.is_muted))
221 .where(User.is_visible)
222 .where(Message.time >= GroupChatSubscription.joined)
223 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
224 .where(Message.id > User.last_notified_message_id)
225 .where(Message.id > GroupChatSubscription.last_seen_message_id)
226 .where(Message.time < now() - timedelta(minutes=5))
227 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
228 )
229 .scalars()
230 .unique()
231 )
233 for user in users:
234 # now actually grab all the group chats, not just less than 5 min old
235 subquery = (
236 select(
237 GroupChatSubscription.group_chat_id.label("group_chat_id"),
238 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
239 func.max(Message.id).label("message_id"),
240 func.count(Message.id).label("count_unseen"),
241 )
242 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
243 .where(GroupChatSubscription.user_id == user.id)
244 .where(not_(GroupChatSubscription.is_muted))
245 .where(Message.id > user.last_notified_message_id)
246 .where(Message.id > GroupChatSubscription.last_seen_message_id)
247 .where(Message.time >= GroupChatSubscription.joined)
248 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
249 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
250 .group_by(GroupChatSubscription.group_chat_id)
251 .order_by(func.max(Message.id).desc())
252 .subquery()
253 )
255 unseen_messages = session.execute(
256 select(GroupChat, Message, subquery.c.count_unseen)
257 .join(subquery, subquery.c.message_id == Message.id)
258 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
259 .order_by(subquery.c.message_id.desc())
260 ).all()
262 if not unseen_messages:
263 continue
265 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
267 def format_title(message, group_chat, count_unseen):
268 if group_chat.is_dm:
269 return f"You missed {count_unseen} message(s) from {message.author.name}"
270 else:
271 return f"You missed {count_unseen} message(s) in {group_chat.title}"
273 notify(
274 session,
275 user_id=user.id,
276 topic_action="chat:missed_messages",
277 data=notification_data_pb2.ChatMissedMessages(
278 messages=[
279 notification_data_pb2.ChatMessage(
280 author=user_model_to_pb(
281 message.author,
282 session,
283 make_background_user_context(user_id=user.id),
284 ),
285 message=format_title(message, group_chat, count_unseen),
286 text=message.text,
287 group_chat_id=message.conversation_id,
288 )
289 for group_chat, message, count_unseen in unseen_messages
290 ],
291 ),
292 )
293 session.commit()
296send_message_notifications.PAYLOAD = empty_pb2.Empty
297send_message_notifications.SCHEDULE = timedelta(minutes=3)
300def send_request_notifications(payload):
301 """
302 Sends out email notifications for unseen messages in host requests (as surfer or host)
303 """
304 logger.info("Sending out email notifications for unseen messages in host requests")
306 with session_scope() as session:
307 # requests where this user is surfing
308 surfing_reqs = session.execute(
309 select(User, HostRequest, func.max(Message.id))
310 .where(User.is_visible)
311 .join(HostRequest, HostRequest.surfer_user_id == User.id)
312 .join(Message, Message.conversation_id == HostRequest.conversation_id)
313 .where(Message.id > HostRequest.surfer_last_seen_message_id)
314 .where(Message.id > User.last_notified_request_message_id)
315 .where(Message.time < now() - timedelta(minutes=5))
316 .where(Message.message_type == MessageType.text)
317 .group_by(User, HostRequest)
318 ).all()
320 # where this user is hosting
321 hosting_reqs = session.execute(
322 select(User, HostRequest, func.max(Message.id))
323 .where(User.is_visible)
324 .join(HostRequest, HostRequest.host_user_id == User.id)
325 .join(Message, Message.conversation_id == HostRequest.conversation_id)
326 .where(Message.id > HostRequest.host_last_seen_message_id)
327 .where(Message.id > User.last_notified_request_message_id)
328 .where(Message.time < now() - timedelta(minutes=5))
329 .where(Message.message_type == MessageType.text)
330 .group_by(User, HostRequest)
331 ).all()
333 for user, host_request, max_message_id in surfing_reqs:
334 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
335 session.flush()
337 context = make_background_user_context(user_id=user.id)
338 notify(
339 session,
340 user_id=user.id,
341 topic_action="host_request:missed_messages",
342 key=host_request.conversation_id,
343 data=notification_data_pb2.HostRequestMissedMessages(
344 host_request=host_request_to_pb(host_request, session, context),
345 user=user_model_to_pb(host_request.host, session, context),
346 am_host=False,
347 ),
348 )
350 for user, host_request, max_message_id in hosting_reqs:
351 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
352 session.flush()
354 context = make_background_user_context(user_id=user.id)
355 notify(
356 session,
357 user_id=user.id,
358 topic_action="host_request:missed_messages",
359 key=host_request.conversation_id,
360 data=notification_data_pb2.HostRequestMissedMessages(
361 host_request=host_request_to_pb(host_request, session, context),
362 user=user_model_to_pb(host_request.surfer, session, context),
363 am_host=True,
364 ),
365 )
368send_request_notifications.PAYLOAD = empty_pb2.Empty
369send_request_notifications.SCHEDULE = timedelta(minutes=3)
372def send_onboarding_emails(payload):
373 """
374 Sends out onboarding emails
375 """
376 logger.info("Sending out onboarding emails")
378 with session_scope() as session:
379 # first onboarding email
380 users = (
381 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
382 )
384 for user in users:
385 notify(
386 session,
387 user_id=user.id,
388 topic_action="onboarding:reminder",
389 key="1",
390 )
391 user.onboarding_emails_sent = 1
392 user.last_onboarding_email_sent = now()
393 session.commit()
395 # second onboarding email
396 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
397 users = (
398 session.execute(
399 select(User)
400 .where(User.is_visible)
401 .where(User.onboarding_emails_sent == 1)
402 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
403 .where(User.has_completed_profile == False)
404 )
405 .scalars()
406 .all()
407 )
409 for user in users:
410 notify(
411 session,
412 user_id=user.id,
413 topic_action="onboarding:reminder",
414 key="2",
415 )
416 user.onboarding_emails_sent = 2
417 user.last_onboarding_email_sent = now()
418 session.commit()
421send_onboarding_emails.PAYLOAD = empty_pb2.Empty
422send_onboarding_emails.SCHEDULE = timedelta(hours=1)
425def send_reference_reminders(payload):
426 """
427 Sends out reminders to write references after hosting/staying
428 """
429 logger.info("Sending out reference reminder emails")
431 # Keep this in chronological order!
432 reference_reminder_schedule = [
433 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
434 # the end time to write a reference is supposed to be midnight in the host's timezone
435 # 8 pm ish on the last day of the stay
436 (1, timedelta(days=15) - timedelta(hours=20), 14),
437 # 2 pm ish a week after stay
438 (2, timedelta(days=8) - timedelta(hours=14), 7),
439 # 10 am ish 3 days before end of time to write ref
440 (3, timedelta(days=4) - timedelta(hours=10), 3),
441 ]
443 with session_scope() as session:
444 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
445 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
446 user = aliased(User)
447 other_user = aliased(User)
448 # surfers needing to write a ref
449 q1 = (
450 select(literal(True), HostRequest, user, other_user)
451 .join(user, user.id == HostRequest.surfer_user_id)
452 .join(other_user, other_user.id == HostRequest.host_user_id)
453 .outerjoin(
454 Reference,
455 and_(
456 Reference.host_request_id == HostRequest.conversation_id,
457 # if no reference is found in this join, then the surfer has not written a ref
458 Reference.from_user_id == HostRequest.surfer_user_id,
459 ),
460 )
461 .where(user.is_visible)
462 .where(other_user.is_visible)
463 .where(Reference.id == None)
464 .where(HostRequest.can_write_reference)
465 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
466 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
467 .where(HostRequest.surfer_reason_didnt_meetup == None)
468 )
470 # hosts needing to write a ref
471 q2 = (
472 select(literal(False), HostRequest, user, other_user)
473 .join(user, user.id == HostRequest.host_user_id)
474 .join(other_user, other_user.id == HostRequest.surfer_user_id)
475 .outerjoin(
476 Reference,
477 and_(
478 Reference.host_request_id == HostRequest.conversation_id,
479 # if no reference is found in this join, then the host has not written a ref
480 Reference.from_user_id == HostRequest.host_user_id,
481 ),
482 )
483 .where(user.is_visible)
484 .where(other_user.is_visible)
485 .where(Reference.id == None)
486 .where(HostRequest.can_write_reference)
487 .where(HostRequest.host_sent_reference_reminders < reminder_number)
488 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
489 .where(HostRequest.host_reason_didnt_meetup == None)
490 )
492 union = union_all(q1, q2).subquery()
493 union = select(
494 union.c[0].label("surfed"),
495 aliased(HostRequest, union),
496 aliased(user, union),
497 aliased(other_user, union),
498 )
499 reference_reminders = session.execute(union).all()
501 for surfed, host_request, user, other_user in reference_reminders:
502 # checked in sql
503 assert user.is_visible
504 if not is_not_visible(session, user.id, other_user.id):
505 context = make_background_user_context(user_id=user.id)
506 notify(
507 session,
508 user_id=user.id,
509 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
510 data=notification_data_pb2.ReferenceReminder(
511 host_request_id=host_request.conversation_id,
512 other_user=user_model_to_pb(other_user, session, context),
513 days_left=reminder_days_left,
514 ),
515 )
516 if surfed:
517 host_request.surfer_sent_reference_reminders = reminder_number
518 else:
519 host_request.host_sent_reference_reminders = reminder_number
520 session.commit()
523send_reference_reminders.PAYLOAD = empty_pb2.Empty
524send_reference_reminders.SCHEDULE = timedelta(hours=1)
527def send_host_request_reminders(payload):
528 with session_scope() as session:
529 host_has_sent_message = select(1).where(
530 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id
531 )
533 requests = (
534 session.execute(
535 select(HostRequest)
536 .where(HostRequest.status == HostRequestStatus.pending)
537 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
538 .where(HostRequest.start_time > func.now())
539 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
540 .where(~exists(host_has_sent_message))
541 )
542 .scalars()
543 .all()
544 )
546 for host_request in requests:
547 host_request.host_sent_request_reminders += 1
548 host_request.last_sent_request_reminder_time = now()
550 context = make_background_user_context(user_id=host_request.host_user_id)
551 notify(
552 session,
553 user_id=host_request.host_user_id,
554 topic_action="host_request:reminder",
555 data=notification_data_pb2.HostRequestReminder(
556 host_request=host_request_to_pb(host_request, session, context),
557 surfer=user_model_to_pb(host_request.surfer, session, context),
558 ),
559 )
561 session.commit()
564send_host_request_reminders.PAYLOAD = empty_pb2.Empty
565send_host_request_reminders.SCHEDULE = timedelta(minutes=15)
568def add_users_to_email_list(payload):
569 if not config["LISTMONK_ENABLED"]:
570 logger.info("Not adding users to mailing list")
571 return
573 logger.info("Adding users to mailing list")
575 while True:
576 with session_scope() as session:
577 user = session.execute(
578 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
579 ).scalar_one_or_none()
580 if not user:
581 logger.info("Finished adding users to mailing list")
582 return
584 if user.opt_out_of_newsletter:
585 user.in_sync_with_newsletter = True
586 session.commit()
587 continue
589 r = requests.post(
590 config["LISTMONK_BASE_URL"] + "/api/subscribers",
591 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
592 json={
593 "email": user.email,
594 "name": user.name,
595 "lists": [config["LISTMONK_LIST_ID"]],
596 "preconfirm_subscriptions": True,
597 "attribs": {"couchers_user_id": user.id},
598 "status": "enabled",
599 },
600 timeout=10,
601 )
602 # the API returns if the user is already subscribed
603 if r.status_code == 200 or r.status_code == 409:
604 user.in_sync_with_newsletter = True
605 session.commit()
606 else:
607 raise Exception("Failed to add users to mailing list")
610add_users_to_email_list.PAYLOAD = empty_pb2.Empty
611add_users_to_email_list.SCHEDULE = timedelta(hours=1)
614def enforce_community_membership(payload):
615 tasks_enforce_community_memberships()
618enforce_community_membership.PAYLOAD = empty_pb2.Empty
619enforce_community_membership.SCHEDULE = timedelta(minutes=15)
622def update_recommendation_scores(payload):
623 text_fields = [
624 User.hometown,
625 User.occupation,
626 User.education,
627 User.about_me,
628 User.things_i_like,
629 User.about_place,
630 User.additional_information,
631 User.pet_details,
632 User.kid_details,
633 User.housemate_details,
634 User.other_host_info,
635 User.sleeping_details,
636 User.area,
637 User.house_rules,
638 ]
639 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
641 def poor_man_gaussian():
642 """
643 Produces an approximatley std normal random variate
644 """
645 trials = 5
646 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
648 def int_(stmt):
649 return func.coalesce(cast(stmt, Integer), 0)
651 def float_(stmt):
652 return func.coalesce(cast(stmt, Float), 0.0)
654 with session_scope() as session:
655 # profile
656 profile_text = ""
657 for field in text_fields:
658 profile_text += func.coalesce(field, "")
659 text_length = func.length(profile_text)
660 home_text = ""
661 for field in home_fields:
662 home_text += func.coalesce(field, "")
663 home_length = func.length(home_text)
665 filled_profile = int_(User.has_completed_profile)
666 has_text = int_(text_length > 500)
667 long_text = int_(text_length > 2000)
668 can_host = int_(User.hosting_status == HostingStatus.can_host)
669 may_host = int_(User.hosting_status == HostingStatus.maybe)
670 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
671 filled_home = int_(User.has_completed_my_home)
672 filled_home_lots = int_(home_length > 200)
673 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host
674 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots
676 # references
677 left_ref_expr = int_(1).label("left_reference")
678 left_refs_subquery = (
679 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
680 )
681 left_reference = int_(left_refs_subquery.c.left_reference)
682 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
683 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
684 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
685 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
686 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
687 "has_bad_ref"
688 )
689 received_ref_subquery = (
690 select(
691 Reference.to_user_id.label("user_id"),
692 has_reference_expr,
693 has_multiple_types_expr,
694 has_bad_ref_expr,
695 ref_count_expr,
696 ref_avg_expr,
697 )
698 .group_by(Reference.to_user_id)
699 .subquery()
700 )
701 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
702 has_reference = int_(received_ref_subquery.c.has_reference)
703 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
704 rating_score = float_(
705 received_ref_subquery.c.ref_avg
706 * (
707 2 * func.least(received_ref_subquery.c.ref_count, 5)
708 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
709 )
710 )
711 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
713 # activeness
714 recently_active = int_(User.last_active >= now() - timedelta(days=180))
715 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
716 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
717 messaged_lots = int_(func.count(Message.id) > 5)
718 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
719 messaging_subquery = (
720 select(Message.author_id.label("user_id"), messaging_points_subquery)
721 .where(Message.message_type == MessageType.text)
722 .group_by(Message.author_id)
723 .subquery()
724 )
725 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
727 # verification
728 cb_subquery = (
729 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
730 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
731 .where(ClusterSubscription.role == ClusterRole.admin)
732 .where(Cluster.is_official_cluster)
733 .group_by(ClusterSubscription.user_id)
734 .subquery()
735 )
736 min_node_id = cb_subquery.c.min_node_id
737 cb = int_(min_node_id >= 1)
738 wcb = int_(min_node_id == 1)
739 badge_points = {
740 "founder": 100,
741 "board_member": 20,
742 "past_board_member": 5,
743 "strong_verification": 3,
744 "volunteer": 3,
745 "past_volunteer": 2,
746 "donor": 1,
747 "phone_verified": 1,
748 }
750 badge_subquery = (
751 select(
752 UserBadge.user_id.label("user_id"),
753 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
754 )
755 .group_by(UserBadge.user_id)
756 .subquery()
757 )
759 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
761 # response rate
762 hr_subquery = select(
763 UserResponseRate.user_id,
764 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"),
765 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"),
766 ).subquery()
767 response_time_33p = hr_subquery.c.response_time_33p
768 response_time_66p = hr_subquery.c.response_time_66p
769 # be careful with nulls
770 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0)
772 recommendation_score = (
773 hosting_status_points
774 + profile_points
775 + ref_score
776 + activeness_points
777 + other_points
778 + response_rate_points
779 + 2 * poor_man_gaussian()
780 )
782 scores = (
783 select(User.id.label("user_id"), recommendation_score.label("score"))
784 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
785 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
786 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
787 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
788 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
789 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
790 ).subquery()
792 session.execute(
793 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
794 )
796 logger.info("Updated recommendation scores")
799update_recommendation_scores.PAYLOAD = empty_pb2.Empty
800update_recommendation_scores.SCHEDULE = timedelta(hours=24)
803def update_badges(payload):
804 with session_scope() as session:
806 def update_badge(badge_id: str, members: list[int]):
807 badge = get_badge_dict()[badge_id]
808 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
809 # in case the user ids don't exist in the db
810 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
811 # we should add the badge to these
812 add = set(actual_members) - set(user_ids)
813 # we should remove the badge from these
814 remove = set(user_ids) - set(actual_members)
815 for user_id in add:
816 user_add_badge(session, user_id, badge_id)
818 for user_id in remove:
819 user_remove_badge(session, user_id, badge_id)
821 update_badge("founder", get_static_badge_dict()["founder"])
822 update_badge("board_member", get_static_badge_dict()["board_member"])
823 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
824 update_badge(
825 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all()
826 )
827 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
828 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
829 # strong verification requires passport on file + gender/sex correspondence and date of birth match
830 update_badge(
831 "strong_verification",
832 session.execute(
833 select(User.id)
834 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
835 .where(StrongVerificationAttempt.has_strong_verification(User))
836 )
837 .scalars()
838 .all(),
839 )
842update_badges.PAYLOAD = empty_pb2.Empty
843update_badges.SCHEDULE = timedelta(minutes=15)
846def finalize_strong_verification(payload):
847 with session_scope() as session:
848 verification_attempt = session.execute(
849 select(StrongVerificationAttempt)
850 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
851 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
852 ).scalar_one()
853 response = requests.post(
854 "https://passportreader.app/api/v1/session.get",
855 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
856 json={"id": verification_attempt.iris_session_id},
857 timeout=10,
858 )
859 if response.status_code != 200:
860 raise Exception(f"Iris didn't return 200: {response.text}")
861 json_data = response.json()
862 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
863 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
864 )
865 assert verification_attempt.user_id == reference_payload.user_id
866 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
867 assert verification_attempt.iris_session_id == json_data["id"]
868 assert json_data["state"] == "APPROVED"
870 if json_data["document_type"] != "PASSPORT":
871 verification_attempt.status = StrongVerificationAttemptStatus.failed
872 notify(
873 session,
874 user_id=verification_attempt.user_id,
875 topic_action="verification:sv_fail",
876 data=notification_data_pb2.VerificationSVFail(
877 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
878 ),
879 )
880 return
882 assert json_data["document_type"] == "PASSPORT"
884 expiry_date = date.fromisoformat(json_data["expiry_date"])
885 nationality = json_data["nationality"]
886 last_three_document_chars = json_data["document_number"][-3:]
888 existing_attempt = session.execute(
889 select(StrongVerificationAttempt)
890 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
891 .where(StrongVerificationAttempt.passport_nationality == nationality)
892 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
893 .order_by(StrongVerificationAttempt.id)
894 .limit(1)
895 ).scalar_one_or_none()
897 verification_attempt.has_minimal_data = True
898 verification_attempt.passport_expiry_date = expiry_date
899 verification_attempt.passport_nationality = nationality
900 verification_attempt.passport_last_three_document_chars = last_three_document_chars
902 if existing_attempt:
903 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
905 if existing_attempt.user_id != verification_attempt.user_id:
906 session.flush()
907 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
909 notify(
910 session,
911 user_id=verification_attempt.user_id,
912 topic_action="verification:sv_fail",
913 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
914 )
915 return
917 verification_attempt.has_full_data = True
918 verification_attempt.passport_encrypted_data = asym_encrypt(
919 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
920 )
921 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
922 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
923 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
925 session.flush()
927 strong_verification_completions_counter.inc()
929 user = verification_attempt.user
930 if verification_attempt.has_strong_verification(user):
931 badge_id = "strong_verification"
932 if session.execute(
933 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
934 ).scalar_one_or_none():
935 return
937 user_add_badge(session, user.id, badge_id, do_notify=False)
938 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success")
939 else:
940 notify(
941 session,
942 user_id=verification_attempt.user_id,
943 topic_action="verification:sv_fail",
944 data=notification_data_pb2.VerificationSVFail(
945 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
946 ),
947 )
950finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload
953def send_activeness_probes(payload):
954 with session_scope() as session:
955 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
957 if config["ACTIVENESS_PROBES_ENABLED"]:
958 # current activeness probes
959 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
961 # users who we should send an activeness probe to
962 new_probe_user_ids = (
963 session.execute(
964 select(User.id)
965 .where(User.is_visible)
966 .where(User.hosting_status == HostingStatus.can_host)
967 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
968 .where(User.id.not_in(select(subquery.c.user_id)))
969 )
970 .scalars()
971 .all()
972 )
974 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
975 probes_today = session.execute(
976 select(func.count())
977 .select_from(ActivenessProbe)
978 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
979 ).scalar_one()
981 # send probes to max 2% of users per day
982 max_probes_per_day = 0.02 * total_users
983 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
985 if len(new_probe_user_ids) > max_probe_size:
986 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
988 for user_id in new_probe_user_ids:
989 session.add(ActivenessProbe(user_id=user_id))
991 session.commit()
993 ## Step 2: actually send out probe notifications
994 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
995 probes = (
996 session.execute(
997 select(ActivenessProbe)
998 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
999 .where(ActivenessProbe.probe_initiated + delay < func.now())
1000 .where(ActivenessProbe.is_pending)
1001 )
1002 .scalars()
1003 .all()
1004 )
1006 for probe in probes:
1007 probe.notifications_sent = probe_number_minus_1 + 1
1008 context = make_background_user_context(user_id=probe.user.id)
1009 notify(
1010 session,
1011 user_id=probe.user.id,
1012 topic_action="activeness:probe",
1013 key=probe.id,
1014 data=notification_data_pb2.ActivenessProbe(
1015 reminder_number=probe_number_minus_1 + 1,
1016 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1017 ),
1018 )
1019 session.commit()
1021 ## Step 3: for those who haven't responded, mark them as failed
1022 expired_probes = (
1023 session.execute(
1024 select(ActivenessProbe)
1025 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1026 .where(ActivenessProbe.is_pending)
1027 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1028 )
1029 .scalars()
1030 .all()
1031 )
1033 for probe in expired_probes:
1034 probe.responded = now()
1035 probe.response = ActivenessProbeStatus.expired
1036 if probe.user.hosting_status == HostingStatus.can_host:
1037 probe.user.hosting_status = HostingStatus.maybe
1038 if probe.user.meetup_status == MeetupStatus.wants_to_meetup:
1039 probe.user.meetup_status = MeetupStatus.open_to_meetup
1040 session.commit()
1043send_activeness_probes.PAYLOAD = empty_pb2.Empty
1044send_activeness_probes.SCHEDULE = timedelta(minutes=60)
1047def update_randomized_locations(payload):
1048 """
1049 We generate for each user a randomized location as follows:
1050 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1051 - For each user, mix in the user_id for randomness
1052 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1053 - Generate an angle from [0, 360]
1054 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1055 """
1056 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1058 def gen_randomized_coords(user_id, lat, lng):
1059 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1060 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1061 radius = 0.02 + 0.08 * radius_u
1062 angle_rad = 2 * pi * angle_u
1063 offset_lng = radius * cos(angle_rad)
1064 offset_lat = radius * sin(angle_rad)
1065 return lat + offset_lat, lng + offset_lng
1067 user_updates = []
1069 with session_scope() as session:
1070 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1072 for user_id, geom in users_to_update:
1073 lat, lng = get_coordinates(geom)
1074 user_updates.append(
1075 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1076 )
1078 with session_scope() as session:
1079 session.execute(update(User), user_updates)
1082update_randomized_locations.PAYLOAD = empty_pb2.Empty
1083update_randomized_locations.SCHEDULE = timedelta(hours=1)
1086def send_event_reminders(payload: empty_pb2.Empty):
1087 """
1088 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1089 """
1090 logger.info("Sending event reminder emails")
1092 with session_scope() as session:
1093 occurrences = (
1094 session.execute(
1095 select(EventOccurrence)
1096 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1097 .where(EventOccurrence.start_time >= now())
1098 )
1099 .scalars()
1100 .all()
1101 )
1103 for occurrence in occurrences:
1104 results = session.execute(
1105 select(User, EventOccurrenceAttendee)
1106 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1107 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1108 .where(EventOccurrenceAttendee.reminder_sent == False)
1109 ).all()
1111 for user, attendee in results:
1112 context = make_background_user_context(user_id=user.id)
1114 notify(
1115 session,
1116 user_id=user.id,
1117 topic_action="event:reminder",
1118 data=notification_data_pb2.EventReminder(
1119 event=event_to_pb(session, occurrence, context),
1120 user=user_model_to_pb(user, session, context),
1121 ),
1122 )
1124 attendee.reminder_sent = True
1125 session.commit()
1128send_event_reminders.PAYLOAD = empty_pb2.Empty
1129send_event_reminders.SCHEDULE = timedelta(hours=1)