Coverage for src/tests/test_bg_jobs.py: 100%
458 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-05 23:21 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-05 23:21 +0000
1from datetime import timedelta
2from unittest.mock import call, patch
4import pytest
5import requests
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import delete, func
9import couchers.jobs.worker
10from couchers.config import config
11from couchers.constants import HOST_REQUEST_MAX_REMINDERS, HOST_REQUEST_REMINDER_INTERVAL
12from couchers.crypto import urlsafe_secure_token
13from couchers.db import session_scope
14from couchers.email import queue_email
15from couchers.email.dev import print_dev_email
16from couchers.jobs.enqueue import queue_job
17from couchers.jobs.handlers import (
18 add_users_to_email_list,
19 send_host_request_reminders,
20 send_message_notifications,
21 send_onboarding_emails,
22 send_reference_reminders,
23 send_request_notifications,
24 update_badges,
25 update_recommendation_scores,
26)
27from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs
28from couchers.metrics import create_prometheus_server
29from couchers.models import (
30 AccountDeletionToken,
31 BackgroundJob,
32 BackgroundJobState,
33 Email,
34 HostRequestStatus,
35 LoginToken,
36 Message,
37 MessageType,
38 PasswordResetToken,
39 UserBadge,
40)
41from couchers.sql import couchers_select as select
42from couchers.utils import now, today
43from proto import conversations_pb2, requests_pb2
44from tests.test_fixtures import ( # noqa
45 auth_api_session,
46 conversations_session,
47 db,
48 generate_user,
49 make_friends,
50 make_user_block,
51 process_jobs,
52 push_collector,
53 requests_session,
54 testconfig,
55)
56from tests.test_references import create_host_reference, create_host_request, create_host_request_by_date
59def now_5_min_in_future():
60 return now() + timedelta(minutes=5)
63@pytest.fixture(autouse=True)
64def _(testconfig):
65 pass
68def _check_job_counter(job, status, attempt, exception):
69 metrics_string = requests.get("http://localhost:8000").text
70 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"'
71 assert string_to_check in metrics_string
74def test_email_job(db):
75 with session_scope() as session:
76 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html")
78 def mock_print_dev_email(
79 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data
80 ):
81 assert sender_name == "sender_name"
82 assert sender_email == "sender_email"
83 assert recipient == "recipient"
84 assert subject == "subject"
85 assert plain == "plain"
86 assert html == "html"
87 return print_dev_email(
88 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data
89 )
91 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email):
92 process_job()
94 with session_scope() as session:
95 assert (
96 session.execute(
97 select(func.count())
98 .select_from(BackgroundJob)
99 .where(BackgroundJob.state == BackgroundJobState.completed)
100 ).scalar_one()
101 == 1
102 )
103 assert (
104 session.execute(
105 select(func.count())
106 .select_from(BackgroundJob)
107 .where(BackgroundJob.state != BackgroundJobState.completed)
108 ).scalar_one()
109 == 0
110 )
113def test_purge_login_tokens(db):
114 user, api_token = generate_user()
116 with session_scope() as session:
117 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now())
118 session.add(login_token)
119 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1
121 queue_job(session, "purge_login_tokens", empty_pb2.Empty())
122 process_job()
124 with session_scope() as session:
125 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0
127 with session_scope() as session:
128 assert (
129 session.execute(
130 select(func.count())
131 .select_from(BackgroundJob)
132 .where(BackgroundJob.state == BackgroundJobState.completed)
133 ).scalar_one()
134 == 1
135 )
136 assert (
137 session.execute(
138 select(func.count())
139 .select_from(BackgroundJob)
140 .where(BackgroundJob.state != BackgroundJobState.completed)
141 ).scalar_one()
142 == 0
143 )
146def test_purge_password_reset_tokens(db):
147 user, api_token = generate_user()
149 with session_scope() as session:
150 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now())
151 session.add(password_reset_token)
152 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1
154 queue_job(session, "purge_password_reset_tokens", empty_pb2.Empty())
155 process_job()
157 with session_scope() as session:
158 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0
160 with session_scope() as session:
161 assert (
162 session.execute(
163 select(func.count())
164 .select_from(BackgroundJob)
165 .where(BackgroundJob.state == BackgroundJobState.completed)
166 ).scalar_one()
167 == 1
168 )
169 assert (
170 session.execute(
171 select(func.count())
172 .select_from(BackgroundJob)
173 .where(BackgroundJob.state != BackgroundJobState.completed)
174 ).scalar_one()
175 == 0
176 )
179def test_purge_account_deletion_tokens(db):
180 user, api_token = generate_user()
181 user2, api_token2 = generate_user()
182 user3, api_token3 = generate_user()
184 with session_scope() as session:
185 """
186 3 cases:
187 1) Token is valid
188 2) Token expired but account retrievable
189 3) Account is irretrievable (and expired)
190 """
191 account_deletion_tokens = [
192 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)),
193 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()),
194 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)),
195 ]
196 for token in account_deletion_tokens:
197 session.add(token)
198 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
200 queue_job(session, "purge_account_deletion_tokens", empty_pb2.Empty())
201 process_job()
203 with session_scope() as session:
204 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1
206 with session_scope() as session:
207 assert (
208 session.execute(
209 select(func.count())
210 .select_from(BackgroundJob)
211 .where(BackgroundJob.state == BackgroundJobState.completed)
212 ).scalar_one()
213 == 1
214 )
215 assert (
216 session.execute(
217 select(func.count())
218 .select_from(BackgroundJob)
219 .where(BackgroundJob.state != BackgroundJobState.completed)
220 ).scalar_one()
221 == 0
222 )
225def test_enforce_community_memberships(db):
226 with session_scope() as session:
227 queue_job(session, "enforce_community_membership", empty_pb2.Empty())
228 process_job()
230 with session_scope() as session:
231 assert (
232 session.execute(
233 select(func.count())
234 .select_from(BackgroundJob)
235 .where(BackgroundJob.state == BackgroundJobState.completed)
236 ).scalar_one()
237 == 1
238 )
239 assert (
240 session.execute(
241 select(func.count())
242 .select_from(BackgroundJob)
243 .where(BackgroundJob.state != BackgroundJobState.completed)
244 ).scalar_one()
245 == 0
246 )
249def test_refresh_materialized_views(db):
250 with session_scope() as session:
251 queue_job(session, "refresh_materialized_views", empty_pb2.Empty())
253 process_job()
255 with session_scope() as session:
256 assert (
257 session.execute(
258 select(func.count())
259 .select_from(BackgroundJob)
260 .where(BackgroundJob.state == BackgroundJobState.completed)
261 ).scalar_one()
262 == 1
263 )
264 assert (
265 session.execute(
266 select(func.count())
267 .select_from(BackgroundJob)
268 .where(BackgroundJob.state != BackgroundJobState.completed)
269 ).scalar_one()
270 == 0
271 )
274def test_service_jobs(db):
275 with session_scope() as session:
276 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html")
278 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise
279 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left
280 class HitSleep(Exception):
281 pass
283 # the mock `sleep` function that instead raises the aforementioned exception
284 def raising_sleep(seconds):
285 raise HitSleep()
287 with pytest.raises(HitSleep):
288 with patch("couchers.jobs.worker.sleep", raising_sleep):
289 service_jobs()
291 with session_scope() as session:
292 assert (
293 session.execute(
294 select(func.count())
295 .select_from(BackgroundJob)
296 .where(BackgroundJob.state == BackgroundJobState.completed)
297 ).scalar_one()
298 == 1
299 )
300 assert (
301 session.execute(
302 select(func.count())
303 .select_from(BackgroundJob)
304 .where(BackgroundJob.state != BackgroundJobState.completed)
305 ).scalar_one()
306 == 0
307 )
310def test_scheduler(db, monkeypatch):
311 MOCK_SCHEDULE = [
312 ("purge_login_tokens", timedelta(seconds=7)),
313 ("send_message_notifications", timedelta(seconds=11)),
314 ]
316 current_time = 0
317 end_time = 70
319 class EndOfTime(Exception):
320 pass
322 def mock_monotonic():
323 nonlocal current_time
324 return current_time
326 def mock_sleep(seconds):
327 nonlocal current_time
328 current_time += seconds
329 if current_time > end_time:
330 raise EndOfTime()
332 realized_schedule = []
334 def mock_run_job_and_schedule(sched, schedule_id):
335 nonlocal current_time
336 realized_schedule.append((current_time, schedule_id))
337 _run_job_and_schedule(sched, schedule_id)
339 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule)
340 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE)
341 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic)
342 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep)
344 with pytest.raises(EndOfTime):
345 run_scheduler()
347 assert realized_schedule == [
348 (0.0, 0),
349 (0.0, 1),
350 (7.0, 0),
351 (11.0, 1),
352 (14.0, 0),
353 (21.0, 0),
354 (22.0, 1),
355 (28.0, 0),
356 (33.0, 1),
357 (35.0, 0),
358 (42.0, 0),
359 (44.0, 1),
360 (49.0, 0),
361 (55.0, 1),
362 (56.0, 0),
363 (63.0, 0),
364 (66.0, 1),
365 (70.0, 0),
366 ]
368 with session_scope() as session:
369 assert (
370 session.execute(
371 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending)
372 ).scalar_one()
373 == 18
374 )
375 assert (
376 session.execute(
377 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending)
378 ).scalar_one()
379 == 0
380 )
383def test_job_retry(db):
384 with session_scope() as session:
385 queue_job(session, "mock_job", empty_pb2.Empty())
387 called_count = 0
389 def mock_job(payload):
390 nonlocal called_count
391 called_count += 1
392 raise Exception()
394 MOCK_JOBS = {
395 "mock_job": (empty_pb2.Empty, mock_job),
396 }
397 create_prometheus_server(port=8000)
399 # if IN_TEST is true, then the bg worker will raise on exceptions
400 new_config = config.copy()
401 new_config["IN_TEST"] = False
403 with patch("couchers.jobs.worker.config", new_config), patch("couchers.jobs.worker.JOBS", MOCK_JOBS):
404 process_job()
405 with session_scope() as session:
406 assert (
407 session.execute(
408 select(func.count())
409 .select_from(BackgroundJob)
410 .where(BackgroundJob.state == BackgroundJobState.error)
411 ).scalar_one()
412 == 1
413 )
414 assert (
415 session.execute(
416 select(func.count())
417 .select_from(BackgroundJob)
418 .where(BackgroundJob.state != BackgroundJobState.error)
419 ).scalar_one()
420 == 0
421 )
423 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
424 process_job()
425 with session_scope() as session:
426 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
427 process_job()
428 with session_scope() as session:
429 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
430 process_job()
431 with session_scope() as session:
432 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
433 process_job()
435 with session_scope() as session:
436 assert (
437 session.execute(
438 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed)
439 ).scalar_one()
440 == 1
441 )
442 assert (
443 session.execute(
444 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed)
445 ).scalar_one()
446 == 0
447 )
449 _check_job_counter("mock_job", "error", "4", "Exception")
450 _check_job_counter("mock_job", "failed", "5", "Exception")
453def test_no_jobs_no_problem(db):
454 with session_scope() as session:
455 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
457 assert not process_job()
459 with session_scope() as session:
460 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
463def test_send_message_notifications_basic(db):
464 user1, token1 = generate_user()
465 user2, token2 = generate_user()
466 user3, token3 = generate_user()
468 make_friends(user1, user2)
469 make_friends(user1, user3)
470 make_friends(user2, user3)
472 send_message_notifications(empty_pb2.Empty())
473 process_jobs()
475 # should find no jobs, since there's no messages
476 with session_scope() as session:
477 assert (
478 session.execute(
479 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
480 ).scalar_one()
481 == 0
482 )
484 with conversations_session(token1) as c:
485 group_chat_id = c.CreateGroupChat(
486 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id])
487 ).group_chat_id
488 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
489 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
490 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
491 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
493 with conversations_session(token3) as c:
494 group_chat_id = c.CreateGroupChat(
495 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
496 ).group_chat_id
497 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5"))
498 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6"))
500 send_message_notifications(empty_pb2.Empty())
501 process_jobs()
503 # no emails sent out
504 with session_scope() as session:
505 assert (
506 session.execute(
507 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
508 ).scalar_one()
509 == 0
510 )
512 # this should generate emails for both user2 and user3
513 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
514 send_message_notifications(empty_pb2.Empty())
515 process_jobs()
517 with session_scope() as session:
518 assert (
519 session.execute(
520 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
521 ).scalar_one()
522 == 2
523 )
524 # delete them all
525 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
527 # shouldn't generate any more emails
528 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
529 send_message_notifications(empty_pb2.Empty())
530 process_jobs()
532 with session_scope() as session:
533 assert (
534 session.execute(
535 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
536 ).scalar_one()
537 == 0
538 )
541def test_send_message_notifications_muted(db):
542 user1, token1 = generate_user()
543 user2, token2 = generate_user()
544 user3, token3 = generate_user()
546 make_friends(user1, user2)
547 make_friends(user1, user3)
548 make_friends(user2, user3)
550 send_message_notifications(empty_pb2.Empty())
551 process_jobs()
553 # should find no jobs, since there's no messages
554 with session_scope() as session:
555 assert (
556 session.execute(
557 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
558 ).scalar_one()
559 == 0
560 )
562 with conversations_session(token1) as c:
563 group_chat_id = c.CreateGroupChat(
564 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id])
565 ).group_chat_id
567 with conversations_session(token3) as c:
568 # mute it for user 3
569 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True))
571 with conversations_session(token1) as c:
572 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
573 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
574 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
575 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
577 with conversations_session(token3) as c:
578 group_chat_id = c.CreateGroupChat(
579 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
580 ).group_chat_id
581 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5"))
582 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6"))
584 send_message_notifications(empty_pb2.Empty())
585 process_jobs()
587 # no emails sent out
588 with session_scope() as session:
589 assert (
590 session.execute(
591 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
592 ).scalar_one()
593 == 0
594 )
596 # this should generate emails for both user2 and NOT user3
597 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
598 send_message_notifications(empty_pb2.Empty())
599 process_jobs()
601 with session_scope() as session:
602 assert (
603 session.execute(
604 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
605 ).scalar_one()
606 == 1
607 )
608 # delete them all
609 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
611 # shouldn't generate any more emails
612 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
613 send_message_notifications(empty_pb2.Empty())
614 process_jobs()
616 with session_scope() as session:
617 assert (
618 session.execute(
619 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
620 ).scalar_one()
621 == 0
622 )
625def test_send_request_notifications_host_request(db):
626 user1, token1 = generate_user()
627 user2, token2 = generate_user()
629 today_plus_2 = (today() + timedelta(days=2)).isoformat()
630 today_plus_3 = (today() + timedelta(days=3)).isoformat()
632 send_request_notifications(empty_pb2.Empty())
633 process_jobs()
635 # should find no jobs, since there's no messages
636 with session_scope() as session:
637 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
639 # first test that sending host request creates email
640 with requests_session(token1) as requests:
641 host_request_id = requests.CreateHostRequest(
642 requests_pb2.CreateHostRequestReq(
643 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request"
644 )
645 ).host_request_id
647 with session_scope() as session:
648 # delete send_email BackgroundJob created by CreateHostRequest
649 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
651 # check send_request_notifications successfully creates background job
652 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
653 send_request_notifications(empty_pb2.Empty())
654 process_jobs()
655 assert (
656 session.execute(
657 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
658 ).scalar_one()
659 == 1
660 )
662 # delete all BackgroundJobs
663 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
665 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
666 send_request_notifications(empty_pb2.Empty())
667 process_jobs()
668 # should find no messages since host has already been notified
669 assert (
670 session.execute(
671 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
672 ).scalar_one()
673 == 0
674 )
676 # then test that responding to host request creates email
677 with requests_session(token2) as requests:
678 requests.RespondHostRequest(
679 requests_pb2.RespondHostRequestReq(
680 host_request_id=host_request_id,
681 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
682 text="Test request",
683 )
684 )
686 with session_scope() as session:
687 # delete send_email BackgroundJob created by RespondHostRequest
688 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
690 # check send_request_notifications successfully creates background job
691 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
692 send_request_notifications(empty_pb2.Empty())
693 process_jobs()
694 assert (
695 session.execute(
696 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
697 ).scalar_one()
698 == 1
699 )
701 # delete all BackgroundJobs
702 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
704 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
705 send_request_notifications(empty_pb2.Empty())
706 process_jobs()
707 # should find no messages since guest has already been notified
708 assert (
709 session.execute(
710 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
711 ).scalar_one()
712 == 0
713 )
716def test_send_message_notifications_seen(db):
717 user1, token1 = generate_user()
718 user2, token2 = generate_user()
720 make_friends(user1, user2)
722 send_message_notifications(empty_pb2.Empty())
724 # should find no jobs, since there's no messages
725 with session_scope() as session:
726 assert (
727 session.execute(
728 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
729 ).scalar_one()
730 == 0
731 )
733 with conversations_session(token1) as c:
734 group_chat_id = c.CreateGroupChat(
735 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
736 ).group_chat_id
737 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
738 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
739 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
740 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
742 # user 2 now marks those messages as seen
743 with conversations_session(token2) as c:
744 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id
745 c.MarkLastSeenGroupChat(
746 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id)
747 )
749 send_message_notifications(empty_pb2.Empty())
751 # no emails sent out
752 with session_scope() as session:
753 assert (
754 session.execute(
755 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
756 ).scalar_one()
757 == 0
758 )
760 def now_30_min_in_future():
761 return now() + timedelta(minutes=30)
763 # still shouldn't generate emails as user2 has seen all messages
764 with patch("couchers.jobs.handlers.now", now_30_min_in_future):
765 send_message_notifications(empty_pb2.Empty())
767 with session_scope() as session:
768 assert (
769 session.execute(
770 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
771 ).scalar_one()
772 == 0
773 )
776def test_send_onboarding_emails(db):
777 # needs to get first onboarding email
778 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None, complete_profile=False)
780 send_onboarding_emails(empty_pb2.Empty())
781 process_jobs()
783 with session_scope() as session:
784 assert (
785 session.execute(
786 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
787 ).scalar_one()
788 == 1
789 )
791 # needs to get second onboarding email, but not yet
792 user2, token2 = generate_user(
793 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6), complete_profile=False
794 )
796 send_onboarding_emails(empty_pb2.Empty())
797 process_jobs()
799 with session_scope() as session:
800 assert (
801 session.execute(
802 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
803 ).scalar_one()
804 == 1
805 )
807 # needs to get second onboarding email
808 user3, token3 = generate_user(
809 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8), complete_profile=False
810 )
812 send_onboarding_emails(empty_pb2.Empty())
813 process_jobs()
815 with session_scope() as session:
816 assert (
817 session.execute(
818 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
819 ).scalar_one()
820 == 2
821 )
824def test_send_reference_reminders(db):
825 # need to test:
826 # case 1: bidirectional (no emails)
827 # case 2: host left ref (surfer needs an email)
828 # case 3: surfer left ref (host needs an email)
829 # case 4: neither left ref (host & surfer need an email)
830 # case 5: neither left ref, but host blocked surfer, so neither should get an email
831 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
833 send_reference_reminders(empty_pb2.Empty())
835 # case 1: bidirectional (no emails)
836 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1")
837 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2")
839 # case 2: host left ref (surfer needs an email)
840 # host
841 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3")
842 # surfer
843 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4")
845 # case 3: surfer left ref (host needs an email)
846 # host
847 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5")
848 # surfer
849 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6")
851 # case 4: neither left ref (host & surfer need an email)
852 # surfer
853 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7")
854 # host
855 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8")
857 # case 5: neither left ref, but host blocked surfer, so neither should get an email
858 # surfer
859 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9")
860 # host
861 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10")
863 make_user_block(user9, user10)
865 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
866 # host
867 user11, token11 = generate_user(email="user11@couchers.org.invalid", name="User 11")
868 # surfer
869 user12, token12 = generate_user(email="user12@couchers.org.invalid", name="User 12")
871 with session_scope() as session:
872 # note that create_host_reference creates a host request whose age is one day older than the timedelta here
874 # case 1: bidirectional (no emails)
875 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True)
876 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1)
878 # case 2: host left ref (surfer needs an email)
879 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False)
881 # case 3: surfer left ref (host needs an email)
882 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True)
884 # case 4: neither left ref (host & surfer need an email)
885 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4))
887 # case 5: neither left ref, but host blocked surfer, so neither should get an email
888 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7))
890 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
891 hr6 = create_host_request(session, user12.id, user11.id, timedelta(days=6), surfer_reason_didnt_meetup="")
893 expected_emails = [
894 (
895 "user11@couchers.org.invalid",
896 "[TEST] You have 14 days to write a reference for User 12!",
897 ("from when you hosted them", "/leave-reference/hosted/"),
898 ),
899 (
900 "user4@couchers.org.invalid",
901 "[TEST] You have 3 days to write a reference for User 3!",
902 ("from when you surfed with them", "/leave-reference/surfed/"),
903 ),
904 (
905 "user5@couchers.org.invalid",
906 "[TEST] You have 7 days to write a reference for User 6!",
907 ("from when you hosted them", "/leave-reference/hosted/"),
908 ),
909 (
910 "user7@couchers.org.invalid",
911 "[TEST] You have 14 days to write a reference for User 8!",
912 ("from when you surfed with them", "/leave-reference/surfed/"),
913 ),
914 (
915 "user8@couchers.org.invalid",
916 "[TEST] You have 14 days to write a reference for User 7!",
917 ("from when you hosted them", "/leave-reference/hosted/"),
918 ),
919 ]
921 send_reference_reminders(empty_pb2.Empty())
923 while process_job():
924 pass
926 with session_scope() as session:
927 emails = [
928 (email.recipient, email.subject, email.plain, email.html)
929 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all()
930 ]
932 actual_addresses_and_subjects = [email[:2] for email in emails]
933 expected_addresses_and_subjects = [email[:2] for email in expected_emails]
935 print(actual_addresses_and_subjects)
936 print(expected_addresses_and_subjects)
938 assert actual_addresses_and_subjects == expected_addresses_and_subjects
940 for (address, subject, plain, html), (_, _, search_strings) in zip(emails, expected_emails):
941 for find in search_strings:
942 assert find in plain, f"Expected to find string {find} in PLAIN email {subject} to {address}, didn't"
943 assert find in html, f"Expected to find string {find} in HTML email {subject} to {address}, didn't"
946def test_send_host_request_reminders(db):
947 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1")
948 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2")
949 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3")
950 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4")
951 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5")
952 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6")
953 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7")
954 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8")
955 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9")
956 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10")
957 user11, token11 = generate_user(email="user11@couchers.org.invalid", name="User 11")
958 user12, token12 = generate_user(email="user12@couchers.org.invalid", name="User 12")
959 user13, token13 = generate_user(email="user13@couchers.org.invalid", name="User 13")
960 user14, token14 = generate_user(email="user14@couchers.org.invalid", name="User 14")
962 with session_scope() as session:
963 # case 1: pending, future, interval elapsed => notify
964 hr1 = create_host_request_by_date(
965 session=session,
966 surfer_user_id=user1.id,
967 host_user_id=user2.id,
968 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1),
969 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2),
970 status=HostRequestStatus.pending,
971 host_sent_request_reminders=0,
972 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
973 )
975 # case 2: max reminders reached => do not notify
976 hr2 = create_host_request_by_date(
977 session=session,
978 surfer_user_id=user3.id,
979 host_user_id=user4.id,
980 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1),
981 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2),
982 status=HostRequestStatus.pending,
983 host_sent_request_reminders=HOST_REQUEST_MAX_REMINDERS,
984 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
985 )
987 # case 3: interval not yet elapsed => do not notify
988 hr3 = create_host_request_by_date(
989 session=session,
990 surfer_user_id=user5.id,
991 host_user_id=user6.id,
992 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1),
993 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2),
994 status=HostRequestStatus.pending,
995 host_sent_request_reminders=0,
996 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL + timedelta(hours=1),
997 )
999 # case 4: start date is today => do not notify
1000 hr4 = create_host_request_by_date(
1001 session=session,
1002 surfer_user_id=user7.id,
1003 host_user_id=user8.id,
1004 from_date=today(),
1005 to_date=today() + timedelta(days=2),
1006 status=HostRequestStatus.pending,
1007 host_sent_request_reminders=0,
1008 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
1009 )
1011 # case 5: from_date in the past => do not notify
1012 hr5 = create_host_request_by_date(
1013 session=session,
1014 surfer_user_id=user9.id,
1015 host_user_id=user10.id,
1016 from_date=today() - timedelta(days=1),
1017 to_date=today() + timedelta(days=1),
1018 status=HostRequestStatus.pending,
1019 host_sent_request_reminders=0,
1020 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
1021 )
1023 # case 6: non-pending status => do not notify
1024 hr6 = create_host_request_by_date(
1025 session=session,
1026 surfer_user_id=user11.id,
1027 host_user_id=user12.id,
1028 from_date=today() + timedelta(days=3),
1029 to_date=today() + timedelta(days=4),
1030 status=HostRequestStatus.accepted,
1031 host_sent_request_reminders=0,
1032 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
1033 )
1035 # case 7: host already sent a message => do not notify
1036 hr7 = create_host_request_by_date(
1037 session=session,
1038 surfer_user_id=user13.id,
1039 host_user_id=user14.id,
1040 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1),
1041 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2),
1042 status=HostRequestStatus.pending,
1043 host_sent_request_reminders=0,
1044 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
1045 )
1047 session.add(
1048 Message(
1049 time=now(),
1050 conversation_id=hr7,
1051 author_id=user14.id,
1052 text="Looking forward to hosting you!",
1053 message_type=MessageType.text,
1054 )
1055 )
1057 send_host_request_reminders(empty_pb2.Empty())
1059 while process_job():
1060 pass
1062 with session_scope() as session:
1063 emails = [
1064 (email.recipient, email.subject, email.plain, email.html)
1065 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all()
1066 ]
1068 expected_emails = [
1069 (
1070 "user2@couchers.org.invalid",
1071 "[TEST] You have a pending host request from User 1!",
1072 ("Please respond to the request!", "User 1"),
1073 )
1074 ]
1076 actual_addresses_and_subjects = [email[:2] for email in emails]
1077 expected_addresses_and_subjects = [email[:2] for email in expected_emails]
1079 print(actual_addresses_and_subjects)
1080 print(expected_addresses_and_subjects)
1082 assert actual_addresses_and_subjects == expected_addresses_and_subjects
1084 for (address, subject, plain, html), (_, _, search_strings) in zip(emails, expected_emails):
1085 for find in search_strings:
1086 assert find in plain, f"Expected to find string {find} in PLAIN email {subject} to {address}, didn't"
1087 assert find in html, f"Expected to find string {find} in HTML email {subject} to {address}, didn't"
1090def test_add_users_to_email_list(db):
1091 new_config = config.copy()
1092 new_config["LISTMONK_ENABLED"] = True
1093 new_config["LISTMONK_BASE_URL"] = "https://example.com"
1094 new_config["LISTMONK_API_USERNAME"] = "test_user"
1095 new_config["LISTMONK_API_KEY"] = "dummy_api_key"
1096 new_config["LISTMONK_LIST_ID"] = 6
1098 with patch("couchers.jobs.handlers.config", new_config):
1099 with patch("couchers.jobs.handlers.requests.post") as mock:
1100 add_users_to_email_list(empty_pb2.Empty())
1101 mock.assert_not_called()
1103 generate_user(in_sync_with_newsletter=False, email="testing1@couchers.invalid", name="Tester1", id=15)
1104 generate_user(in_sync_with_newsletter=True, email="testing2@couchers.invalid", name="Tester2")
1105 generate_user(in_sync_with_newsletter=False, email="testing3@couchers.invalid", name="Tester3 von test", id=17)
1106 generate_user(
1107 in_sync_with_newsletter=False, email="testing4@couchers.invalid", name="Tester4", opt_out_of_newsletter=True
1108 )
1110 with patch("couchers.jobs.handlers.requests.post") as mock:
1111 ret = mock.return_value
1112 ret.status_code = 200
1113 add_users_to_email_list(empty_pb2.Empty())
1114 mock.assert_has_calls(
1115 [
1116 call(
1117 "https://example.com/api/subscribers",
1118 auth=("test_user", "dummy_api_key"),
1119 json={
1120 "email": "testing1@couchers.invalid",
1121 "name": "Tester1",
1122 "lists": [6],
1123 "preconfirm_subscriptions": True,
1124 "attribs": {"couchers_user_id": 15},
1125 "status": "enabled",
1126 },
1127 timeout=10,
1128 ),
1129 call(
1130 "https://example.com/api/subscribers",
1131 auth=("test_user", "dummy_api_key"),
1132 json={
1133 "email": "testing3@couchers.invalid",
1134 "name": "Tester3 von test",
1135 "lists": [6],
1136 "preconfirm_subscriptions": True,
1137 "attribs": {"couchers_user_id": 17},
1138 "status": "enabled",
1139 },
1140 timeout=10,
1141 ),
1142 ],
1143 any_order=True,
1144 )
1146 with patch("couchers.jobs.handlers.requests.post") as mock:
1147 add_users_to_email_list(empty_pb2.Empty())
1148 mock.assert_not_called()
1151def test_update_recommendation_scores(db):
1152 update_recommendation_scores(empty_pb2.Empty())
1155def test_update_badges(db, push_collector):
1156 user1, _ = generate_user()
1157 user2, _ = generate_user()
1158 user3, _ = generate_user()
1159 user4, _ = generate_user(phone="+15555555555", phone_verification_verified=func.now())
1160 user5, _ = generate_user(phone="+15555555556", phone_verification_verified=func.now())
1161 user6, _ = generate_user()
1163 with session_scope() as session:
1164 session.add(UserBadge(user_id=user5.id, badge_id="board_member"))
1166 update_badges(empty_pb2.Empty())
1167 process_jobs()
1169 with session_scope() as session:
1170 badge_tuples = session.execute(
1171 select(UserBadge.user_id, UserBadge.badge_id).order_by(UserBadge.user_id.asc(), UserBadge.id.asc())
1172 ).all()
1174 expected = [
1175 (user1.id, "founder"),
1176 (user1.id, "board_member"),
1177 (user2.id, "founder"),
1178 (user2.id, "board_member"),
1179 (user4.id, "phone_verified"),
1180 (user5.id, "phone_verified"),
1181 ]
1183 assert badge_tuples == expected
1185 print(push_collector.pushes)
1187 push_collector.assert_user_push_matches_fields(
1188 user1.id,
1189 ix=0,
1190 title="The Founder badge was added to your profile",
1191 body="Check out your profile to see the new badge!",
1192 )
1193 push_collector.assert_user_push_matches_fields(
1194 user1.id,
1195 ix=1,
1196 title="The Board Member badge was added to your profile",
1197 body="Check out your profile to see the new badge!",
1198 )
1199 push_collector.assert_user_push_matches_fields(
1200 user2.id,
1201 ix=0,
1202 title="The Founder badge was added to your profile",
1203 body="Check out your profile to see the new badge!",
1204 )
1205 push_collector.assert_user_push_matches_fields(
1206 user2.id,
1207 ix=1,
1208 title="The Board Member badge was added to your profile",
1209 body="Check out your profile to see the new badge!",
1210 )
1211 push_collector.assert_user_push_matches_fields(
1212 user4.id,
1213 ix=0,
1214 title="The Verified Phone badge was added to your profile",
1215 body="Check out your profile to see the new badge!",
1216 )
1217 push_collector.assert_user_push_matches_fields(
1218 user5.id,
1219 ix=0,
1220 title="The Board Member badge was removed from your profile",
1221 body="You can see all your badges on your profile.",
1222 )
1223 push_collector.assert_user_push_matches_fields(
1224 user5.id,
1225 ix=1,
1226 title="The Verified Phone badge was added to your profile",
1227 body="Check out your profile to see the new badge!",
1228 )