Coverage for src/couchers/jobs/handlers.py: 98%
311 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import sqrt
8from types import SimpleNamespace
10import requests
11from google.protobuf import empty_pb2
12from sqlalchemy import Integer
13from sqlalchemy.orm import aliased
14from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all
15from sqlalchemy.sql.functions import percentile_disc
17from couchers.config import config
18from couchers.crypto import asym_encrypt, b64decode, simple_decrypt
19from couchers.db import session_scope
20from couchers.email.dev import print_dev_email
21from couchers.email.smtp import send_smtp_email
22from couchers.helpers.badges import user_add_badge, user_remove_badge
23from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid
24from couchers.metrics import strong_verification_completions_counter
25from couchers.models import (
26 AccountDeletionToken,
27 Cluster,
28 ClusterRole,
29 ClusterSubscription,
30 Float,
31 GroupChat,
32 GroupChatSubscription,
33 HostingStatus,
34 HostRequest,
35 Invoice,
36 LoginToken,
37 Message,
38 MessageType,
39 PassportSex,
40 PasswordResetToken,
41 Reference,
42 StrongVerificationAttempt,
43 StrongVerificationAttemptStatus,
44 User,
45 UserBadge,
46)
47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification
48from couchers.notifications.notify import notify
49from couchers.resources import get_badge_dict, get_static_badge_dict
50from couchers.servicers.api import user_model_to_pb
51from couchers.servicers.blocking import are_blocked
52from couchers.servicers.conversations import generate_message_notifications
53from couchers.servicers.events import (
54 generate_event_cancel_notifications,
55 generate_event_create_notifications,
56 generate_event_delete_notifications,
57 generate_event_update_notifications,
58)
59from couchers.servicers.requests import host_request_to_pb
60from couchers.sql import couchers_select as select
61from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
62from couchers.tasks import send_duplicate_strong_verification_email
63from couchers.utils import now
64from proto import notification_data_pb2
65from proto.internal import jobs_pb2, verification_pb2
67logger = logging.getLogger(__name__)
69# these were straight up imported
70handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
72send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload
74handle_email_digests.PAYLOAD = empty_pb2.Empty
75handle_email_digests.SCHEDULE = timedelta(minutes=15)
77generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
79generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
81generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
83generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
85generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
88refresh_materialized_views.PAYLOAD = empty_pb2.Empty
89refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
91refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
92refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
95def send_email(payload):
96 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
97 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
98 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
99 # the sender must return a models.Email object that can be added to the database
100 email = sender(
101 sender_name=payload.sender_name,
102 sender_email=payload.sender_email,
103 recipient=payload.recipient,
104 subject=payload.subject,
105 plain=payload.plain,
106 html=payload.html,
107 list_unsubscribe_header=payload.list_unsubscribe_header,
108 source_data=payload.source_data,
109 )
110 with session_scope() as session:
111 session.add(email)
114send_email.PAYLOAD = jobs_pb2.SendEmailPayload
117def purge_login_tokens(payload):
118 logger.info("Purging login tokens")
119 with session_scope() as session:
120 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
123purge_login_tokens.PAYLOAD = empty_pb2.Empty
124purge_login_tokens.SCHEDULE = timedelta(hours=24)
127def purge_password_reset_tokens(payload):
128 logger.info("Purging login tokens")
129 with session_scope() as session:
130 session.execute(
131 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
132 )
135purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
136purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
139def purge_account_deletion_tokens(payload):
140 logger.info("Purging account deletion tokens")
141 with session_scope() as session:
142 session.execute(
143 delete(AccountDeletionToken)
144 .where(~AccountDeletionToken.is_valid)
145 .execution_options(synchronize_session=False)
146 )
149purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
150purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
153def send_message_notifications(payload):
154 """
155 Sends out email notifications for messages that have been unseen for a long enough time
156 """
157 # very crude and dumb algorithm
158 logger.info("Sending out email notifications for unseen messages")
160 with session_scope() as session:
161 # users who have unnotified messages older than 5 minutes in any group chat
162 users = (
163 session.execute(
164 select(User)
165 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
166 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
167 .where(not_(GroupChatSubscription.is_muted))
168 .where(User.is_visible)
169 .where(Message.time >= GroupChatSubscription.joined)
170 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
171 .where(Message.id > User.last_notified_message_id)
172 .where(Message.id > GroupChatSubscription.last_seen_message_id)
173 .where(Message.time < now() - timedelta(minutes=5))
174 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
175 )
176 .scalars()
177 .unique()
178 )
180 for user in users:
181 # now actually grab all the group chats, not just less than 5 min old
182 subquery = (
183 select(
184 GroupChatSubscription.group_chat_id.label("group_chat_id"),
185 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
186 func.max(Message.id).label("message_id"),
187 func.count(Message.id).label("count_unseen"),
188 )
189 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
190 .where(GroupChatSubscription.user_id == user.id)
191 .where(not_(GroupChatSubscription.is_muted))
192 .where(Message.id > user.last_notified_message_id)
193 .where(Message.id > GroupChatSubscription.last_seen_message_id)
194 .where(Message.time >= GroupChatSubscription.joined)
195 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
196 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
197 .group_by(GroupChatSubscription.group_chat_id)
198 .order_by(func.max(Message.id).desc())
199 .subquery()
200 )
202 unseen_messages = session.execute(
203 select(GroupChat, Message, subquery.c.count_unseen)
204 .join(subquery, subquery.c.message_id == Message.id)
205 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
206 .order_by(subquery.c.message_id.desc())
207 ).all()
209 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
211 def format_title(message, group_chat, count_unseen):
212 if group_chat.is_dm:
213 return f"You missed {count_unseen} message(s) from {message.author.name}"
214 else:
215 return f"You missed {count_unseen} message(s) in {group_chat.title}"
217 notify(
218 session,
219 user_id=user.id,
220 topic_action="chat:missed_messages",
221 data=notification_data_pb2.ChatMissedMessages(
222 messages=[
223 notification_data_pb2.ChatMessage(
224 author=user_model_to_pb(
225 message.author,
226 session,
227 SimpleNamespace(user_id=user.id),
228 ),
229 message=format_title(message, group_chat, count_unseen),
230 text=message.text,
231 group_chat_id=message.conversation_id,
232 )
233 for group_chat, message, count_unseen in unseen_messages
234 ],
235 ),
236 )
237 session.commit()
240send_message_notifications.PAYLOAD = empty_pb2.Empty
241send_message_notifications.SCHEDULE = timedelta(minutes=3)
244def send_request_notifications(payload):
245 """
246 Sends out email notifications for unseen messages in host requests (as surfer or host)
247 """
248 logger.info("Sending out email notifications for unseen messages in host requests")
250 with session_scope() as session:
251 # requests where this user is surfing
252 surfing_reqs = session.execute(
253 select(User, HostRequest, func.max(Message.id))
254 .where(User.is_visible)
255 .join(HostRequest, HostRequest.surfer_user_id == User.id)
256 .join(Message, Message.conversation_id == HostRequest.conversation_id)
257 .where(Message.id > HostRequest.surfer_last_seen_message_id)
258 .where(Message.id > User.last_notified_request_message_id)
259 .where(Message.time < now() - timedelta(minutes=5))
260 .where(Message.message_type == MessageType.text)
261 .group_by(User, HostRequest)
262 ).all()
264 # where this user is hosting
265 hosting_reqs = session.execute(
266 select(User, HostRequest, func.max(Message.id))
267 .where(User.is_visible)
268 .join(HostRequest, HostRequest.host_user_id == User.id)
269 .join(Message, Message.conversation_id == HostRequest.conversation_id)
270 .where(Message.id > HostRequest.host_last_seen_message_id)
271 .where(Message.id > User.last_notified_request_message_id)
272 .where(Message.time < now() - timedelta(minutes=5))
273 .where(Message.message_type == MessageType.text)
274 .group_by(User, HostRequest)
275 ).all()
277 for user, host_request, max_message_id in surfing_reqs:
278 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
279 session.flush()
281 context = SimpleNamespace(user_id=user.id)
282 notify(
283 session,
284 user_id=user.id,
285 topic_action="host_request:missed_messages",
286 key=host_request.conversation_id,
287 data=notification_data_pb2.HostRequestMissedMessages(
288 host_request=host_request_to_pb(host_request, session, context),
289 user=user_model_to_pb(host_request.host, session, context),
290 am_host=False,
291 ),
292 )
294 for user, host_request, max_message_id in hosting_reqs:
295 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
296 session.flush()
298 context = SimpleNamespace(user_id=user.id)
299 notify(
300 session,
301 user_id=user.id,
302 topic_action="host_request:missed_messages",
303 key=host_request.conversation_id,
304 data=notification_data_pb2.HostRequestMissedMessages(
305 host_request=host_request_to_pb(host_request, session, context),
306 user=user_model_to_pb(host_request.surfer, session, context),
307 am_host=True,
308 ),
309 )
312send_request_notifications.PAYLOAD = empty_pb2.Empty
313send_request_notifications.SCHEDULE = timedelta(minutes=3)
316def send_onboarding_emails(payload):
317 """
318 Sends out onboarding emails
319 """
320 logger.info("Sending out onboarding emails")
322 with session_scope() as session:
323 # first onboarding email
324 users = (
325 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
326 )
328 for user in users:
329 notify(
330 session,
331 user_id=user.id,
332 topic_action="onboarding:reminder",
333 key="1",
334 )
335 user.onboarding_emails_sent = 1
336 user.last_onboarding_email_sent = now()
337 session.commit()
339 # second onboarding email
340 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
341 users = (
342 session.execute(
343 select(User)
344 .where(User.is_visible)
345 .where(User.onboarding_emails_sent == 1)
346 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
347 .where(User.has_completed_profile == False)
348 )
349 .scalars()
350 .all()
351 )
353 for user in users:
354 notify(
355 session,
356 user_id=user.id,
357 topic_action="onboarding:reminder",
358 key="2",
359 )
360 user.onboarding_emails_sent = 2
361 user.last_onboarding_email_sent = now()
362 session.commit()
365send_onboarding_emails.PAYLOAD = empty_pb2.Empty
366send_onboarding_emails.SCHEDULE = timedelta(hours=1)
369def send_reference_reminders(payload):
370 """
371 Sends out reminders to write references after hosting/staying
372 """
373 logger.info("Sending out reference reminder emails")
375 # Keep this in chronological order!
376 reference_reminder_schedule = [
377 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
378 # the end time to write a reference is supposed to be midnight in the host's timezone
379 # 8 pm ish on the last day of the stay
380 (1, timedelta(days=15) - timedelta(hours=20), 14),
381 # 2 pm ish a week after stay
382 (2, timedelta(days=8) - timedelta(hours=14), 7),
383 # 10 am ish 3 days before end of time to write ref
384 (3, timedelta(days=4) - timedelta(hours=10), 3),
385 ]
387 with session_scope() as session:
388 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
389 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
390 user = aliased(User)
391 other_user = aliased(User)
392 # surfers needing to write a ref
393 q1 = (
394 select(literal(True), HostRequest, user, other_user)
395 .join(user, user.id == HostRequest.surfer_user_id)
396 .join(other_user, other_user.id == HostRequest.host_user_id)
397 .outerjoin(
398 Reference,
399 and_(
400 Reference.host_request_id == HostRequest.conversation_id,
401 # if no reference is found in this join, then the surfer has not written a ref
402 Reference.from_user_id == HostRequest.surfer_user_id,
403 ),
404 )
405 .where(user.is_visible)
406 .where(other_user.is_visible)
407 .where(Reference.id == None)
408 .where(HostRequest.can_write_reference)
409 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
410 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
411 )
413 # hosts needing to write a ref
414 q2 = (
415 select(literal(False), HostRequest, user, other_user)
416 .join(user, user.id == HostRequest.host_user_id)
417 .join(other_user, other_user.id == HostRequest.surfer_user_id)
418 .outerjoin(
419 Reference,
420 and_(
421 Reference.host_request_id == HostRequest.conversation_id,
422 # if no reference is found in this join, then the host has not written a ref
423 Reference.from_user_id == HostRequest.host_user_id,
424 ),
425 )
426 .where(user.is_visible)
427 .where(other_user.is_visible)
428 .where(Reference.id == None)
429 .where(HostRequest.can_write_reference)
430 .where(HostRequest.host_sent_reference_reminders < reminder_number)
431 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
432 )
434 union = union_all(q1, q2).subquery()
435 union = select(
436 union.c[0].label("surfed"),
437 aliased(HostRequest, union),
438 aliased(user, union),
439 aliased(other_user, union),
440 )
441 reference_reminders = session.execute(union).all()
443 for surfed, host_request, user, other_user in reference_reminders:
444 # checked in sql
445 assert user.is_visible
446 if not are_blocked(session, user.id, other_user.id):
447 context = SimpleNamespace(user_id=user.id)
448 notify(
449 session,
450 user_id=user.id,
451 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
452 data=notification_data_pb2.ReferenceReminder(
453 host_request_id=host_request.conversation_id,
454 other_user=user_model_to_pb(other_user, session, context),
455 days_left=reminder_days_left,
456 ),
457 )
458 if surfed:
459 host_request.surfer_sent_reference_reminders = reminder_number
460 else:
461 host_request.host_sent_reference_reminders = reminder_number
462 session.commit()
465send_reference_reminders.PAYLOAD = empty_pb2.Empty
466send_reference_reminders.SCHEDULE = timedelta(hours=1)
469def add_users_to_email_list(payload):
470 if not config["LISTMONK_ENABLED"]:
471 logger.info("Not adding users to mailing list")
472 return
474 logger.info("Adding users to mailing list")
476 while True:
477 with session_scope() as session:
478 user = session.execute(
479 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
480 ).scalar_one_or_none()
481 if not user:
482 logger.info("Finished adding users to mailing list")
483 return
485 if user.opt_out_of_newsletter:
486 user.in_sync_with_newsletter = True
487 session.commit()
488 continue
490 r = requests.post(
491 config["LISTMONK_BASE_URL"] + "/api/subscribers",
492 auth=("listmonk", config["LISTMONK_API_KEY"]),
493 json={
494 "email": user.email,
495 "name": user.name,
496 "list_uuids": [config["LISTMONK_LIST_UUID"]],
497 "preconfirm_subscriptions": True,
498 "attribs": {"couchers_user_id": user.id},
499 },
500 timeout=10,
501 )
502 # the API returns if the user is already subscribed
503 if r.status_code == 200 or r.status_code == 409:
504 user.in_sync_with_newsletter = True
505 session.commit()
506 else:
507 raise Exception("Failed to add users to mailing list")
510add_users_to_email_list.PAYLOAD = empty_pb2.Empty
511add_users_to_email_list.SCHEDULE = timedelta(hours=1)
514def enforce_community_membership(payload):
515 tasks_enforce_community_memberships()
518enforce_community_membership.PAYLOAD = empty_pb2.Empty
519enforce_community_membership.SCHEDULE = timedelta(minutes=15)
522def update_recommendation_scores(payload):
523 text_fields = [
524 User.hometown,
525 User.occupation,
526 User.education,
527 User.about_me,
528 User.things_i_like,
529 User.about_place,
530 User.additional_information,
531 User.pet_details,
532 User.kid_details,
533 User.housemate_details,
534 User.other_host_info,
535 User.sleeping_details,
536 User.area,
537 User.house_rules,
538 ]
539 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
541 def poor_man_gaussian():
542 """
543 Produces an approximatley std normal random variate
544 """
545 trials = 5
546 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
548 def int_(stmt):
549 return func.coalesce(cast(stmt, Integer), 0)
551 def float_(stmt):
552 return func.coalesce(cast(stmt, Float), 0.0)
554 with session_scope() as session:
555 # profile
556 profile_text = ""
557 for field in text_fields:
558 profile_text += func.coalesce(field, "")
559 text_length = func.length(profile_text)
560 home_text = ""
561 for field in home_fields:
562 home_text += func.coalesce(field, "")
563 home_length = func.length(home_text)
565 has_text = int_(text_length > 500)
566 long_text = int_(text_length > 2000)
567 has_pic = int_(User.avatar_key != None)
568 can_host = int_(User.hosting_status == HostingStatus.can_host)
569 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
570 filled_home = int_(User.last_minute != None) * int_(home_length > 200)
571 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host
573 # references
574 left_ref_expr = int_(1).label("left_reference")
575 left_refs_subquery = (
576 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
577 )
578 left_reference = int_(left_refs_subquery.c.left_reference)
579 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
580 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
581 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
582 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
583 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
584 "has_bad_ref"
585 )
586 received_ref_subquery = (
587 select(
588 Reference.to_user_id.label("user_id"),
589 has_reference_expr,
590 has_multiple_types_expr,
591 has_bad_ref_expr,
592 ref_count_expr,
593 ref_avg_expr,
594 )
595 .group_by(Reference.to_user_id)
596 .subquery()
597 )
598 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
599 has_reference = int_(received_ref_subquery.c.has_reference)
600 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
601 rating_score = float_(
602 received_ref_subquery.c.ref_avg
603 * (
604 2 * func.least(received_ref_subquery.c.ref_count, 5)
605 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
606 )
607 )
608 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
610 # activeness
611 recently_active = int_(User.last_active >= now() - timedelta(days=180))
612 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
613 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
614 messaged_lots = int_(func.count(Message.id) > 5)
615 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
616 messaging_subquery = (
617 select(Message.author_id.label("user_id"), messaging_points_subquery)
618 .where(Message.message_type == MessageType.text)
619 .group_by(Message.author_id)
620 .subquery()
621 )
622 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
624 # verification
625 cb_subquery = (
626 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
627 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
628 .where(ClusterSubscription.role == ClusterRole.admin)
629 .where(Cluster.is_official_cluster)
630 .group_by(ClusterSubscription.user_id)
631 .subquery()
632 )
633 min_node_id = cb_subquery.c.min_node_id
634 cb = int_(min_node_id >= 1)
635 wcb = int_(min_node_id == 1)
636 badge_points = {
637 "founder": 100,
638 "board_member": 20,
639 "past_board_member": 5,
640 "strong_verification": 3,
641 "volunteer": 3,
642 "past_volunteer": 2,
643 "donor": 1,
644 "phone_verified": 1,
645 }
647 badge_subquery = (
648 select(
649 UserBadge.user_id.label("user_id"),
650 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
651 )
652 .group_by(UserBadge.user_id)
653 .subquery()
654 )
656 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
658 # response rate
659 t = (
660 select(Message.conversation_id, Message.time)
661 .where(Message.message_type == MessageType.chat_created)
662 .subquery()
663 )
664 s = (
665 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
666 .group_by(Message.conversation_id, Message.author_id)
667 .subquery()
668 )
669 hr_subquery = (
670 select(
671 HostRequest.host_user_id.label("user_id"),
672 func.avg(s.c.time - t.c.time).label("avg_response_time"),
673 func.count(t.c.time).label("received"),
674 func.count(s.c.time).label("responded"),
675 float_(
676 extract(
677 "epoch",
678 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
679 )
680 / 60.0
681 ).label("response_time_33p"),
682 float_(
683 extract(
684 "epoch",
685 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
686 )
687 / 60.0
688 ).label("response_time_66p"),
689 )
690 .join(t, t.c.conversation_id == HostRequest.conversation_id)
691 .outerjoin(
692 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)
693 )
694 .group_by(HostRequest.host_user_id)
695 .subquery()
696 )
697 avg_response_time = hr_subquery.c.avg_response_time
698 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0)
699 received = hr_subquery.c.received
700 responded = hr_subquery.c.responded
701 response_time_33p = hr_subquery.c.response_time_33p
702 response_time_66p = hr_subquery.c.response_time_66p
703 response_rate = float_(responded / (1.0 * func.greatest(received, 1)))
704 # be careful with nulls
705 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0)
707 recommendation_score = (
708 profile_points
709 + ref_score
710 + activeness_points
711 + other_points
712 + response_rate_points
713 + 2 * poor_man_gaussian()
714 )
716 scores = (
717 select(User.id.label("user_id"), recommendation_score.label("score"))
718 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
719 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
720 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
721 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
722 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
723 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
724 ).subquery()
726 session.execute(
727 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
728 )
730 logger.info("Updated recommendation scores")
733update_recommendation_scores.PAYLOAD = empty_pb2.Empty
734update_recommendation_scores.SCHEDULE = timedelta(hours=24)
737def update_badges(payload):
738 with session_scope() as session:
740 def update_badge(badge_id: str, members: list[int]):
741 badge = get_badge_dict()[badge_id]
742 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
743 # in case the user ids don't exist in the db
744 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
745 # we should add the badge to these
746 add = set(actual_members) - set(user_ids)
747 # we should remove the badge from these
748 remove = set(user_ids) - set(actual_members)
749 for user_id in add:
750 user_add_badge(session, user_id, badge_id)
752 for user_id in remove:
753 user_remove_badge(session, user_id, badge_id)
755 update_badge("founder", get_static_badge_dict()["founder"])
756 update_badge("board_member", get_static_badge_dict()["board_member"])
757 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
758 update_badge(
759 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all()
760 )
761 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
762 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
763 # strong verification requires passport on file + gender/sex correspondence and date of birth match
764 update_badge(
765 "strong_verification",
766 session.execute(
767 select(User.id)
768 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
769 .where(StrongVerificationAttempt.has_strong_verification(User))
770 )
771 .scalars()
772 .all(),
773 )
776update_badges.PAYLOAD = empty_pb2.Empty
777update_badges.SCHEDULE = timedelta(minutes=15)
780def finalize_strong_verification(payload):
781 with session_scope() as session:
782 verification_attempt = session.execute(
783 select(StrongVerificationAttempt)
784 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
785 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
786 ).scalar_one()
787 response = requests.post(
788 "https://passportreader.app/api/v1/session.get",
789 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
790 json={"id": verification_attempt.iris_session_id},
791 timeout=10,
792 )
793 if response.status_code != 200:
794 raise Exception(f"Iris didn't return 200: {response.text}")
795 json_data = response.json()
796 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
797 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
798 )
799 assert verification_attempt.user_id == reference_payload.user_id
800 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
801 assert verification_attempt.iris_session_id == json_data["id"]
802 assert json_data["state"] == "APPROVED"
804 if json_data["document_type"] != "PASSPORT":
805 verification_attempt.status = StrongVerificationAttemptStatus.failed
806 notify(
807 session,
808 user_id=verification_attempt.user_id,
809 topic_action="verification:sv_fail",
810 data=notification_data_pb2.VerificationSVFail(
811 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
812 ),
813 )
814 return
816 assert json_data["document_type"] == "PASSPORT"
818 expiry_date = date.fromisoformat(json_data["expiry_date"])
819 nationality = json_data["nationality"]
820 last_three_document_chars = json_data["document_number"][-3:]
822 existing_attempt = session.execute(
823 select(StrongVerificationAttempt)
824 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
825 .where(StrongVerificationAttempt.passport_nationality == nationality)
826 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
827 .order_by(StrongVerificationAttempt.id)
828 .limit(1)
829 ).scalar_one_or_none()
831 verification_attempt.has_minimal_data = True
832 verification_attempt.passport_expiry_date = expiry_date
833 verification_attempt.passport_nationality = nationality
834 verification_attempt.passport_last_three_document_chars = last_three_document_chars
836 if existing_attempt:
837 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
839 if existing_attempt.user_id != verification_attempt.user_id:
840 session.flush()
841 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
843 notify(
844 session,
845 user_id=verification_attempt.user_id,
846 topic_action="verification:sv_fail",
847 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
848 )
849 return
851 verification_attempt.has_full_data = True
852 verification_attempt.passport_encrypted_data = asym_encrypt(
853 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
854 )
855 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
856 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
857 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
859 session.flush()
861 strong_verification_completions_counter.inc()
863 user = verification_attempt.user
864 if verification_attempt.has_strong_verification(user):
865 badge_id = "strong_verification"
866 if session.execute(
867 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
868 ).scalar_one_or_none():
869 return
871 user_add_badge(session, user.id, badge_id, do_notify=False)
872 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success")
873 else:
874 notify(
875 session,
876 user_id=verification_attempt.user_id,
877 topic_action="verification:sv_fail",
878 data=notification_data_pb2.VerificationSVFail(
879 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
880 ),
881 )
884finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload