Coverage for src/couchers/jobs/handlers.py: 98%
311 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1"""
2Background job servicers
3"""
5import logging
6from datetime import date, timedelta
7from math import sqrt
8from types import SimpleNamespace
10import requests
11from google.protobuf import empty_pb2
12from sqlalchemy import Integer
13from sqlalchemy.orm import aliased
14from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all
15from sqlalchemy.sql.functions import percentile_disc
17from couchers.config import config
18from couchers.crypto import asym_encrypt, b64decode, simple_decrypt
19from couchers.db import session_scope
20from couchers.email.dev import print_dev_email
21from couchers.email.smtp import send_smtp_email
22from couchers.helpers.badges import user_add_badge, user_remove_badge
23from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid
24from couchers.metrics import strong_verification_completions_counter
25from couchers.models import (
26 AccountDeletionToken,
27 Cluster,
28 ClusterRole,
29 ClusterSubscription,
30 Float,
31 GroupChat,
32 GroupChatSubscription,
33 HostingStatus,
34 HostRequest,
35 Invoice,
36 LoginToken,
37 Message,
38 MessageType,
39 PassportSex,
40 PasswordResetToken,
41 Reference,
42 StrongVerificationAttempt,
43 StrongVerificationAttemptStatus,
44 User,
45 UserBadge,
46)
47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification
48from couchers.notifications.notify import notify
49from couchers.resources import get_badge_dict, get_static_badge_dict
50from couchers.servicers.api import user_model_to_pb
51from couchers.servicers.blocking import are_blocked
52from couchers.servicers.conversations import generate_message_notifications
53from couchers.servicers.events import (
54 generate_event_cancel_notifications,
55 generate_event_create_notifications,
56 generate_event_delete_notifications,
57 generate_event_update_notifications,
58)
59from couchers.servicers.requests import host_request_to_pb
60from couchers.sql import couchers_select as select
61from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships
62from couchers.tasks import send_duplicate_strong_verification_email
63from couchers.utils import now
64from proto import notification_data_pb2
65from proto.internal import jobs_pb2, verification_pb2
67logger = logging.getLogger(__name__)
69# these were straight up imported
70handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload
72send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload
74handle_email_digests.PAYLOAD = empty_pb2.Empty
75handle_email_digests.SCHEDULE = timedelta(minutes=15)
77generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload
79generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload
81generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload
83generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload
85generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload
88refresh_materialized_views.PAYLOAD = empty_pb2.Empty
89refresh_materialized_views.SCHEDULE = timedelta(minutes=5)
91refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty
92refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30)
95def send_email(payload):
96 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
97 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
98 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email
99 # the sender must return a models.Email object that can be added to the database
100 email = sender(
101 sender_name=payload.sender_name,
102 sender_email=payload.sender_email,
103 recipient=payload.recipient,
104 subject=payload.subject,
105 plain=payload.plain,
106 html=payload.html,
107 list_unsubscribe_header=payload.list_unsubscribe_header,
108 source_data=payload.source_data,
109 )
110 with session_scope() as session:
111 session.add(email)
114send_email.PAYLOAD = jobs_pb2.SendEmailPayload
117def purge_login_tokens(payload):
118 logger.info("Purging login tokens")
119 with session_scope() as session:
120 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
123purge_login_tokens.PAYLOAD = empty_pb2.Empty
124purge_login_tokens.SCHEDULE = timedelta(hours=24)
127def purge_password_reset_tokens(payload):
128 logger.info("Purging login tokens")
129 with session_scope() as session:
130 session.execute(
131 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
132 )
135purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty
136purge_password_reset_tokens.SCHEDULE = timedelta(hours=24)
139def purge_account_deletion_tokens(payload):
140 logger.info("Purging account deletion tokens")
141 with session_scope() as session:
142 session.execute(
143 delete(AccountDeletionToken)
144 .where(~AccountDeletionToken.is_valid)
145 .execution_options(synchronize_session=False)
146 )
149purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty
150purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24)
153def send_message_notifications(payload):
154 """
155 Sends out email notifications for messages that have been unseen for a long enough time
156 """
157 # very crude and dumb algorithm
158 logger.info("Sending out email notifications for unseen messages")
160 with session_scope() as session:
161 # users who have unnotified messages older than 5 minutes in any group chat
162 users = (
163 session.execute(
164 select(User)
165 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
166 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
167 .where(not_(GroupChatSubscription.is_muted))
168 .where(User.is_visible)
169 .where(Message.time >= GroupChatSubscription.joined)
170 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
171 .where(Message.id > User.last_notified_message_id)
172 .where(Message.id > GroupChatSubscription.last_seen_message_id)
173 .where(Message.time < now() - timedelta(minutes=5))
174 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
175 )
176 .scalars()
177 .unique()
178 )
180 for user in users:
181 # now actually grab all the group chats, not just less than 5 min old
182 subquery = (
183 select(
184 GroupChatSubscription.group_chat_id.label("group_chat_id"),
185 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
186 func.max(Message.id).label("message_id"),
187 func.count(Message.id).label("count_unseen"),
188 )
189 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
190 .where(GroupChatSubscription.user_id == user.id)
191 .where(not_(GroupChatSubscription.is_muted))
192 .where(Message.id > user.last_notified_message_id)
193 .where(Message.id > GroupChatSubscription.last_seen_message_id)
194 .where(Message.time >= GroupChatSubscription.joined)
195 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
196 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
197 .group_by(GroupChatSubscription.group_chat_id)
198 .order_by(func.max(Message.id).desc())
199 .subquery()
200 )
202 unseen_messages = session.execute(
203 select(GroupChat, Message, subquery.c.count_unseen)
204 .join(subquery, subquery.c.message_id == Message.id)
205 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
206 .order_by(subquery.c.message_id.desc())
207 ).all()
209 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
211 def format_title(message, group_chat, count_unseen):
212 if group_chat.is_dm:
213 return f"You missed {count_unseen} message(s) from {message.author.name}"
214 else:
215 return f"You missed {count_unseen} message(s) in {group_chat.title}"
217 notify(
218 session,
219 user_id=user.id,
220 topic_action="chat:missed_messages",
221 data=notification_data_pb2.ChatMissedMessages(
222 messages=[
223 notification_data_pb2.ChatMessage(
224 author=user_model_to_pb(
225 message.author,
226 session,
227 SimpleNamespace(user_id=user.id),
228 ),
229 message=format_title(message, group_chat, count_unseen),
230 text=message.text,
231 group_chat_id=message.conversation_id,
232 )
233 for group_chat, message, count_unseen in unseen_messages
234 ],
235 ),
236 )
237 session.commit()
240send_message_notifications.PAYLOAD = empty_pb2.Empty
241send_message_notifications.SCHEDULE = timedelta(minutes=3)
244def send_request_notifications(payload):
245 """
246 Sends out email notifications for unseen messages in host requests (as surfer or host)
247 """
248 logger.info("Sending out email notifications for unseen messages in host requests")
250 with session_scope() as session:
251 # requests where this user is surfing
252 surfing_reqs = session.execute(
253 select(User, HostRequest, func.max(Message.id))
254 .where(User.is_visible)
255 .join(HostRequest, HostRequest.surfer_user_id == User.id)
256 .join(Message, Message.conversation_id == HostRequest.conversation_id)
257 .where(Message.id > HostRequest.surfer_last_seen_message_id)
258 .where(Message.id > User.last_notified_request_message_id)
259 .where(Message.time < now() - timedelta(minutes=5))
260 .where(Message.message_type == MessageType.text)
261 .group_by(User, HostRequest)
262 ).all()
264 # where this user is hosting
265 hosting_reqs = session.execute(
266 select(User, HostRequest, func.max(Message.id))
267 .where(User.is_visible)
268 .join(HostRequest, HostRequest.host_user_id == User.id)
269 .join(Message, Message.conversation_id == HostRequest.conversation_id)
270 .where(Message.id > HostRequest.host_last_seen_message_id)
271 .where(Message.id > User.last_notified_request_message_id)
272 .where(Message.time < now() - timedelta(minutes=5))
273 .where(Message.message_type == MessageType.text)
274 .group_by(User, HostRequest)
275 ).all()
277 for user, host_request, max_message_id in surfing_reqs:
278 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
279 session.flush()
281 context = SimpleNamespace(user_id=user.id)
282 notify(
283 session,
284 user_id=user.id,
285 topic_action="host_request:missed_messages",
286 key=host_request.conversation_id,
287 data=notification_data_pb2.HostRequestMissedMessages(
288 host_request=host_request_to_pb(host_request, session, context),
289 user=user_model_to_pb(host_request.host, session, context),
290 am_host=False,
291 ),
292 )
294 for user, host_request, max_message_id in hosting_reqs:
295 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
296 session.flush()
298 context = SimpleNamespace(user_id=user.id)
299 notify(
300 session,
301 user_id=user.id,
302 topic_action="host_request:missed_messages",
303 key=host_request.conversation_id,
304 data=notification_data_pb2.HostRequestMissedMessages(
305 host_request=host_request_to_pb(host_request, session, context),
306 user=user_model_to_pb(host_request.surfer, session, context),
307 am_host=True,
308 ),
309 )
312send_request_notifications.PAYLOAD = empty_pb2.Empty
313send_request_notifications.SCHEDULE = timedelta(minutes=3)
316def send_onboarding_emails(payload):
317 """
318 Sends out onboarding emails
319 """
320 logger.info("Sending out onboarding emails")
322 with session_scope() as session:
323 # first onboarding email
324 users = (
325 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
326 )
328 for user in users:
329 notify(
330 session,
331 user_id=user.id,
332 topic_action="onboarding:reminder",
333 key="1",
334 )
335 user.onboarding_emails_sent = 1
336 user.last_onboarding_email_sent = now()
337 session.commit()
339 # second onboarding email
340 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
341 users = (
342 session.execute(
343 select(User)
344 .where(User.is_visible)
345 .where(User.onboarding_emails_sent == 1)
346 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
347 .where(User.has_completed_profile == False)
348 )
349 .scalars()
350 .all()
351 )
353 for user in users:
354 notify(
355 session,
356 user_id=user.id,
357 topic_action="onboarding:reminder",
358 key="2",
359 )
360 user.onboarding_emails_sent = 2
361 user.last_onboarding_email_sent = now()
362 session.commit()
365send_onboarding_emails.PAYLOAD = empty_pb2.Empty
366send_onboarding_emails.SCHEDULE = timedelta(hours=1)
369def send_reference_reminders(payload):
370 """
371 Sends out reminders to write references after hosting/staying
372 """
373 logger.info("Sending out reference reminder emails")
375 # Keep this in chronological order!
376 reference_reminder_schedule = [
377 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
378 # the end time to write a reference is supposed to be midnight in the host's timezone
379 # 8 pm ish on the last day of the stay
380 (1, timedelta(days=15) - timedelta(hours=20), 14),
381 # 2 pm ish a week after stay
382 (2, timedelta(days=8) - timedelta(hours=14), 7),
383 # 10 am ish 3 days before end of time to write ref
384 (3, timedelta(days=4) - timedelta(hours=10), 3),
385 ]
387 with session_scope() as session:
388 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
389 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule):
390 user = aliased(User)
391 other_user = aliased(User)
392 # surfers needing to write a ref
393 q1 = (
394 select(literal(True), HostRequest, user, other_user)
395 .join(user, user.id == HostRequest.surfer_user_id)
396 .join(other_user, other_user.id == HostRequest.host_user_id)
397 .outerjoin(
398 Reference,
399 and_(
400 Reference.host_request_id == HostRequest.conversation_id,
401 # if no reference is found in this join, then the surfer has not written a ref
402 Reference.from_user_id == HostRequest.surfer_user_id,
403 ),
404 )
405 .where(user.is_visible)
406 .where(other_user.is_visible)
407 .where(Reference.id == None)
408 .where(HostRequest.can_write_reference)
409 .where(HostRequest.surfer_sent_reference_reminders < reminder_number)
410 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
411 .where(HostRequest.surfer_reason_didnt_meetup == None)
412 )
414 # hosts needing to write a ref
415 q2 = (
416 select(literal(False), HostRequest, user, other_user)
417 .join(user, user.id == HostRequest.host_user_id)
418 .join(other_user, other_user.id == HostRequest.surfer_user_id)
419 .outerjoin(
420 Reference,
421 and_(
422 Reference.host_request_id == HostRequest.conversation_id,
423 # if no reference is found in this join, then the host has not written a ref
424 Reference.from_user_id == HostRequest.host_user_id,
425 ),
426 )
427 .where(user.is_visible)
428 .where(other_user.is_visible)
429 .where(Reference.id == None)
430 .where(HostRequest.can_write_reference)
431 .where(HostRequest.host_sent_reference_reminders < reminder_number)
432 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
433 .where(HostRequest.host_reason_didnt_meetup == None)
434 )
436 union = union_all(q1, q2).subquery()
437 union = select(
438 union.c[0].label("surfed"),
439 aliased(HostRequest, union),
440 aliased(user, union),
441 aliased(other_user, union),
442 )
443 reference_reminders = session.execute(union).all()
445 for surfed, host_request, user, other_user in reference_reminders:
446 # checked in sql
447 assert user.is_visible
448 if not are_blocked(session, user.id, other_user.id):
449 context = SimpleNamespace(user_id=user.id)
450 notify(
451 session,
452 user_id=user.id,
453 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted",
454 data=notification_data_pb2.ReferenceReminder(
455 host_request_id=host_request.conversation_id,
456 other_user=user_model_to_pb(other_user, session, context),
457 days_left=reminder_days_left,
458 ),
459 )
460 if surfed:
461 host_request.surfer_sent_reference_reminders = reminder_number
462 else:
463 host_request.host_sent_reference_reminders = reminder_number
464 session.commit()
467send_reference_reminders.PAYLOAD = empty_pb2.Empty
468send_reference_reminders.SCHEDULE = timedelta(hours=1)
471def add_users_to_email_list(payload):
472 if not config["LISTMONK_ENABLED"]:
473 logger.info("Not adding users to mailing list")
474 return
476 logger.info("Adding users to mailing list")
478 while True:
479 with session_scope() as session:
480 user = session.execute(
481 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1)
482 ).scalar_one_or_none()
483 if not user:
484 logger.info("Finished adding users to mailing list")
485 return
487 if user.opt_out_of_newsletter:
488 user.in_sync_with_newsletter = True
489 session.commit()
490 continue
492 r = requests.post(
493 config["LISTMONK_BASE_URL"] + "/api/subscribers",
494 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]),
495 json={
496 "email": user.email,
497 "name": user.name,
498 "lists": [config["LISTMONK_LIST_ID"]],
499 "preconfirm_subscriptions": True,
500 "attribs": {"couchers_user_id": user.id},
501 "status": "enabled",
502 },
503 timeout=10,
504 )
505 # the API returns if the user is already subscribed
506 if r.status_code == 200 or r.status_code == 409:
507 user.in_sync_with_newsletter = True
508 session.commit()
509 else:
510 raise Exception("Failed to add users to mailing list")
513add_users_to_email_list.PAYLOAD = empty_pb2.Empty
514add_users_to_email_list.SCHEDULE = timedelta(hours=1)
517def enforce_community_membership(payload):
518 tasks_enforce_community_memberships()
521enforce_community_membership.PAYLOAD = empty_pb2.Empty
522enforce_community_membership.SCHEDULE = timedelta(minutes=15)
525def update_recommendation_scores(payload):
526 text_fields = [
527 User.hometown,
528 User.occupation,
529 User.education,
530 User.about_me,
531 User.things_i_like,
532 User.about_place,
533 User.additional_information,
534 User.pet_details,
535 User.kid_details,
536 User.housemate_details,
537 User.other_host_info,
538 User.sleeping_details,
539 User.area,
540 User.house_rules,
541 ]
542 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
544 def poor_man_gaussian():
545 """
546 Produces an approximatley std normal random variate
547 """
548 trials = 5
549 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
551 def int_(stmt):
552 return func.coalesce(cast(stmt, Integer), 0)
554 def float_(stmt):
555 return func.coalesce(cast(stmt, Float), 0.0)
557 with session_scope() as session:
558 # profile
559 profile_text = ""
560 for field in text_fields:
561 profile_text += func.coalesce(field, "")
562 text_length = func.length(profile_text)
563 home_text = ""
564 for field in home_fields:
565 home_text += func.coalesce(field, "")
566 home_length = func.length(home_text)
568 has_text = int_(text_length > 500)
569 long_text = int_(text_length > 2000)
570 has_pic = int_(User.avatar_key != None)
571 can_host = int_(User.hosting_status == HostingStatus.can_host)
572 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
573 filled_home = int_(User.last_minute != None) * int_(home_length > 200)
574 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host
576 # references
577 left_ref_expr = int_(1).label("left_reference")
578 left_refs_subquery = (
579 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
580 )
581 left_reference = int_(left_refs_subquery.c.left_reference)
582 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
583 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
584 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
585 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
586 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
587 "has_bad_ref"
588 )
589 received_ref_subquery = (
590 select(
591 Reference.to_user_id.label("user_id"),
592 has_reference_expr,
593 has_multiple_types_expr,
594 has_bad_ref_expr,
595 ref_count_expr,
596 ref_avg_expr,
597 )
598 .group_by(Reference.to_user_id)
599 .subquery()
600 )
601 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
602 has_reference = int_(received_ref_subquery.c.has_reference)
603 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
604 rating_score = float_(
605 received_ref_subquery.c.ref_avg
606 * (
607 2 * func.least(received_ref_subquery.c.ref_count, 5)
608 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
609 )
610 )
611 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
613 # activeness
614 recently_active = int_(User.last_active >= now() - timedelta(days=180))
615 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
616 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
617 messaged_lots = int_(func.count(Message.id) > 5)
618 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
619 messaging_subquery = (
620 select(Message.author_id.label("user_id"), messaging_points_subquery)
621 .where(Message.message_type == MessageType.text)
622 .group_by(Message.author_id)
623 .subquery()
624 )
625 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
627 # verification
628 cb_subquery = (
629 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
630 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
631 .where(ClusterSubscription.role == ClusterRole.admin)
632 .where(Cluster.is_official_cluster)
633 .group_by(ClusterSubscription.user_id)
634 .subquery()
635 )
636 min_node_id = cb_subquery.c.min_node_id
637 cb = int_(min_node_id >= 1)
638 wcb = int_(min_node_id == 1)
639 badge_points = {
640 "founder": 100,
641 "board_member": 20,
642 "past_board_member": 5,
643 "strong_verification": 3,
644 "volunteer": 3,
645 "past_volunteer": 2,
646 "donor": 1,
647 "phone_verified": 1,
648 }
650 badge_subquery = (
651 select(
652 UserBadge.user_id.label("user_id"),
653 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"),
654 )
655 .group_by(UserBadge.user_id)
656 .subquery()
657 )
659 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points)
661 # response rate
662 t = (
663 select(Message.conversation_id, Message.time)
664 .where(Message.message_type == MessageType.chat_created)
665 .subquery()
666 )
667 s = (
668 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
669 .group_by(Message.conversation_id, Message.author_id)
670 .subquery()
671 )
672 hr_subquery = (
673 select(
674 HostRequest.host_user_id.label("user_id"),
675 func.avg(s.c.time - t.c.time).label("avg_response_time"),
676 func.count(t.c.time).label("received"),
677 func.count(s.c.time).label("responded"),
678 float_(
679 extract(
680 "epoch",
681 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
682 )
683 / 60.0
684 ).label("response_time_33p"),
685 float_(
686 extract(
687 "epoch",
688 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
689 )
690 / 60.0
691 ).label("response_time_66p"),
692 )
693 .join(t, t.c.conversation_id == HostRequest.conversation_id)
694 .outerjoin(
695 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)
696 )
697 .group_by(HostRequest.host_user_id)
698 .subquery()
699 )
700 avg_response_time = hr_subquery.c.avg_response_time
701 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0)
702 received = hr_subquery.c.received
703 responded = hr_subquery.c.responded
704 response_time_33p = hr_subquery.c.response_time_33p
705 response_time_66p = hr_subquery.c.response_time_66p
706 response_rate = float_(responded / (1.0 * func.greatest(received, 1)))
707 # be careful with nulls
708 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0)
710 recommendation_score = (
711 profile_points
712 + ref_score
713 + activeness_points
714 + other_points
715 + response_rate_points
716 + 2 * poor_man_gaussian()
717 )
719 scores = (
720 select(User.id.label("user_id"), recommendation_score.label("score"))
721 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
722 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
723 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id)
724 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
725 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
726 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
727 ).subquery()
729 session.execute(
730 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
731 )
733 logger.info("Updated recommendation scores")
736update_recommendation_scores.PAYLOAD = empty_pb2.Empty
737update_recommendation_scores.SCHEDULE = timedelta(hours=24)
740def update_badges(payload):
741 with session_scope() as session:
743 def update_badge(badge_id: str, members: list[int]):
744 badge = get_badge_dict()[badge_id]
745 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all()
746 # in case the user ids don't exist in the db
747 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all()
748 # we should add the badge to these
749 add = set(actual_members) - set(user_ids)
750 # we should remove the badge from these
751 remove = set(user_ids) - set(actual_members)
752 for user_id in add:
753 user_add_badge(session, user_id, badge_id)
755 for user_id in remove:
756 user_remove_badge(session, user_id, badge_id)
758 update_badge("founder", get_static_badge_dict()["founder"])
759 update_badge("board_member", get_static_badge_dict()["board_member"])
760 update_badge("past_board_member", get_static_badge_dict()["past_board_member"])
761 update_badge(
762 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all()
763 )
764 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all())
765 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all())
766 # strong verification requires passport on file + gender/sex correspondence and date of birth match
767 update_badge(
768 "strong_verification",
769 session.execute(
770 select(User.id)
771 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id)
772 .where(StrongVerificationAttempt.has_strong_verification(User))
773 )
774 .scalars()
775 .all(),
776 )
779update_badges.PAYLOAD = empty_pb2.Empty
780update_badges.SCHEDULE = timedelta(minutes=15)
783def finalize_strong_verification(payload):
784 with session_scope() as session:
785 verification_attempt = session.execute(
786 select(StrongVerificationAttempt)
787 .where(StrongVerificationAttempt.id == payload.verification_attempt_id)
788 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend)
789 ).scalar_one()
790 response = requests.post(
791 "https://passportreader.app/api/v1/session.get",
792 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
793 json={"id": verification_attempt.iris_session_id},
794 timeout=10,
795 )
796 if response.status_code != 200:
797 raise Exception(f"Iris didn't return 200: {response.text}")
798 json_data = response.json()
799 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
800 simple_decrypt("iris_callback", b64decode(json_data["reference"]))
801 )
802 assert verification_attempt.user_id == reference_payload.user_id
803 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token
804 assert verification_attempt.iris_session_id == json_data["id"]
805 assert json_data["state"] == "APPROVED"
807 if json_data["document_type"] != "PASSPORT":
808 verification_attempt.status = StrongVerificationAttemptStatus.failed
809 notify(
810 session,
811 user_id=verification_attempt.user_id,
812 topic_action="verification:sv_fail",
813 data=notification_data_pb2.VerificationSVFail(
814 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT
815 ),
816 )
817 return
819 assert json_data["document_type"] == "PASSPORT"
821 expiry_date = date.fromisoformat(json_data["expiry_date"])
822 nationality = json_data["nationality"]
823 last_three_document_chars = json_data["document_number"][-3:]
825 existing_attempt = session.execute(
826 select(StrongVerificationAttempt)
827 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date)
828 .where(StrongVerificationAttempt.passport_nationality == nationality)
829 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars)
830 .order_by(StrongVerificationAttempt.id)
831 .limit(1)
832 ).scalar_one_or_none()
834 verification_attempt.has_minimal_data = True
835 verification_attempt.passport_expiry_date = expiry_date
836 verification_attempt.passport_nationality = nationality
837 verification_attempt.passport_last_three_document_chars = last_three_document_chars
839 if existing_attempt:
840 verification_attempt.status = StrongVerificationAttemptStatus.duplicate
842 if existing_attempt.user_id != verification_attempt.user_id:
843 session.flush()
844 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt)
846 notify(
847 session,
848 user_id=verification_attempt.user_id,
849 topic_action="verification:sv_fail",
850 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE),
851 )
852 return
854 verification_attempt.has_full_data = True
855 verification_attempt.passport_encrypted_data = asym_encrypt(
856 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8")
857 )
858 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"])
859 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()]
860 verification_attempt.status = StrongVerificationAttemptStatus.succeeded
862 session.flush()
864 strong_verification_completions_counter.inc()
866 user = verification_attempt.user
867 if verification_attempt.has_strong_verification(user):
868 badge_id = "strong_verification"
869 if session.execute(
870 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id)
871 ).scalar_one_or_none():
872 return
874 user_add_badge(session, user.id, badge_id, do_notify=False)
875 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success")
876 else:
877 notify(
878 session,
879 user_id=verification_attempt.user_id,
880 topic_action="verification:sv_fail",
881 data=notification_data_pb2.VerificationSVFail(
882 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER
883 ),
884 )
887finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload