Coverage for src/couchers/jobs/handlers.py: 99%
292 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import sqrt
8from types import SimpleNamespace
9from typing import List
11import requests
12from google.protobuf import empty_pb2
13from sqlalchemy import Integer
14from sqlalchemy.orm import aliased
15from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all
16from sqlalchemy.sql.functions import percentile_disc
18from couchers.config import config
19from couchers.crypto import asym_encrypt, b64decode, simple_decrypt
20from couchers.db import session_scope
21from couchers.email.dev import print_dev_email
22from couchers.email.smtp import send_smtp_email
23from couchers.helpers.badges import user_add_badge, user_remove_badge
24from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid
25from couchers.models import (
26 AccountDeletionToken,
27 Cluster,
28 ClusterRole,
29 ClusterSubscription,
30 Float,
31 GroupChat,
32 GroupChatSubscription,
33 HostingStatus,
34 HostRequest,
35 Invoice,
36 LoginToken,
37 Message,
38 MessageType,
39 PassportSex,
40 PasswordResetToken,
41 Reference,
42 StrongVerificationAttempt,
43 StrongVerificationAttemptStatus,
44 User,
45 UserBadge,
46)
47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification
48from couchers.notifications.notify import notify
49from couchers.resources import get_badge_dict, get_static_badge_dict
50from couchers.servicers.api import user_model_to_pb
51from couchers.servicers.blocking import are_blocked
52from couchers.servicers.conversations import generate_message_notifications
53from couchers.servicers.events import (
54 generate_event_cancel_notifications,
55 generate_event_create_notifications,
56 generate_event_delete_notifications,
57 generate_event_update_notifications,
58)
59from couchers.servicers.requests import host_request_to_pb
60from couchers.sql import couchers_select as select
61from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
62from couchers.utils import now
63from proto import notification_data_pb2
64from proto.internal import jobs_pb2, verification_pb2
66logger = logging.getLogger(__name__)
68# these were straight up imported
69handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
71send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload
73handle_email_digests.PAYLOAD = empty_pb2.Empty
74handle_email_digests.SCHEDULE = timedelta(minutes=15)
76generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
78generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
80generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
82generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
84generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
87refresh_materialized_views.PAYLOAD = empty_pb2.Empty
88refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
90refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
91refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
94def send_email(payload):
95 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
96 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
97 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
98 # the sender must return a models.Email object that can be added to the database
99 email = sender(
100 sender_name=payload.sender_name,
101 sender_email=payload.sender_email,
102 recipient=payload.recipient,
103 subject=payload.subject,
104 plain=payload.plain,
105 html=payload.html,
106 list_unsubscribe_header=payload.list_unsubscribe_header,
107 source_data=payload.source_data,
108 )
109 with session_scope() as session:
110 session.add(email)
113send_email.PAYLOAD = jobs_pb2.SendEmailPayload
116def purge_login_tokens(payload):
117 logger.info("Purging login tokens")
118 with session_scope() as session:
119 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
122purge_login_tokens.PAYLOAD = empty_pb2.Empty
123purge_login_tokens.SCHEDULE = timedelta(hours=24)
126def purge_password_reset_tokens(payload):
127 logger.info("Purging login tokens")
128 with session_scope() as session:
129 session.execute(
130 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
131 )
134purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
135purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
138def purge_account_deletion_tokens(payload):
139 logger.info("Purging account deletion tokens")
140 with session_scope() as session:
141 session.execute(
142 delete(AccountDeletionToken)
143 .where(~AccountDeletionToken.is_valid)
144 .execution_options(synchronize_session=False)
145 )
148purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
149purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
152def send_message_notifications(payload):
153 """
154 Sends out email notifications for messages that have been unseen for a long enough time
155 """
156 # very crude and dumb algorithm
157 logger.info("Sending out email notifications for unseen messages")
159 with session_scope() as session:
160 # users who have unnotified messages older than 5 minutes in any group chat
161 users = (
162 session.execute(
163 select(User)
164 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
165 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
166 .where(not_(GroupChatSubscription.is_muted))
167 .where(User.is_visible)
168 .where(Message.time >= GroupChatSubscription.joined)
169 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
170 .where(Message.id > User.last_notified_message_id)
171 .where(Message.id > GroupChatSubscription.last_seen_message_id)
172 .where(Message.time < now() - timedelta(minutes=5))
173 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
174 )
175 .scalars()
176 .unique()
177 )
179 for user in users:
180 # now actually grab all the group chats, not just less than 5 min old
181 subquery = (
182 select(
183 GroupChatSubscription.group_chat_id.label("group_chat_id"),
184 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
185 func.max(Message.id).label("message_id"),
186 func.count(Message.id).label("count_unseen"),
187 )
188 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
189 .where(GroupChatSubscription.user_id == user.id)
190 .where(not_(GroupChatSubscription.is_muted))
191 .where(Message.id > user.last_notified_message_id)
192 .where(Message.id > GroupChatSubscription.last_seen_message_id)
193 .where(Message.time >= GroupChatSubscription.joined)
194 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
195 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
196 .group_by(GroupChatSubscription.group_chat_id)
197 .order_by(func.max(Message.id).desc())
198 .subquery()
199 )
201 unseen_messages = session.execute(
202 select(GroupChat, Message, subquery.c.count_unseen)
203 .join(subquery, subquery.c.message_id == Message.id)
204 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
205 .order_by(subquery.c.message_id.desc())
206 ).all()
208 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
210 def format_title(message, group_chat, count_unseen):
211 if group_chat.is_dm:
212 return f"You missed {count_unseen} message(s) from {message.author.name}"
213 else:
214 return f"You missed {count_unseen} message(s) in {group_chat.title}"
216 notify(
217 session,
218 user_id=user.id,
219 topic_action="chat:missed_messages",
220 data=notification_data_pb2.ChatMissedMessages(
221 messages=[
222 notification_data_pb2.ChatMessage(
223 author=user_model_to_pb(
224 message.author,
225 session,
226 SimpleNamespace(user_id=user.id),
227 ),
228 message=format_title(message, group_chat, count_unseen),
229 text=message.text,
230 group_chat_id=message.conversation_id,
231 )
232 for group_chat, message, count_unseen in unseen_messages
233 ],
234 ),
235 )
236 session.commit()
239send_message_notifications.PAYLOAD = empty_pb2.Empty
240send_message_notifications.SCHEDULE = timedelta(minutes=3)
243def send_request_notifications(payload):
244 """
245 Sends out email notifications for unseen messages in host requests (as surfer or host)
246 """
247 logger.info("Sending out email notifications for unseen messages in host requests")
249 with session_scope() as session:
250 # requests where this user is surfing
251 surfing_reqs = session.execute(
252 select(User, HostRequest, func.max(Message.id))
253 .where(User.is_visible)
254 .join(HostRequest, HostRequest.surfer_user_id == User.id)
255 .join(Message, Message.conversation_id == HostRequest.conversation_id)
256 .where(Message.id > HostRequest.surfer_last_seen_message_id)
257 .where(Message.id > User.last_notified_request_message_id)
258 .where(Message.time < now() - timedelta(minutes=5))
259 .where(Message.message_type == MessageType.text)
260 .group_by(User, HostRequest)
261 ).all()
263 # where this user is hosting
264 hosting_reqs = session.execute(
265 select(User, HostRequest, func.max(Message.id))
266 .where(User.is_visible)
267 .join(HostRequest, HostRequest.host_user_id == User.id)
268 .join(Message, Message.conversation_id == HostRequest.conversation_id)
269 .where(Message.id > HostRequest.host_last_seen_message_id)
270 .where(Message.id > User.last_notified_request_message_id)
271 .where(Message.time < now() - timedelta(minutes=5))
272 .where(Message.message_type == MessageType.text)
273 .group_by(User, HostRequest)
274 ).all()
276 for user, host_request, max_message_id in surfing_reqs:
277 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
278 session.flush()
280 context = SimpleNamespace(user_id=user.id)
281 notify(
282 session,
283 user_id=user.id,
284 topic_action="host_request:missed_messages",
285 key=host_request.conversation_id,
286 data=notification_data_pb2.HostRequestMissedMessages(
287 host_request=host_request_to_pb(host_request, session, context),
288 user=user_model_to_pb(host_request.host, session, context),
289 am_host=False,
290 ),
291 )
293 for user, host_request, max_message_id in hosting_reqs:
294 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
295 session.flush()
297 context = SimpleNamespace(user_id=user.id)
298 notify(
299 session,
300 user_id=user.id,
301 topic_action="host_request:missed_messages",
302 key=host_request.conversation_id,
303 data=notification_data_pb2.HostRequestMissedMessages(
304 host_request=host_request_to_pb(host_request, session, context),
305 user=user_model_to_pb(host_request.surfer, session, context),
306 am_host=True,
307 ),
308 )
311send_request_notifications.PAYLOAD = empty_pb2.Empty
312send_request_notifications.SCHEDULE = timedelta(minutes=3)
315def send_onboarding_emails(payload):
316 """
317 Sends out onboarding emails
318 """
319 logger.info("Sending out onboarding emails")
321 with session_scope() as session:
322 # first onboarding email
323 users = (
324 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
325 )
327 for user in users:
328 notify(
329 session,
330 user_id=user.id,
331 topic_action="onboarding:reminder",
332 key="1",
333 )
334 user.onboarding_emails_sent = 1
335 user.last_onboarding_email_sent = now()
336 session.commit()
338 # second onboarding email
339 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
340 users = (
341 session.execute(
342 select(User)
343 .where(User.is_visible)
344 .where(User.onboarding_emails_sent == 1)
345 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
346 .where(User.has_completed_profile == False)
347 )
348 .scalars()
349 .all()
350 )
352 for user in users:
353 notify(
354 session,
355 user_id=user.id,
356 topic_action="onboarding:reminder",
357 key="2",
358 )
359 user.onboarding_emails_sent = 2
360 user.last_onboarding_email_sent = now()
361 session.commit()
364send_onboarding_emails.PAYLOAD = empty_pb2.Empty
365send_onboarding_emails.SCHEDULE = timedelta(hours=1)
368def send_reference_reminders(payload):
369 """
370 Sends out reminders to write references after hosting/staying
371 """
372 logger.info("Sending out reference reminder emails")
374 # Keep this in chronological order!
375 reference_reminder_schedule = [
376 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
377 # the end time to write a reference is supposed to be midnight in the host's timezone
378 # 8 pm ish on the last day of the stay
379 (1, timedelta(days=15) - timedelta(hours=20), 14),
380 # 2 pm ish a week after stay
381 (2, timedelta(days=8) - timedelta(hours=14), 7),
382 # 10 am ish 3 days before end of time to write ref
383 (3, timedelta(days=4) - timedelta(hours=10), 3),
384 ]
386 with session_scope() as session:
387 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
388 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
389 user = aliased(User)
390 other_user = aliased(User)
391 # surfers needing to write a ref
392 q1 = (
393 select(literal(True), HostRequest, user, other_user)
394 .join(user, user.id == HostRequest.surfer_user_id)
395 .join(other_user, other_user.id == HostRequest.host_user_id)
396 .outerjoin(
397 Reference,
398 and_(
399 Reference.host_request_id == HostRequest.conversation_id,
400 # if no reference is found in this join, then the surfer has not written a ref
401 Reference.from_user_id == HostRequest.surfer_user_id,
402 ),
403 )
404 .where(user.is_visible)
405 .where(other_user.is_visible)
406 .where(Reference.id == None)
407 .where(HostRequest.can_write_reference)
408 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
409 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
410 )
412 # hosts needing to write a ref
413 q2 = (
414 select(literal(False), HostRequest, user, other_user)
415 .join(user, user.id == HostRequest.host_user_id)
416 .join(other_user, other_user.id == HostRequest.surfer_user_id)
417 .outerjoin(
418 Reference,
419 and_(
420 Reference.host_request_id == HostRequest.conversation_id,
421 # if no reference is found in this join, then the host has not written a ref
422 Reference.from_user_id == HostRequest.host_user_id,
423 ),
424 )
425 .where(user.is_visible)
426 .where(other_user.is_visible)
427 .where(Reference.id == None)
428 .where(HostRequest.can_write_reference)
429 .where(HostRequest.host_sent_reference_reminders < reminder_number)
430 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
431 )
433 union = union_all(q1, q2).subquery()
434 union = select(
435 union.c[0].label("surfed"),
436 aliased(HostRequest, union),
437 aliased(user, union),
438 aliased(other_user, union),
439 )
440 reference_reminders = session.execute(union).all()
442 for surfed, host_request, user, other_user in reference_reminders:
443 # checked in sql
444 assert user.is_visible
445 if not are_blocked(session, user.id, other_user.id):
446 context = SimpleNamespace(user_id=user.id)
447 notify(
448 session,
449 user_id=user.id,
450 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
451 data=notification_data_pb2.ReferenceReminder(
452 host_request_id=host_request.conversation_id,
453 other_user=user_model_to_pb(other_user, session, context),
454 days_left=reminder_days_left,
455 ),
456 )
457 if surfed:
458 host_request.surfer_sent_reference_reminders = reminder_number
459 else:
460 host_request.host_sent_reference_reminders = reminder_number
461 session.commit()
464send_reference_reminders.PAYLOAD = empty_pb2.Empty
465send_reference_reminders.SCHEDULE = timedelta(hours=1)
468def add_users_to_email_list(payload):
469 if not config["LISTMONK_ENABLED"]:
470 logger.info("Not adding users to mailing list")
471 return
473 logger.info("Adding users to mailing list")
475 while True:
476 with session_scope() as session:
477 user = session.execute(
478 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
479 ).scalar_one_or_none()
480 if not user:
481 logger.info("Finished adding users to mailing list")
482 return
484 if user.opt_out_of_newsletter:
485 user.in_sync_with_newsletter = True
486 session.commit()
487 continue
489 r = requests.post(
490 config["LISTMONK_BASE_URL"] + "/api/subscribers",
491 auth=("listmonk", config["LISTMONK_API_KEY"]),
492 json={
493 "email": user.email,
494 "name": user.name,
495 "list_uuids": [config["LISTMONK_LIST_UUID"]],
496 "preconfirm_subscriptions": True,
497 "attribs": {"couchers_user_id": user.id},
498 },
499 timeout=10,
500 )
501 # the API returns if the user is already subscribed
502 if r.status_code == 200 or r.status_code == 409:
503 user.in_sync_with_newsletter = True
504 session.commit()
505 else:
506 raise Exception("Failed to add users to mailing list")
509add_users_to_email_list.PAYLOAD = empty_pb2.Empty
510add_users_to_email_list.SCHEDULE = timedelta(hours=1)
513def enforce_community_membership(payload):
514 tasks_enforce_community_memberships()
517enforce_community_membership.PAYLOAD = empty_pb2.Empty
518enforce_community_membership.SCHEDULE = timedelta(minutes=15)
521def update_recommendation_scores(payload):
522 text_fields = [
523 User.hometown,
524 User.occupation,
525 User.education,
526 User.about_me,
527 User.my_travels,
528 User.things_i_like,
529 User.about_place,
530 User.additional_information,
531 User.pet_details,
532 User.kid_details,
533 User.housemate_details,
534 User.other_host_info,
535 User.sleeping_details,
536 User.area,
537 User.house_rules,
538 ]
539 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
541 def poor_man_gaussian():
542 """
543 Produces an approximatley std normal random variate
544 """
545 trials = 5
546 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
548 def int_(stmt):
549 return func.coalesce(cast(stmt, Integer), 0)
551 def float_(stmt):
552 return func.coalesce(cast(stmt, Float), 0.0)
554 with session_scope() as session:
555 # profile
556 profile_text = ""
557 for field in text_fields:
558 profile_text += func.coalesce(field, "")
559 text_length = func.length(profile_text)
560 home_text = ""
561 for field in home_fields:
562 home_text += func.coalesce(field, "")
563 home_length = func.length(home_text)
565 has_text = int_(text_length > 500)
566 long_text = int_(text_length > 2000)
567 has_pic = int_(User.avatar_key != None)
568 can_host = int_(User.hosting_status == HostingStatus.can_host)
569 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
570 filled_home = int_(User.last_minute != None) * int_(home_length > 200)
571 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host
573 # references
574 left_ref_expr = int_(1).label("left_reference")
575 left_refs_subquery = (
576 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
577 )
578 left_reference = int_(left_refs_subquery.c.left_reference)
579 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
580 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
581 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
582 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
583 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
584 "has_bad_ref"
585 )
586 received_ref_subquery = (
587 select(
588 Reference.to_user_id.label("user_id"),
589 has_reference_expr,
590 has_multiple_types_expr,
591 has_bad_ref_expr,
592 ref_count_expr,
593 ref_avg_expr,
594 )
595 .group_by(Reference.to_user_id)
596 .subquery()
597 )
598 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
599 has_reference = int_(received_ref_subquery.c.has_reference)
600 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
601 rating_score = float_(
602 received_ref_subquery.c.ref_avg
603 * (
604 2 * func.least(received_ref_subquery.c.ref_count, 5)
605 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
606 )
607 )
608 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
610 # activeness
611 recently_active = int_(User.last_active >= now() - timedelta(days=180))
612 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
613 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
614 messaged_lots = int_(func.count(Message.id) > 5)
615 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
616 messaging_subquery = (
617 select(Message.author_id.label("user_id"), messaging_points_subquery)
618 .where(Message.message_type == MessageType.text)
619 .group_by(Message.author_id)
620 .subquery()
621 )
622 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
624 # verification
625 cb_subquery = (
626 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
627 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
628 .where(ClusterSubscription.role == ClusterRole.admin)
629 .where(Cluster.is_official_cluster)
630 .group_by(ClusterSubscription.user_id)
631 .subquery()
632 )
633 min_node_id = cb_subquery.c.min_node_id
634 cb = int_(min_node_id >= 1)
635 wcb = int_(min_node_id == 1)
636 badge_points = {
637 "founder": 100,
638 "board_member": 20,
639 "past_board_member": 5,
640 "strong_verification": 3,
641 "volunteer": 3,
642 "past_volunteer": 2,
643 "donor": 1,
644 "phone_verified": 1,
645 }
647 badge_subquery = (
648 select(
649 UserBadge.user_id.label("user_id"),
650 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
651 )
652 .group_by(UserBadge.user_id)
653 .subquery()
654 )
656 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
658 # response rate
659 t = (
660 select(Message.conversation_id, Message.time)
661 .where(Message.message_type == MessageType.chat_created)
662 .subquery()
663 )
664 s = (
665 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
666 .group_by(Message.conversation_id, Message.author_id)
667 .subquery()
668 )
669 hr_subquery = (
670 select(
671 HostRequest.host_user_id.label("user_id"),
672 func.avg(s.c.time - t.c.time).label("avg_response_time"),
673 func.count(t.c.time).label("received"),
674 func.count(s.c.time).label("responded"),
675 float_(
676 extract(
677 "epoch",
678 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
679 )
680 / 60.0
681 ).label("response_time_33p"),
682 float_(
683 extract(
684 "epoch",
685 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
686 )
687 / 60.0
688 ).label("response_time_66p"),
689 )
690 .join(t, t.c.conversation_id == HostRequest.conversation_id)
691 .outerjoin(
692 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)
693 )
694 .group_by(HostRequest.host_user_id)
695 .subquery()
696 )
697 avg_response_time = hr_subquery.c.avg_response_time
698 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0)
699 received = hr_subquery.c.received
700 responded = hr_subquery.c.responded
701 response_time_33p = hr_subquery.c.response_time_33p
702 response_time_66p = hr_subquery.c.response_time_66p
703 response_rate = float_(responded / (1.0 * func.greatest(received, 1)))
704 # be careful with nulls
705 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0)
707 recommendation_score = (
708 profile_points
709 + ref_score
710 + activeness_points
711 + other_points
712 + response_rate_points
713 + 2 * poor_man_gaussian()
714 )
716 scores = (
717 select(User.id.label("user_id"), recommendation_score.label("score"))
718 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
719 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
720 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
721 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
722 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
723 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
724 ).subquery()
726 session.execute(
727 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
728 )
730 logger.info("Updated recommendation scores")
733update_recommendation_scores.PAYLOAD = empty_pb2.Empty
734update_recommendation_scores.SCHEDULE = timedelta(hours=24)
737def update_badges(payload):
738 with session_scope() as session:
740 def update_badge(badge_id: str, members: List[int]):
741 badge = get_badge_dict()[badge_id]
742 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
743 # in case the user ids don't exist in the db
744 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
745 # we should add the badge to these
746 add = set(actual_members) - set(user_ids)
747 # we should remove the badge from these
748 remove = set(user_ids) - set(actual_members)
749 for user_id in add:
750 user_add_badge(session, user_id, badge_id)
752 for user_id in remove:
753 user_remove_badge(session, user_id, badge_id)
755 update_badge("founder", get_static_badge_dict()["founder"])
756 update_badge("board_member", get_static_badge_dict()["board_member"])
757 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
758 update_badge(
759 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all()
760 )
761 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
762 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
763 # strong verification requires passport on file + gender/sex correspondence and date of birth match
764 update_badge(
765 "strong_verification",
766 session.execute(
767 select(User.id)
768 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
769 .where(StrongVerificationAttempt.has_strong_verification(User))
770 )
771 .scalars()
772 .all(),
773 )
776update_badges.PAYLOAD = empty_pb2.Empty
777update_badges.SCHEDULE = timedelta(minutes=15)
780def finalize_strong_verification(payload):
781 with session_scope() as session:
782 verification_attempt = session.execute(
783 select(StrongVerificationAttempt)
784 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
785 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
786 ).scalar_one()
787 response = requests.post(
788 "https://passportreader.app/api/v1/session.get",
789 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
790 json={"id": verification_attempt.iris_session_id},
791 timeout=10,
792 )
793 if response.status_code != 200:
794 raise Exception(f"Iris didn't return 200: {response.text}")
795 json_data = response.json()
796 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
797 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
798 )
799 assert verification_attempt.user_id == reference_payload.user_id
800 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
801 assert verification_attempt.iris_session_id == json_data["id"]
802 assert json_data["state"] == "APPROVED"
803 assert json_data["document_type"] == "PASSPORT"
804 verification_attempt.has_full_data = True
805 verification_attempt.passport_encrypted_data = asym_encrypt(
806 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
807 )
808 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
809 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
810 verification_attempt.has_minimal_data = True
811 verification_attempt.passport_expiry_date = date.fromisoformat(json_data["expiry_date"])
812 verification_attempt.passport_nationality = json_data["nationality"]
813 verification_attempt.passport_last_three_document_chars = json_data["document_number"][-3:]
814 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
816 session.flush()
818 user = verification_attempt.user
819 if verification_attempt.has_strong_verification(user):
820 badge_id = "strong_verification"
821 if session.execute(
822 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
823 ).scalar_one_or_none():
824 return
826 user_add_badge(session, user.id, badge_id)
829finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload