Coverage for src/tests/test_bg_jobs.py: 100%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from datetime import timedelta
2from unittest.mock import patch
4import pytest
5import requests
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import delete, func
9import couchers.jobs.worker
10from couchers.config import config
11from couchers.crypto import urlsafe_secure_token
12from couchers.db import session_scope
13from couchers.email import queue_email
14from couchers.email.dev import print_dev_email
15from couchers.jobs.enqueue import queue_job
16from couchers.jobs.handlers import (
17 process_add_users_to_email_list,
18 process_send_message_notifications,
19 process_send_onboarding_emails,
20 process_send_reference_reminders,
21 process_send_request_notifications,
22 process_update_recommendation_scores,
23)
24from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs
25from couchers.metrics import create_prometheus_server, job_process_registry
26from couchers.models import (
27 AccountDeletionToken,
28 BackgroundJob,
29 BackgroundJobState,
30 BackgroundJobType,
31 Email,
32 LoginToken,
33 PasswordResetToken,
34)
35from couchers.sql import couchers_select as select
36from couchers.tasks import send_login_email
37from couchers.utils import now, today
38from proto import conversations_pb2, requests_pb2
39from tests.test_fixtures import ( # noqa
40 auth_api_session,
41 conversations_session,
42 db,
43 generate_user,
44 make_friends,
45 make_user_block,
46 requests_session,
47 testconfig,
48)
49from tests.test_references import create_host_reference, create_host_request
52def now_5_min_in_future():
53 return now() + timedelta(minutes=5)
56@pytest.fixture(autouse=True)
57def _(testconfig):
58 pass
61def _check_job_counter(job, status, attempt, exception):
62 metrics_string = requests.get("http://localhost:8001").text
63 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"'
64 assert string_to_check in metrics_string
67def test_login_email_full(db):
68 user, api_token = generate_user()
69 user_email = user.email
71 with session_scope() as session:
72 login_token = send_login_email(session, user)
74 def mock_print_dev_email(sender_name, sender_email, recipient, subject, plain, html):
75 assert recipient == user.email
76 assert "login" in subject.lower()
77 assert login_token.token in plain
78 assert login_token.token in html
79 return print_dev_email(sender_name, sender_email, recipient, subject, plain, html)
81 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email):
82 process_job()
84 with session_scope() as session:
85 assert (
86 session.execute(
87 select(func.count())
88 .select_from(BackgroundJob)
89 .where(BackgroundJob.state == BackgroundJobState.completed)
90 ).scalar_one()
91 == 1
92 )
93 assert (
94 session.execute(
95 select(func.count())
96 .select_from(BackgroundJob)
97 .where(BackgroundJob.state != BackgroundJobState.completed)
98 ).scalar_one()
99 == 0
100 )
103def test_email_job(db):
104 queue_email("sender_name", "sender_email", "recipient", "subject", "plain", "html")
106 def mock_print_dev_email(sender_name, sender_email, recipient, subject, plain, html):
107 assert sender_name == "sender_name"
108 assert sender_email == "sender_email"
109 assert recipient == "recipient"
110 assert subject == "subject"
111 assert plain == "plain"
112 assert html == "html"
113 return print_dev_email(sender_name, sender_email, recipient, subject, plain, html)
115 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email):
116 process_job()
118 with session_scope() as session:
119 assert (
120 session.execute(
121 select(func.count())
122 .select_from(BackgroundJob)
123 .where(BackgroundJob.state == BackgroundJobState.completed)
124 ).scalar_one()
125 == 1
126 )
127 assert (
128 session.execute(
129 select(func.count())
130 .select_from(BackgroundJob)
131 .where(BackgroundJob.state != BackgroundJobState.completed)
132 ).scalar_one()
133 == 0
134 )
137def test_purge_login_tokens(db):
138 user, api_token = generate_user()
140 with session_scope() as session:
141 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now())
142 session.add(login_token)
143 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1
145 queue_job(BackgroundJobType.purge_login_tokens, empty_pb2.Empty())
146 process_job()
148 with session_scope() as session:
149 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0
151 with session_scope() as session:
152 assert (
153 session.execute(
154 select(func.count())
155 .select_from(BackgroundJob)
156 .where(BackgroundJob.state == BackgroundJobState.completed)
157 ).scalar_one()
158 == 1
159 )
160 assert (
161 session.execute(
162 select(func.count())
163 .select_from(BackgroundJob)
164 .where(BackgroundJob.state != BackgroundJobState.completed)
165 ).scalar_one()
166 == 0
167 )
170def test_purge_password_reset_tokens(db):
171 user, api_token = generate_user()
173 with session_scope() as session:
174 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now())
175 session.add(password_reset_token)
176 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1
178 queue_job(BackgroundJobType.purge_password_reset_tokens, empty_pb2.Empty())
179 process_job()
181 with session_scope() as session:
182 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0
184 with session_scope() as session:
185 assert (
186 session.execute(
187 select(func.count())
188 .select_from(BackgroundJob)
189 .where(BackgroundJob.state == BackgroundJobState.completed)
190 ).scalar_one()
191 == 1
192 )
193 assert (
194 session.execute(
195 select(func.count())
196 .select_from(BackgroundJob)
197 .where(BackgroundJob.state != BackgroundJobState.completed)
198 ).scalar_one()
199 == 0
200 )
203def test_purge_account_deletion_tokens(db):
204 user, api_token = generate_user()
205 user2, api_token2 = generate_user()
206 user3, api_token3 = generate_user()
208 with session_scope() as session:
209 """
210 3 cases:
211 1) Token is valid
212 2) Token expired but account retrievable
213 3) Account is irretrievable (and expired)
214 """
215 account_deletion_tokens = [
216 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)),
217 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()),
218 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)),
219 ]
220 for token in account_deletion_tokens:
221 session.add(token)
222 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
224 queue_job(BackgroundJobType.purge_account_deletion_tokens, empty_pb2.Empty())
225 process_job()
227 with session_scope() as session:
228 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1
230 with session_scope() as session:
231 assert (
232 session.execute(
233 select(func.count())
234 .select_from(BackgroundJob)
235 .where(BackgroundJob.state == BackgroundJobState.completed)
236 ).scalar_one()
237 == 1
238 )
239 assert (
240 session.execute(
241 select(func.count())
242 .select_from(BackgroundJob)
243 .where(BackgroundJob.state != BackgroundJobState.completed)
244 ).scalar_one()
245 == 0
246 )
249def test_enforce_community_memberships(db):
250 queue_job(BackgroundJobType.enforce_community_membership, empty_pb2.Empty())
251 process_job()
253 with session_scope() as session:
254 assert (
255 session.execute(
256 select(func.count())
257 .select_from(BackgroundJob)
258 .where(BackgroundJob.state == BackgroundJobState.completed)
259 ).scalar_one()
260 == 1
261 )
262 assert (
263 session.execute(
264 select(func.count())
265 .select_from(BackgroundJob)
266 .where(BackgroundJob.state != BackgroundJobState.completed)
267 ).scalar_one()
268 == 0
269 )
272def test_refresh_materialized_views(db):
273 queue_job(BackgroundJobType.refresh_materialized_views, empty_pb2.Empty())
274 process_job()
276 with session_scope() as session:
277 assert (
278 session.execute(
279 select(func.count())
280 .select_from(BackgroundJob)
281 .where(BackgroundJob.state == BackgroundJobState.completed)
282 ).scalar_one()
283 == 1
284 )
285 assert (
286 session.execute(
287 select(func.count())
288 .select_from(BackgroundJob)
289 .where(BackgroundJob.state != BackgroundJobState.completed)
290 ).scalar_one()
291 == 0
292 )
295def test_service_jobs(db):
296 queue_email("sender_name", "sender_email", "recipient", "subject", "plain", "html")
298 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise
299 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left
300 class HitSleep(Exception):
301 pass
303 # the mock `sleep` function that instead raises the aforementioned exception
304 def raising_sleep(seconds):
305 raise HitSleep()
307 with pytest.raises(HitSleep):
308 with patch("couchers.jobs.worker.sleep", raising_sleep):
309 service_jobs()
311 with session_scope() as session:
312 assert (
313 session.execute(
314 select(func.count())
315 .select_from(BackgroundJob)
316 .where(BackgroundJob.state == BackgroundJobState.completed)
317 ).scalar_one()
318 == 1
319 )
320 assert (
321 session.execute(
322 select(func.count())
323 .select_from(BackgroundJob)
324 .where(BackgroundJob.state != BackgroundJobState.completed)
325 ).scalar_one()
326 == 0
327 )
330def test_scheduler(db, monkeypatch):
331 MOCK_SCHEDULE = [
332 (BackgroundJobType.purge_login_tokens, timedelta(seconds=7)),
333 (BackgroundJobType.send_message_notifications, timedelta(seconds=11)),
334 ]
336 current_time = 0
337 end_time = 70
339 class EndOfTime(Exception):
340 pass
342 def mock_monotonic():
343 nonlocal current_time
344 return current_time
346 def mock_sleep(seconds):
347 nonlocal current_time
348 current_time += seconds
349 if current_time > end_time:
350 raise EndOfTime()
352 realized_schedule = []
354 def mock_run_job_and_schedule(sched, schedule_id):
355 nonlocal current_time
356 realized_schedule.append((current_time, schedule_id))
357 _run_job_and_schedule(sched, schedule_id)
359 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule)
360 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE)
361 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic)
362 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep)
364 with pytest.raises(EndOfTime):
365 run_scheduler()
367 assert realized_schedule == [
368 (0.0, 0),
369 (0.0, 1),
370 (7.0, 0),
371 (11.0, 1),
372 (14.0, 0),
373 (21.0, 0),
374 (22.0, 1),
375 (28.0, 0),
376 (33.0, 1),
377 (35.0, 0),
378 (42.0, 0),
379 (44.0, 1),
380 (49.0, 0),
381 (55.0, 1),
382 (56.0, 0),
383 (63.0, 0),
384 (66.0, 1),
385 (70.0, 0),
386 ]
388 with session_scope() as session:
389 assert (
390 session.execute(
391 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending)
392 ).scalar_one()
393 == 18
394 )
395 assert (
396 session.execute(
397 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending)
398 ).scalar_one()
399 == 0
400 )
403def test_job_retry(db):
404 queue_job(BackgroundJobType.purge_login_tokens, empty_pb2.Empty())
406 called_count = 0
408 def mock_handler(payload):
409 nonlocal called_count
410 called_count += 1
411 raise Exception()
413 MOCK_JOBS = {
414 BackgroundJobType.purge_login_tokens: (empty_pb2.Empty, mock_handler),
415 }
416 create_prometheus_server(registry=job_process_registry, port=8001)
417 with patch("couchers.jobs.worker.JOBS", MOCK_JOBS):
418 process_job()
419 with session_scope() as session:
420 assert (
421 session.execute(
422 select(func.count())
423 .select_from(BackgroundJob)
424 .where(BackgroundJob.state == BackgroundJobState.error)
425 ).scalar_one()
426 == 1
427 )
428 assert (
429 session.execute(
430 select(func.count())
431 .select_from(BackgroundJob)
432 .where(BackgroundJob.state != BackgroundJobState.error)
433 ).scalar_one()
434 == 0
435 )
437 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
438 process_job()
439 with session_scope() as session:
440 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
441 process_job()
442 with session_scope() as session:
443 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
444 process_job()
445 with session_scope() as session:
446 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
447 process_job()
449 with session_scope() as session:
450 assert (
451 session.execute(
452 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed)
453 ).scalar_one()
454 == 1
455 )
456 assert (
457 session.execute(
458 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed)
459 ).scalar_one()
460 == 0
461 )
463 _check_job_counter("purge_login_tokens", "error", "4", "Exception")
464 _check_job_counter("purge_login_tokens", "failed", "5", "Exception")
467def test_no_jobs_no_problem(db):
468 with session_scope() as session:
469 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
471 assert not process_job()
473 with session_scope() as session:
474 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
477def test_process_send_message_notifications_basic(db):
478 user1, token1 = generate_user()
479 user2, token2 = generate_user()
480 user3, token3 = generate_user()
482 make_friends(user1, user2)
483 make_friends(user1, user3)
484 make_friends(user2, user3)
486 process_send_message_notifications(empty_pb2.Empty())
488 # should find no jobs, since there's no messages
489 with session_scope() as session:
490 assert (
491 session.execute(
492 select(func.count())
493 .select_from(BackgroundJob)
494 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
495 ).scalar_one()
496 == 0
497 )
499 with conversations_session(token1) as c:
500 group_chat_id = c.CreateGroupChat(
501 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id])
502 ).group_chat_id
503 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
504 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
505 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
506 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
508 with conversations_session(token3) as c:
509 group_chat_id = c.CreateGroupChat(
510 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
511 ).group_chat_id
512 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5"))
513 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6"))
515 process_send_message_notifications(empty_pb2.Empty())
517 # no emails sent out
518 with session_scope() as session:
519 assert (
520 session.execute(
521 select(func.count())
522 .select_from(BackgroundJob)
523 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
524 ).scalar_one()
525 == 0
526 )
528 # this should generate emails for both user2 and user3
529 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
530 process_send_message_notifications(empty_pb2.Empty())
532 with session_scope() as session:
533 assert (
534 session.execute(
535 select(func.count())
536 .select_from(BackgroundJob)
537 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
538 ).scalar_one()
539 == 2
540 )
541 # delete them all
542 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
544 # shouldn't generate any more emails
545 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
546 process_send_message_notifications(empty_pb2.Empty())
548 with session_scope() as session:
549 assert (
550 session.execute(
551 select(func.count())
552 .select_from(BackgroundJob)
553 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
554 ).scalar_one()
555 == 0
556 )
559def test_process_send_message_notifications_muted(db):
560 user1, token1 = generate_user()
561 user2, token2 = generate_user()
562 user3, token3 = generate_user()
564 make_friends(user1, user2)
565 make_friends(user1, user3)
566 make_friends(user2, user3)
568 process_send_message_notifications(empty_pb2.Empty())
570 # should find no jobs, since there's no messages
571 with session_scope() as session:
572 assert (
573 session.execute(
574 select(func.count())
575 .select_from(BackgroundJob)
576 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
577 ).scalar_one()
578 == 0
579 )
581 with conversations_session(token1) as c:
582 group_chat_id = c.CreateGroupChat(
583 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id])
584 ).group_chat_id
586 with conversations_session(token3) as c:
587 # mute it for user 3
588 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True))
590 with conversations_session(token1) as c:
591 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
592 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
593 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
594 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
596 with conversations_session(token3) as c:
597 group_chat_id = c.CreateGroupChat(
598 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
599 ).group_chat_id
600 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5"))
601 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6"))
603 process_send_message_notifications(empty_pb2.Empty())
605 # no emails sent out
606 with session_scope() as session:
607 assert (
608 session.execute(
609 select(func.count())
610 .select_from(BackgroundJob)
611 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
612 ).scalar_one()
613 == 0
614 )
616 # this should generate emails for both user2 and NOT user3
617 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
618 process_send_message_notifications(empty_pb2.Empty())
620 with session_scope() as session:
621 assert (
622 session.execute(
623 select(func.count())
624 .select_from(BackgroundJob)
625 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
626 ).scalar_one()
627 == 1
628 )
629 # delete them all
630 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
632 # shouldn't generate any more emails
633 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
634 process_send_message_notifications(empty_pb2.Empty())
636 with session_scope() as session:
637 assert (
638 session.execute(
639 select(func.count())
640 .select_from(BackgroundJob)
641 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
642 ).scalar_one()
643 == 0
644 )
647def test_process_send_request_notifications_host_request(db):
648 user1, token1 = generate_user()
649 user2, token2 = generate_user()
651 today_plus_2 = (today() + timedelta(days=2)).isoformat()
652 today_plus_3 = (today() + timedelta(days=3)).isoformat()
654 process_send_request_notifications(empty_pb2.Empty())
656 # should find no jobs, since there's no messages
657 with session_scope() as session:
658 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
660 # first test that sending host request creates email
661 with requests_session(token1) as requests:
662 host_request_id = requests.CreateHostRequest(
663 requests_pb2.CreateHostRequestReq(
664 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request"
665 )
666 ).host_request_id
668 with session_scope() as session:
669 # delete send_email BackgroundJob created by CreateHostRequest
670 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
672 # check process_send_request_notifications successfully creates background job
673 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
674 process_send_request_notifications(empty_pb2.Empty())
675 assert (
676 session.execute(
677 select(func.count())
678 .select_from(BackgroundJob)
679 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
680 ).scalar_one()
681 == 1
682 )
684 # delete all BackgroundJobs
685 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
687 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
688 process_send_request_notifications(empty_pb2.Empty())
689 # should find no messages since host has already been notified
690 assert (
691 session.execute(
692 select(func.count())
693 .select_from(BackgroundJob)
694 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
695 ).scalar_one()
696 == 0
697 )
699 # then test that responding to host request creates email
700 with requests_session(token2) as requests:
701 requests.RespondHostRequest(
702 requests_pb2.RespondHostRequestReq(
703 host_request_id=host_request_id,
704 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
705 text="Test request",
706 )
707 )
709 with session_scope() as session:
710 # delete send_email BackgroundJob created by RespondHostRequest
711 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
713 # check process_send_request_notifications successfully creates background job
714 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
715 process_send_request_notifications(empty_pb2.Empty())
716 assert (
717 session.execute(
718 select(func.count())
719 .select_from(BackgroundJob)
720 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
721 ).scalar_one()
722 == 1
723 )
725 # delete all BackgroundJobs
726 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
728 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
729 process_send_request_notifications(empty_pb2.Empty())
730 # should find no messages since guest has already been notified
731 assert (
732 session.execute(
733 select(func.count())
734 .select_from(BackgroundJob)
735 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
736 ).scalar_one()
737 == 0
738 )
741def test_process_send_message_notifications_seen(db):
742 user1, token1 = generate_user()
743 user2, token2 = generate_user()
745 make_friends(user1, user2)
747 process_send_message_notifications(empty_pb2.Empty())
749 # should find no jobs, since there's no messages
750 with session_scope() as session:
751 assert (
752 session.execute(
753 select(func.count())
754 .select_from(BackgroundJob)
755 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
756 ).scalar_one()
757 == 0
758 )
760 with conversations_session(token1) as c:
761 group_chat_id = c.CreateGroupChat(
762 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
763 ).group_chat_id
764 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
765 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
766 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
767 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
769 # user 2 now marks those messages as seen
770 with conversations_session(token2) as c:
771 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id
772 c.MarkLastSeenGroupChat(
773 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id)
774 )
776 process_send_message_notifications(empty_pb2.Empty())
778 # no emails sent out
779 with session_scope() as session:
780 assert (
781 session.execute(
782 select(func.count())
783 .select_from(BackgroundJob)
784 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
785 ).scalar_one()
786 == 0
787 )
789 def now_30_min_in_future():
790 return now() + timedelta(minutes=30)
792 # still shouldn't generate emails as user2 has seen all messages
793 with patch("couchers.jobs.handlers.now", now_30_min_in_future):
794 process_send_message_notifications(empty_pb2.Empty())
796 with session_scope() as session:
797 assert (
798 session.execute(
799 select(func.count())
800 .select_from(BackgroundJob)
801 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
802 ).scalar_one()
803 == 0
804 )
807def test_process_send_onboarding_emails(db):
808 # needs to get first onboarding email
809 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None)
811 process_send_onboarding_emails(empty_pb2.Empty())
813 with session_scope() as session:
814 assert (
815 session.execute(
816 select(func.count())
817 .select_from(BackgroundJob)
818 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
819 ).scalar_one()
820 == 1
821 )
823 # needs to get second onboarding email, but not yet
824 user2, token2 = generate_user(onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6))
826 process_send_onboarding_emails(empty_pb2.Empty())
828 with session_scope() as session:
829 assert (
830 session.execute(
831 select(func.count())
832 .select_from(BackgroundJob)
833 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
834 ).scalar_one()
835 == 1
836 )
838 # needs to get second onboarding email
839 user3, token3 = generate_user(onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8))
841 process_send_onboarding_emails(empty_pb2.Empty())
843 with session_scope() as session:
844 assert (
845 session.execute(
846 select(func.count())
847 .select_from(BackgroundJob)
848 .where(BackgroundJob.job_type == BackgroundJobType.send_email)
849 ).scalar_one()
850 == 2
851 )
854def test_process_send_reference_reminders(db):
855 # need to test:
856 # case 1: bidirectional (no emails)
857 # case 2: host left ref (surfer needs an email)
858 # case 3: surfer left ref (host needs an email)
859 # case 4: neither left ref (host & surfer need an email)
861 process_send_reference_reminders(empty_pb2.Empty())
863 # case 1: bidirectional (no emails)
864 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1")
865 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2")
867 # case 2: host left ref (surfer needs an email)
868 # host
869 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3")
870 # surfer
871 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4")
873 # case 3: surfer left ref (host needs an email)
874 # host
875 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5")
876 # surfer
877 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6")
879 # case 4: neither left ref (host & surfer need an email)
880 # host
881 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7")
882 # surfer
883 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8")
885 # case 5: neither left ref, but host blocked surfer, so neither should get an email
886 # host
887 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9")
888 # surfer
889 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10")
891 make_user_block(user9, user10)
893 with session_scope() as session:
894 # note that create_host_reference creates a host request whose age is one day older than the timedelta here
896 # case 1: bidirectional (no emails)
897 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True)
898 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1)
900 # case 2: host left ref (surfer needs an email)
901 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False)
903 # case 3: surfer left ref (host needs an email)
904 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True)
906 # case 4: neither left ref (host & surfer need an email)
907 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4))
909 # case 5: neither left ref, but host blocked surfer, so neither should get an email
910 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7))
912 expected_emails = [
913 ("user4@couchers.org.invalid", "[TEST] You have 3 days to write a reference for User 3!"),
914 ("user5@couchers.org.invalid", "[TEST] You have 7 days to write a reference for User 6!"),
915 ("user7@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 8!"),
916 ("user8@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 7!"),
917 ]
919 process_send_reference_reminders(empty_pb2.Empty())
921 while process_job():
922 pass
924 with session_scope() as session:
925 emails = [
926 (email.recipient, email.subject)
927 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all()
928 ]
930 print(emails)
931 print(expected_emails)
933 assert emails == expected_emails
936def test_process_add_users_to_email_list(db):
937 new_config = config.copy()
938 new_config["MAILCHIMP_ENABLED"] = True
939 new_config["MAILCHIMP_API_KEY"] = "dummy_api_key"
940 new_config["MAILCHIMP_DC"] = "dc99"
941 new_config["MAILCHIMP_LIST_ID"] = "dummy_list_id"
943 with patch("couchers.config.config", new_config):
944 with patch("couchers.jobs.handlers.requests.post") as mock:
945 process_add_users_to_email_list(empty_pb2.Empty())
946 mock.assert_not_called()
948 generate_user(added_to_mailing_list=False, email="testing1@couchers.invalid", name="Tester1")
949 generate_user(added_to_mailing_list=True, email="testing2@couchers.invalid", name="Tester2")
950 generate_user(added_to_mailing_list=False, email="testing3@couchers.invalid", name="Tester3 von test")
952 with patch("couchers.jobs.handlers.requests.post") as mock:
953 ret = mock.return_value
954 ret.status_code = 200
955 process_add_users_to_email_list(empty_pb2.Empty())
957 mock.assert_called_once_with(
958 "https://dc99.api.mailchimp.com/3.0/lists/dummy_list_id",
959 auth=("apikey", "dummy_api_key"),
960 json={
961 "members": [
962 {
963 "email_address": "testing1@couchers.invalid",
964 "status_if_new": "subscribed",
965 "status": "subscribed",
966 "merge_fields": {
967 "FNAME": "Tester1",
968 },
969 },
970 {
971 "email_address": "testing3@couchers.invalid",
972 "status_if_new": "subscribed",
973 "status": "subscribed",
974 "merge_fields": {
975 "FNAME": "Tester3 von test",
976 },
977 },
978 ]
979 },
980 )
982 with patch("couchers.jobs.handlers.requests.post") as mock:
983 process_add_users_to_email_list(empty_pb2.Empty())
984 mock.assert_not_called()
987def test_process_update_recommendation_scores(db):
988 process_update_recommendation_scores(empty_pb2.Empty())