Coverage for src/tests/test_bg_jobs.py: 99%
492 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1from datetime import timedelta
2from unittest.mock import call, patch
4import pytest
5import requests
6from google.protobuf import empty_pb2
7from google.protobuf.empty_pb2 import Empty
8from sqlalchemy.sql import delete, func
10import couchers.jobs.worker
11from couchers.config import config
12from couchers.constants import HOST_REQUEST_MAX_REMINDERS, HOST_REQUEST_REMINDER_INTERVAL
13from couchers.crypto import urlsafe_secure_token
14from couchers.db import session_scope
15from couchers.email import queue_email
16from couchers.email.dev import print_dev_email
17from couchers.jobs import handlers
18from couchers.jobs.enqueue import queue_job
19from couchers.jobs.handlers import (
20 add_users_to_email_list,
21 send_host_request_reminders,
22 send_message_notifications,
23 send_onboarding_emails,
24 send_reference_reminders,
25 send_request_notifications,
26 update_badges,
27 update_recommendation_scores,
28)
29from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs
30from couchers.metrics import create_prometheus_server
31from couchers.models import (
32 AccountDeletionToken,
33 BackgroundJob,
34 BackgroundJobState,
35 Email,
36 HostRequestStatus,
37 LoginToken,
38 Message,
39 MessageType,
40 PasswordResetToken,
41 UserBadge,
42)
43from couchers.sql import couchers_select as select
44from couchers.utils import now, today
45from proto import conversations_pb2, requests_pb2
46from tests.test_fixtures import ( # noqa
47 auth_api_session,
48 conversations_session,
49 db,
50 generate_user,
51 make_friends,
52 make_user_block,
53 process_jobs,
54 push_collector,
55 requests_session,
56 testconfig,
57)
58from tests.test_references import create_host_reference, create_host_request, create_host_request_by_date
61def now_5_min_in_future():
62 return now() + timedelta(minutes=5)
65@pytest.fixture(autouse=True)
66def _(testconfig):
67 pass
70def _check_job_counter(job, status, attempt, exception):
71 metrics_string = requests.get("http://localhost:8000").text
72 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"'
73 assert string_to_check in metrics_string
76def test_email_job(db):
77 with session_scope() as session:
78 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html")
80 def mock_print_dev_email(
81 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data
82 ):
83 assert sender_name == "sender_name"
84 assert sender_email == "sender_email"
85 assert recipient == "recipient"
86 assert subject == "subject"
87 assert plain == "plain"
88 assert html == "html"
89 return print_dev_email(
90 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data
91 )
93 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email):
94 process_job()
96 with session_scope() as session:
97 assert (
98 session.execute(
99 select(func.count())
100 .select_from(BackgroundJob)
101 .where(BackgroundJob.state == BackgroundJobState.completed)
102 ).scalar_one()
103 == 1
104 )
105 assert (
106 session.execute(
107 select(func.count())
108 .select_from(BackgroundJob)
109 .where(BackgroundJob.state != BackgroundJobState.completed)
110 ).scalar_one()
111 == 0
112 )
115def test_purge_login_tokens(db):
116 user, api_token = generate_user()
118 with session_scope() as session:
119 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now())
120 session.add(login_token)
121 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1
123 queue_job(session, "purge_login_tokens", empty_pb2.Empty())
124 process_job()
126 with session_scope() as session:
127 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0
129 with session_scope() as session:
130 assert (
131 session.execute(
132 select(func.count())
133 .select_from(BackgroundJob)
134 .where(BackgroundJob.state == BackgroundJobState.completed)
135 ).scalar_one()
136 == 1
137 )
138 assert (
139 session.execute(
140 select(func.count())
141 .select_from(BackgroundJob)
142 .where(BackgroundJob.state != BackgroundJobState.completed)
143 ).scalar_one()
144 == 0
145 )
148def test_purge_password_reset_tokens(db):
149 user, api_token = generate_user()
151 with session_scope() as session:
152 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now())
153 session.add(password_reset_token)
154 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1
156 queue_job(session, "purge_password_reset_tokens", empty_pb2.Empty())
157 process_job()
159 with session_scope() as session:
160 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0
162 with session_scope() as session:
163 assert (
164 session.execute(
165 select(func.count())
166 .select_from(BackgroundJob)
167 .where(BackgroundJob.state == BackgroundJobState.completed)
168 ).scalar_one()
169 == 1
170 )
171 assert (
172 session.execute(
173 select(func.count())
174 .select_from(BackgroundJob)
175 .where(BackgroundJob.state != BackgroundJobState.completed)
176 ).scalar_one()
177 == 0
178 )
181def test_purge_account_deletion_tokens(db):
182 user, api_token = generate_user()
183 user2, api_token2 = generate_user()
184 user3, api_token3 = generate_user()
186 with session_scope() as session:
187 """
188 3 cases:
189 1) Token is valid
190 2) Token expired but account retrievable
191 3) Account is irretrievable (and expired)
192 """
193 account_deletion_tokens = [
194 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)),
195 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()),
196 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)),
197 ]
198 for token in account_deletion_tokens:
199 session.add(token)
200 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
202 queue_job(session, "purge_account_deletion_tokens", empty_pb2.Empty())
203 process_job()
205 with session_scope() as session:
206 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1
208 with session_scope() as session:
209 assert (
210 session.execute(
211 select(func.count())
212 .select_from(BackgroundJob)
213 .where(BackgroundJob.state == BackgroundJobState.completed)
214 ).scalar_one()
215 == 1
216 )
217 assert (
218 session.execute(
219 select(func.count())
220 .select_from(BackgroundJob)
221 .where(BackgroundJob.state != BackgroundJobState.completed)
222 ).scalar_one()
223 == 0
224 )
227def test_enforce_community_memberships(db):
228 with session_scope() as session:
229 queue_job(session, "enforce_community_membership", empty_pb2.Empty())
230 process_job()
232 with session_scope() as session:
233 assert (
234 session.execute(
235 select(func.count())
236 .select_from(BackgroundJob)
237 .where(BackgroundJob.state == BackgroundJobState.completed)
238 ).scalar_one()
239 == 1
240 )
241 assert (
242 session.execute(
243 select(func.count())
244 .select_from(BackgroundJob)
245 .where(BackgroundJob.state != BackgroundJobState.completed)
246 ).scalar_one()
247 == 0
248 )
251def test_refresh_materialized_views(db):
252 with session_scope() as session:
253 queue_job(session, "refresh_materialized_views", empty_pb2.Empty())
255 process_job()
257 with session_scope() as session:
258 assert (
259 session.execute(
260 select(func.count())
261 .select_from(BackgroundJob)
262 .where(BackgroundJob.state == BackgroundJobState.completed)
263 ).scalar_one()
264 == 1
265 )
266 assert (
267 session.execute(
268 select(func.count())
269 .select_from(BackgroundJob)
270 .where(BackgroundJob.state != BackgroundJobState.completed)
271 ).scalar_one()
272 == 0
273 )
276def test_service_jobs(db):
277 with session_scope() as session:
278 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html")
280 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise
281 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left
282 class HitSleep(Exception):
283 pass
285 # the mock `sleep` function that instead raises the aforementioned exception
286 def raising_sleep(seconds):
287 raise HitSleep()
289 with pytest.raises(HitSleep):
290 with patch("couchers.jobs.worker.sleep", raising_sleep):
291 service_jobs()
293 with session_scope() as session:
294 assert (
295 session.execute(
296 select(func.count())
297 .select_from(BackgroundJob)
298 .where(BackgroundJob.state == BackgroundJobState.completed)
299 ).scalar_one()
300 == 1
301 )
302 assert (
303 session.execute(
304 select(func.count())
305 .select_from(BackgroundJob)
306 .where(BackgroundJob.state != BackgroundJobState.completed)
307 ).scalar_one()
308 == 0
309 )
312def test_scheduler(db, monkeypatch):
313 MOCK_SCHEDULE = [
314 ("purge_login_tokens", timedelta(seconds=7)),
315 ("send_message_notifications", timedelta(seconds=11)),
316 ]
318 current_time = 0
319 end_time = 70
321 class EndOfTime(Exception):
322 pass
324 def mock_monotonic():
325 nonlocal current_time
326 return current_time
328 def mock_sleep(seconds):
329 nonlocal current_time
330 current_time += seconds
331 if current_time > end_time:
332 raise EndOfTime()
334 realized_schedule = []
336 def mock_run_job_and_schedule(sched, schedule_id):
337 nonlocal current_time
338 realized_schedule.append((current_time, schedule_id))
339 _run_job_and_schedule(sched, schedule_id)
341 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule)
342 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE)
343 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic)
344 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep)
346 with pytest.raises(EndOfTime):
347 run_scheduler()
349 assert realized_schedule == [
350 (0.0, 0),
351 (0.0, 1),
352 (7.0, 0),
353 (11.0, 1),
354 (14.0, 0),
355 (21.0, 0),
356 (22.0, 1),
357 (28.0, 0),
358 (33.0, 1),
359 (35.0, 0),
360 (42.0, 0),
361 (44.0, 1),
362 (49.0, 0),
363 (55.0, 1),
364 (56.0, 0),
365 (63.0, 0),
366 (66.0, 1),
367 (70.0, 0),
368 ]
370 with session_scope() as session:
371 assert (
372 session.execute(
373 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending)
374 ).scalar_one()
375 == 18
376 )
377 assert (
378 session.execute(
379 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending)
380 ).scalar_one()
381 == 0
382 )
385def test_job_retry(db):
386 with session_scope() as session:
387 queue_job(session, "mock_job", empty_pb2.Empty())
389 called_count = 0
391 def mock_job(payload):
392 nonlocal called_count
393 called_count += 1
394 raise Exception()
396 MOCK_JOBS = {
397 "mock_job": (empty_pb2.Empty, mock_job),
398 }
399 create_prometheus_server(port=8000)
401 # if IN_TEST is true, then the bg worker will raise on exceptions
402 new_config = config.copy()
403 new_config["IN_TEST"] = False
405 with patch("couchers.jobs.worker.config", new_config), patch("couchers.jobs.worker.JOBS", MOCK_JOBS):
406 process_job()
407 with session_scope() as session:
408 assert (
409 session.execute(
410 select(func.count())
411 .select_from(BackgroundJob)
412 .where(BackgroundJob.state == BackgroundJobState.error)
413 ).scalar_one()
414 == 1
415 )
416 assert (
417 session.execute(
418 select(func.count())
419 .select_from(BackgroundJob)
420 .where(BackgroundJob.state != BackgroundJobState.error)
421 ).scalar_one()
422 == 0
423 )
425 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
426 process_job()
427 with session_scope() as session:
428 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
429 process_job()
430 with session_scope() as session:
431 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
432 process_job()
433 with session_scope() as session:
434 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now()
435 process_job()
437 with session_scope() as session:
438 assert (
439 session.execute(
440 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed)
441 ).scalar_one()
442 == 1
443 )
444 assert (
445 session.execute(
446 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed)
447 ).scalar_one()
448 == 0
449 )
451 _check_job_counter("mock_job", "error", "4", "Exception")
452 _check_job_counter("mock_job", "failed", "5", "Exception")
455def test_no_jobs_no_problem(db):
456 with session_scope() as session:
457 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
459 assert not process_job()
461 with session_scope() as session:
462 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
465def test_send_message_notifications_basic(db):
466 user1, token1 = generate_user()
467 user2, token2 = generate_user()
468 user3, token3 = generate_user()
470 make_friends(user1, user2)
471 make_friends(user1, user3)
472 make_friends(user2, user3)
474 send_message_notifications(empty_pb2.Empty())
475 process_jobs()
477 # should find no jobs, since there's no messages
478 with session_scope() as session:
479 assert (
480 session.execute(
481 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
482 ).scalar_one()
483 == 0
484 )
486 with conversations_session(token1) as c:
487 group_chat_id = c.CreateGroupChat(
488 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id])
489 ).group_chat_id
490 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
491 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
492 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
493 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
495 with conversations_session(token3) as c:
496 group_chat_id = c.CreateGroupChat(
497 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
498 ).group_chat_id
499 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5"))
500 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6"))
502 send_message_notifications(empty_pb2.Empty())
503 process_jobs()
505 # no emails sent out
506 with session_scope() as session:
507 assert (
508 session.execute(
509 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
510 ).scalar_one()
511 == 0
512 )
514 # this should generate emails for both user2 and user3
515 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
516 send_message_notifications(empty_pb2.Empty())
517 process_jobs()
519 with session_scope() as session:
520 assert (
521 session.execute(
522 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
523 ).scalar_one()
524 == 2
525 )
526 # delete them all
527 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
529 # shouldn't generate any more emails
530 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
531 send_message_notifications(empty_pb2.Empty())
532 process_jobs()
534 with session_scope() as session:
535 assert (
536 session.execute(
537 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
538 ).scalar_one()
539 == 0
540 )
543def test_send_message_notifications_muted(db):
544 user1, token1 = generate_user()
545 user2, token2 = generate_user()
546 user3, token3 = generate_user()
548 make_friends(user1, user2)
549 make_friends(user1, user3)
550 make_friends(user2, user3)
552 send_message_notifications(empty_pb2.Empty())
553 process_jobs()
555 # should find no jobs, since there's no messages
556 with session_scope() as session:
557 assert (
558 session.execute(
559 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
560 ).scalar_one()
561 == 0
562 )
564 with conversations_session(token1) as c:
565 group_chat_id = c.CreateGroupChat(
566 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id])
567 ).group_chat_id
569 with conversations_session(token3) as c:
570 # mute it for user 3
571 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True))
573 with conversations_session(token1) as c:
574 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
575 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
576 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
577 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
579 with conversations_session(token3) as c:
580 group_chat_id = c.CreateGroupChat(
581 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
582 ).group_chat_id
583 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5"))
584 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6"))
586 send_message_notifications(empty_pb2.Empty())
587 process_jobs()
589 # no emails sent out
590 with session_scope() as session:
591 assert (
592 session.execute(
593 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
594 ).scalar_one()
595 == 0
596 )
598 # this should generate emails for both user2 and NOT user3
599 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
600 send_message_notifications(empty_pb2.Empty())
601 process_jobs()
603 with session_scope() as session:
604 assert (
605 session.execute(
606 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
607 ).scalar_one()
608 == 1
609 )
610 # delete them all
611 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
613 # shouldn't generate any more emails
614 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
615 send_message_notifications(empty_pb2.Empty())
616 process_jobs()
618 with session_scope() as session:
619 assert (
620 session.execute(
621 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
622 ).scalar_one()
623 == 0
624 )
627def test_send_request_notifications_host_request(db):
628 user1, token1 = generate_user()
629 user2, token2 = generate_user()
631 today_plus_2 = (today() + timedelta(days=2)).isoformat()
632 today_plus_3 = (today() + timedelta(days=3)).isoformat()
634 send_request_notifications(empty_pb2.Empty())
635 process_jobs()
637 # should find no jobs, since there's no messages
638 with session_scope() as session:
639 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0
641 # first test that sending host request creates email
642 with requests_session(token1) as requests:
643 host_request_id = requests.CreateHostRequest(
644 requests_pb2.CreateHostRequestReq(
645 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request"
646 )
647 ).host_request_id
649 with session_scope() as session:
650 # delete send_email BackgroundJob created by CreateHostRequest
651 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
653 # check send_request_notifications successfully creates background job
654 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
655 send_request_notifications(empty_pb2.Empty())
656 process_jobs()
657 assert (
658 session.execute(
659 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
660 ).scalar_one()
661 == 1
662 )
664 # delete all BackgroundJobs
665 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
667 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
668 send_request_notifications(empty_pb2.Empty())
669 process_jobs()
670 # should find no messages since host has already been notified
671 assert (
672 session.execute(
673 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
674 ).scalar_one()
675 == 0
676 )
678 # then test that responding to host request creates email
679 with requests_session(token2) as requests:
680 requests.RespondHostRequest(
681 requests_pb2.RespondHostRequestReq(
682 host_request_id=host_request_id,
683 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
684 text="Test request",
685 )
686 )
688 with session_scope() as session:
689 # delete send_email BackgroundJob created by RespondHostRequest
690 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
692 # check send_request_notifications successfully creates background job
693 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
694 send_request_notifications(empty_pb2.Empty())
695 process_jobs()
696 assert (
697 session.execute(
698 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
699 ).scalar_one()
700 == 1
701 )
703 # delete all BackgroundJobs
704 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False))
706 with patch("couchers.jobs.handlers.now", now_5_min_in_future):
707 send_request_notifications(empty_pb2.Empty())
708 process_jobs()
709 # should find no messages since guest has already been notified
710 assert (
711 session.execute(
712 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
713 ).scalar_one()
714 == 0
715 )
718def test_send_message_notifications_seen(db):
719 user1, token1 = generate_user()
720 user2, token2 = generate_user()
722 make_friends(user1, user2)
724 send_message_notifications(empty_pb2.Empty())
726 # should find no jobs, since there's no messages
727 with session_scope() as session:
728 assert (
729 session.execute(
730 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
731 ).scalar_one()
732 == 0
733 )
735 with conversations_session(token1) as c:
736 group_chat_id = c.CreateGroupChat(
737 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id])
738 ).group_chat_id
739 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1"))
740 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2"))
741 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3"))
742 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4"))
744 # user 2 now marks those messages as seen
745 with conversations_session(token2) as c:
746 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id
747 c.MarkLastSeenGroupChat(
748 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id)
749 )
751 send_message_notifications(empty_pb2.Empty())
753 # no emails sent out
754 with session_scope() as session:
755 assert (
756 session.execute(
757 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
758 ).scalar_one()
759 == 0
760 )
762 def now_30_min_in_future():
763 return now() + timedelta(minutes=30)
765 # still shouldn't generate emails as user2 has seen all messages
766 with patch("couchers.jobs.handlers.now", now_30_min_in_future):
767 send_message_notifications(empty_pb2.Empty())
769 with session_scope() as session:
770 assert (
771 session.execute(
772 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
773 ).scalar_one()
774 == 0
775 )
778def test_send_onboarding_emails(db):
779 # needs to get first onboarding email
780 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None, complete_profile=False)
782 send_onboarding_emails(empty_pb2.Empty())
783 process_jobs()
785 with session_scope() as session:
786 assert (
787 session.execute(
788 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
789 ).scalar_one()
790 == 1
791 )
793 # needs to get second onboarding email, but not yet
794 user2, token2 = generate_user(
795 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6), complete_profile=False
796 )
798 send_onboarding_emails(empty_pb2.Empty())
799 process_jobs()
801 with session_scope() as session:
802 assert (
803 session.execute(
804 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
805 ).scalar_one()
806 == 1
807 )
809 # needs to get second onboarding email
810 user3, token3 = generate_user(
811 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8), complete_profile=False
812 )
814 send_onboarding_emails(empty_pb2.Empty())
815 process_jobs()
817 with session_scope() as session:
818 assert (
819 session.execute(
820 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email")
821 ).scalar_one()
822 == 2
823 )
826def test_send_reference_reminders(db):
827 # need to test:
828 # case 1: bidirectional (no emails)
829 # case 2: host left ref (surfer needs an email)
830 # case 3: surfer left ref (host needs an email)
831 # case 4: neither left ref (host & surfer need an email)
832 # case 5: neither left ref, but host blocked surfer, so neither should get an email
833 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
835 send_reference_reminders(empty_pb2.Empty())
837 # case 1: bidirectional (no emails)
838 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1")
839 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2")
841 # case 2: host left ref (surfer needs an email)
842 # host
843 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3")
844 # surfer
845 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4")
847 # case 3: surfer left ref (host needs an email)
848 # host
849 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5")
850 # surfer
851 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6")
853 # case 4: neither left ref (host & surfer need an email)
854 # surfer
855 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7")
856 # host
857 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8")
859 # case 5: neither left ref, but host blocked surfer, so neither should get an email
860 # surfer
861 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9")
862 # host
863 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10")
865 make_user_block(user9, user10)
867 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
868 # host
869 user11, token11 = generate_user(email="user11@couchers.org.invalid", name="User 11")
870 # surfer
871 user12, token12 = generate_user(email="user12@couchers.org.invalid", name="User 12")
873 with session_scope() as session:
874 # note that create_host_reference creates a host request whose age is one day older than the timedelta here
876 # case 1: bidirectional (no emails)
877 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True)
878 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1)
880 # case 2: host left ref (surfer needs an email)
881 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False)
883 # case 3: surfer left ref (host needs an email)
884 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True)
886 # case 4: neither left ref (host & surfer need an email)
887 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4))
889 # case 5: neither left ref, but host blocked surfer, so neither should get an email
890 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7))
892 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email)
893 hr6 = create_host_request(session, user12.id, user11.id, timedelta(days=6), surfer_reason_didnt_meetup="")
895 expected_emails = [
896 (
897 "user11@couchers.org.invalid",
898 "[TEST] You have 14 days to write a reference for User 12!",
899 ("from when you hosted them", "/leave-reference/hosted/"),
900 ),
901 (
902 "user4@couchers.org.invalid",
903 "[TEST] You have 3 days to write a reference for User 3!",
904 ("from when you surfed with them", "/leave-reference/surfed/"),
905 ),
906 (
907 "user5@couchers.org.invalid",
908 "[TEST] You have 7 days to write a reference for User 6!",
909 ("from when you hosted them", "/leave-reference/hosted/"),
910 ),
911 (
912 "user7@couchers.org.invalid",
913 "[TEST] You have 14 days to write a reference for User 8!",
914 ("from when you surfed with them", "/leave-reference/surfed/"),
915 ),
916 (
917 "user8@couchers.org.invalid",
918 "[TEST] You have 14 days to write a reference for User 7!",
919 ("from when you hosted them", "/leave-reference/hosted/"),
920 ),
921 ]
923 send_reference_reminders(empty_pb2.Empty())
925 while process_job():
926 pass
928 with session_scope() as session:
929 emails = [
930 (email.recipient, email.subject, email.plain, email.html)
931 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all()
932 ]
934 actual_addresses_and_subjects = [email[:2] for email in emails]
935 expected_addresses_and_subjects = [email[:2] for email in expected_emails]
937 print(actual_addresses_and_subjects)
938 print(expected_addresses_and_subjects)
940 assert actual_addresses_and_subjects == expected_addresses_and_subjects
942 for (address, subject, plain, html), (_, _, search_strings) in zip(emails, expected_emails):
943 for find in search_strings:
944 assert find in plain, f"Expected to find string {find} in PLAIN email {subject} to {address}, didn't"
945 assert find in html, f"Expected to find string {find} in HTML email {subject} to {address}, didn't"
948def test_send_host_request_reminders(db):
949 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1")
950 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2")
951 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3")
952 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4")
953 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5")
954 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6")
955 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7")
956 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8")
957 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9")
958 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10")
959 user11, token11 = generate_user(email="user11@couchers.org.invalid", name="User 11")
960 user12, token12 = generate_user(email="user12@couchers.org.invalid", name="User 12")
961 user13, token13 = generate_user(email="user13@couchers.org.invalid", name="User 13")
962 user14, token14 = generate_user(email="user14@couchers.org.invalid", name="User 14")
964 with session_scope() as session:
965 # case 1: pending, future, interval elapsed => notify
966 hr1 = create_host_request_by_date(
967 session=session,
968 surfer_user_id=user1.id,
969 host_user_id=user2.id,
970 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1),
971 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2),
972 status=HostRequestStatus.pending,
973 host_sent_request_reminders=0,
974 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
975 )
977 # case 2: max reminders reached => do not notify
978 hr2 = create_host_request_by_date(
979 session=session,
980 surfer_user_id=user3.id,
981 host_user_id=user4.id,
982 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1),
983 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2),
984 status=HostRequestStatus.pending,
985 host_sent_request_reminders=HOST_REQUEST_MAX_REMINDERS,
986 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
987 )
989 # case 3: interval not yet elapsed => do not notify
990 hr3 = create_host_request_by_date(
991 session=session,
992 surfer_user_id=user5.id,
993 host_user_id=user6.id,
994 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1),
995 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2),
996 status=HostRequestStatus.pending,
997 host_sent_request_reminders=0,
998 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL + timedelta(hours=1),
999 )
1001 # case 4: start date is today => do not notify
1002 hr4 = create_host_request_by_date(
1003 session=session,
1004 surfer_user_id=user7.id,
1005 host_user_id=user8.id,
1006 from_date=today(),
1007 to_date=today() + timedelta(days=2),
1008 status=HostRequestStatus.pending,
1009 host_sent_request_reminders=0,
1010 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
1011 )
1013 # case 5: from_date in the past => do not notify
1014 hr5 = create_host_request_by_date(
1015 session=session,
1016 surfer_user_id=user9.id,
1017 host_user_id=user10.id,
1018 from_date=today() - timedelta(days=1),
1019 to_date=today() + timedelta(days=1),
1020 status=HostRequestStatus.pending,
1021 host_sent_request_reminders=0,
1022 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
1023 )
1025 # case 6: non-pending status => do not notify
1026 hr6 = create_host_request_by_date(
1027 session=session,
1028 surfer_user_id=user11.id,
1029 host_user_id=user12.id,
1030 from_date=today() + timedelta(days=3),
1031 to_date=today() + timedelta(days=4),
1032 status=HostRequestStatus.accepted,
1033 host_sent_request_reminders=0,
1034 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
1035 )
1037 # case 7: host already sent a message => do not notify
1038 hr7 = create_host_request_by_date(
1039 session=session,
1040 surfer_user_id=user13.id,
1041 host_user_id=user14.id,
1042 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1),
1043 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2),
1044 status=HostRequestStatus.pending,
1045 host_sent_request_reminders=0,
1046 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL,
1047 )
1049 session.add(
1050 Message(
1051 time=now(),
1052 conversation_id=hr7,
1053 author_id=user14.id,
1054 text="Looking forward to hosting you!",
1055 message_type=MessageType.text,
1056 )
1057 )
1059 send_host_request_reminders(empty_pb2.Empty())
1061 while process_job():
1062 pass
1064 with session_scope() as session:
1065 emails = [
1066 (email.recipient, email.subject, email.plain, email.html)
1067 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all()
1068 ]
1070 expected_emails = [
1071 (
1072 "user2@couchers.org.invalid",
1073 "[TEST] You have a pending host request from User 1!",
1074 ("Please respond to the request!", "User 1"),
1075 )
1076 ]
1078 actual_addresses_and_subjects = [email[:2] for email in emails]
1079 expected_addresses_and_subjects = [email[:2] for email in expected_emails]
1081 print(actual_addresses_and_subjects)
1082 print(expected_addresses_and_subjects)
1084 assert actual_addresses_and_subjects == expected_addresses_and_subjects
1086 for (address, subject, plain, html), (_, _, search_strings) in zip(emails, expected_emails):
1087 for find in search_strings:
1088 assert find in plain, f"Expected to find string {find} in PLAIN email {subject} to {address}, didn't"
1089 assert find in html, f"Expected to find string {find} in HTML email {subject} to {address}, didn't"
1092def test_add_users_to_email_list(db):
1093 new_config = config.copy()
1094 new_config["LISTMONK_ENABLED"] = True
1095 new_config["LISTMONK_BASE_URL"] = "https://example.com"
1096 new_config["LISTMONK_API_USERNAME"] = "test_user"
1097 new_config["LISTMONK_API_KEY"] = "dummy_api_key"
1098 new_config["LISTMONK_LIST_ID"] = 6
1100 with patch("couchers.jobs.handlers.config", new_config):
1101 with patch("couchers.jobs.handlers.requests.post") as mock:
1102 add_users_to_email_list(empty_pb2.Empty())
1103 mock.assert_not_called()
1105 generate_user(in_sync_with_newsletter=False, email="testing1@couchers.invalid", name="Tester1", id=15)
1106 generate_user(in_sync_with_newsletter=True, email="testing2@couchers.invalid", name="Tester2")
1107 generate_user(in_sync_with_newsletter=False, email="testing3@couchers.invalid", name="Tester3 von test", id=17)
1108 generate_user(
1109 in_sync_with_newsletter=False, email="testing4@couchers.invalid", name="Tester4", opt_out_of_newsletter=True
1110 )
1112 with patch("couchers.jobs.handlers.requests.post") as mock:
1113 ret = mock.return_value
1114 ret.status_code = 200
1115 add_users_to_email_list(empty_pb2.Empty())
1116 mock.assert_has_calls(
1117 [
1118 call(
1119 "https://example.com/api/subscribers",
1120 auth=("test_user", "dummy_api_key"),
1121 json={
1122 "email": "testing1@couchers.invalid",
1123 "name": "Tester1",
1124 "lists": [6],
1125 "preconfirm_subscriptions": True,
1126 "attribs": {"couchers_user_id": 15},
1127 "status": "enabled",
1128 },
1129 timeout=10,
1130 ),
1131 call(
1132 "https://example.com/api/subscribers",
1133 auth=("test_user", "dummy_api_key"),
1134 json={
1135 "email": "testing3@couchers.invalid",
1136 "name": "Tester3 von test",
1137 "lists": [6],
1138 "preconfirm_subscriptions": True,
1139 "attribs": {"couchers_user_id": 17},
1140 "status": "enabled",
1141 },
1142 timeout=10,
1143 ),
1144 ],
1145 any_order=True,
1146 )
1148 with patch("couchers.jobs.handlers.requests.post") as mock:
1149 add_users_to_email_list(empty_pb2.Empty())
1150 mock.assert_not_called()
1153def test_update_recommendation_scores(db):
1154 update_recommendation_scores(empty_pb2.Empty())
1157def test_update_badges(db, push_collector):
1158 user1, _ = generate_user()
1159 user2, _ = generate_user()
1160 user3, _ = generate_user()
1161 user4, _ = generate_user(phone="+15555555555", phone_verification_verified=func.now())
1162 user5, _ = generate_user(phone="+15555555556", phone_verification_verified=func.now())
1163 user6, _ = generate_user()
1165 with session_scope() as session:
1166 session.add(UserBadge(user_id=user5.id, badge_id="board_member"))
1168 update_badges(empty_pb2.Empty())
1169 process_jobs()
1171 with session_scope() as session:
1172 badge_tuples = session.execute(
1173 select(UserBadge.user_id, UserBadge.badge_id).order_by(UserBadge.user_id.asc(), UserBadge.id.asc())
1174 ).all()
1176 expected = [
1177 (user1.id, "founder"),
1178 (user1.id, "board_member"),
1179 (user2.id, "founder"),
1180 (user2.id, "board_member"),
1181 (user4.id, "phone_verified"),
1182 (user5.id, "phone_verified"),
1183 ]
1185 assert badge_tuples == expected
1187 print(push_collector.pushes)
1189 push_collector.assert_user_push_matches_fields(
1190 user1.id,
1191 ix=0,
1192 title="The Founder badge was added to your profile",
1193 body="Check out your profile to see the new badge!",
1194 )
1195 push_collector.assert_user_push_matches_fields(
1196 user1.id,
1197 ix=1,
1198 title="The Board Member badge was added to your profile",
1199 body="Check out your profile to see the new badge!",
1200 )
1201 push_collector.assert_user_push_matches_fields(
1202 user2.id,
1203 ix=0,
1204 title="The Founder badge was added to your profile",
1205 body="Check out your profile to see the new badge!",
1206 )
1207 push_collector.assert_user_push_matches_fields(
1208 user2.id,
1209 ix=1,
1210 title="The Board Member badge was added to your profile",
1211 body="Check out your profile to see the new badge!",
1212 )
1213 push_collector.assert_user_push_matches_fields(
1214 user4.id,
1215 ix=0,
1216 title="The Verified Phone badge was added to your profile",
1217 body="Check out your profile to see the new badge!",
1218 )
1219 push_collector.assert_user_push_matches_fields(
1220 user5.id,
1221 ix=0,
1222 title="The Board Member badge was removed from your profile",
1223 body="You can see all your badges on your profile.",
1224 )
1225 push_collector.assert_user_push_matches_fields(
1226 user5.id,
1227 ix=1,
1228 title="The Verified Phone badge was added to your profile",
1229 body="Check out your profile to see the new badge!",
1230 )
1233def test_send_message_notifications_empty_unseen_simple(monkeypatch):
1234 class DummyUser:
1235 id = 1
1236 is_visible = True
1237 last_notified_message_id = 0
1239 class FirstResult:
1240 def scalars(self):
1241 return self
1243 def unique(self):
1244 return [DummyUser()]
1246 class SecondResult:
1247 def all(self):
1248 return []
1250 class DummySession:
1251 def __init__(self):
1252 self.calls = 0
1254 def execute(self, *a, **k):
1255 self.calls += 1
1256 return FirstResult() if self.calls == 1 else SecondResult()
1258 def commit(self):
1259 pass
1261 def flush(self):
1262 pass
1264 def fake_session_scope():
1265 class Ctx:
1266 def __enter__(self):
1267 return DummySession()
1269 def __exit__(self, exc_type, exc, tb):
1270 pass
1272 return Ctx()
1274 monkeypatch.setattr(handlers, "session_scope", fake_session_scope)
1276 handlers.send_message_notifications(Empty())