Coverage for src/couchers/jobs/handlers.py: 92%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Background job servicers
3"""
6import logging
7from datetime import timedelta
8from math import sqrt
10import requests
11from sqlalchemy import Integer
12from sqlalchemy.orm import aliased
13from sqlalchemy.sql import and_, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all
14from sqlalchemy.sql.functions import percentile_disc
16from couchers import config, email, urls
17from couchers.db import session_scope
18from couchers.email.dev import print_dev_email
19from couchers.email.smtp import send_smtp_email
20from couchers.materialized_views import refresh_materialized_views
21from couchers.models import (
22 AccountDeletionToken,
23 Cluster,
24 ClusterRole,
25 ClusterSubscription,
26 Float,
27 GroupChat,
28 GroupChatSubscription,
29 HostingStatus,
30 HostRequest,
31 LoginToken,
32 Message,
33 MessageType,
34 PasswordResetToken,
35 Reference,
36 User,
37)
38from couchers.notifications.background import handle_email_digests, handle_email_notifications, handle_notification
39from couchers.notifications.notify import notify
40from couchers.servicers.blocking import are_blocked
41from couchers.sql import couchers_select as select
42from couchers.tasks import enforce_community_memberships, send_onboarding_email, send_reference_reminder_email
43from couchers.utils import now
45logger = logging.getLogger(__name__)
48def process_send_email(payload):
49 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'")
50 # selects a "sender", which either prints the email to the logger or sends it out with SMTP
51 sender = send_smtp_email if config.config["ENABLE_EMAIL"] else print_dev_email
52 # the sender must return a models.Email object that can be added to the database
53 email = sender(
54 sender_name=payload.sender_name,
55 sender_email=payload.sender_email,
56 recipient=payload.recipient,
57 subject=payload.subject,
58 plain=payload.plain,
59 html=payload.html,
60 )
61 with session_scope() as session:
62 session.add(email)
65def process_purge_login_tokens(payload):
66 logger.info(f"Purging login tokens")
67 with session_scope() as session:
68 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False))
71def process_purge_password_reset_tokens(payload):
72 logger.info(f"Purging login tokens")
73 with session_scope() as session:
74 session.execute(
75 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False)
76 )
79def process_purge_account_deletion_tokens(payload):
80 logger.info(f"Purging account deletion tokens")
81 with session_scope() as session:
82 session.execute(
83 delete(AccountDeletionToken)
84 .where(~AccountDeletionToken.is_valid)
85 .execution_options(synchronize_session=False)
86 )
89def process_generate_message_notifications(payload):
90 """
91 Generates notifications for a message sent to a group chat
92 """
93 logger.info(f"Sending out notifications for message_id = {payload.message_id}")
95 with session_scope() as session:
96 message, group_chat = session.execute(
97 select(Message, GroupChat)
98 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
99 .where(Message.id == payload.message_id)
100 ).one()
102 if message.message_type != MessageType.text:
103 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
104 return
106 subscriptions = (
107 session.execute(
108 select(GroupChatSubscription)
109 .join(User, User.id == GroupChatSubscription.user_id)
110 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
111 .where(User.is_visible)
112 .where(User.id != message.author_id)
113 .where(GroupChatSubscription.left == None)
114 .where(not_(GroupChatSubscription.is_muted))
115 )
116 .scalars()
117 .all()
118 )
120 for subscription in subscriptions:
121 logger.info(f"Notifying user_id = {subscription.user_id}")
122 notify(
123 user_id=subscription.user_id,
124 topic="chat",
125 key=str(message.conversation_id),
126 action="message",
127 icon="message",
128 title=f"{message.author.name} sent a message in {group_chat.title}",
129 content=message.text,
130 link=urls.chat_link(chat_id=message.conversation_id),
131 )
134def process_send_message_notifications(payload):
135 """
136 Sends out email notifications for messages that have been unseen for a long enough time
137 """
138 # very crude and dumb algorithm
139 logger.info(f"Sending out email notifications for unseen messages")
141 with session_scope() as session:
142 # users who have unnotified messages older than 5 minutes in any group chat
143 users = (
144 session.execute(
145 (
146 select(User)
147 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id)
148 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
149 .where(not_(GroupChatSubscription.is_muted))
150 .where(User.is_visible)
151 .where(Message.time >= GroupChatSubscription.joined)
152 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
153 .where(Message.id > User.last_notified_message_id)
154 .where(Message.id > GroupChatSubscription.last_seen_message_id)
155 .where(Message.time < now() - timedelta(minutes=5))
156 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
157 )
158 )
159 .scalars()
160 .unique()
161 )
163 for user in users:
164 # now actually grab all the group chats, not just less than 5 min old
165 subquery = (
166 select(
167 GroupChatSubscription.group_chat_id.label("group_chat_id"),
168 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
169 func.max(Message.id).label("message_id"),
170 func.count(Message.id).label("count_unseen"),
171 )
172 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
173 .where(GroupChatSubscription.user_id == user.id)
174 .where(not_(GroupChatSubscription.is_muted))
175 .where(Message.id > user.last_notified_message_id)
176 .where(Message.id > GroupChatSubscription.last_seen_message_id)
177 .where(Message.time >= GroupChatSubscription.joined)
178 .where(Message.message_type == MessageType.text) # TODO: only text messages for now
179 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
180 .group_by(GroupChatSubscription.group_chat_id)
181 .order_by(func.max(Message.id).desc())
182 .subquery()
183 )
185 unseen_messages = session.execute(
186 select(GroupChat, Message, subquery.c.count_unseen)
187 .join(subquery, subquery.c.message_id == Message.id)
188 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id)
189 .order_by(subquery.c.message_id.desc())
190 ).all()
192 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages)
193 session.commit()
195 total_unseen_message_count = sum(count for _, _, count in unseen_messages)
197 email.enqueue_email_from_template(
198 user.email,
199 "unseen_messages",
200 template_args={
201 "user": user,
202 "total_unseen_message_count": total_unseen_message_count,
203 "unseen_messages": [
204 (group_chat, latest_message, count) for group_chat, latest_message, count in unseen_messages
205 ],
206 "group_chats_link": urls.messages_link(),
207 },
208 )
211def process_send_request_notifications(payload):
212 """
213 Sends out email notifications for unseen messages in host requests (as surfer or host)
214 """
215 logger.info(f"Sending out email notifications for unseen messages in host requests")
217 with session_scope() as session:
218 # requests where this user is surfing
219 surfing_reqs = session.execute(
220 select(User, HostRequest, func.max(Message.id))
221 .where(User.is_visible)
222 .join(HostRequest, HostRequest.surfer_user_id == User.id)
223 .join(Message, Message.conversation_id == HostRequest.conversation_id)
224 .where(Message.id > HostRequest.surfer_last_seen_message_id)
225 .where(Message.id > User.last_notified_request_message_id)
226 .where(Message.time < now() - timedelta(minutes=5))
227 .where(Message.message_type == MessageType.text)
228 .group_by(User, HostRequest)
229 ).all()
231 # where this user is hosting
232 hosting_reqs = session.execute(
233 select(User, HostRequest, func.max(Message.id))
234 .where(User.is_visible)
235 .join(HostRequest, HostRequest.host_user_id == User.id)
236 .join(Message, Message.conversation_id == HostRequest.conversation_id)
237 .where(Message.id > HostRequest.host_last_seen_message_id)
238 .where(Message.id > User.last_notified_request_message_id)
239 .where(Message.time < now() - timedelta(minutes=5))
240 .where(Message.message_type == MessageType.text)
241 .group_by(User, HostRequest)
242 ).all()
244 for user, host_request, max_message_id in surfing_reqs:
245 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
246 session.commit()
248 email.enqueue_email_from_template(
249 user.email,
250 "unseen_message_guest",
251 template_args={
252 "user": user,
253 "host_request": host_request,
254 "host_request_link": urls.host_request_link_guest(),
255 },
256 )
258 for user, host_request, max_message_id in hosting_reqs:
259 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id)
260 session.commit()
262 email.enqueue_email_from_template(
263 user.email,
264 "unseen_message_host",
265 template_args={
266 "user": user,
267 "host_request": host_request,
268 "host_request_link": urls.host_request_link_host(),
269 },
270 )
273def process_send_onboarding_emails(payload):
274 """
275 Sends out onboarding emails
276 """
277 logger.info(f"Sending out onboarding emails")
279 with session_scope() as session:
280 # first onboarding email
281 users = (
282 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all()
283 )
285 for user in users:
286 send_onboarding_email(user, email_number=1)
287 user.onboarding_emails_sent = 1
288 user.last_onboarding_email_sent = now()
289 session.commit()
291 # second onboarding email
292 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long
293 users = (
294 session.execute(
295 select(User)
296 .where(User.is_visible)
297 .where(User.onboarding_emails_sent == 1)
298 .where(now() - User.last_onboarding_email_sent > timedelta(days=7))
299 .where(User.has_completed_profile == False)
300 )
301 .scalars()
302 .all()
303 )
305 for user in users:
306 send_onboarding_email(user, email_number=2)
307 user.onboarding_emails_sent = 2
308 user.last_onboarding_email_sent = now()
309 session.commit()
312def process_send_reference_reminders(payload):
313 """
314 Sends out reminders to write references after hosting/staying
315 """
316 logger.info(f"Sending out reference reminder emails")
318 # Keep this in chronological order!
319 reference_reminder_schedule = [
320 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref)
321 # the end time to write a reference is supposed to be midnight in the host's timezone
322 # 8 pm ish on the last day of the stay
323 (1, timedelta(days=15) - timedelta(hours=20), "14 days"),
324 # 2 pm ish a week after stay
325 (2, timedelta(days=8) - timedelta(hours=14), "7 days"),
326 # 10 am ish 3 days before end of time to write ref
327 (3, timedelta(days=4) - timedelta(hours=10), "3 days"),
328 ]
330 with session_scope() as session:
331 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates
332 for reminder_no, reminder_time, reminder_text in reversed(reference_reminder_schedule):
333 user = aliased(User)
334 other_user = aliased(User)
335 # surfers needing to write a ref
336 q1 = (
337 select(literal(True), HostRequest, user, other_user)
338 .join(user, user.id == HostRequest.surfer_user_id)
339 .join(other_user, other_user.id == HostRequest.host_user_id)
340 .outerjoin(
341 Reference,
342 and_(
343 Reference.host_request_id == HostRequest.conversation_id,
344 # if no reference is found in this join, then the surfer has not written a ref
345 Reference.from_user_id == HostRequest.surfer_user_id,
346 ),
347 )
348 .where(user.is_visible)
349 .where(other_user.is_visible)
350 .where(Reference.id == None)
351 .where(HostRequest.can_write_reference)
352 .where(HostRequest.surfer_sent_reference_reminders < reminder_no)
353 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
354 )
356 # hosts needing to write a ref
357 q2 = (
358 select(literal(False), HostRequest, user, other_user)
359 .join(user, user.id == HostRequest.host_user_id)
360 .join(other_user, other_user.id == HostRequest.surfer_user_id)
361 .outerjoin(
362 Reference,
363 and_(
364 Reference.host_request_id == HostRequest.conversation_id,
365 # if no reference is found in this join, then the host has not written a ref
366 Reference.from_user_id == HostRequest.host_user_id,
367 ),
368 )
369 .where(user.is_visible)
370 .where(other_user.is_visible)
371 .where(Reference.id == None)
372 .where(HostRequest.can_write_reference)
373 .where(HostRequest.host_sent_reference_reminders < reminder_no)
374 .where(HostRequest.end_time_to_write_reference - reminder_time < now())
375 )
377 union = union_all(q1, q2).subquery()
378 union = select(
379 union.c[0].label("surfed"),
380 aliased(HostRequest, union),
381 aliased(user, union),
382 aliased(other_user, union),
383 )
384 reference_reminders = session.execute(union).all()
386 for surfed, host_request, user, other_user in reference_reminders:
387 # checked in sql
388 assert user.is_visible
389 if not are_blocked(session, user.id, other_user.id):
390 send_reference_reminder_email(user, other_user, host_request, surfed, reminder_text)
391 if surfed:
392 host_request.surfer_sent_reference_reminders = reminder_no
393 else:
394 host_request.host_sent_reference_reminders = reminder_no
395 session.commit()
398def process_add_users_to_email_list(payload):
399 if not config.config["MAILCHIMP_ENABLED"]:
400 logger.info(f"Not adding users to mailing list")
401 return
403 logger.info(f"Adding users to mailing list")
405 with session_scope() as session:
406 users = (
407 session.execute(select(User).where(User.is_visible).where(User.added_to_mailing_list == False).limit(100))
408 .scalars()
409 .all()
410 )
412 if not users:
413 logger.info(f"No users to add to mailing list")
414 return
416 auth = ("apikey", config.config["MAILCHIMP_API_KEY"])
418 body = {
419 "members": [
420 {
421 "email_address": user.email,
422 "status_if_new": "subscribed",
423 "status": "subscribed",
424 "merge_fields": {
425 "FNAME": user.name,
426 },
427 }
428 for user in users
429 ]
430 }
432 dc = config.config["MAILCHIMP_DC"]
433 list_id = config.config["MAILCHIMP_LIST_ID"]
434 r = requests.post(f"https://{dc}.api.mailchimp.com/3.0/lists/{list_id}", auth=auth, json=body)
435 if r.status_code == 200:
436 for user in users:
437 user.added_to_mailing_list = True
438 session.commit()
439 else:
440 raise Exception("Failed to add users to mailing list")
443def process_enforce_community_membership(payload):
444 enforce_community_memberships()
447def process_handle_notification(payload):
448 handle_notification(payload.notification_id)
451def process_handle_email_notifications(payload):
452 handle_email_notifications()
455def process_handle_email_digests(payload):
456 handle_email_digests()
459def process_update_recommendation_scores(payload):
460 text_fields = [
461 User.hometown,
462 User.occupation,
463 User.education,
464 User.about_me,
465 User.my_travels,
466 User.things_i_like,
467 User.about_place,
468 User.additional_information,
469 User.pet_details,
470 User.kid_details,
471 User.housemate_details,
472 User.other_host_info,
473 User.sleeping_details,
474 User.area,
475 User.house_rules,
476 ]
477 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules]
479 def poor_man_gaussian():
480 """
481 Produces an approximatley std normal random variate
482 """
483 trials = 5
484 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12)
486 def int_(stmt):
487 return func.coalesce(cast(stmt, Integer), 0)
489 def float_(stmt):
490 return func.coalesce(cast(stmt, Float), 0.0)
492 with session_scope() as session:
493 # profile
494 profile_text = ""
495 for field in text_fields:
496 profile_text += func.coalesce(field, "")
497 text_length = func.length(profile_text)
498 home_text = ""
499 for field in home_fields:
500 home_text += func.coalesce(field, "")
501 home_length = func.length(home_text)
503 has_text = int_(text_length > 500)
504 long_text = int_(text_length > 2000)
505 has_pic = int_(User.avatar_key != None)
506 can_host = int_(User.hosting_status == HostingStatus.can_host)
507 cant_host = int_(User.hosting_status == HostingStatus.cant_host)
508 filled_home = int_(User.last_minute != None) * int_(home_length > 200)
509 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host
511 # references
512 left_ref_expr = int_(1).label("left_reference")
513 left_refs_subquery = (
514 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery()
515 )
516 left_reference = int_(left_refs_subquery.c.left_reference)
517 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference")
518 ref_count_expr = int_(func.count(Reference.id)).label("ref_count")
519 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg")
520 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types")
521 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label(
522 "has_bad_ref"
523 )
524 received_ref_subquery = (
525 select(
526 Reference.to_user_id.label("user_id"),
527 has_reference_expr,
528 has_multiple_types_expr,
529 has_bad_ref_expr,
530 ref_count_expr,
531 ref_avg_expr,
532 )
533 .group_by(Reference.to_user_id)
534 .subquery()
535 )
536 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types)
537 has_reference = int_(received_ref_subquery.c.has_reference)
538 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref)
539 rating_score = float_(
540 received_ref_subquery.c.ref_avg
541 * (
542 2 * func.least(received_ref_subquery.c.ref_count, 5)
543 + func.greatest(received_ref_subquery.c.ref_count - 5, 0)
544 )
545 )
546 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score
548 # activeness
549 recently_active = int_(User.last_active >= now() - timedelta(days=180))
550 very_recently_active = int_(User.last_active >= now() - timedelta(days=14))
551 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14))
552 messaged_lots = int_(func.count(Message.id) > 5)
553 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points")
554 messaging_subquery = (
555 select(Message.author_id.label("user_id"), messaging_points_subquery)
556 .where(Message.message_type == MessageType.text)
557 .group_by(Message.author_id)
558 .subquery()
559 )
560 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points)
562 # verification
563 phone_verified = int_(User.phone_is_verified)
564 cb_subquery = (
565 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id"))
566 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id)
567 .where(ClusterSubscription.role == ClusterRole.admin)
568 .where(Cluster.is_official_cluster)
569 .group_by(ClusterSubscription.user_id)
570 .subquery()
571 )
572 min_node_id = cb_subquery.c.min_node_id
573 cb = int_(min_node_id >= 1)
574 f = int_(User.id <= 2)
575 wcb = int_(min_node_id == 1)
576 verification_points = 0.0 + 100 * f + 10 * wcb + 5 * cb
578 # response rate
579 t = (
580 select(Message.conversation_id, Message.time)
581 .where(Message.message_type == MessageType.chat_created)
582 .subquery()
583 )
584 s = (
585 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
586 .group_by(Message.conversation_id, Message.author_id)
587 .subquery()
588 )
589 hr_subquery = (
590 select(
591 HostRequest.host_user_id.label("user_id"),
592 func.avg(s.c.time - t.c.time).label("avg_response_time"),
593 func.count(t.c.time).label("received"),
594 func.count(s.c.time).label("responded"),
595 float_(
596 extract(
597 "epoch",
598 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
599 )
600 / 60.0
601 ).label("response_time_33p"),
602 float_(
603 extract(
604 "epoch",
605 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))),
606 )
607 / 60.0
608 ).label("response_time_66p"),
609 )
610 .join(t, t.c.conversation_id == HostRequest.conversation_id)
611 .outerjoin(
612 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)
613 )
614 .group_by(HostRequest.host_user_id)
615 .subquery()
616 )
617 avg_response_time = hr_subquery.c.avg_response_time
618 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0)
619 received = hr_subquery.c.received
620 responded = hr_subquery.c.responded
621 response_time_33p = hr_subquery.c.response_time_33p
622 response_time_66p = hr_subquery.c.response_time_66p
623 response_rate = float_(responded / (1.0 * func.greatest(received, 1)))
624 # be careful with nulls
625 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0)
627 recommendation_score = (
628 profile_points
629 + ref_score
630 + activeness_points
631 + verification_points
632 + response_rate_points
633 + 2 * poor_man_gaussian()
634 )
636 scores = (
637 select(User.id.label("user_id"), recommendation_score.label("score"))
638 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id)
639 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id)
640 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id)
641 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id)
642 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id)
643 ).subquery()
645 session.execute(
646 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id)
647 )
649 logger.info("Updated recommendation scores")
652def process_refresh_materialized_views(payload):
653 refresh_materialized_views()