Coverage for src/couchers/jobs/handlers.py: 98%
315 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import sqrt
8from types import SimpleNamespace
10import requests
11from google.protobuf import empty_pb2
12from sqlalchemy import Integer
13from sqlalchemy.orm import aliased
14from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all
15from sqlalchemy.sql.functions import percentile_disc
17from couchers.config import config
18from couchers.crypto import asym_encrypt, b64decode, simple_decrypt
19from couchers.db import session_scope
20from couchers.email.dev import print_dev_email
21from couchers.email.smtp import send_smtp_email
22from couchers.helpers.badges import user_add_badge, user_remove_badge
23from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid
24from couchers.metrics import strong_verification_completions_counter
25from couchers.models import (
26 AccountDeletionToken,
27 Cluster,
28 ClusterRole,
29 ClusterSubscription,
30 Float,
31 GroupChat,
32 GroupChatSubscription,
33 HostingStatus,
34 HostRequest,
35 Invoice,
36 LoginToken,
37 Message,
38 MessageType,
39 PassportSex,
40 PasswordResetToken,
41 Reference,
42 StrongVerificationAttempt,
43 StrongVerificationAttemptStatus,
44 User,
45 UserBadge,
46)
47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification
48from couchers.notifications.notify import notify
49from couchers.resources import get_badge_dict, get_static_badge_dict
50from couchers.servicers.api import user_model_to_pb
51from couchers.servicers.blocking import are_blocked
52from couchers.servicers.conversations import generate_message_notifications
53from couchers.servicers.discussions import generate_create_discussion_notifications
54from couchers.servicers.events import (
55 generate_event_cancel_notifications,
56 generate_event_create_notifications,
57 generate_event_delete_notifications,
58 generate_event_update_notifications,
59)
60from couchers.servicers.requests import host_request_to_pb
61from couchers.servicers.threads import generate_reply_notifications
62from couchers.sql import couchers_select as select
63from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
64from couchers.tasks import send_duplicate_strong_verification_email
65from couchers.utils import now
66from proto import notification_data_pb2
67from proto.internal import jobs_pb2, verification_pb2
69logger = logging.getLogger(__name__)
71# these were straight up imported
72handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
74send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload
76handle_email_digests.PAYLOAD = empty_pb2.Empty
77handle_email_digests.SCHEDULE = timedelta(minutes=15)
79generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
81generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload
83generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload
85generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
87generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
89generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
91generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
94refresh_materialized_views.PAYLOAD = empty_pb2.Empty
95refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
97refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
98refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
101def send_email(payload):
102 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
103 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
104 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
105 # the sender must return a models.Email object that can be added to the database
106 email = sender(
107 sender_name=payload.sender_name,
108 sender_email=payload.sender_email,
109 recipient=payload.recipient,
110 subject=payload.subject,
111 plain=payload.plain,
112 html=payload.html,
113 list_unsubscribe_header=payload.list_unsubscribe_header,
114 source_data=payload.source_data,
115 )
116 with session_scope() as session:
117 session.add(email)
120send_email.PAYLOAD = jobs_pb2.SendEmailPayload
123def purge_login_tokens(payload):
124 logger.info("Purging login tokens")
125 with session_scope() as session:
126 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
129purge_login_tokens.PAYLOAD = empty_pb2.Empty
130purge_login_tokens.SCHEDULE = timedelta(hours=24)
133def purge_password_reset_tokens(payload):
134 logger.info("Purging login tokens")
135 with session_scope() as session:
136 session.execute(
137 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
138 )
141purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
142purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
145def purge_account_deletion_tokens(payload):
146 logger.info("Purging account deletion tokens")
147 with session_scope() as session:
148 session.execute(
149 delete(AccountDeletionToken)
150 .where(~AccountDeletionToken.is_valid)
151 .execution_options(synchronize_session=False)
152 )
155purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
156purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
159def send_message_notifications(payload):
160 """
161 Sends out email notifications for messages that have been unseen for a long enough time
162 """
163 # very crude and dumb algorithm
164 logger.info("Sending out email notifications for unseen messages")
166 with session_scope() as session:
167 # users who have unnotified messages older than 5 minutes in any group chat
168 users = (
169 session.execute(
170 select(User)
171 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
172 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
173 .where(not_(GroupChatSubscription.is_muted))
174 .where(User.is_visible)
175 .where(Message.time >= GroupChatSubscription.joined)
176 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
177 .where(Message.id > User.last_notified_message_id)
178 .where(Message.id > GroupChatSubscription.last_seen_message_id)
179 .where(Message.time < now() - timedelta(minutes=5))
180 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
181 )
182 .scalars()
183 .unique()
184 )
186 for user in users:
187 # now actually grab all the group chats, not just less than 5 min old
188 subquery = (
189 select(
190 GroupChatSubscription.group_chat_id.label("group_chat_id"),
191 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
192 func.max(Message.id).label("message_id"),
193 func.count(Message.id).label("count_unseen"),
194 )
195 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
196 .where(GroupChatSubscription.user_id == user.id)
197 .where(not_(GroupChatSubscription.is_muted))
198 .where(Message.id > user.last_notified_message_id)
199 .where(Message.id > GroupChatSubscription.last_seen_message_id)
200 .where(Message.time >= GroupChatSubscription.joined)
201 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
202 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
203 .group_by(GroupChatSubscription.group_chat_id)
204 .order_by(func.max(Message.id).desc())
205 .subquery()
206 )
208 unseen_messages = session.execute(
209 select(GroupChat, Message, subquery.c.count_unseen)
210 .join(subquery, subquery.c.message_id == Message.id)
211 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
212 .order_by(subquery.c.message_id.desc())
213 ).all()
215 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
217 def format_title(message, group_chat, count_unseen):
218 if group_chat.is_dm:
219 return f"You missed {count_unseen} message(s) from {message.author.name}"
220 else:
221 return f"You missed {count_unseen} message(s) in {group_chat.title}"
223 notify(
224 session,
225 user_id=user.id,
226 topic_action="chat:missed_messages",
227 data=notification_data_pb2.ChatMissedMessages(
228 messages=[
229 notification_data_pb2.ChatMessage(
230 author=user_model_to_pb(
231 message.author,
232 session,
233 SimpleNamespace(user_id=user.id),
234 ),
235 message=format_title(message, group_chat, count_unseen),
236 text=message.text,
237 group_chat_id=message.conversation_id,
238 )
239 for group_chat, message, count_unseen in unseen_messages
240 ],
241 ),
242 )
243 session.commit()
246send_message_notifications.PAYLOAD = empty_pb2.Empty
247send_message_notifications.SCHEDULE = timedelta(minutes=3)
250def send_request_notifications(payload):
251 """
252 Sends out email notifications for unseen messages in host requests (as surfer or host)
253 """
254 logger.info("Sending out email notifications for unseen messages in host requests")
256 with session_scope() as session:
257 # requests where this user is surfing
258 surfing_reqs = session.execute(
259 select(User, HostRequest, func.max(Message.id))
260 .where(User.is_visible)
261 .join(HostRequest, HostRequest.surfer_user_id == User.id)
262 .join(Message, Message.conversation_id == HostRequest.conversation_id)
263 .where(Message.id > HostRequest.surfer_last_seen_message_id)
264 .where(Message.id > User.last_notified_request_message_id)
265 .where(Message.time < now() - timedelta(minutes=5))
266 .where(Message.message_type == MessageType.text)
267 .group_by(User, HostRequest)
268 ).all()
270 # where this user is hosting
271 hosting_reqs = session.execute(
272 select(User, HostRequest, func.max(Message.id))
273 .where(User.is_visible)
274 .join(HostRequest, HostRequest.host_user_id == User.id)
275 .join(Message, Message.conversation_id == HostRequest.conversation_id)
276 .where(Message.id > HostRequest.host_last_seen_message_id)
277 .where(Message.id > User.last_notified_request_message_id)
278 .where(Message.time < now() - timedelta(minutes=5))
279 .where(Message.message_type == MessageType.text)
280 .group_by(User, HostRequest)
281 ).all()
283 for user, host_request, max_message_id in surfing_reqs:
284 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
285 session.flush()
287 context = SimpleNamespace(user_id=user.id)
288 notify(
289 session,
290 user_id=user.id,
291 topic_action="host_request:missed_messages",
292 key=host_request.conversation_id,
293 data=notification_data_pb2.HostRequestMissedMessages(
294 host_request=host_request_to_pb(host_request, session, context),
295 user=user_model_to_pb(host_request.host, session, context),
296 am_host=False,
297 ),
298 )
300 for user, host_request, max_message_id in hosting_reqs:
301 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
302 session.flush()
304 context = SimpleNamespace(user_id=user.id)
305 notify(
306 session,
307 user_id=user.id,
308 topic_action="host_request:missed_messages",
309 key=host_request.conversation_id,
310 data=notification_data_pb2.HostRequestMissedMessages(
311 host_request=host_request_to_pb(host_request, session, context),
312 user=user_model_to_pb(host_request.surfer, session, context),
313 am_host=True,
314 ),
315 )
318send_request_notifications.PAYLOAD = empty_pb2.Empty
319send_request_notifications.SCHEDULE = timedelta(minutes=3)
322def send_onboarding_emails(payload):
323 """
324 Sends out onboarding emails
325 """
326 logger.info("Sending out onboarding emails")
328 with session_scope() as session:
329 # first onboarding email
330 users = (
331 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
332 )
334 for user in users:
335 notify(
336 session,
337 user_id=user.id,
338 topic_action="onboarding:reminder",
339 key="1",
340 )
341 user.onboarding_emails_sent = 1
342 user.last_onboarding_email_sent = now()
343 session.commit()
345 # second onboarding email
346 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
347 users = (
348 session.execute(
349 select(User)
350 .where(User.is_visible)
351 .where(User.onboarding_emails_sent == 1)
352 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
353 .where(User.has_completed_profile == False)
354 )
355 .scalars()
356 .all()
357 )
359 for user in users:
360 notify(
361 session,
362 user_id=user.id,
363 topic_action="onboarding:reminder",
364 key="2",
365 )
366 user.onboarding_emails_sent = 2
367 user.last_onboarding_email_sent = now()
368 session.commit()
371send_onboarding_emails.PAYLOAD = empty_pb2.Empty
372send_onboarding_emails.SCHEDULE = timedelta(hours=1)
375def send_reference_reminders(payload):
376 """
377 Sends out reminders to write references after hosting/staying
378 """
379 logger.info("Sending out reference reminder emails")
381 # Keep this in chronological order!
382 reference_reminder_schedule = [
383 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
384 # the end time to write a reference is supposed to be midnight in the host's timezone
385 # 8 pm ish on the last day of the stay
386 (1, timedelta(days=15) - timedelta(hours=20), 14),
387 # 2 pm ish a week after stay
388 (2, timedelta(days=8) - timedelta(hours=14), 7),
389 # 10 am ish 3 days before end of time to write ref
390 (3, timedelta(days=4) - timedelta(hours=10), 3),
391 ]
393 with session_scope() as session:
394 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
395 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
396 user = aliased(User)
397 other_user = aliased(User)
398 # surfers needing to write a ref
399 q1 = (
400 select(literal(True), HostRequest, user, other_user)
401 .join(user, user.id == HostRequest.surfer_user_id)
402 .join(other_user, other_user.id == HostRequest.host_user_id)
403 .outerjoin(
404 Reference,
405 and_(
406 Reference.host_request_id == HostRequest.conversation_id,
407 # if no reference is found in this join, then the surfer has not written a ref
408 Reference.from_user_id == HostRequest.surfer_user_id,
409 ),
410 )
411 .where(user.is_visible)
412 .where(other_user.is_visible)
413 .where(Reference.id == None)
414 .where(HostRequest.can_write_reference)
415 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
416 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
417 .where(HostRequest.surfer_reason_didnt_meetup == None)
418 )
420 # hosts needing to write a ref
421 q2 = (
422 select(literal(False), HostRequest, user, other_user)
423 .join(user, user.id == HostRequest.host_user_id)
424 .join(other_user, other_user.id == HostRequest.surfer_user_id)
425 .outerjoin(
426 Reference,
427 and_(
428 Reference.host_request_id == HostRequest.conversation_id,
429 # if no reference is found in this join, then the host has not written a ref
430 Reference.from_user_id == HostRequest.host_user_id,
431 ),
432 )
433 .where(user.is_visible)
434 .where(other_user.is_visible)
435 .where(Reference.id == None)
436 .where(HostRequest.can_write_reference)
437 .where(HostRequest.host_sent_reference_reminders < reminder_number)
438 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
439 .where(HostRequest.host_reason_didnt_meetup == None)
440 )
442 union = union_all(q1, q2).subquery()
443 union = select(
444 union.c[0].label("surfed"),
445 aliased(HostRequest, union),
446 aliased(user, union),
447 aliased(other_user, union),
448 )
449 reference_reminders = session.execute(union).all()
451 for surfed, host_request, user, other_user in reference_reminders:
452 # checked in sql
453 assert user.is_visible
454 if not are_blocked(session, user.id, other_user.id):
455 context = SimpleNamespace(user_id=user.id)
456 notify(
457 session,
458 user_id=user.id,
459 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
460 data=notification_data_pb2.ReferenceReminder(
461 host_request_id=host_request.conversation_id,
462 other_user=user_model_to_pb(other_user, session, context),
463 days_left=reminder_days_left,
464 ),
465 )
466 if surfed:
467 host_request.surfer_sent_reference_reminders = reminder_number
468 else:
469 host_request.host_sent_reference_reminders = reminder_number
470 session.commit()
473send_reference_reminders.PAYLOAD = empty_pb2.Empty
474send_reference_reminders.SCHEDULE = timedelta(hours=1)
477def add_users_to_email_list(payload):
478 if not config["LISTMONK_ENABLED"]:
479 logger.info("Not adding users to mailing list")
480 return
482 logger.info("Adding users to mailing list")
484 while True:
485 with session_scope() as session:
486 user = session.execute(
487 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
488 ).scalar_one_or_none()
489 if not user:
490 logger.info("Finished adding users to mailing list")
491 return
493 if user.opt_out_of_newsletter:
494 user.in_sync_with_newsletter = True
495 session.commit()
496 continue
498 r = requests.post(
499 config["LISTMONK_BASE_URL"] + "/api/subscribers",
500 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
501 json={
502 "email": user.email,
503 "name": user.name,
504 "lists": [config["LISTMONK_LIST_ID"]],
505 "preconfirm_subscriptions": True,
506 "attribs": {"couchers_user_id": user.id},
507 "status": "enabled",
508 },
509 timeout=10,
510 )
511 # the API returns if the user is already subscribed
512 if r.status_code == 200 or r.status_code == 409:
513 user.in_sync_with_newsletter = True
514 session.commit()
515 else:
516 raise Exception("Failed to add users to mailing list")
519add_users_to_email_list.PAYLOAD = empty_pb2.Empty
520add_users_to_email_list.SCHEDULE = timedelta(hours=1)
523def enforce_community_membership(payload):
524 tasks_enforce_community_memberships()
527enforce_community_membership.PAYLOAD = empty_pb2.Empty
528enforce_community_membership.SCHEDULE = timedelta(minutes=15)
531def update_recommendation_scores(payload):
532 text_fields = [
533 User.hometown,
534 User.occupation,
535 User.education,
536 User.about_me,
537 User.things_i_like,
538 User.about_place,
539 User.additional_information,
540 User.pet_details,
541 User.kid_details,
542 User.housemate_details,
543 User.other_host_info,
544 User.sleeping_details,
545 User.area,
546 User.house_rules,
547 ]
548 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
550 def poor_man_gaussian():
551 """
552 Produces an approximatley std normal random variate
553 """
554 trials = 5
555 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
557 def int_(stmt):
558 return func.coalesce(cast(stmt, Integer), 0)
560 def float_(stmt):
561 return func.coalesce(cast(stmt, Float), 0.0)
563 with session_scope() as session:
564 # profile
565 profile_text = ""
566 for field in text_fields:
567 profile_text += func.coalesce(field, "")
568 text_length = func.length(profile_text)
569 home_text = ""
570 for field in home_fields:
571 home_text += func.coalesce(field, "")
572 home_length = func.length(home_text)
574 has_text = int_(text_length > 500)
575 long_text = int_(text_length > 2000)
576 has_pic = int_(User.avatar_key != None)
577 can_host = int_(User.hosting_status == HostingStatus.can_host)
578 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
579 filled_home = int_(User.last_minute != None) * int_(home_length > 200)
580 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host
582 # references
583 left_ref_expr = int_(1).label("left_reference")
584 left_refs_subquery = (
585 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
586 )
587 left_reference = int_(left_refs_subquery.c.left_reference)
588 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
589 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
590 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
591 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
592 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
593 "has_bad_ref"
594 )
595 received_ref_subquery = (
596 select(
597 Reference.to_user_id.label("user_id"),
598 has_reference_expr,
599 has_multiple_types_expr,
600 has_bad_ref_expr,
601 ref_count_expr,
602 ref_avg_expr,
603 )
604 .group_by(Reference.to_user_id)
605 .subquery()
606 )
607 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
608 has_reference = int_(received_ref_subquery.c.has_reference)
609 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
610 rating_score = float_(
611 received_ref_subquery.c.ref_avg
612 * (
613 2 * func.least(received_ref_subquery.c.ref_count, 5)
614 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
615 )
616 )
617 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
619 # activeness
620 recently_active = int_(User.last_active >= now() - timedelta(days=180))
621 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
622 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
623 messaged_lots = int_(func.count(Message.id) > 5)
624 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
625 messaging_subquery = (
626 select(Message.author_id.label("user_id"), messaging_points_subquery)
627 .where(Message.message_type == MessageType.text)
628 .group_by(Message.author_id)
629 .subquery()
630 )
631 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
633 # verification
634 cb_subquery = (
635 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
636 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
637 .where(ClusterSubscription.role == ClusterRole.admin)
638 .where(Cluster.is_official_cluster)
639 .group_by(ClusterSubscription.user_id)
640 .subquery()
641 )
642 min_node_id = cb_subquery.c.min_node_id
643 cb = int_(min_node_id >= 1)
644 wcb = int_(min_node_id == 1)
645 badge_points = {
646 "founder": 100,
647 "board_member": 20,
648 "past_board_member": 5,
649 "strong_verification": 3,
650 "volunteer": 3,
651 "past_volunteer": 2,
652 "donor": 1,
653 "phone_verified": 1,
654 }
656 badge_subquery = (
657 select(
658 UserBadge.user_id.label("user_id"),
659 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
660 )
661 .group_by(UserBadge.user_id)
662 .subquery()
663 )
665 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
667 # response rate
668 t = (
669 select(Message.conversation_id, Message.time)
670 .where(Message.message_type == MessageType.chat_created)
671 .subquery()
672 )
673 s = (
674 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
675 .group_by(Message.conversation_id, Message.author_id)
676 .subquery()
677 )
678 hr_subquery = (
679 select(
680 HostRequest.host_user_id.label("user_id"),
681 func.avg(s.c.time - t.c.time).label("avg_response_time"),
682 func.count(t.c.time).label("received"),
683 func.count(s.c.time).label("responded"),
684 float_(
685 extract(
686 "epoch",
687 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
688 )
689 / 60.0
690 ).label("response_time_33p"),
691 float_(
692 extract(
693 "epoch",
694 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
695 )
696 / 60.0
697 ).label("response_time_66p"),
698 )
699 .join(t, t.c.conversation_id == HostRequest.conversation_id)
700 .outerjoin(
701 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)
702 )
703 .group_by(HostRequest.host_user_id)
704 .subquery()
705 )
706 avg_response_time = hr_subquery.c.avg_response_time
707 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0)
708 received = hr_subquery.c.received
709 responded = hr_subquery.c.responded
710 response_time_33p = hr_subquery.c.response_time_33p
711 response_time_66p = hr_subquery.c.response_time_66p
712 response_rate = float_(responded / (1.0 * func.greatest(received, 1)))
713 # be careful with nulls
714 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0)
716 recommendation_score = (
717 profile_points
718 + ref_score
719 + activeness_points
720 + other_points
721 + response_rate_points
722 + 2 * poor_man_gaussian()
723 )
725 scores = (
726 select(User.id.label("user_id"), recommendation_score.label("score"))
727 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
728 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
729 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
730 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
731 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
732 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
733 ).subquery()
735 session.execute(
736 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
737 )
739 logger.info("Updated recommendation scores")
742update_recommendation_scores.PAYLOAD = empty_pb2.Empty
743update_recommendation_scores.SCHEDULE = timedelta(hours=24)
746def update_badges(payload):
747 with session_scope() as session:
749 def update_badge(badge_id: str, members: list[int]):
750 badge = get_badge_dict()[badge_id]
751 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
752 # in case the user ids don't exist in the db
753 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
754 # we should add the badge to these
755 add = set(actual_members) - set(user_ids)
756 # we should remove the badge from these
757 remove = set(user_ids) - set(actual_members)
758 for user_id in add:
759 user_add_badge(session, user_id, badge_id)
761 for user_id in remove:
762 user_remove_badge(session, user_id, badge_id)
764 update_badge("founder", get_static_badge_dict()["founder"])
765 update_badge("board_member", get_static_badge_dict()["board_member"])
766 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
767 update_badge(
768 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all()
769 )
770 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
771 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
772 # strong verification requires passport on file + gender/sex correspondence and date of birth match
773 update_badge(
774 "strong_verification",
775 session.execute(
776 select(User.id)
777 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
778 .where(StrongVerificationAttempt.has_strong_verification(User))
779 )
780 .scalars()
781 .all(),
782 )
785update_badges.PAYLOAD = empty_pb2.Empty
786update_badges.SCHEDULE = timedelta(minutes=15)
789def finalize_strong_verification(payload):
790 with session_scope() as session:
791 verification_attempt = session.execute(
792 select(StrongVerificationAttempt)
793 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
794 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
795 ).scalar_one()
796 response = requests.post(
797 "https://passportreader.app/api/v1/session.get",
798 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
799 json={"id": verification_attempt.iris_session_id},
800 timeout=10,
801 )
802 if response.status_code != 200:
803 raise Exception(f"Iris didn't return 200: {response.text}")
804 json_data = response.json()
805 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
806 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
807 )
808 assert verification_attempt.user_id == reference_payload.user_id
809 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
810 assert verification_attempt.iris_session_id == json_data["id"]
811 assert json_data["state"] == "APPROVED"
813 if json_data["document_type"] != "PASSPORT":
814 verification_attempt.status = StrongVerificationAttemptStatus.failed
815 notify(
816 session,
817 user_id=verification_attempt.user_id,
818 topic_action="verification:sv_fail",
819 data=notification_data_pb2.VerificationSVFail(
820 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
821 ),
822 )
823 return
825 assert json_data["document_type"] == "PASSPORT"
827 expiry_date = date.fromisoformat(json_data["expiry_date"])
828 nationality = json_data["nationality"]
829 last_three_document_chars = json_data["document_number"][-3:]
831 existing_attempt = session.execute(
832 select(StrongVerificationAttempt)
833 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
834 .where(StrongVerificationAttempt.passport_nationality == nationality)
835 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
836 .order_by(StrongVerificationAttempt.id)
837 .limit(1)
838 ).scalar_one_or_none()
840 verification_attempt.has_minimal_data = True
841 verification_attempt.passport_expiry_date = expiry_date
842 verification_attempt.passport_nationality = nationality
843 verification_attempt.passport_last_three_document_chars = last_three_document_chars
845 if existing_attempt:
846 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
848 if existing_attempt.user_id != verification_attempt.user_id:
849 session.flush()
850 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
852 notify(
853 session,
854 user_id=verification_attempt.user_id,
855 topic_action="verification:sv_fail",
856 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
857 )
858 return
860 verification_attempt.has_full_data = True
861 verification_attempt.passport_encrypted_data = asym_encrypt(
862 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
863 )
864 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
865 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
866 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
868 session.flush()
870 strong_verification_completions_counter.inc()
872 user = verification_attempt.user
873 if verification_attempt.has_strong_verification(user):
874 badge_id = "strong_verification"
875 if session.execute(
876 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
877 ).scalar_one_or_none():
878 return
880 user_add_badge(session, user.id, badge_id, do_notify=False)
881 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success")
882 else:
883 notify(
884 session,
885 user_id=verification_attempt.user_id,
886 topic_action="verification:sv_fail",
887 data=notification_data_pb2.VerificationSVFail(
888 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
889 ),
890 )
893finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload