Coverage for src/couchers/jobs/handlers.py: 99%
334 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import sqrt
8from types import SimpleNamespace
10import requests
11from google.protobuf import empty_pb2
12from sqlalchemy import Float, Integer
13from sqlalchemy.orm import aliased
14from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all
16from couchers.config import config
17from couchers.constants import (
18 ACTIVENESS_PROBE_EXPIRY_TIME,
19 ACTIVENESS_PROBE_INACTIVITY_PERIOD,
20 ACTIVENESS_PROBE_TIME_REMINDERS,
21)
22from couchers.crypto import asym_encrypt, b64decode, simple_decrypt
23from couchers.db import session_scope
24from couchers.email.dev import print_dev_email
25from couchers.email.smtp import send_smtp_email
26from couchers.helpers.badges import user_add_badge, user_remove_badge
27from couchers.materialized_views import (
28 refresh_materialized_views,
29 refresh_materialized_views_rapid,
30 user_response_rates,
31)
32from couchers.metrics import strong_verification_completions_counter
33from couchers.models import (
34 AccountDeletionToken,
35 ActivenessProbe,
36 ActivenessProbeStatus,
37 Cluster,
38 ClusterRole,
39 ClusterSubscription,
40 GroupChat,
41 GroupChatSubscription,
42 HostingStatus,
43 HostRequest,
44 Invoice,
45 LoginToken,
46 MeetupStatus,
47 Message,
48 MessageType,
49 PassportSex,
50 PasswordResetToken,
51 Reference,
52 StrongVerificationAttempt,
53 StrongVerificationAttemptStatus,
54 User,
55 UserBadge,
56)
57from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification
58from couchers.notifications.notify import notify
59from couchers.resources import get_badge_dict, get_static_badge_dict
60from couchers.servicers.api import user_model_to_pb
61from couchers.servicers.blocking import are_blocked
62from couchers.servicers.conversations import generate_message_notifications
63from couchers.servicers.discussions import generate_create_discussion_notifications
64from couchers.servicers.events import (
65 generate_event_cancel_notifications,
66 generate_event_create_notifications,
67 generate_event_delete_notifications,
68 generate_event_update_notifications,
69)
70from couchers.servicers.requests import host_request_to_pb
71from couchers.servicers.threads import generate_reply_notifications
72from couchers.sql import couchers_select as select
73from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
74from couchers.tasks import send_duplicate_strong_verification_email
75from couchers.utils import Timestamp_from_datetime, now
76from proto import notification_data_pb2
77from proto.internal import jobs_pb2, verification_pb2
79logger = logging.getLogger(__name__)
81# these were straight up imported
82handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
84send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload
86handle_email_digests.PAYLOAD = empty_pb2.Empty
87handle_email_digests.SCHEDULE = timedelta(minutes=15)
89generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
91generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload
93generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload
95generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
97generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
99generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
101generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
104refresh_materialized_views.PAYLOAD = empty_pb2.Empty
105refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
107refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
108refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
111def send_email(payload):
112 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
113 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
114 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
115 # the sender must return a models.Email object that can be added to the database
116 email = sender(
117 sender_name=payload.sender_name,
118 sender_email=payload.sender_email,
119 recipient=payload.recipient,
120 subject=payload.subject,
121 plain=payload.plain,
122 html=payload.html,
123 list_unsubscribe_header=payload.list_unsubscribe_header,
124 source_data=payload.source_data,
125 )
126 with session_scope() as session:
127 session.add(email)
130send_email.PAYLOAD = jobs_pb2.SendEmailPayload
133def purge_login_tokens(payload):
134 logger.info("Purging login tokens")
135 with session_scope() as session:
136 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
139purge_login_tokens.PAYLOAD = empty_pb2.Empty
140purge_login_tokens.SCHEDULE = timedelta(hours=24)
143def purge_password_reset_tokens(payload):
144 logger.info("Purging login tokens")
145 with session_scope() as session:
146 session.execute(
147 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
148 )
151purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
152purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
155def purge_account_deletion_tokens(payload):
156 logger.info("Purging account deletion tokens")
157 with session_scope() as session:
158 session.execute(
159 delete(AccountDeletionToken)
160 .where(~AccountDeletionToken.is_valid)
161 .execution_options(synchronize_session=False)
162 )
165purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
166purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
169def send_message_notifications(payload):
170 """
171 Sends out email notifications for messages that have been unseen for a long enough time
172 """
173 # very crude and dumb algorithm
174 logger.info("Sending out email notifications for unseen messages")
176 with session_scope() as session:
177 # users who have unnotified messages older than 5 minutes in any group chat
178 users = (
179 session.execute(
180 select(User)
181 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
182 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
183 .where(not_(GroupChatSubscription.is_muted))
184 .where(User.is_visible)
185 .where(Message.time >= GroupChatSubscription.joined)
186 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
187 .where(Message.id > User.last_notified_message_id)
188 .where(Message.id > GroupChatSubscription.last_seen_message_id)
189 .where(Message.time < now() - timedelta(minutes=5))
190 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
191 )
192 .scalars()
193 .unique()
194 )
196 for user in users:
197 # now actually grab all the group chats, not just less than 5 min old
198 subquery = (
199 select(
200 GroupChatSubscription.group_chat_id.label("group_chat_id"),
201 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
202 func.max(Message.id).label("message_id"),
203 func.count(Message.id).label("count_unseen"),
204 )
205 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
206 .where(GroupChatSubscription.user_id == user.id)
207 .where(not_(GroupChatSubscription.is_muted))
208 .where(Message.id > user.last_notified_message_id)
209 .where(Message.id > GroupChatSubscription.last_seen_message_id)
210 .where(Message.time >= GroupChatSubscription.joined)
211 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
212 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
213 .group_by(GroupChatSubscription.group_chat_id)
214 .order_by(func.max(Message.id).desc())
215 .subquery()
216 )
218 unseen_messages = session.execute(
219 select(GroupChat, Message, subquery.c.count_unseen)
220 .join(subquery, subquery.c.message_id == Message.id)
221 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
222 .order_by(subquery.c.message_id.desc())
223 ).all()
225 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
227 def format_title(message, group_chat, count_unseen):
228 if group_chat.is_dm:
229 return f"You missed {count_unseen} message(s) from {message.author.name}"
230 else:
231 return f"You missed {count_unseen} message(s) in {group_chat.title}"
233 notify(
234 session,
235 user_id=user.id,
236 topic_action="chat:missed_messages",
237 data=notification_data_pb2.ChatMissedMessages(
238 messages=[
239 notification_data_pb2.ChatMessage(
240 author=user_model_to_pb(
241 message.author,
242 session,
243 SimpleNamespace(user_id=user.id),
244 ),
245 message=format_title(message, group_chat, count_unseen),
246 text=message.text,
247 group_chat_id=message.conversation_id,
248 )
249 for group_chat, message, count_unseen in unseen_messages
250 ],
251 ),
252 )
253 session.commit()
256send_message_notifications.PAYLOAD = empty_pb2.Empty
257send_message_notifications.SCHEDULE = timedelta(minutes=3)
260def send_request_notifications(payload):
261 """
262 Sends out email notifications for unseen messages in host requests (as surfer or host)
263 """
264 logger.info("Sending out email notifications for unseen messages in host requests")
266 with session_scope() as session:
267 # requests where this user is surfing
268 surfing_reqs = session.execute(
269 select(User, HostRequest, func.max(Message.id))
270 .where(User.is_visible)
271 .join(HostRequest, HostRequest.surfer_user_id == User.id)
272 .join(Message, Message.conversation_id == HostRequest.conversation_id)
273 .where(Message.id > HostRequest.surfer_last_seen_message_id)
274 .where(Message.id > User.last_notified_request_message_id)
275 .where(Message.time < now() - timedelta(minutes=5))
276 .where(Message.message_type == MessageType.text)
277 .group_by(User, HostRequest)
278 ).all()
280 # where this user is hosting
281 hosting_reqs = session.execute(
282 select(User, HostRequest, func.max(Message.id))
283 .where(User.is_visible)
284 .join(HostRequest, HostRequest.host_user_id == User.id)
285 .join(Message, Message.conversation_id == HostRequest.conversation_id)
286 .where(Message.id > HostRequest.host_last_seen_message_id)
287 .where(Message.id > User.last_notified_request_message_id)
288 .where(Message.time < now() - timedelta(minutes=5))
289 .where(Message.message_type == MessageType.text)
290 .group_by(User, HostRequest)
291 ).all()
293 for user, host_request, max_message_id in surfing_reqs:
294 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
295 session.flush()
297 context = SimpleNamespace(user_id=user.id)
298 notify(
299 session,
300 user_id=user.id,
301 topic_action="host_request:missed_messages",
302 key=host_request.conversation_id,
303 data=notification_data_pb2.HostRequestMissedMessages(
304 host_request=host_request_to_pb(host_request, session, context),
305 user=user_model_to_pb(host_request.host, session, context),
306 am_host=False,
307 ),
308 )
310 for user, host_request, max_message_id in hosting_reqs:
311 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
312 session.flush()
314 context = SimpleNamespace(user_id=user.id)
315 notify(
316 session,
317 user_id=user.id,
318 topic_action="host_request:missed_messages",
319 key=host_request.conversation_id,
320 data=notification_data_pb2.HostRequestMissedMessages(
321 host_request=host_request_to_pb(host_request, session, context),
322 user=user_model_to_pb(host_request.surfer, session, context),
323 am_host=True,
324 ),
325 )
328send_request_notifications.PAYLOAD = empty_pb2.Empty
329send_request_notifications.SCHEDULE = timedelta(minutes=3)
332def send_onboarding_emails(payload):
333 """
334 Sends out onboarding emails
335 """
336 logger.info("Sending out onboarding emails")
338 with session_scope() as session:
339 # first onboarding email
340 users = (
341 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
342 )
344 for user in users:
345 notify(
346 session,
347 user_id=user.id,
348 topic_action="onboarding:reminder",
349 key="1",
350 )
351 user.onboarding_emails_sent = 1
352 user.last_onboarding_email_sent = now()
353 session.commit()
355 # second onboarding email
356 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
357 users = (
358 session.execute(
359 select(User)
360 .where(User.is_visible)
361 .where(User.onboarding_emails_sent == 1)
362 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
363 .where(User.has_completed_profile == False)
364 )
365 .scalars()
366 .all()
367 )
369 for user in users:
370 notify(
371 session,
372 user_id=user.id,
373 topic_action="onboarding:reminder",
374 key="2",
375 )
376 user.onboarding_emails_sent = 2
377 user.last_onboarding_email_sent = now()
378 session.commit()
381send_onboarding_emails.PAYLOAD = empty_pb2.Empty
382send_onboarding_emails.SCHEDULE = timedelta(hours=1)
385def send_reference_reminders(payload):
386 """
387 Sends out reminders to write references after hosting/staying
388 """
389 logger.info("Sending out reference reminder emails")
391 # Keep this in chronological order!
392 reference_reminder_schedule = [
393 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
394 # the end time to write a reference is supposed to be midnight in the host's timezone
395 # 8 pm ish on the last day of the stay
396 (1, timedelta(days=15) - timedelta(hours=20), 14),
397 # 2 pm ish a week after stay
398 (2, timedelta(days=8) - timedelta(hours=14), 7),
399 # 10 am ish 3 days before end of time to write ref
400 (3, timedelta(days=4) - timedelta(hours=10), 3),
401 ]
403 with session_scope() as session:
404 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
405 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
406 user = aliased(User)
407 other_user = aliased(User)
408 # surfers needing to write a ref
409 q1 = (
410 select(literal(True), HostRequest, user, other_user)
411 .join(user, user.id == HostRequest.surfer_user_id)
412 .join(other_user, other_user.id == HostRequest.host_user_id)
413 .outerjoin(
414 Reference,
415 and_(
416 Reference.host_request_id == HostRequest.conversation_id,
417 # if no reference is found in this join, then the surfer has not written a ref
418 Reference.from_user_id == HostRequest.surfer_user_id,
419 ),
420 )
421 .where(user.is_visible)
422 .where(other_user.is_visible)
423 .where(Reference.id == None)
424 .where(HostRequest.can_write_reference)
425 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
426 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
427 .where(HostRequest.surfer_reason_didnt_meetup == None)
428 )
430 # hosts needing to write a ref
431 q2 = (
432 select(literal(False), HostRequest, user, other_user)
433 .join(user, user.id == HostRequest.host_user_id)
434 .join(other_user, other_user.id == HostRequest.surfer_user_id)
435 .outerjoin(
436 Reference,
437 and_(
438 Reference.host_request_id == HostRequest.conversation_id,
439 # if no reference is found in this join, then the host has not written a ref
440 Reference.from_user_id == HostRequest.host_user_id,
441 ),
442 )
443 .where(user.is_visible)
444 .where(other_user.is_visible)
445 .where(Reference.id == None)
446 .where(HostRequest.can_write_reference)
447 .where(HostRequest.host_sent_reference_reminders < reminder_number)
448 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
449 .where(HostRequest.host_reason_didnt_meetup == None)
450 )
452 union = union_all(q1, q2).subquery()
453 union = select(
454 union.c[0].label("surfed"),
455 aliased(HostRequest, union),
456 aliased(user, union),
457 aliased(other_user, union),
458 )
459 reference_reminders = session.execute(union).all()
461 for surfed, host_request, user, other_user in reference_reminders:
462 # checked in sql
463 assert user.is_visible
464 if not are_blocked(session, user.id, other_user.id):
465 context = SimpleNamespace(user_id=user.id)
466 notify(
467 session,
468 user_id=user.id,
469 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
470 data=notification_data_pb2.ReferenceReminder(
471 host_request_id=host_request.conversation_id,
472 other_user=user_model_to_pb(other_user, session, context),
473 days_left=reminder_days_left,
474 ),
475 )
476 if surfed:
477 host_request.surfer_sent_reference_reminders = reminder_number
478 else:
479 host_request.host_sent_reference_reminders = reminder_number
480 session.commit()
483send_reference_reminders.PAYLOAD = empty_pb2.Empty
484send_reference_reminders.SCHEDULE = timedelta(hours=1)
487def add_users_to_email_list(payload):
488 if not config["LISTMONK_ENABLED"]:
489 logger.info("Not adding users to mailing list")
490 return
492 logger.info("Adding users to mailing list")
494 while True:
495 with session_scope() as session:
496 user = session.execute(
497 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
498 ).scalar_one_or_none()
499 if not user:
500 logger.info("Finished adding users to mailing list")
501 return
503 if user.opt_out_of_newsletter:
504 user.in_sync_with_newsletter = True
505 session.commit()
506 continue
508 r = requests.post(
509 config["LISTMONK_BASE_URL"] + "/api/subscribers",
510 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
511 json={
512 "email": user.email,
513 "name": user.name,
514 "lists": [config["LISTMONK_LIST_ID"]],
515 "preconfirm_subscriptions": True,
516 "attribs": {"couchers_user_id": user.id},
517 "status": "enabled",
518 },
519 timeout=10,
520 )
521 # the API returns if the user is already subscribed
522 if r.status_code == 200 or r.status_code == 409:
523 user.in_sync_with_newsletter = True
524 session.commit()
525 else:
526 raise Exception("Failed to add users to mailing list")
529add_users_to_email_list.PAYLOAD = empty_pb2.Empty
530add_users_to_email_list.SCHEDULE = timedelta(hours=1)
533def enforce_community_membership(payload):
534 tasks_enforce_community_memberships()
537enforce_community_membership.PAYLOAD = empty_pb2.Empty
538enforce_community_membership.SCHEDULE = timedelta(minutes=15)
541def update_recommendation_scores(payload):
542 text_fields = [
543 User.hometown,
544 User.occupation,
545 User.education,
546 User.about_me,
547 User.things_i_like,
548 User.about_place,
549 User.additional_information,
550 User.pet_details,
551 User.kid_details,
552 User.housemate_details,
553 User.other_host_info,
554 User.sleeping_details,
555 User.area,
556 User.house_rules,
557 ]
558 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
560 def poor_man_gaussian():
561 """
562 Produces an approximatley std normal random variate
563 """
564 trials = 5
565 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
567 def int_(stmt):
568 return func.coalesce(cast(stmt, Integer), 0)
570 def float_(stmt):
571 return func.coalesce(cast(stmt, Float), 0.0)
573 with session_scope() as session:
574 # profile
575 profile_text = ""
576 for field in text_fields:
577 profile_text += func.coalesce(field, "")
578 text_length = func.length(profile_text)
579 home_text = ""
580 for field in home_fields:
581 home_text += func.coalesce(field, "")
582 home_length = func.length(home_text)
584 has_text = int_(text_length > 500)
585 long_text = int_(text_length > 2000)
586 has_pic = int_(User.avatar_key != None)
587 can_host = int_(User.hosting_status == HostingStatus.can_host)
588 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
589 filled_home = int_(User.last_minute != None) * int_(home_length > 200)
590 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host
592 # references
593 left_ref_expr = int_(1).label("left_reference")
594 left_refs_subquery = (
595 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
596 )
597 left_reference = int_(left_refs_subquery.c.left_reference)
598 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
599 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
600 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
601 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
602 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
603 "has_bad_ref"
604 )
605 received_ref_subquery = (
606 select(
607 Reference.to_user_id.label("user_id"),
608 has_reference_expr,
609 has_multiple_types_expr,
610 has_bad_ref_expr,
611 ref_count_expr,
612 ref_avg_expr,
613 )
614 .group_by(Reference.to_user_id)
615 .subquery()
616 )
617 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
618 has_reference = int_(received_ref_subquery.c.has_reference)
619 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
620 rating_score = float_(
621 received_ref_subquery.c.ref_avg
622 * (
623 2 * func.least(received_ref_subquery.c.ref_count, 5)
624 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
625 )
626 )
627 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
629 # activeness
630 recently_active = int_(User.last_active >= now() - timedelta(days=180))
631 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
632 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
633 messaged_lots = int_(func.count(Message.id) > 5)
634 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
635 messaging_subquery = (
636 select(Message.author_id.label("user_id"), messaging_points_subquery)
637 .where(Message.message_type == MessageType.text)
638 .group_by(Message.author_id)
639 .subquery()
640 )
641 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
643 # verification
644 cb_subquery = (
645 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
646 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
647 .where(ClusterSubscription.role == ClusterRole.admin)
648 .where(Cluster.is_official_cluster)
649 .group_by(ClusterSubscription.user_id)
650 .subquery()
651 )
652 min_node_id = cb_subquery.c.min_node_id
653 cb = int_(min_node_id >= 1)
654 wcb = int_(min_node_id == 1)
655 badge_points = {
656 "founder": 100,
657 "board_member": 20,
658 "past_board_member": 5,
659 "strong_verification": 3,
660 "volunteer": 3,
661 "past_volunteer": 2,
662 "donor": 1,
663 "phone_verified": 1,
664 }
666 badge_subquery = (
667 select(
668 UserBadge.user_id.label("user_id"),
669 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
670 )
671 .group_by(UserBadge.user_id)
672 .subquery()
673 )
675 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
677 # response rate
678 hr_subquery = select(
679 user_response_rates.c.user_id,
680 float_(extract("epoch", user_response_rates.c.response_time_33p) / 60.0).label("response_time_33p"),
681 float_(extract("epoch", user_response_rates.c.response_time_66p) / 60.0).label("response_time_66p"),
682 ).subquery()
683 response_time_33p = hr_subquery.c.response_time_33p
684 response_time_66p = hr_subquery.c.response_time_66p
685 # be careful with nulls
686 response_rate_points = -10 * int_(response_time_33p > 60 * 72.0) + 5 * int_(response_time_66p < 60 * 48.0)
688 recommendation_score = (
689 profile_points
690 + ref_score
691 + activeness_points
692 + other_points
693 + response_rate_points
694 + 2 * poor_man_gaussian()
695 )
697 scores = (
698 select(User.id.label("user_id"), recommendation_score.label("score"))
699 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
700 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
701 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
702 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
703 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
704 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
705 ).subquery()
707 session.execute(
708 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
709 )
711 logger.info("Updated recommendation scores")
714update_recommendation_scores.PAYLOAD = empty_pb2.Empty
715update_recommendation_scores.SCHEDULE = timedelta(hours=24)
718def update_badges(payload):
719 with session_scope() as session:
721 def update_badge(badge_id: str, members: list[int]):
722 badge = get_badge_dict()[badge_id]
723 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
724 # in case the user ids don't exist in the db
725 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
726 # we should add the badge to these
727 add = set(actual_members) - set(user_ids)
728 # we should remove the badge from these
729 remove = set(user_ids) - set(actual_members)
730 for user_id in add:
731 user_add_badge(session, user_id, badge_id)
733 for user_id in remove:
734 user_remove_badge(session, user_id, badge_id)
736 update_badge("founder", get_static_badge_dict()["founder"])
737 update_badge("board_member", get_static_badge_dict()["board_member"])
738 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
739 update_badge(
740 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all()
741 )
742 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
743 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
744 # strong verification requires passport on file + gender/sex correspondence and date of birth match
745 update_badge(
746 "strong_verification",
747 session.execute(
748 select(User.id)
749 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
750 .where(StrongVerificationAttempt.has_strong_verification(User))
751 )
752 .scalars()
753 .all(),
754 )
757update_badges.PAYLOAD = empty_pb2.Empty
758update_badges.SCHEDULE = timedelta(minutes=15)
761def finalize_strong_verification(payload):
762 with session_scope() as session:
763 verification_attempt = session.execute(
764 select(StrongVerificationAttempt)
765 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
766 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
767 ).scalar_one()
768 response = requests.post(
769 "https://passportreader.app/api/v1/session.get",
770 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
771 json={"id": verification_attempt.iris_session_id},
772 timeout=10,
773 )
774 if response.status_code != 200:
775 raise Exception(f"Iris didn't return 200: {response.text}")
776 json_data = response.json()
777 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
778 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
779 )
780 assert verification_attempt.user_id == reference_payload.user_id
781 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
782 assert verification_attempt.iris_session_id == json_data["id"]
783 assert json_data["state"] == "APPROVED"
785 if json_data["document_type"] != "PASSPORT":
786 verification_attempt.status = StrongVerificationAttemptStatus.failed
787 notify(
788 session,
789 user_id=verification_attempt.user_id,
790 topic_action="verification:sv_fail",
791 data=notification_data_pb2.VerificationSVFail(
792 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
793 ),
794 )
795 return
797 assert json_data["document_type"] == "PASSPORT"
799 expiry_date = date.fromisoformat(json_data["expiry_date"])
800 nationality = json_data["nationality"]
801 last_three_document_chars = json_data["document_number"][-3:]
803 existing_attempt = session.execute(
804 select(StrongVerificationAttempt)
805 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
806 .where(StrongVerificationAttempt.passport_nationality == nationality)
807 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
808 .order_by(StrongVerificationAttempt.id)
809 .limit(1)
810 ).scalar_one_or_none()
812 verification_attempt.has_minimal_data = True
813 verification_attempt.passport_expiry_date = expiry_date
814 verification_attempt.passport_nationality = nationality
815 verification_attempt.passport_last_three_document_chars = last_three_document_chars
817 if existing_attempt:
818 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
820 if existing_attempt.user_id != verification_attempt.user_id:
821 session.flush()
822 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
824 notify(
825 session,
826 user_id=verification_attempt.user_id,
827 topic_action="verification:sv_fail",
828 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
829 )
830 return
832 verification_attempt.has_full_data = True
833 verification_attempt.passport_encrypted_data = asym_encrypt(
834 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
835 )
836 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
837 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
838 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
840 session.flush()
842 strong_verification_completions_counter.inc()
844 user = verification_attempt.user
845 if verification_attempt.has_strong_verification(user):
846 badge_id = "strong_verification"
847 if session.execute(
848 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
849 ).scalar_one_or_none():
850 return
852 user_add_badge(session, user.id, badge_id, do_notify=False)
853 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success")
854 else:
855 notify(
856 session,
857 user_id=verification_attempt.user_id,
858 topic_action="verification:sv_fail",
859 data=notification_data_pb2.VerificationSVFail(
860 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
861 ),
862 )
865finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload
868def send_activeness_probes(payload):
869 with session_scope() as session:
870 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled)
872 if config["ACTIVENESS_PROBES_ENABLED"]:
873 # current activeness probes
874 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery()
876 # users who we should send an activeness probe to
877 new_probe_user_ids = (
878 session.execute(
879 select(User.id)
880 .where(User.is_visible)
881 .where(User.hosting_status == HostingStatus.can_host)
882 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD)
883 .where(User.id.not_in(select(subquery.c.user_id)))
884 )
885 .scalars()
886 .all()
887 )
889 for user_id in new_probe_user_ids:
890 session.add(ActivenessProbe(user_id=user_id))
892 session.commit()
894 ## Step 2: actually send out probe notifications
895 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS):
896 probes = (
897 session.execute(
898 select(ActivenessProbe)
899 .where(ActivenessProbe.notifications_sent == probe_number_minus_1)
900 .where(ActivenessProbe.probe_initiated + delay < func.now())
901 .where(ActivenessProbe.is_pending)
902 )
903 .scalars()
904 .all()
905 )
907 for probe in probes:
908 probe.notifications_sent = probe_number_minus_1 + 1
909 context = SimpleNamespace(user_id=probe.user.id)
910 notify(
911 session,
912 user_id=probe.user.id,
913 topic_action="activeness:probe",
914 key=probe.id,
915 data=notification_data_pb2.ActivenessProbe(
916 reminder_number=probe_number_minus_1 + 1,
917 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME),
918 ),
919 )
920 session.commit()
922 ## Step 3: for those who haven't responded, mark them as failed
923 expired_probes = (
924 session.execute(
925 select(ActivenessProbe)
926 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS))
927 .where(ActivenessProbe.is_pending)
928 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now())
929 )
930 .scalars()
931 .all()
932 )
934 for probe in expired_probes:
935 probe.responded = now()
936 probe.response = ActivenessProbeStatus.expired
937 if probe.user.hosting_status == HostingStatus.can_host:
938 probe.user.hosting_status = HostingStatus.cant_host
939 if probe.user.meetup_status == MeetupStatus.wants_to_meetup:
940 probe.user.meetup_status = MeetupStatus.open_to_meetup
941 session.commit()
944send_activeness_probes.PAYLOAD = empty_pb2.Empty
945send_activeness_probes.SCHEDULE = timedelta(minutes=60)