Coverage for src/tests/test_bg_jobs.py: 100%
418 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1from datetime import timedelta
2from unittest.mock import call, patch
4import pytest
5import requests
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import delete, func
9import couchers.jobs.worker
10from couchers.config import config
11from couchers.crypto import urlsafe_secure_token
12from couchers.db import session_scope
13from couchers.email import queue_email
14from couchers.email.dev import print_dev_email
15from couchers.jobs.enqueue import queue_job
16from couchers.jobs.handlers import (
17 add_users_to_email_list,
18 send_message_notifications,
19 send_onboarding_emails,
20 send_reference_reminders,
21 send_request_notifications,
22 update_badges,
23 update_recommendation_scores,
24)
25from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs
26from couchers.metrics import create_prometheus_server, job_process_registry
27from couchers.models import (
28 AccountDeletionToken,
29 BackgroundJob,
30 BackgroundJobState,
31 Email,
32 LoginToken,
33 PasswordResetToken,
34 UserBadge,
35)
36from couchers.sql import couchers_select as select
37from couchers.utils import now, today
38from proto import conversations_pb2, requests_pb2
39from tests.test_fixtures import ( # noqa
40 auth_api_session,
41 conversations_session,
42 db,
43 generate_user,
44 make_friends,
45 make_user_block,
46 process_jobs,
47 push_collector,
48 requests_session,
49 testconfig,
50)
51from tests.test_references import create_host_reference, create_host_request
54def now_5_min_in_future():
55 return now() + timedelta(minutes=5)
58@pytest.fixture(autouse=True)
59def _(testconfig):
60 pass
63def _check_job_counter(job, status, attempt, exception):
64 metrics_string = requests.get("http://localhost:8001").text
65 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"'
66 assert string_to_check in metrics_string
69def test_email_job(db):
70 with session_scope() as session:
71 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html")
73 def mock_print_dev_email(
74 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data
75 ):
76 assert sender_name == "sender_name"
77 assert sender_email == "sender_email"
78 assert recipient == "recipient"
79 assert subject == "subject"
80 assert plain == "plain"
81 assert html == "html"
82 return print_dev_email(
83 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data
84 )
86 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email):
87 process_job()
89 with session_scope() as session:
90 assert (
91 session.execute(
92 select(func.count())
93 .select_from(BackgroundJob)
94 .where(BackgroundJob.state == BackgroundJobState.completed)
95 ).scalar_one()
96 == 1
97 )
98 assert (
99 session.execute(
100 select(func.count())
101 .select_from(BackgroundJob)
102 .where(BackgroundJob.state != BackgroundJobState.completed)
103 ).scalar_one()
104 == 0
105 )
108def test_purge_login_tokens(db):
109 user, api_token = generate_user()
111 with session_scope() as session:
112 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now())
113 session.add(login_token)
114 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1
116 queue_job(session, "purge_login_tokens", empty_pb2.Empty())
117 process_job()
119 with session_scope() as session:
120 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0
122 with session_scope() as session:
123 assert (
124 session.execute(
125 select(func.count())
126 .select_from(BackgroundJob)
127 .where(BackgroundJob.state == BackgroundJobState.completed)
128 ).scalar_one()
129 == 1
130 )
131 assert (
132 session.execute(
133 select(func.count())
134 .select_from(BackgroundJob)
135 .where(BackgroundJob.state != BackgroundJobState.completed)
136 ).scalar_one()
137 == 0
138 )
141def test_purge_password_reset_tokens(db):
142 user, api_token = generate_user()
144 with session_scope() as session:
145 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now())
146 session.add(password_reset_token)
147 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1
149 queue_job(session, "purge_password_reset_tokens", empty_pb2.Empty())
150 process_job()
152 with session_scope() as session:
153 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0
155 with session_scope() as session:
156 assert (
157 session.execute(
158 select(func.count())
159 .select_from(BackgroundJob)
160 .where(BackgroundJob.state == BackgroundJobState.completed)
161 ).scalar_one()
162 == 1
163 )
164 assert (
165 session.execute(
166 select(func.count())
167 .select_from(BackgroundJob)
168 .where(BackgroundJob.state != BackgroundJobState.completed)
169 ).scalar_one()
170 == 0
171 )
174def test_purge_account_deletion_tokens(db):
175 user, api_token = generate_user()
176 user2, api_token2 = generate_user()
177 user3, api_token3 = generate_user()
179 with session_scope() as session:
180 """
181 3 cases:
182 1) Token is valid
183 2) Token expired but account retrievable
184 3) Account is irretrievable (and expired)
185 """
186 account_deletion_tokens = [
187 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)),
188 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()),
189 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)),
190 ]
191 for token in account_deletion_tokens:
192 session.add(token)
193 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
195 queue_job(session, "purge_account_deletion_tokens", empty_pb2.Empty())
196 process_job()
198 with session_scope() as session:
199 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1
201 with session_scope() as session:
202 assert (
203 session.execute(
204 select(func.count())
205 .select_from(BackgroundJob)
206 .where(BackgroundJob.state == BackgroundJobState.completed)
207 ).scalar_one()
208 == 1
209 )
210 assert (
211 session.execute(
212 select(func.count())
213 .select_from(BackgroundJob)
214 .where(BackgroundJob.state != BackgroundJobState.completed)
215 ).scalar_one()
216 == 0
217 )
220def test_enforce_community_memberships(db):
221 with session_scope() as session:
222 queue_job(session, "enforce_community_membership", empty_pb2.Empty())
223 process_job()
225 with session_scope() as session:
226 assert (
227 session.execute(
228 select(func.count())
229 .select_from(BackgroundJob)
230 .where(BackgroundJob.state == BackgroundJobState.completed)
231 ).scalar_one()
232 == 1
233 )
234 assert (
235 session.execute(
236 select(func.count())
237 .select_from(BackgroundJob)
238 .where(BackgroundJob.state != BackgroundJobState.completed)
239 ).scalar_one()
240 == 0
241 )
244def test_refresh_materialized_views(db):
245 with session_scope() as session:
246 queue_job(session, "refresh_materialized_views", empty_pb2.Empty())
248 process_job()
250 with session_scope() as session:
251 assert (
252 session.execute(
253 select(func.count())
254 .select_from(BackgroundJob)
255 .where(BackgroundJob.state == BackgroundJobState.completed)
256 ).scalar_one()
257 == 1
258 )
259 assert (
260 session.execute(
261 select(func.count())
262 .select_from(BackgroundJob)
263 .where(BackgroundJob.state != BackgroundJobState.completed)
264 ).scalar_one()
265 == 0
266 )
269def test_service_jobs(db):
270 with session_scope() as session:
271 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html")
273 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise
274 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left
275 class HitSleep(Exception):
276 pass
278 # the mock `sleep` function that instead raises the aforementioned exception
279 def raising_sleep(seconds):
280 raise HitSleep()
282 with pytest.raises(HitSleep):
283 with patch("couchers.jobs.worker.sleep", raising_sleep):
284 service_jobs()
286 with session_scope() as session:
287 assert (
288 session.execute(
289 select(func.count())
290 .select_from(BackgroundJob)
291 .where(BackgroundJob.state == BackgroundJobState.completed)
292 ).scalar_one()
293 == 1
294 )
295 assert (
296 session.execute(
297 select(func.count())
298 .select_from(BackgroundJob)
299 .where(BackgroundJob.state != BackgroundJobState.completed)
300 ).scalar_one()
301 == 0
302 )
305def test_scheduler(db, monkeypatch):
306 MOCK_SCHEDULE = [
307 ("purge_login_tokens", timedelta(seconds=7)),
308 ("send_message_notifications", timedelta(seconds=11)),
309 ]
311 current_time = 0
312 end_time = 70
314 class EndOfTime(Exception):
315 pass
317 def mock_monotonic():
318 nonlocal current_time
319 return current_time
321 def mock_sleep(seconds):
322 nonlocal current_time
323 current_time += seconds
324 if current_time > end_time:
325 raise EndOfTime()
327 realized_schedule = []
329 def mock_run_job_and_schedule(sched, schedule_id):
330 nonlocal current_time
331 realized_schedule.append((current_time, schedule_id))
332 _run_job_and_schedule(sched, schedule_id)
334 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule)
335 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE)
336 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic)
337 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep)
339 with pytest.raises(EndOfTime):
340 run_scheduler()
342 assert realized_schedule == [
343 (0.0, 0),
344 (0.0, 1),
345 (7.0, 0),
346 (11.0, 1),
347 (14.0, 0),
348 (21.0, 0),
349 (22.0, 1),
350 (28.0, 0),
351 (33.0, 1),
352 (35.0, 0),
353 (42.0, 0),
354 (44.0, 1),
355 (49.0, 0),
356 (55.0, 1),
357 (56.0, 0),
358 (63.0, 0),
359 (66.0, 1),
360 (70.0, 0),
361 ]
363 with session_scope() as session:
364 assert (
365 session.execute(
366 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending)
367 ).scalar_one()
368 == 18
369 )
370 assert (
371 session.execute(
372 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending)
373 ).scalar_one()
374 == 0
375 )
378def test_job_retry(db):
379 with session_scope() as session:
380 queue_job(session, "mock_job", empty_pb2.Empty())
382 called_count = 0
384 def mock_job(payload):
385 nonlocal called_count
386 called_count += 1
387 raise Exception()
389 MOCK_JOBS = {
390 "mock_job": (empty_pb2.Empty, mock_job),
391 }
392 create_prometheus_server(registry=job_process_registry, port=8001)
394 # if IN_TEST is true, then the bg worker will raise on exceptions
395 new_config = config.copy()
396 new_config["IN_TEST"] = False
398 with patch("couchers.jobs.worker.config", new_config), patch("couchers.jobs.worker.JOBS", MOCK_JOBS):
399 process_job()
400 with session_scope() as session:
401 assert (
402 session.execute(
403 select(func.count())
404 .select_from(BackgroundJob)
405 .where(BackgroundJob.state == BackgroundJobState.error)
406 ).scalar_one()
407 == 1
408 )
409 assert (
410 session.execute(
411 select(func.count())
412 .select_from(BackgroundJob)
413 .where(BackgroundJob.state != BackgroundJobState.error)
414 ).scalar_one()
415 == 0
416 )
418 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
419 process_job()
420 with session_scope() as session:
421 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
422 process_job()
423 with session_scope() as session:
424 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
425 process_job()
426 with session_scope() as session:
427 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
428 process_job()
430 with session_scope() as session:
431 assert (
432 session.execute(
433 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed)
434 ).scalar_one()
435 == 1
436 )
437 assert (
438 session.execute(
439 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed)
440 ).scalar_one()
441 == 0
442 )
444 _check_job_counter("mock_job", "error", "4", "Exception")
445 _check_job_counter("mock_job", "failed", "5", "Exception")
448def test_no_jobs_no_problem(db):
449 with session_scope() as session:
450 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
452 assert not process_job()
454 with session_scope() as session:
455 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
458def test_send_message_notifications_basic(db):
459 user1, token1 = generate_user()
460 user2, token2 = generate_user()
461 user3, token3 = generate_user()
463 make_friends(user1, user2)
464 make_friends(user1, user3)
465 make_friends(user2, user3)
467 send_message_notifications(empty_pb2.Empty())
468 process_jobs()
470 # should find no jobs, since there's no messages
471 with session_scope() as session:
472 assert (
473 session.execute(
474 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
475 ).scalar_one()
476 == 0
477 )
479 with conversations_session(token1) as c:
480 group_chat_id = c.CreateGroupChat(
481 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id])
482 ).group_chat_id
483 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
484 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
485 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
486 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
488 with conversations_session(token3) as c:
489 group_chat_id = c.CreateGroupChat(
490 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
491 ).group_chat_id
492 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5"))
493 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6"))
495 send_message_notifications(empty_pb2.Empty())
496 process_jobs()
498 # no emails sent out
499 with session_scope() as session:
500 assert (
501 session.execute(
502 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
503 ).scalar_one()
504 == 0
505 )
507 # this should generate emails for both user2 and user3
508 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
509 send_message_notifications(empty_pb2.Empty())
510 process_jobs()
512 with session_scope() as session:
513 assert (
514 session.execute(
515 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
516 ).scalar_one()
517 == 2
518 )
519 # delete them all
520 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
522 # shouldn't generate any more emails
523 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
524 send_message_notifications(empty_pb2.Empty())
525 process_jobs()
527 with session_scope() as session:
528 assert (
529 session.execute(
530 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
531 ).scalar_one()
532 == 0
533 )
536def test_send_message_notifications_muted(db):
537 user1, token1 = generate_user()
538 user2, token2 = generate_user()
539 user3, token3 = generate_user()
541 make_friends(user1, user2)
542 make_friends(user1, user3)
543 make_friends(user2, user3)
545 send_message_notifications(empty_pb2.Empty())
546 process_jobs()
548 # should find no jobs, since there's no messages
549 with session_scope() as session:
550 assert (
551 session.execute(
552 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
553 ).scalar_one()
554 == 0
555 )
557 with conversations_session(token1) as c:
558 group_chat_id = c.CreateGroupChat(
559 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id])
560 ).group_chat_id
562 with conversations_session(token3) as c:
563 # mute it for user 3
564 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True))
566 with conversations_session(token1) as c:
567 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
568 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
569 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
570 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
572 with conversations_session(token3) as c:
573 group_chat_id = c.CreateGroupChat(
574 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
575 ).group_chat_id
576 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5"))
577 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6"))
579 send_message_notifications(empty_pb2.Empty())
580 process_jobs()
582 # no emails sent out
583 with session_scope() as session:
584 assert (
585 session.execute(
586 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
587 ).scalar_one()
588 == 0
589 )
591 # this should generate emails for both user2 and NOT user3
592 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
593 send_message_notifications(empty_pb2.Empty())
594 process_jobs()
596 with session_scope() as session:
597 assert (
598 session.execute(
599 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
600 ).scalar_one()
601 == 1
602 )
603 # delete them all
604 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
606 # shouldn't generate any more emails
607 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
608 send_message_notifications(empty_pb2.Empty())
609 process_jobs()
611 with session_scope() as session:
612 assert (
613 session.execute(
614 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
615 ).scalar_one()
616 == 0
617 )
620def test_send_request_notifications_host_request(db):
621 user1, token1 = generate_user()
622 user2, token2 = generate_user()
624 today_plus_2 = (today() + timedelta(days=2)).isoformat()
625 today_plus_3 = (today() + timedelta(days=3)).isoformat()
627 send_request_notifications(empty_pb2.Empty())
628 process_jobs()
630 # should find no jobs, since there's no messages
631 with session_scope() as session:
632 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
634 # first test that sending host request creates email
635 with requests_session(token1) as requests:
636 host_request_id = requests.CreateHostRequest(
637 requests_pb2.CreateHostRequestReq(
638 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request"
639 )
640 ).host_request_id
642 with session_scope() as session:
643 # delete send_email BackgroundJob created by CreateHostRequest
644 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
646 # check send_request_notifications successfully creates background job
647 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
648 send_request_notifications(empty_pb2.Empty())
649 process_jobs()
650 assert (
651 session.execute(
652 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
653 ).scalar_one()
654 == 1
655 )
657 # delete all BackgroundJobs
658 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
660 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
661 send_request_notifications(empty_pb2.Empty())
662 process_jobs()
663 # should find no messages since host has already been notified
664 assert (
665 session.execute(
666 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
667 ).scalar_one()
668 == 0
669 )
671 # then test that responding to host request creates email
672 with requests_session(token2) as requests:
673 requests.RespondHostRequest(
674 requests_pb2.RespondHostRequestReq(
675 host_request_id=host_request_id,
676 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
677 text="Test request",
678 )
679 )
681 with session_scope() as session:
682 # delete send_email BackgroundJob created by RespondHostRequest
683 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
685 # check send_request_notifications successfully creates background job
686 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
687 send_request_notifications(empty_pb2.Empty())
688 process_jobs()
689 assert (
690 session.execute(
691 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
692 ).scalar_one()
693 == 1
694 )
696 # delete all BackgroundJobs
697 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
699 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
700 send_request_notifications(empty_pb2.Empty())
701 process_jobs()
702 # should find no messages since guest has already been notified
703 assert (
704 session.execute(
705 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
706 ).scalar_one()
707 == 0
708 )
711def test_send_message_notifications_seen(db):
712 user1, token1 = generate_user()
713 user2, token2 = generate_user()
715 make_friends(user1, user2)
717 send_message_notifications(empty_pb2.Empty())
719 # should find no jobs, since there's no messages
720 with session_scope() as session:
721 assert (
722 session.execute(
723 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
724 ).scalar_one()
725 == 0
726 )
728 with conversations_session(token1) as c:
729 group_chat_id = c.CreateGroupChat(
730 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
731 ).group_chat_id
732 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
733 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
734 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
735 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
737 # user 2 now marks those messages as seen
738 with conversations_session(token2) as c:
739 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id
740 c.MarkLastSeenGroupChat(
741 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id)
742 )
744 send_message_notifications(empty_pb2.Empty())
746 # no emails sent out
747 with session_scope() as session:
748 assert (
749 session.execute(
750 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
751 ).scalar_one()
752 == 0
753 )
755 def now_30_min_in_future():
756 return now() + timedelta(minutes=30)
758 # still shouldn't generate emails as user2 has seen all messages
759 with patch("couchers.jobs.handlers.now", now_30_min_in_future):
760 send_message_notifications(empty_pb2.Empty())
762 with session_scope() as session:
763 assert (
764 session.execute(
765 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
766 ).scalar_one()
767 == 0
768 )
771def test_send_onboarding_emails(db):
772 # needs to get first onboarding email
773 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None, complete_profile=False)
775 send_onboarding_emails(empty_pb2.Empty())
776 process_jobs()
778 with session_scope() as session:
779 assert (
780 session.execute(
781 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
782 ).scalar_one()
783 == 1
784 )
786 # needs to get second onboarding email, but not yet
787 user2, token2 = generate_user(
788 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6), complete_profile=False
789 )
791 send_onboarding_emails(empty_pb2.Empty())
792 process_jobs()
794 with session_scope() as session:
795 assert (
796 session.execute(
797 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
798 ).scalar_one()
799 == 1
800 )
802 # needs to get second onboarding email
803 user3, token3 = generate_user(
804 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8), complete_profile=False
805 )
807 send_onboarding_emails(empty_pb2.Empty())
808 process_jobs()
810 with session_scope() as session:
811 assert (
812 session.execute(
813 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
814 ).scalar_one()
815 == 2
816 )
819def test_send_reference_reminders(db):
820 # need to test:
821 # case 1: bidirectional (no emails)
822 # case 2: host left ref (surfer needs an email)
823 # case 3: surfer left ref (host needs an email)
824 # case 4: neither left ref (host & surfer need an email)
825 # case 5: neither left ref, but host blocked surfer, so neither should get an email
826 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
828 send_reference_reminders(empty_pb2.Empty())
830 # case 1: bidirectional (no emails)
831 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1")
832 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2")
834 # case 2: host left ref (surfer needs an email)
835 # host
836 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3")
837 # surfer
838 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4")
840 # case 3: surfer left ref (host needs an email)
841 # host
842 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5")
843 # surfer
844 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6")
846 # case 4: neither left ref (host & surfer need an email)
847 # surfer
848 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7")
849 # host
850 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8")
852 # case 5: neither left ref, but host blocked surfer, so neither should get an email
853 # surfer
854 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9")
855 # host
856 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10")
858 make_user_block(user9, user10)
860 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
861 # host
862 user11, token11 = generate_user(email="user11@couchers.org.invalid", name="User 11")
863 # surfer
864 user12, token12 = generate_user(email="user12@couchers.org.invalid", name="User 12")
866 with session_scope() as session:
867 # note that create_host_reference creates a host request whose age is one day older than the timedelta here
869 # case 1: bidirectional (no emails)
870 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True)
871 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1)
873 # case 2: host left ref (surfer needs an email)
874 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False)
876 # case 3: surfer left ref (host needs an email)
877 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True)
879 # case 4: neither left ref (host & surfer need an email)
880 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4))
882 # case 5: neither left ref, but host blocked surfer, so neither should get an email
883 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7))
885 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
886 hr6 = create_host_request(session, user12.id, user11.id, timedelta(days=6), surfer_reason_didnt_meetup="")
888 expected_emails = [
889 (
890 "user11@couchers.org.invalid",
891 "[TEST] You have 14 days to write a reference for User 12!",
892 ("from when you hosted them", "/leave-reference/hosted/"),
893 ),
894 (
895 "user4@couchers.org.invalid",
896 "[TEST] You have 3 days to write a reference for User 3!",
897 ("from when you surfed with them", "/leave-reference/surfed/"),
898 ),
899 (
900 "user5@couchers.org.invalid",
901 "[TEST] You have 7 days to write a reference for User 6!",
902 ("from when you hosted them", "/leave-reference/hosted/"),
903 ),
904 (
905 "user7@couchers.org.invalid",
906 "[TEST] You have 14 days to write a reference for User 8!",
907 ("from when you surfed with them", "/leave-reference/surfed/"),
908 ),
909 (
910 "user8@couchers.org.invalid",
911 "[TEST] You have 14 days to write a reference for User 7!",
912 ("from when you hosted them", "/leave-reference/hosted/"),
913 ),
914 ]
916 send_reference_reminders(empty_pb2.Empty())
918 while process_job():
919 pass
921 with session_scope() as session:
922 emails = [
923 (email.recipient, email.subject, email.plain, email.html)
924 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all()
925 ]
927 actual_addresses_and_subjects = [email[:2] for email in emails]
928 expected_addresses_and_subjects = [email[:2] for email in expected_emails]
930 print(actual_addresses_and_subjects)
931 print(expected_addresses_and_subjects)
933 assert actual_addresses_and_subjects == expected_addresses_and_subjects
935 for (address, subject, plain, html), (_, _, search_strings) in zip(emails, expected_emails):
936 for find in search_strings:
937 assert find in plain, f"Expected to find string {find} in PLAIN email {subject} to {address}, didn't"
938 assert find in html, f"Expected to find string {find} in HTML email {subject} to {address}, didn't"
941def test_add_users_to_email_list(db):
942 new_config = config.copy()
943 new_config["LISTMONK_ENABLED"] = True
944 new_config["LISTMONK_BASE_URL"] = "https://example.com"
945 new_config["LISTMONK_API_USERNAME"] = "test_user"
946 new_config["LISTMONK_API_KEY"] = "dummy_api_key"
947 new_config["LISTMONK_LIST_ID"] = 6
949 with patch("couchers.jobs.handlers.config", new_config):
950 with patch("couchers.jobs.handlers.requests.post") as mock:
951 add_users_to_email_list(empty_pb2.Empty())
952 mock.assert_not_called()
954 generate_user(in_sync_with_newsletter=False, email="testing1@couchers.invalid", name="Tester1", id=15)
955 generate_user(in_sync_with_newsletter=True, email="testing2@couchers.invalid", name="Tester2")
956 generate_user(in_sync_with_newsletter=False, email="testing3@couchers.invalid", name="Tester3 von test", id=17)
957 generate_user(
958 in_sync_with_newsletter=False, email="testing4@couchers.invalid", name="Tester4", opt_out_of_newsletter=True
959 )
961 with patch("couchers.jobs.handlers.requests.post") as mock:
962 ret = mock.return_value
963 ret.status_code = 200
964 add_users_to_email_list(empty_pb2.Empty())
965 mock.assert_has_calls(
966 [
967 call(
968 "https://example.com/api/subscribers",
969 auth=("test_user", "dummy_api_key"),
970 json={
971 "email": "testing1@couchers.invalid",
972 "name": "Tester1",
973 "lists": [6],
974 "preconfirm_subscriptions": True,
975 "attribs": {"couchers_user_id": 15},
976 "status": "enabled",
977 },
978 timeout=10,
979 ),
980 call(
981 "https://example.com/api/subscribers",
982 auth=("test_user", "dummy_api_key"),
983 json={
984 "email": "testing3@couchers.invalid",
985 "name": "Tester3 von test",
986 "lists": [6],
987 "preconfirm_subscriptions": True,
988 "attribs": {"couchers_user_id": 17},
989 "status": "enabled",
990 },
991 timeout=10,
992 ),
993 ],
994 any_order=True,
995 )
997 with patch("couchers.jobs.handlers.requests.post") as mock:
998 add_users_to_email_list(empty_pb2.Empty())
999 mock.assert_not_called()
1002def test_update_recommendation_scores(db):
1003 update_recommendation_scores(empty_pb2.Empty())
1006def test_update_badges(db, push_collector):
1007 user1, _ = generate_user()
1008 user2, _ = generate_user()
1009 user3, _ = generate_user()
1010 user4, _ = generate_user(phone="+15555555555", phone_verification_verified=func.now())
1011 user5, _ = generate_user(phone="+15555555556", phone_verification_verified=func.now())
1012 user6, _ = generate_user()
1014 with session_scope() as session:
1015 session.add(UserBadge(user_id=user5.id, badge_id="board_member"))
1017 update_badges(empty_pb2.Empty())
1018 process_jobs()
1020 with session_scope() as session:
1021 badge_tuples = session.execute(
1022 select(UserBadge.user_id, UserBadge.badge_id).order_by(UserBadge.user_id.asc(), UserBadge.id.asc())
1023 ).all()
1025 expected = [
1026 (user1.id, "founder"),
1027 (user1.id, "board_member"),
1028 (user2.id, "founder"),
1029 (user2.id, "board_member"),
1030 (user4.id, "phone_verified"),
1031 (user5.id, "phone_verified"),
1032 ]
1034 assert badge_tuples == expected
1036 print(push_collector.pushes)
1038 push_collector.assert_user_push_matches_fields(
1039 user1.id,
1040 ix=0,
1041 title="The Founder badge was added to your profile",
1042 body="Check out your profile to see the new badge!",
1043 )
1044 push_collector.assert_user_push_matches_fields(
1045 user1.id,
1046 ix=1,
1047 title="The Board Member badge was added to your profile",
1048 body="Check out your profile to see the new badge!",
1049 )
1050 push_collector.assert_user_push_matches_fields(
1051 user2.id,
1052 ix=0,
1053 title="The Founder badge was added to your profile",
1054 body="Check out your profile to see the new badge!",
1055 )
1056 push_collector.assert_user_push_matches_fields(
1057 user2.id,
1058 ix=1,
1059 title="The Board Member badge was added to your profile",
1060 body="Check out your profile to see the new badge!",
1061 )
1062 push_collector.assert_user_push_matches_fields(
1063 user4.id,
1064 ix=0,
1065 title="The Verified Phone badge was added to your profile",
1066 body="Check out your profile to see the new badge!",
1067 )
1068 push_collector.assert_user_push_matches_fields(
1069 user5.id,
1070 ix=0,
1071 title="The Board Member badge was removed from your profile",
1072 body="You can see all your badges on your profile.",
1073 )
1074 push_collector.assert_user_push_matches_fields(
1075 user5.id,
1076 ix=1,
1077 title="The Verified Phone badge was added to your profile",
1078 body="Check out your profile to see the new badge!",
1079 )