Coverage for src/couchers/jobs/handlers.py: 96%
387 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-12 05:54 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-12 05:54 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import cos, pi, sin, sqrt
8from random import sample
10import requests
11from google.protobuf import empty_pb2
12from sqlalchemy import Float, Integer
13from sqlalchemy.orm import aliased
14from sqlalchemy.sql import (
15 and_,
16 case,
17 cast,
18 delete,
19 distinct,
20 exists,
21 extract,
22 func,
23 literal,
24 not_,
25 or_,
26 select,
27 union_all,
28 update,
29)
31from couchers.config import config
32from couchers.constants import (
33 ACTIVENESS_PROBE_EXPIRY_TIME,
34 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
35 ACTIVENESS_PROBE_TIME_REMINDERS,
36 EVENT_REMINDER_TIMEDELTA,
37 HOST_REQUEST_MAX_REMINDERS,
38 HOST_REQUEST_REMINDER_INTERVAL,
39)
40from couchers.crypto import (
41 USER_LOCATION_RANDOMIZATION_NAME,
42 asym_encrypt,
43 b64decode,
44 get_secret,
45 simple_decrypt,
46 stable_secure_uniform,
47)
48from couchers.db import session_scope
49from couchers.email.dev import print_dev_email
50from couchers.email.smtp import send_smtp_email
51from couchers.helpers.badges import user_add_badge, user_remove_badge
52from couchers.materialized_views import (
53 refresh_materialized_views,
54 refresh_materialized_views_rapid,
55 user_response_rates,
56)
57from couchers.metrics import strong_verification_completions_counter
58from couchers.models import (
59 AccountDeletionToken,
60 ActivenessProbe,
61 ActivenessProbeStatus,
62 Cluster,
63 ClusterRole,
64 ClusterSubscription,
65 EventOccurrence,
66 EventOccurrenceAttendee,
67 GroupChat,
68 GroupChatSubscription,
69 HostingStatus,
70 HostRequest,
71 HostRequestStatus,
72 Invoice,
73 LoginToken,
74 MeetupStatus,
75 Message,
76 MessageType,
77 PassportSex,
78 PasswordResetToken,
79 Reference,
80 StrongVerificationAttempt,
81 StrongVerificationAttemptStatus,
82 User,
83 UserBadge,
84)
85from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification
86from couchers.notifications.notify import notify
87from couchers.resources import get_badge_dict, get_static_badge_dict
88from couchers.servicers.admin import generate_new_blog_post_notifications
89from couchers.servicers.api import user_model_to_pb
90from couchers.servicers.blocking import is_not_visible
91from couchers.servicers.conversations import generate_message_notifications
92from couchers.servicers.discussions import generate_create_discussion_notifications
93from couchers.servicers.events import (
94 event_to_pb,
95 generate_event_cancel_notifications,
96 generate_event_create_notifications,
97 generate_event_delete_notifications,
98 generate_event_update_notifications,
99)
100from couchers.servicers.requests import host_request_to_pb
101from couchers.servicers.threads import generate_reply_notifications
102from couchers.sql import couchers_select as select
103from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
104from couchers.tasks import send_duplicate_strong_verification_email
105from couchers.utils import (
106 Timestamp_from_datetime,
107 create_coordinate,
108 get_coordinates,
109 make_user_context,
110 now,
111)
112from proto import notification_data_pb2
113from proto.internal import jobs_pb2, verification_pb2
115logger = logging.getLogger(__name__)
117# these were straight up imported
118handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
120send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload
122handle_email_digests.PAYLOAD = empty_pb2.Empty
123handle_email_digests.SCHEDULE = timedelta(minutes=15)
125generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
127generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload
129generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload
131generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
133generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
135generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
137generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
139generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload
141refresh_materialized_views.PAYLOAD = empty_pb2.Empty
142refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
144refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
145refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
148def send_email(payload):
149 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
150 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
151 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
152 # the sender must return a models.Email object that can be added to the database
153 email = sender(
154 sender_name=payload.sender_name,
155 sender_email=payload.sender_email,
156 recipient=payload.recipient,
157 subject=payload.subject,
158 plain=payload.plain,
159 html=payload.html,
160 list_unsubscribe_header=payload.list_unsubscribe_header,
161 source_data=payload.source_data,
162 )
163 with session_scope() as session:
164 session.add(email)
167send_email.PAYLOAD = jobs_pb2.SendEmailPayload
170def purge_login_tokens(payload):
171 logger.info("Purging login tokens")
172 with session_scope() as session:
173 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
176purge_login_tokens.PAYLOAD = empty_pb2.Empty
177purge_login_tokens.SCHEDULE = timedelta(hours=24)
180def purge_password_reset_tokens(payload):
181 logger.info("Purging login tokens")
182 with session_scope() as session:
183 session.execute(
184 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
185 )
188purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
189purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
192def purge_account_deletion_tokens(payload):
193 logger.info("Purging account deletion tokens")
194 with session_scope() as session:
195 session.execute(
196 delete(AccountDeletionToken)
197 .where(~AccountDeletionToken.is_valid)
198 .execution_options(synchronize_session=False)
199 )
202purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
203purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
206def send_message_notifications(payload):
207 """
208 Sends out email notifications for messages that have been unseen for a long enough time
209 """
210 # very crude and dumb algorithm
211 logger.info("Sending out email notifications for unseen messages")
213 with session_scope() as session:
214 # users who have unnotified messages older than 5 minutes in any group chat
215 users = (
216 session.execute(
217 select(User)
218 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
219 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
220 .where(not_(GroupChatSubscription.is_muted))
221 .where(User.is_visible)
222 .where(Message.time >= GroupChatSubscription.joined)
223 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
224 .where(Message.id > User.last_notified_message_id)
225 .where(Message.id > GroupChatSubscription.last_seen_message_id)
226 .where(Message.time < now() - timedelta(minutes=5))
227 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
228 )
229 .scalars()
230 .unique()
231 )
233 for user in users:
234 # now actually grab all the group chats, not just less than 5 min old
235 subquery = (
236 select(
237 GroupChatSubscription.group_chat_id.label("group_chat_id"),
238 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
239 func.max(Message.id).label("message_id"),
240 func.count(Message.id).label("count_unseen"),
241 )
242 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
243 .where(GroupChatSubscription.user_id == user.id)
244 .where(not_(GroupChatSubscription.is_muted))
245 .where(Message.id > user.last_notified_message_id)
246 .where(Message.id > GroupChatSubscription.last_seen_message_id)
247 .where(Message.time >= GroupChatSubscription.joined)
248 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
249 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
250 .group_by(GroupChatSubscription.group_chat_id)
251 .order_by(func.max(Message.id).desc())
252 .subquery()
253 )
255 unseen_messages = session.execute(
256 select(GroupChat, Message, subquery.c.count_unseen)
257 .join(subquery, subquery.c.message_id == Message.id)
258 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
259 .order_by(subquery.c.message_id.desc())
260 ).all()
262 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
264 def format_title(message, group_chat, count_unseen):
265 if group_chat.is_dm:
266 return f"You missed {count_unseen} message(s) from {message.author.name}"
267 else:
268 return f"You missed {count_unseen} message(s) in {group_chat.title}"
270 notify(
271 session,
272 user_id=user.id,
273 topic_action="chat:missed_messages",
274 data=notification_data_pb2.ChatMissedMessages(
275 messages=[
276 notification_data_pb2.ChatMessage(
277 author=user_model_to_pb(
278 message.author,
279 session,
280 make_user_context(user_id=user.id),
281 ),
282 message=format_title(message, group_chat, count_unseen),
283 text=message.text,
284 group_chat_id=message.conversation_id,
285 )
286 for group_chat, message, count_unseen in unseen_messages
287 ],
288 ),
289 )
290 session.commit()
293send_message_notifications.PAYLOAD = empty_pb2.Empty
294send_message_notifications.SCHEDULE = timedelta(minutes=3)
297def send_request_notifications(payload):
298 """
299 Sends out email notifications for unseen messages in host requests (as surfer or host)
300 """
301 logger.info("Sending out email notifications for unseen messages in host requests")
303 with session_scope() as session:
304 # requests where this user is surfing
305 surfing_reqs = session.execute(
306 select(User, HostRequest, func.max(Message.id))
307 .where(User.is_visible)
308 .join(HostRequest, HostRequest.surfer_user_id == User.id)
309 .join(Message, Message.conversation_id == HostRequest.conversation_id)
310 .where(Message.id > HostRequest.surfer_last_seen_message_id)
311 .where(Message.id > User.last_notified_request_message_id)
312 .where(Message.time < now() - timedelta(minutes=5))
313 .where(Message.message_type == MessageType.text)
314 .group_by(User, HostRequest)
315 ).all()
317 # where this user is hosting
318 hosting_reqs = session.execute(
319 select(User, HostRequest, func.max(Message.id))
320 .where(User.is_visible)
321 .join(HostRequest, HostRequest.host_user_id == User.id)
322 .join(Message, Message.conversation_id == HostRequest.conversation_id)
323 .where(Message.id > HostRequest.host_last_seen_message_id)
324 .where(Message.id > User.last_notified_request_message_id)
325 .where(Message.time < now() - timedelta(minutes=5))
326 .where(Message.message_type == MessageType.text)
327 .group_by(User, HostRequest)
328 ).all()
330 for user, host_request, max_message_id in surfing_reqs:
331 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
332 session.flush()
334 context = make_user_context(user_id=user.id)
335 notify(
336 session,
337 user_id=user.id,
338 topic_action="host_request:missed_messages",
339 key=host_request.conversation_id,
340 data=notification_data_pb2.HostRequestMissedMessages(
341 host_request=host_request_to_pb(host_request, session, context),
342 user=user_model_to_pb(host_request.host, session, context),
343 am_host=False,
344 ),
345 )
347 for user, host_request, max_message_id in hosting_reqs:
348 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
349 session.flush()
351 context = make_user_context(user_id=user.id)
352 notify(
353 session,
354 user_id=user.id,
355 topic_action="host_request:missed_messages",
356 key=host_request.conversation_id,
357 data=notification_data_pb2.HostRequestMissedMessages(
358 host_request=host_request_to_pb(host_request, session, context),
359 user=user_model_to_pb(host_request.surfer, session, context),
360 am_host=True,
361 ),
362 )
365send_request_notifications.PAYLOAD = empty_pb2.Empty
366send_request_notifications.SCHEDULE = timedelta(minutes=3)
369def send_onboarding_emails(payload):
370 """
371 Sends out onboarding emails
372 """
373 logger.info("Sending out onboarding emails")
375 with session_scope() as session:
376 # first onboarding email
377 users = (
378 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
379 )
381 for user in users:
382 notify(
383 session,
384 user_id=user.id,
385 topic_action="onboarding:reminder",
386 key="1",
387 )
388 user.onboarding_emails_sent = 1
389 user.last_onboarding_email_sent = now()
390 session.commit()
392 # second onboarding email
393 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
394 users = (
395 session.execute(
396 select(User)
397 .where(User.is_visible)
398 .where(User.onboarding_emails_sent == 1)
399 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
400 .where(User.has_completed_profile == False)
401 )
402 .scalars()
403 .all()
404 )
406 for user in users:
407 notify(
408 session,
409 user_id=user.id,
410 topic_action="onboarding:reminder",
411 key="2",
412 )
413 user.onboarding_emails_sent = 2
414 user.last_onboarding_email_sent = now()
415 session.commit()
418send_onboarding_emails.PAYLOAD = empty_pb2.Empty
419send_onboarding_emails.SCHEDULE = timedelta(hours=1)
422def send_reference_reminders(payload):
423 """
424 Sends out reminders to write references after hosting/staying
425 """
426 logger.info("Sending out reference reminder emails")
428 # Keep this in chronological order!
429 reference_reminder_schedule = [
430 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
431 # the end time to write a reference is supposed to be midnight in the host's timezone
432 # 8 pm ish on the last day of the stay
433 (1, timedelta(days=15) - timedelta(hours=20), 14),
434 # 2 pm ish a week after stay
435 (2, timedelta(days=8) - timedelta(hours=14), 7),
436 # 10 am ish 3 days before end of time to write ref
437 (3, timedelta(days=4) - timedelta(hours=10), 3),
438 ]
440 with session_scope() as session:
441 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
442 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
443 user = aliased(User)
444 other_user = aliased(User)
445 # surfers needing to write a ref
446 q1 = (
447 select(literal(True), HostRequest, user, other_user)
448 .join(user, user.id == HostRequest.surfer_user_id)
449 .join(other_user, other_user.id == HostRequest.host_user_id)
450 .outerjoin(
451 Reference,
452 and_(
453 Reference.host_request_id == HostRequest.conversation_id,
454 # if no reference is found in this join, then the surfer has not written a ref
455 Reference.from_user_id == HostRequest.surfer_user_id,
456 ),
457 )
458 .where(user.is_visible)
459 .where(other_user.is_visible)
460 .where(Reference.id == None)
461 .where(HostRequest.can_write_reference)
462 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
463 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
464 .where(HostRequest.surfer_reason_didnt_meetup == None)
465 )
467 # hosts needing to write a ref
468 q2 = (
469 select(literal(False), HostRequest, user, other_user)
470 .join(user, user.id == HostRequest.host_user_id)
471 .join(other_user, other_user.id == HostRequest.surfer_user_id)
472 .outerjoin(
473 Reference,
474 and_(
475 Reference.host_request_id == HostRequest.conversation_id,
476 # if no reference is found in this join, then the host has not written a ref
477 Reference.from_user_id == HostRequest.host_user_id,
478 ),
479 )
480 .where(user.is_visible)
481 .where(other_user.is_visible)
482 .where(Reference.id == None)
483 .where(HostRequest.can_write_reference)
484 .where(HostRequest.host_sent_reference_reminders < reminder_number)
485 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
486 .where(HostRequest.host_reason_didnt_meetup == None)
487 )
489 union = union_all(q1, q2).subquery()
490 union = select(
491 union.c[0].label("surfed"),
492 aliased(HostRequest, union),
493 aliased(user, union),
494 aliased(other_user, union),
495 )
496 reference_reminders = session.execute(union).all()
498 for surfed, host_request, user, other_user in reference_reminders:
499 # checked in sql
500 assert user.is_visible
501 if not is_not_visible(session, user.id, other_user.id):
502 context = make_user_context(user_id=user.id)
503 notify(
504 session,
505 user_id=user.id,
506 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
507 data=notification_data_pb2.ReferenceReminder(
508 host_request_id=host_request.conversation_id,
509 other_user=user_model_to_pb(other_user, session, context),
510 days_left=reminder_days_left,
511 ),
512 )
513 if surfed:
514 host_request.surfer_sent_reference_reminders = reminder_number
515 else:
516 host_request.host_sent_reference_reminders = reminder_number
517 session.commit()
520send_reference_reminders.PAYLOAD = empty_pb2.Empty
521send_reference_reminders.SCHEDULE = timedelta(hours=1)
524def send_host_request_reminders(payload):
525 with session_scope() as session:
526 host_has_sent_message = select(1).where(
527 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id
528 )
530 requests = (
531 session.execute(
532 select(HostRequest)
533 .where(HostRequest.status == HostRequestStatus.pending)
534 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS)
535 .where(HostRequest.start_time > func.now())
536 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL)
537 .where(~exists(host_has_sent_message))
538 )
539 .scalars()
540 .all()
541 )
543 for host_request in requests:
544 host_request.host_sent_request_reminders += 1
545 host_request.last_sent_request_reminder_time = now()
547 context = make_user_context(user_id=host_request.host_user_id)
548 notify(
549 session,
550 user_id=host_request.host_user_id,
551 topic_action="host_request:reminder",
552 data=notification_data_pb2.HostRequestReminder(
553 host_request=host_request_to_pb(host_request, session, context),
554 surfer=user_model_to_pb(host_request.surfer, session, context),
555 ),
556 )
558 session.commit()
561send_host_request_reminders.PAYLOAD = empty_pb2.Empty
562send_host_request_reminders.SCHEDULE = timedelta(minutes=15)
565def add_users_to_email_list(payload):
566 if not config["LISTMONK_ENABLED"]:
567 logger.info("Not adding users to mailing list")
568 return
570 logger.info("Adding users to mailing list")
572 while True:
573 with session_scope() as session:
574 user = session.execute(
575 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
576 ).scalar_one_or_none()
577 if not user:
578 logger.info("Finished adding users to mailing list")
579 return
581 if user.opt_out_of_newsletter:
582 user.in_sync_with_newsletter = True
583 session.commit()
584 continue
586 r = requests.post(
587 config["LISTMONK_BASE_URL"] + "/api/subscribers",
588 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
589 json={
590 "email": user.email,
591 "name": user.name,
592 "lists": [config["LISTMONK_LIST_ID"]],
593 "preconfirm_subscriptions": True,
594 "attribs": {"couchers_user_id": user.id},
595 "status": "enabled",
596 },
597 timeout=10,
598 )
599 # the API returns if the user is already subscribed
600 if r.status_code == 200 or r.status_code == 409:
601 user.in_sync_with_newsletter = True
602 session.commit()
603 else:
604 raise Exception("Failed to add users to mailing list")
607add_users_to_email_list.PAYLOAD = empty_pb2.Empty
608add_users_to_email_list.SCHEDULE = timedelta(hours=1)
611def enforce_community_membership(payload):
612 tasks_enforce_community_memberships()
615enforce_community_membership.PAYLOAD = empty_pb2.Empty
616enforce_community_membership.SCHEDULE = timedelta(minutes=15)
619def update_recommendation_scores(payload):
620 text_fields = [
621 User.hometown,
622 User.occupation,
623 User.education,
624 User.about_me,
625 User.things_i_like,
626 User.about_place,
627 User.additional_information,
628 User.pet_details,
629 User.kid_details,
630 User.housemate_details,
631 User.other_host_info,
632 User.sleeping_details,
633 User.area,
634 User.house_rules,
635 ]
636 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
638 def poor_man_gaussian():
639 """
640 Produces an approximatley std normal random variate
641 """
642 trials = 5
643 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
645 def int_(stmt):
646 return func.coalesce(cast(stmt, Integer), 0)
648 def float_(stmt):
649 return func.coalesce(cast(stmt, Float), 0.0)
651 with session_scope() as session:
652 # profile
653 profile_text = ""
654 for field in text_fields:
655 profile_text += func.coalesce(field, "")
656 text_length = func.length(profile_text)
657 home_text = ""
658 for field in home_fields:
659 home_text += func.coalesce(field, "")
660 home_length = func.length(home_text)
662 has_text = int_(text_length > 500)
663 long_text = int_(text_length > 2000)
664 has_pic = int_(User.avatar_key != None)
665 can_host = int_(User.hosting_status == HostingStatus.can_host)
666 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
667 filled_home = int_(User.last_minute != None) * int_(home_length > 200)
668 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host
670 # references
671 left_ref_expr = int_(1).label("left_reference")
672 left_refs_subquery = (
673 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
674 )
675 left_reference = int_(left_refs_subquery.c.left_reference)
676 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
677 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
678 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
679 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
680 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
681 "has_bad_ref"
682 )
683 received_ref_subquery = (
684 select(
685 Reference.to_user_id.label("user_id"),
686 has_reference_expr,
687 has_multiple_types_expr,
688 has_bad_ref_expr,
689 ref_count_expr,
690 ref_avg_expr,
691 )
692 .group_by(Reference.to_user_id)
693 .subquery()
694 )
695 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
696 has_reference = int_(received_ref_subquery.c.has_reference)
697 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
698 rating_score = float_(
699 received_ref_subquery.c.ref_avg
700 * (
701 2 * func.least(received_ref_subquery.c.ref_count, 5)
702 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
703 )
704 )
705 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
707 # activeness
708 recently_active = int_(User.last_active >= now() - timedelta(days=180))
709 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
710 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
711 messaged_lots = int_(func.count(Message.id) > 5)
712 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
713 messaging_subquery = (
714 select(Message.author_id.label("user_id"), messaging_points_subquery)
715 .where(Message.message_type == MessageType.text)
716 .group_by(Message.author_id)
717 .subquery()
718 )
719 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
721 # verification
722 cb_subquery = (
723 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
724 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
725 .where(ClusterSubscription.role == ClusterRole.admin)
726 .where(Cluster.is_official_cluster)
727 .group_by(ClusterSubscription.user_id)
728 .subquery()
729 )
730 min_node_id = cb_subquery.c.min_node_id
731 cb = int_(min_node_id >= 1)
732 wcb = int_(min_node_id == 1)
733 badge_points = {
734 "founder": 100,
735 "board_member": 20,
736 "past_board_member": 5,
737 "strong_verification": 3,
738 "volunteer": 3,
739 "past_volunteer": 2,
740 "donor": 1,
741 "phone_verified": 1,
742 }
744 badge_subquery = (
745 select(
746 UserBadge.user_id.label("user_id"),
747 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
748 )
749 .group_by(UserBadge.user_id)
750 .subquery()
751 )
753 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
755 # response rate
756 hr_subquery = select(
757 user_response_rates.c.user_id,
758 float_(extract("epoch", user_response_rates.c.response_time_33p) / 60.0).label("response_time_33p"),
759 float_(extract("epoch", user_response_rates.c.response_time_66p) / 60.0).label("response_time_66p"),
760 ).subquery()
761 response_time_33p = hr_subquery.c.response_time_33p
762 response_time_66p = hr_subquery.c.response_time_66p
763 # be careful with nulls
764 response_rate_points = -10 * int_(response_time_33p > 60 * 72.0) + 5 * int_(response_time_66p < 60 * 48.0)
766 recommendation_score = (
767 profile_points
768 + ref_score
769 + activeness_points
770 + other_points
771 + response_rate_points
772 + 2 * poor_man_gaussian()
773 )
775 scores = (
776 select(User.id.label("user_id"), recommendation_score.label("score"))
777 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
778 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
779 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
780 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
781 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
782 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
783 ).subquery()
785 session.execute(
786 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
787 )
789 logger.info("Updated recommendation scores")
792update_recommendation_scores.PAYLOAD = empty_pb2.Empty
793update_recommendation_scores.SCHEDULE = timedelta(hours=24)
796def update_badges(payload):
797 with session_scope() as session:
799 def update_badge(badge_id: str, members: list[int]):
800 badge = get_badge_dict()[badge_id]
801 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
802 # in case the user ids don't exist in the db
803 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
804 # we should add the badge to these
805 add = set(actual_members) - set(user_ids)
806 # we should remove the badge from these
807 remove = set(user_ids) - set(actual_members)
808 for user_id in add:
809 user_add_badge(session, user_id, badge_id)
811 for user_id in remove:
812 user_remove_badge(session, user_id, badge_id)
814 update_badge("founder", get_static_badge_dict()["founder"])
815 update_badge("board_member", get_static_badge_dict()["board_member"])
816 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
817 update_badge(
818 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all()
819 )
820 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
821 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
822 # strong verification requires passport on file + gender/sex correspondence and date of birth match
823 update_badge(
824 "strong_verification",
825 session.execute(
826 select(User.id)
827 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
828 .where(StrongVerificationAttempt.has_strong_verification(User))
829 )
830 .scalars()
831 .all(),
832 )
835update_badges.PAYLOAD = empty_pb2.Empty
836update_badges.SCHEDULE = timedelta(minutes=15)
839def finalize_strong_verification(payload):
840 with session_scope() as session:
841 verification_attempt = session.execute(
842 select(StrongVerificationAttempt)
843 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
844 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
845 ).scalar_one()
846 response = requests.post(
847 "https://passportreader.app/api/v1/session.get",
848 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
849 json={"id": verification_attempt.iris_session_id},
850 timeout=10,
851 )
852 if response.status_code != 200:
853 raise Exception(f"Iris didn't return 200: {response.text}")
854 json_data = response.json()
855 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
856 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
857 )
858 assert verification_attempt.user_id == reference_payload.user_id
859 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
860 assert verification_attempt.iris_session_id == json_data["id"]
861 assert json_data["state"] == "APPROVED"
863 if json_data["document_type"] != "PASSPORT":
864 verification_attempt.status = StrongVerificationAttemptStatus.failed
865 notify(
866 session,
867 user_id=verification_attempt.user_id,
868 topic_action="verification:sv_fail",
869 data=notification_data_pb2.VerificationSVFail(
870 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
871 ),
872 )
873 return
875 assert json_data["document_type"] == "PASSPORT"
877 expiry_date = date.fromisoformat(json_data["expiry_date"])
878 nationality = json_data["nationality"]
879 last_three_document_chars = json_data["document_number"][-3:]
881 existing_attempt = session.execute(
882 select(StrongVerificationAttempt)
883 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
884 .where(StrongVerificationAttempt.passport_nationality == nationality)
885 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
886 .order_by(StrongVerificationAttempt.id)
887 .limit(1)
888 ).scalar_one_or_none()
890 verification_attempt.has_minimal_data = True
891 verification_attempt.passport_expiry_date = expiry_date
892 verification_attempt.passport_nationality = nationality
893 verification_attempt.passport_last_three_document_chars = last_three_document_chars
895 if existing_attempt:
896 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
898 if existing_attempt.user_id != verification_attempt.user_id:
899 session.flush()
900 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
902 notify(
903 session,
904 user_id=verification_attempt.user_id,
905 topic_action="verification:sv_fail",
906 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
907 )
908 return
910 verification_attempt.has_full_data = True
911 verification_attempt.passport_encrypted_data = asym_encrypt(
912 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
913 )
914 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
915 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
916 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
918 session.flush()
920 strong_verification_completions_counter.inc()
922 user = verification_attempt.user
923 if verification_attempt.has_strong_verification(user):
924 badge_id = "strong_verification"
925 if session.execute(
926 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
927 ).scalar_one_or_none():
928 return
930 user_add_badge(session, user.id, badge_id, do_notify=False)
931 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success")
932 else:
933 notify(
934 session,
935 user_id=verification_attempt.user_id,
936 topic_action="verification:sv_fail",
937 data=notification_data_pb2.VerificationSVFail(
938 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
939 ),
940 )
943finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload
946def send_activeness_probes(payload):
947 with session_scope() as session:
948 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
950 if config["ACTIVENESS_PROBES_ENABLED"]:
951 # current activeness probes
952 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
954 # users who we should send an activeness probe to
955 new_probe_user_ids = (
956 session.execute(
957 select(User.id)
958 .where(User.is_visible)
959 .where(User.hosting_status == HostingStatus.can_host)
960 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
961 .where(User.id.not_in(select(subquery.c.user_id)))
962 )
963 .scalars()
964 .all()
965 )
967 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
968 probes_today = session.execute(
969 select(func.count())
970 .select_from(ActivenessProbe)
971 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24))
972 ).scalar_one()
974 # send probes to max 2% of users per day
975 max_probes_per_day = 0.02 * total_users
976 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1))
978 if len(new_probe_user_ids) > max_probe_size:
979 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size)
981 for user_id in new_probe_user_ids:
982 session.add(ActivenessProbe(user_id=user_id))
984 session.commit()
986 ## Step 2: actually send out probe notifications
987 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
988 probes = (
989 session.execute(
990 select(ActivenessProbe)
991 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
992 .where(ActivenessProbe.probe_initiated + delay < func.now())
993 .where(ActivenessProbe.is_pending)
994 )
995 .scalars()
996 .all()
997 )
999 for probe in probes:
1000 probe.notifications_sent = probe_number_minus_1 + 1
1001 context = make_user_context(user_id=probe.user.id)
1002 notify(
1003 session,
1004 user_id=probe.user.id,
1005 topic_action="activeness:probe",
1006 key=probe.id,
1007 data=notification_data_pb2.ActivenessProbe(
1008 reminder_number=probe_number_minus_1 + 1,
1009 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
1010 ),
1011 )
1012 session.commit()
1014 ## Step 3: for those who haven't responded, mark them as failed
1015 expired_probes = (
1016 session.execute(
1017 select(ActivenessProbe)
1018 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
1019 .where(ActivenessProbe.is_pending)
1020 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
1021 )
1022 .scalars()
1023 .all()
1024 )
1026 for probe in expired_probes:
1027 probe.responded = now()
1028 probe.response = ActivenessProbeStatus.expired
1029 if probe.user.hosting_status == HostingStatus.can_host:
1030 probe.user.hosting_status = HostingStatus.cant_host
1031 if probe.user.meetup_status == MeetupStatus.wants_to_meetup:
1032 probe.user.meetup_status = MeetupStatus.open_to_meetup
1033 session.commit()
1036send_activeness_probes.PAYLOAD = empty_pb2.Empty
1037send_activeness_probes.SCHEDULE = timedelta(minutes=60)
1040def update_randomized_locations(payload):
1041 """
1042 We generate for each user a randomized location as follows:
1043 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
1044 - For each user, mix in the user_id for randomness
1045 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
1046 - Generate an angle from [0, 360]
1047 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
1048 """
1049 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
1051 def gen_randomized_coords(user_id, lat, lng):
1052 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
1053 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
1054 radius = 0.02 + 0.08 * radius_u
1055 angle_rad = 2 * pi * angle_u
1056 offset_lng = radius * cos(angle_rad)
1057 offset_lat = radius * sin(angle_rad)
1058 return lat + offset_lat, lng + offset_lng
1060 user_updates = []
1062 with session_scope() as session:
1063 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
1065 for user_id, geom in users_to_update:
1066 lat, lng = get_coordinates(geom)
1067 user_updates.append(
1068 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
1069 )
1071 with session_scope() as session:
1072 session.execute(update(User), user_updates)
1075update_randomized_locations.PAYLOAD = empty_pb2.Empty
1076update_randomized_locations.SCHEDULE = timedelta(hours=1)
1079def send_event_reminders(payload: empty_pb2.Empty):
1080 """
1081 Sends reminders for events that are 24 hours away to users who marked themselves as attending.
1082 """
1083 logger.info("Sending event reminder emails")
1085 with session_scope() as session:
1086 occurrences = (
1087 session.execute(
1088 select(EventOccurrence)
1089 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA)
1090 .where(EventOccurrence.start_time >= now())
1091 )
1092 .scalars()
1093 .all()
1094 )
1096 for occurrence in occurrences:
1097 results = session.execute(
1098 select(User, EventOccurrenceAttendee)
1099 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id)
1100 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
1101 .where(EventOccurrenceAttendee.reminder_sent == False)
1102 ).all()
1104 for user, attendee in results:
1105 context = make_user_context(user_id=user.id)
1107 notify(
1108 session,
1109 user_id=user.id,
1110 topic_action="event:reminder",
1111 data=notification_data_pb2.EventReminder(
1112 event=event_to_pb(session, occurrence, context),
1113 user=user_model_to_pb(user, session, context),
1114 ),
1115 )
1117 attendee.reminder_sent = True
1118 session.commit()
1121send_event_reminders.PAYLOAD = empty_pb2.Empty
1122send_event_reminders.SCHEDULE = timedelta(hours=1)