Coverage for src/couchers/jobs/handlers.py: 99%
355 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import cos, pi, sin, sqrt
9import requests
10from google.protobuf import empty_pb2
11from sqlalchemy import Float, Integer
12from sqlalchemy.orm import aliased
13from sqlalchemy.sql import (
14 and_,
15 case,
16 cast,
17 delete,
18 distinct,
19 extract,
20 func,
21 literal,
22 not_,
23 or_,
24 select,
25 union_all,
26 update,
27)
29from couchers.config import config
30from couchers.constants import (
31 ACTIVENESS_PROBE_EXPIRY_TIME,
32 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
33 ACTIVENESS_PROBE_TIME_REMINDERS,
34)
35from couchers.crypto import (
36 USER_LOCATION_RANDOMIZATION_NAME,
37 asym_encrypt,
38 b64decode,
39 get_secret,
40 simple_decrypt,
41 stable_secure_uniform,
42)
43from couchers.db import session_scope
44from couchers.email.dev import print_dev_email
45from couchers.email.smtp import send_smtp_email
46from couchers.helpers.badges import user_add_badge, user_remove_badge
47from couchers.materialized_views import (
48 refresh_materialized_views,
49 refresh_materialized_views_rapid,
50 user_response_rates,
51)
52from couchers.metrics import strong_verification_completions_counter
53from couchers.models import (
54 AccountDeletionToken,
55 ActivenessProbe,
56 ActivenessProbeStatus,
57 Cluster,
58 ClusterRole,
59 ClusterSubscription,
60 GroupChat,
61 GroupChatSubscription,
62 HostingStatus,
63 HostRequest,
64 Invoice,
65 LoginToken,
66 MeetupStatus,
67 Message,
68 MessageType,
69 PassportSex,
70 PasswordResetToken,
71 Reference,
72 StrongVerificationAttempt,
73 StrongVerificationAttemptStatus,
74 User,
75 UserBadge,
76)
77from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification
78from couchers.notifications.notify import notify
79from couchers.resources import get_badge_dict, get_static_badge_dict
80from couchers.servicers.admin import generate_new_blog_post_notifications
81from couchers.servicers.api import user_model_to_pb
82from couchers.servicers.blocking import are_blocked
83from couchers.servicers.conversations import generate_message_notifications
84from couchers.servicers.discussions import generate_create_discussion_notifications
85from couchers.servicers.events import (
86 generate_event_cancel_notifications,
87 generate_event_create_notifications,
88 generate_event_delete_notifications,
89 generate_event_update_notifications,
90)
91from couchers.servicers.requests import host_request_to_pb
92from couchers.servicers.threads import generate_reply_notifications
93from couchers.sql import couchers_select as select
94from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
95from couchers.tasks import send_duplicate_strong_verification_email
96from couchers.utils import Timestamp_from_datetime, create_coordinate, get_coordinates, make_user_context, now
97from proto import notification_data_pb2
98from proto.internal import jobs_pb2, verification_pb2
100logger = logging.getLogger(__name__)
102# these were straight up imported
103handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
105send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload
107handle_email_digests.PAYLOAD = empty_pb2.Empty
108handle_email_digests.SCHEDULE = timedelta(minutes=15)
110generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
112generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload
114generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload
116generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
118generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
120generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
122generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
124generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload
126refresh_materialized_views.PAYLOAD = empty_pb2.Empty
127refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
129refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
130refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
133def send_email(payload):
134 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
135 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
136 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
137 # the sender must return a models.Email object that can be added to the database
138 email = sender(
139 sender_name=payload.sender_name,
140 sender_email=payload.sender_email,
141 recipient=payload.recipient,
142 subject=payload.subject,
143 plain=payload.plain,
144 html=payload.html,
145 list_unsubscribe_header=payload.list_unsubscribe_header,
146 source_data=payload.source_data,
147 )
148 with session_scope() as session:
149 session.add(email)
152send_email.PAYLOAD = jobs_pb2.SendEmailPayload
155def purge_login_tokens(payload):
156 logger.info("Purging login tokens")
157 with session_scope() as session:
158 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
161purge_login_tokens.PAYLOAD = empty_pb2.Empty
162purge_login_tokens.SCHEDULE = timedelta(hours=24)
165def purge_password_reset_tokens(payload):
166 logger.info("Purging login tokens")
167 with session_scope() as session:
168 session.execute(
169 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
170 )
173purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
174purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
177def purge_account_deletion_tokens(payload):
178 logger.info("Purging account deletion tokens")
179 with session_scope() as session:
180 session.execute(
181 delete(AccountDeletionToken)
182 .where(~AccountDeletionToken.is_valid)
183 .execution_options(synchronize_session=False)
184 )
187purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
188purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
191def send_message_notifications(payload):
192 """
193 Sends out email notifications for messages that have been unseen for a long enough time
194 """
195 # very crude and dumb algorithm
196 logger.info("Sending out email notifications for unseen messages")
198 with session_scope() as session:
199 # users who have unnotified messages older than 5 minutes in any group chat
200 users = (
201 session.execute(
202 select(User)
203 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
204 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
205 .where(not_(GroupChatSubscription.is_muted))
206 .where(User.is_visible)
207 .where(Message.time >= GroupChatSubscription.joined)
208 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
209 .where(Message.id > User.last_notified_message_id)
210 .where(Message.id > GroupChatSubscription.last_seen_message_id)
211 .where(Message.time < now() - timedelta(minutes=5))
212 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
213 )
214 .scalars()
215 .unique()
216 )
218 for user in users:
219 # now actually grab all the group chats, not just less than 5 min old
220 subquery = (
221 select(
222 GroupChatSubscription.group_chat_id.label("group_chat_id"),
223 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
224 func.max(Message.id).label("message_id"),
225 func.count(Message.id).label("count_unseen"),
226 )
227 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
228 .where(GroupChatSubscription.user_id == user.id)
229 .where(not_(GroupChatSubscription.is_muted))
230 .where(Message.id > user.last_notified_message_id)
231 .where(Message.id > GroupChatSubscription.last_seen_message_id)
232 .where(Message.time >= GroupChatSubscription.joined)
233 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
234 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
235 .group_by(GroupChatSubscription.group_chat_id)
236 .order_by(func.max(Message.id).desc())
237 .subquery()
238 )
240 unseen_messages = session.execute(
241 select(GroupChat, Message, subquery.c.count_unseen)
242 .join(subquery, subquery.c.message_id == Message.id)
243 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
244 .order_by(subquery.c.message_id.desc())
245 ).all()
247 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
249 def format_title(message, group_chat, count_unseen):
250 if group_chat.is_dm:
251 return f"You missed {count_unseen} message(s) from {message.author.name}"
252 else:
253 return f"You missed {count_unseen} message(s) in {group_chat.title}"
255 notify(
256 session,
257 user_id=user.id,
258 topic_action="chat:missed_messages",
259 data=notification_data_pb2.ChatMissedMessages(
260 messages=[
261 notification_data_pb2.ChatMessage(
262 author=user_model_to_pb(
263 message.author,
264 session,
265 make_user_context(user_id=user.id),
266 ),
267 message=format_title(message, group_chat, count_unseen),
268 text=message.text,
269 group_chat_id=message.conversation_id,
270 )
271 for group_chat, message, count_unseen in unseen_messages
272 ],
273 ),
274 )
275 session.commit()
278send_message_notifications.PAYLOAD = empty_pb2.Empty
279send_message_notifications.SCHEDULE = timedelta(minutes=3)
282def send_request_notifications(payload):
283 """
284 Sends out email notifications for unseen messages in host requests (as surfer or host)
285 """
286 logger.info("Sending out email notifications for unseen messages in host requests")
288 with session_scope() as session:
289 # requests where this user is surfing
290 surfing_reqs = session.execute(
291 select(User, HostRequest, func.max(Message.id))
292 .where(User.is_visible)
293 .join(HostRequest, HostRequest.surfer_user_id == User.id)
294 .join(Message, Message.conversation_id == HostRequest.conversation_id)
295 .where(Message.id > HostRequest.surfer_last_seen_message_id)
296 .where(Message.id > User.last_notified_request_message_id)
297 .where(Message.time < now() - timedelta(minutes=5))
298 .where(Message.message_type == MessageType.text)
299 .group_by(User, HostRequest)
300 ).all()
302 # where this user is hosting
303 hosting_reqs = session.execute(
304 select(User, HostRequest, func.max(Message.id))
305 .where(User.is_visible)
306 .join(HostRequest, HostRequest.host_user_id == User.id)
307 .join(Message, Message.conversation_id == HostRequest.conversation_id)
308 .where(Message.id > HostRequest.host_last_seen_message_id)
309 .where(Message.id > User.last_notified_request_message_id)
310 .where(Message.time < now() - timedelta(minutes=5))
311 .where(Message.message_type == MessageType.text)
312 .group_by(User, HostRequest)
313 ).all()
315 for user, host_request, max_message_id in surfing_reqs:
316 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
317 session.flush()
319 context = make_user_context(user_id=user.id)
320 notify(
321 session,
322 user_id=user.id,
323 topic_action="host_request:missed_messages",
324 key=host_request.conversation_id,
325 data=notification_data_pb2.HostRequestMissedMessages(
326 host_request=host_request_to_pb(host_request, session, context),
327 user=user_model_to_pb(host_request.host, session, context),
328 am_host=False,
329 ),
330 )
332 for user, host_request, max_message_id in hosting_reqs:
333 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
334 session.flush()
336 context = make_user_context(user_id=user.id)
337 notify(
338 session,
339 user_id=user.id,
340 topic_action="host_request:missed_messages",
341 key=host_request.conversation_id,
342 data=notification_data_pb2.HostRequestMissedMessages(
343 host_request=host_request_to_pb(host_request, session, context),
344 user=user_model_to_pb(host_request.surfer, session, context),
345 am_host=True,
346 ),
347 )
350send_request_notifications.PAYLOAD = empty_pb2.Empty
351send_request_notifications.SCHEDULE = timedelta(minutes=3)
354def send_onboarding_emails(payload):
355 """
356 Sends out onboarding emails
357 """
358 logger.info("Sending out onboarding emails")
360 with session_scope() as session:
361 # first onboarding email
362 users = (
363 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
364 )
366 for user in users:
367 notify(
368 session,
369 user_id=user.id,
370 topic_action="onboarding:reminder",
371 key="1",
372 )
373 user.onboarding_emails_sent = 1
374 user.last_onboarding_email_sent = now()
375 session.commit()
377 # second onboarding email
378 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
379 users = (
380 session.execute(
381 select(User)
382 .where(User.is_visible)
383 .where(User.onboarding_emails_sent == 1)
384 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
385 .where(User.has_completed_profile == False)
386 )
387 .scalars()
388 .all()
389 )
391 for user in users:
392 notify(
393 session,
394 user_id=user.id,
395 topic_action="onboarding:reminder",
396 key="2",
397 )
398 user.onboarding_emails_sent = 2
399 user.last_onboarding_email_sent = now()
400 session.commit()
403send_onboarding_emails.PAYLOAD = empty_pb2.Empty
404send_onboarding_emails.SCHEDULE = timedelta(hours=1)
407def send_reference_reminders(payload):
408 """
409 Sends out reminders to write references after hosting/staying
410 """
411 logger.info("Sending out reference reminder emails")
413 # Keep this in chronological order!
414 reference_reminder_schedule = [
415 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
416 # the end time to write a reference is supposed to be midnight in the host's timezone
417 # 8 pm ish on the last day of the stay
418 (1, timedelta(days=15) - timedelta(hours=20), 14),
419 # 2 pm ish a week after stay
420 (2, timedelta(days=8) - timedelta(hours=14), 7),
421 # 10 am ish 3 days before end of time to write ref
422 (3, timedelta(days=4) - timedelta(hours=10), 3),
423 ]
425 with session_scope() as session:
426 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
427 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
428 user = aliased(User)
429 other_user = aliased(User)
430 # surfers needing to write a ref
431 q1 = (
432 select(literal(True), HostRequest, user, other_user)
433 .join(user, user.id == HostRequest.surfer_user_id)
434 .join(other_user, other_user.id == HostRequest.host_user_id)
435 .outerjoin(
436 Reference,
437 and_(
438 Reference.host_request_id == HostRequest.conversation_id,
439 # if no reference is found in this join, then the surfer has not written a ref
440 Reference.from_user_id == HostRequest.surfer_user_id,
441 ),
442 )
443 .where(user.is_visible)
444 .where(other_user.is_visible)
445 .where(Reference.id == None)
446 .where(HostRequest.can_write_reference)
447 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
448 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
449 .where(HostRequest.surfer_reason_didnt_meetup == None)
450 )
452 # hosts needing to write a ref
453 q2 = (
454 select(literal(False), HostRequest, user, other_user)
455 .join(user, user.id == HostRequest.host_user_id)
456 .join(other_user, other_user.id == HostRequest.surfer_user_id)
457 .outerjoin(
458 Reference,
459 and_(
460 Reference.host_request_id == HostRequest.conversation_id,
461 # if no reference is found in this join, then the host has not written a ref
462 Reference.from_user_id == HostRequest.host_user_id,
463 ),
464 )
465 .where(user.is_visible)
466 .where(other_user.is_visible)
467 .where(Reference.id == None)
468 .where(HostRequest.can_write_reference)
469 .where(HostRequest.host_sent_reference_reminders < reminder_number)
470 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
471 .where(HostRequest.host_reason_didnt_meetup == None)
472 )
474 union = union_all(q1, q2).subquery()
475 union = select(
476 union.c[0].label("surfed"),
477 aliased(HostRequest, union),
478 aliased(user, union),
479 aliased(other_user, union),
480 )
481 reference_reminders = session.execute(union).all()
483 for surfed, host_request, user, other_user in reference_reminders:
484 # checked in sql
485 assert user.is_visible
486 if not are_blocked(session, user.id, other_user.id):
487 context = make_user_context(user_id=user.id)
488 notify(
489 session,
490 user_id=user.id,
491 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
492 data=notification_data_pb2.ReferenceReminder(
493 host_request_id=host_request.conversation_id,
494 other_user=user_model_to_pb(other_user, session, context),
495 days_left=reminder_days_left,
496 ),
497 )
498 if surfed:
499 host_request.surfer_sent_reference_reminders = reminder_number
500 else:
501 host_request.host_sent_reference_reminders = reminder_number
502 session.commit()
505send_reference_reminders.PAYLOAD = empty_pb2.Empty
506send_reference_reminders.SCHEDULE = timedelta(hours=1)
509def add_users_to_email_list(payload):
510 if not config["LISTMONK_ENABLED"]:
511 logger.info("Not adding users to mailing list")
512 return
514 logger.info("Adding users to mailing list")
516 while True:
517 with session_scope() as session:
518 user = session.execute(
519 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
520 ).scalar_one_or_none()
521 if not user:
522 logger.info("Finished adding users to mailing list")
523 return
525 if user.opt_out_of_newsletter:
526 user.in_sync_with_newsletter = True
527 session.commit()
528 continue
530 r = requests.post(
531 config["LISTMONK_BASE_URL"] + "/api/subscribers",
532 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
533 json={
534 "email": user.email,
535 "name": user.name,
536 "lists": [config["LISTMONK_LIST_ID"]],
537 "preconfirm_subscriptions": True,
538 "attribs": {"couchers_user_id": user.id},
539 "status": "enabled",
540 },
541 timeout=10,
542 )
543 # the API returns if the user is already subscribed
544 if r.status_code == 200 or r.status_code == 409:
545 user.in_sync_with_newsletter = True
546 session.commit()
547 else:
548 raise Exception("Failed to add users to mailing list")
551add_users_to_email_list.PAYLOAD = empty_pb2.Empty
552add_users_to_email_list.SCHEDULE = timedelta(hours=1)
555def enforce_community_membership(payload):
556 tasks_enforce_community_memberships()
559enforce_community_membership.PAYLOAD = empty_pb2.Empty
560enforce_community_membership.SCHEDULE = timedelta(minutes=15)
563def update_recommendation_scores(payload):
564 text_fields = [
565 User.hometown,
566 User.occupation,
567 User.education,
568 User.about_me,
569 User.things_i_like,
570 User.about_place,
571 User.additional_information,
572 User.pet_details,
573 User.kid_details,
574 User.housemate_details,
575 User.other_host_info,
576 User.sleeping_details,
577 User.area,
578 User.house_rules,
579 ]
580 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
582 def poor_man_gaussian():
583 """
584 Produces an approximatley std normal random variate
585 """
586 trials = 5
587 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
589 def int_(stmt):
590 return func.coalesce(cast(stmt, Integer), 0)
592 def float_(stmt):
593 return func.coalesce(cast(stmt, Float), 0.0)
595 with session_scope() as session:
596 # profile
597 profile_text = ""
598 for field in text_fields:
599 profile_text += func.coalesce(field, "")
600 text_length = func.length(profile_text)
601 home_text = ""
602 for field in home_fields:
603 home_text += func.coalesce(field, "")
604 home_length = func.length(home_text)
606 has_text = int_(text_length > 500)
607 long_text = int_(text_length > 2000)
608 has_pic = int_(User.avatar_key != None)
609 can_host = int_(User.hosting_status == HostingStatus.can_host)
610 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
611 filled_home = int_(User.last_minute != None) * int_(home_length > 200)
612 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host
614 # references
615 left_ref_expr = int_(1).label("left_reference")
616 left_refs_subquery = (
617 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
618 )
619 left_reference = int_(left_refs_subquery.c.left_reference)
620 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
621 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
622 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
623 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
624 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
625 "has_bad_ref"
626 )
627 received_ref_subquery = (
628 select(
629 Reference.to_user_id.label("user_id"),
630 has_reference_expr,
631 has_multiple_types_expr,
632 has_bad_ref_expr,
633 ref_count_expr,
634 ref_avg_expr,
635 )
636 .group_by(Reference.to_user_id)
637 .subquery()
638 )
639 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
640 has_reference = int_(received_ref_subquery.c.has_reference)
641 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
642 rating_score = float_(
643 received_ref_subquery.c.ref_avg
644 * (
645 2 * func.least(received_ref_subquery.c.ref_count, 5)
646 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
647 )
648 )
649 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
651 # activeness
652 recently_active = int_(User.last_active >= now() - timedelta(days=180))
653 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
654 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
655 messaged_lots = int_(func.count(Message.id) > 5)
656 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
657 messaging_subquery = (
658 select(Message.author_id.label("user_id"), messaging_points_subquery)
659 .where(Message.message_type == MessageType.text)
660 .group_by(Message.author_id)
661 .subquery()
662 )
663 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
665 # verification
666 cb_subquery = (
667 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
668 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
669 .where(ClusterSubscription.role == ClusterRole.admin)
670 .where(Cluster.is_official_cluster)
671 .group_by(ClusterSubscription.user_id)
672 .subquery()
673 )
674 min_node_id = cb_subquery.c.min_node_id
675 cb = int_(min_node_id >= 1)
676 wcb = int_(min_node_id == 1)
677 badge_points = {
678 "founder": 100,
679 "board_member": 20,
680 "past_board_member": 5,
681 "strong_verification": 3,
682 "volunteer": 3,
683 "past_volunteer": 2,
684 "donor": 1,
685 "phone_verified": 1,
686 }
688 badge_subquery = (
689 select(
690 UserBadge.user_id.label("user_id"),
691 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
692 )
693 .group_by(UserBadge.user_id)
694 .subquery()
695 )
697 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
699 # response rate
700 hr_subquery = select(
701 user_response_rates.c.user_id,
702 float_(extract("epoch", user_response_rates.c.response_time_33p) / 60.0).label("response_time_33p"),
703 float_(extract("epoch", user_response_rates.c.response_time_66p) / 60.0).label("response_time_66p"),
704 ).subquery()
705 response_time_33p = hr_subquery.c.response_time_33p
706 response_time_66p = hr_subquery.c.response_time_66p
707 # be careful with nulls
708 response_rate_points = -10 * int_(response_time_33p > 60 * 72.0) + 5 * int_(response_time_66p < 60 * 48.0)
710 recommendation_score = (
711 profile_points
712 + ref_score
713 + activeness_points
714 + other_points
715 + response_rate_points
716 + 2 * poor_man_gaussian()
717 )
719 scores = (
720 select(User.id.label("user_id"), recommendation_score.label("score"))
721 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
722 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
723 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
724 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
725 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
726 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
727 ).subquery()
729 session.execute(
730 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
731 )
733 logger.info("Updated recommendation scores")
736update_recommendation_scores.PAYLOAD = empty_pb2.Empty
737update_recommendation_scores.SCHEDULE = timedelta(hours=24)
740def update_badges(payload):
741 with session_scope() as session:
743 def update_badge(badge_id: str, members: list[int]):
744 badge = get_badge_dict()[badge_id]
745 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
746 # in case the user ids don't exist in the db
747 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
748 # we should add the badge to these
749 add = set(actual_members) - set(user_ids)
750 # we should remove the badge from these
751 remove = set(user_ids) - set(actual_members)
752 for user_id in add:
753 user_add_badge(session, user_id, badge_id)
755 for user_id in remove:
756 user_remove_badge(session, user_id, badge_id)
758 update_badge("founder", get_static_badge_dict()["founder"])
759 update_badge("board_member", get_static_badge_dict()["board_member"])
760 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
761 update_badge(
762 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all()
763 )
764 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
765 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
766 # strong verification requires passport on file + gender/sex correspondence and date of birth match
767 update_badge(
768 "strong_verification",
769 session.execute(
770 select(User.id)
771 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
772 .where(StrongVerificationAttempt.has_strong_verification(User))
773 )
774 .scalars()
775 .all(),
776 )
779update_badges.PAYLOAD = empty_pb2.Empty
780update_badges.SCHEDULE = timedelta(minutes=15)
783def finalize_strong_verification(payload):
784 with session_scope() as session:
785 verification_attempt = session.execute(
786 select(StrongVerificationAttempt)
787 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
788 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
789 ).scalar_one()
790 response = requests.post(
791 "https://passportreader.app/api/v1/session.get",
792 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
793 json={"id": verification_attempt.iris_session_id},
794 timeout=10,
795 )
796 if response.status_code != 200:
797 raise Exception(f"Iris didn't return 200: {response.text}")
798 json_data = response.json()
799 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
800 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
801 )
802 assert verification_attempt.user_id == reference_payload.user_id
803 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
804 assert verification_attempt.iris_session_id == json_data["id"]
805 assert json_data["state"] == "APPROVED"
807 if json_data["document_type"] != "PASSPORT":
808 verification_attempt.status = StrongVerificationAttemptStatus.failed
809 notify(
810 session,
811 user_id=verification_attempt.user_id,
812 topic_action="verification:sv_fail",
813 data=notification_data_pb2.VerificationSVFail(
814 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
815 ),
816 )
817 return
819 assert json_data["document_type"] == "PASSPORT"
821 expiry_date = date.fromisoformat(json_data["expiry_date"])
822 nationality = json_data["nationality"]
823 last_three_document_chars = json_data["document_number"][-3:]
825 existing_attempt = session.execute(
826 select(StrongVerificationAttempt)
827 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
828 .where(StrongVerificationAttempt.passport_nationality == nationality)
829 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
830 .order_by(StrongVerificationAttempt.id)
831 .limit(1)
832 ).scalar_one_or_none()
834 verification_attempt.has_minimal_data = True
835 verification_attempt.passport_expiry_date = expiry_date
836 verification_attempt.passport_nationality = nationality
837 verification_attempt.passport_last_three_document_chars = last_three_document_chars
839 if existing_attempt:
840 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
842 if existing_attempt.user_id != verification_attempt.user_id:
843 session.flush()
844 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
846 notify(
847 session,
848 user_id=verification_attempt.user_id,
849 topic_action="verification:sv_fail",
850 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
851 )
852 return
854 verification_attempt.has_full_data = True
855 verification_attempt.passport_encrypted_data = asym_encrypt(
856 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
857 )
858 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
859 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
860 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
862 session.flush()
864 strong_verification_completions_counter.inc()
866 user = verification_attempt.user
867 if verification_attempt.has_strong_verification(user):
868 badge_id = "strong_verification"
869 if session.execute(
870 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
871 ).scalar_one_or_none():
872 return
874 user_add_badge(session, user.id, badge_id, do_notify=False)
875 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success")
876 else:
877 notify(
878 session,
879 user_id=verification_attempt.user_id,
880 topic_action="verification:sv_fail",
881 data=notification_data_pb2.VerificationSVFail(
882 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
883 ),
884 )
887finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload
890def send_activeness_probes(payload):
891 with session_scope() as session:
892 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
894 if config["ACTIVENESS_PROBES_ENABLED"]:
895 # current activeness probes
896 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
898 # users who we should send an activeness probe to
899 new_probe_user_ids = (
900 session.execute(
901 select(User.id)
902 .where(User.is_visible)
903 .where(User.hosting_status == HostingStatus.can_host)
904 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
905 .where(User.id.not_in(select(subquery.c.user_id)))
906 )
907 .scalars()
908 .all()
909 )
911 for user_id in new_probe_user_ids:
912 session.add(ActivenessProbe(user_id=user_id))
914 session.commit()
916 ## Step 2: actually send out probe notifications
917 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
918 probes = (
919 session.execute(
920 select(ActivenessProbe)
921 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
922 .where(ActivenessProbe.probe_initiated + delay < func.now())
923 .where(ActivenessProbe.is_pending)
924 )
925 .scalars()
926 .all()
927 )
929 for probe in probes:
930 probe.notifications_sent = probe_number_minus_1 + 1
931 context = make_user_context(user_id=probe.user.id)
932 notify(
933 session,
934 user_id=probe.user.id,
935 topic_action="activeness:probe",
936 key=probe.id,
937 data=notification_data_pb2.ActivenessProbe(
938 reminder_number=probe_number_minus_1 + 1,
939 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
940 ),
941 )
942 session.commit()
944 ## Step 3: for those who haven't responded, mark them as failed
945 expired_probes = (
946 session.execute(
947 select(ActivenessProbe)
948 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
949 .where(ActivenessProbe.is_pending)
950 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
951 )
952 .scalars()
953 .all()
954 )
956 for probe in expired_probes:
957 probe.responded = now()
958 probe.response = ActivenessProbeStatus.expired
959 if probe.user.hosting_status == HostingStatus.can_host:
960 probe.user.hosting_status = HostingStatus.cant_host
961 if probe.user.meetup_status == MeetupStatus.wants_to_meetup:
962 probe.user.meetup_status = MeetupStatus.open_to_meetup
963 session.commit()
966send_activeness_probes.PAYLOAD = empty_pb2.Empty
967send_activeness_probes.SCHEDULE = timedelta(minutes=60)
970def update_randomized_locations(payload):
971 """
972 We generate for each user a randomized location as follows:
973 - Start from a strong random seed (based on the SECRET env var and our key derivation function)
974 - For each user, mix in the user_id for randomness
975 - Generate a radius from [0.02, 0.1] degrees (about 2-10km)
976 - Generate an angle from [0, 360]
977 - Randomized location is then a distance `radius` away at an angle `angle` from `geom`
978 """
979 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME)
981 def gen_randomized_coords(user_id, lat, lng):
982 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii"))
983 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii"))
984 radius = 0.02 + 0.08 * radius_u
985 angle_rad = 2 * pi * angle_u
986 offset_lng = radius * cos(angle_rad)
987 offset_lat = radius * sin(angle_rad)
988 return lat + offset_lat, lng + offset_lng
990 user_updates = []
992 with session_scope() as session:
993 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all()
995 for user_id, geom in users_to_update:
996 lat, lng = get_coordinates(geom)
997 user_updates.append(
998 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))}
999 )
1001 with session_scope() as session:
1002 session.execute(update(User), user_updates)
1005update_randomized_locations.PAYLOAD = empty_pb2.Empty
1006update_randomized_locations.SCHEDULE = timedelta(hours=1)