Coverage for src/tests/test_bg_jobs.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

389 statements  

1from datetime import timedelta 

2from unittest.mock import patch 

3 

4import pytest 

5import requests 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import delete, func 

8 

9import couchers.jobs.worker 

10from couchers.config import config 

11from couchers.crypto import urlsafe_secure_token 

12from couchers.db import session_scope 

13from couchers.email import queue_email 

14from couchers.email.dev import print_dev_email 

15from couchers.jobs.enqueue import queue_job 

16from couchers.jobs.handlers import ( 

17 add_users_to_email_list, 

18 send_message_notifications, 

19 send_onboarding_emails, 

20 send_reference_reminders, 

21 send_request_notifications, 

22 update_badges, 

23 update_recommendation_scores, 

24) 

25from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs 

26from couchers.metrics import create_prometheus_server, job_process_registry 

27from couchers.models import ( 

28 AccountDeletionToken, 

29 BackgroundJob, 

30 BackgroundJobState, 

31 Email, 

32 LoginToken, 

33 PasswordResetToken, 

34 UserBadge, 

35) 

36from couchers.sql import couchers_select as select 

37from couchers.tasks import send_login_email 

38from couchers.utils import now, today 

39from proto import conversations_pb2, requests_pb2 

40from tests.test_fixtures import ( # noqa 

41 auth_api_session, 

42 conversations_session, 

43 db, 

44 generate_user, 

45 make_friends, 

46 make_user_block, 

47 requests_session, 

48 testconfig, 

49) 

50from tests.test_references import create_host_reference, create_host_request 

51 

52 

53def now_5_min_in_future(): 

54 return now() + timedelta(minutes=5) 

55 

56 

57@pytest.fixture(autouse=True) 

58def _(testconfig): 

59 pass 

60 

61 

62def _check_job_counter(job, status, attempt, exception): 

63 metrics_string = requests.get("http://localhost:8001").text 

64 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"' 

65 assert string_to_check in metrics_string 

66 

67 

68def test_login_email_full(db): 

69 user, api_token = generate_user() 

70 user_email = user.email 

71 

72 with session_scope() as session: 

73 login_token = send_login_email(session, user) 

74 

75 def mock_print_dev_email(sender_name, sender_email, recipient, subject, plain, html): 

76 assert recipient == user.email 

77 assert "login" in subject.lower() 

78 assert login_token.token in plain 

79 assert login_token.token in html 

80 return print_dev_email(sender_name, sender_email, recipient, subject, plain, html) 

81 

82 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email): 

83 process_job() 

84 

85 with session_scope() as session: 

86 assert ( 

87 session.execute( 

88 select(func.count()) 

89 .select_from(BackgroundJob) 

90 .where(BackgroundJob.state == BackgroundJobState.completed) 

91 ).scalar_one() 

92 == 1 

93 ) 

94 assert ( 

95 session.execute( 

96 select(func.count()) 

97 .select_from(BackgroundJob) 

98 .where(BackgroundJob.state != BackgroundJobState.completed) 

99 ).scalar_one() 

100 == 0 

101 ) 

102 

103 

104def test_email_job(db): 

105 queue_email("sender_name", "sender_email", "recipient", "subject", "plain", "html") 

106 

107 def mock_print_dev_email(sender_name, sender_email, recipient, subject, plain, html): 

108 assert sender_name == "sender_name" 

109 assert sender_email == "sender_email" 

110 assert recipient == "recipient" 

111 assert subject == "subject" 

112 assert plain == "plain" 

113 assert html == "html" 

114 return print_dev_email(sender_name, sender_email, recipient, subject, plain, html) 

115 

116 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email): 

117 process_job() 

118 

119 with session_scope() as session: 

120 assert ( 

121 session.execute( 

122 select(func.count()) 

123 .select_from(BackgroundJob) 

124 .where(BackgroundJob.state == BackgroundJobState.completed) 

125 ).scalar_one() 

126 == 1 

127 ) 

128 assert ( 

129 session.execute( 

130 select(func.count()) 

131 .select_from(BackgroundJob) 

132 .where(BackgroundJob.state != BackgroundJobState.completed) 

133 ).scalar_one() 

134 == 0 

135 ) 

136 

137 

138def test_purge_login_tokens(db): 

139 user, api_token = generate_user() 

140 

141 with session_scope() as session: 

142 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

143 session.add(login_token) 

144 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1 

145 

146 queue_job("purge_login_tokens", empty_pb2.Empty()) 

147 process_job() 

148 

149 with session_scope() as session: 

150 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0 

151 

152 with session_scope() as session: 

153 assert ( 

154 session.execute( 

155 select(func.count()) 

156 .select_from(BackgroundJob) 

157 .where(BackgroundJob.state == BackgroundJobState.completed) 

158 ).scalar_one() 

159 == 1 

160 ) 

161 assert ( 

162 session.execute( 

163 select(func.count()) 

164 .select_from(BackgroundJob) 

165 .where(BackgroundJob.state != BackgroundJobState.completed) 

166 ).scalar_one() 

167 == 0 

168 ) 

169 

170 

171def test_purge_password_reset_tokens(db): 

172 user, api_token = generate_user() 

173 

174 with session_scope() as session: 

175 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

176 session.add(password_reset_token) 

177 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1 

178 

179 queue_job("purge_password_reset_tokens", empty_pb2.Empty()) 

180 process_job() 

181 

182 with session_scope() as session: 

183 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0 

184 

185 with session_scope() as session: 

186 assert ( 

187 session.execute( 

188 select(func.count()) 

189 .select_from(BackgroundJob) 

190 .where(BackgroundJob.state == BackgroundJobState.completed) 

191 ).scalar_one() 

192 == 1 

193 ) 

194 assert ( 

195 session.execute( 

196 select(func.count()) 

197 .select_from(BackgroundJob) 

198 .where(BackgroundJob.state != BackgroundJobState.completed) 

199 ).scalar_one() 

200 == 0 

201 ) 

202 

203 

204def test_purge_account_deletion_tokens(db): 

205 user, api_token = generate_user() 

206 user2, api_token2 = generate_user() 

207 user3, api_token3 = generate_user() 

208 

209 with session_scope() as session: 

210 """ 

211 3 cases: 

212 1) Token is valid 

213 2) Token expired but account retrievable 

214 3) Account is irretrievable (and expired) 

215 """ 

216 account_deletion_tokens = [ 

217 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)), 

218 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()), 

219 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)), 

220 ] 

221 for token in account_deletion_tokens: 

222 session.add(token) 

223 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

224 

225 queue_job("purge_account_deletion_tokens", empty_pb2.Empty()) 

226 process_job() 

227 

228 with session_scope() as session: 

229 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1 

230 

231 with session_scope() as session: 

232 assert ( 

233 session.execute( 

234 select(func.count()) 

235 .select_from(BackgroundJob) 

236 .where(BackgroundJob.state == BackgroundJobState.completed) 

237 ).scalar_one() 

238 == 1 

239 ) 

240 assert ( 

241 session.execute( 

242 select(func.count()) 

243 .select_from(BackgroundJob) 

244 .where(BackgroundJob.state != BackgroundJobState.completed) 

245 ).scalar_one() 

246 == 0 

247 ) 

248 

249 

250def test_enforce_community_memberships(db): 

251 queue_job("enforce_community_membership", empty_pb2.Empty()) 

252 process_job() 

253 

254 with session_scope() as session: 

255 assert ( 

256 session.execute( 

257 select(func.count()) 

258 .select_from(BackgroundJob) 

259 .where(BackgroundJob.state == BackgroundJobState.completed) 

260 ).scalar_one() 

261 == 1 

262 ) 

263 assert ( 

264 session.execute( 

265 select(func.count()) 

266 .select_from(BackgroundJob) 

267 .where(BackgroundJob.state != BackgroundJobState.completed) 

268 ).scalar_one() 

269 == 0 

270 ) 

271 

272 

273def test_refresh_materialized_views(db): 

274 queue_job("refresh_materialized_views", empty_pb2.Empty()) 

275 process_job() 

276 

277 with session_scope() as session: 

278 assert ( 

279 session.execute( 

280 select(func.count()) 

281 .select_from(BackgroundJob) 

282 .where(BackgroundJob.state == BackgroundJobState.completed) 

283 ).scalar_one() 

284 == 1 

285 ) 

286 assert ( 

287 session.execute( 

288 select(func.count()) 

289 .select_from(BackgroundJob) 

290 .where(BackgroundJob.state != BackgroundJobState.completed) 

291 ).scalar_one() 

292 == 0 

293 ) 

294 

295 

296def test_service_jobs(db): 

297 queue_email("sender_name", "sender_email", "recipient", "subject", "plain", "html") 

298 

299 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise 

300 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left 

301 class HitSleep(Exception): 

302 pass 

303 

304 # the mock `sleep` function that instead raises the aforementioned exception 

305 def raising_sleep(seconds): 

306 raise HitSleep() 

307 

308 with pytest.raises(HitSleep): 

309 with patch("couchers.jobs.worker.sleep", raising_sleep): 

310 service_jobs() 

311 

312 with session_scope() as session: 

313 assert ( 

314 session.execute( 

315 select(func.count()) 

316 .select_from(BackgroundJob) 

317 .where(BackgroundJob.state == BackgroundJobState.completed) 

318 ).scalar_one() 

319 == 1 

320 ) 

321 assert ( 

322 session.execute( 

323 select(func.count()) 

324 .select_from(BackgroundJob) 

325 .where(BackgroundJob.state != BackgroundJobState.completed) 

326 ).scalar_one() 

327 == 0 

328 ) 

329 

330 

331def test_scheduler(db, monkeypatch): 

332 MOCK_SCHEDULE = [ 

333 ("purge_login_tokens", timedelta(seconds=7)), 

334 ("send_message_notifications", timedelta(seconds=11)), 

335 ] 

336 

337 current_time = 0 

338 end_time = 70 

339 

340 class EndOfTime(Exception): 

341 pass 

342 

343 def mock_monotonic(): 

344 nonlocal current_time 

345 return current_time 

346 

347 def mock_sleep(seconds): 

348 nonlocal current_time 

349 current_time += seconds 

350 if current_time > end_time: 

351 raise EndOfTime() 

352 

353 realized_schedule = [] 

354 

355 def mock_run_job_and_schedule(sched, schedule_id): 

356 nonlocal current_time 

357 realized_schedule.append((current_time, schedule_id)) 

358 _run_job_and_schedule(sched, schedule_id) 

359 

360 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule) 

361 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE) 

362 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic) 

363 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep) 

364 

365 with pytest.raises(EndOfTime): 

366 run_scheduler() 

367 

368 assert realized_schedule == [ 

369 (0.0, 0), 

370 (0.0, 1), 

371 (7.0, 0), 

372 (11.0, 1), 

373 (14.0, 0), 

374 (21.0, 0), 

375 (22.0, 1), 

376 (28.0, 0), 

377 (33.0, 1), 

378 (35.0, 0), 

379 (42.0, 0), 

380 (44.0, 1), 

381 (49.0, 0), 

382 (55.0, 1), 

383 (56.0, 0), 

384 (63.0, 0), 

385 (66.0, 1), 

386 (70.0, 0), 

387 ] 

388 

389 with session_scope() as session: 

390 assert ( 

391 session.execute( 

392 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending) 

393 ).scalar_one() 

394 == 18 

395 ) 

396 assert ( 

397 session.execute( 

398 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending) 

399 ).scalar_one() 

400 == 0 

401 ) 

402 

403 

404def test_job_retry(db): 

405 queue_job("purge_login_tokens", empty_pb2.Empty()) 

406 

407 called_count = 0 

408 

409 def mock_handler(payload): 

410 nonlocal called_count 

411 called_count += 1 

412 raise Exception() 

413 

414 MOCK_JOBS = { 

415 "purge_login_tokens": (empty_pb2.Empty, mock_handler), 

416 } 

417 create_prometheus_server(registry=job_process_registry, port=8001) 

418 with patch("couchers.jobs.worker.JOBS", MOCK_JOBS): 

419 process_job() 

420 with session_scope() as session: 

421 assert ( 

422 session.execute( 

423 select(func.count()) 

424 .select_from(BackgroundJob) 

425 .where(BackgroundJob.state == BackgroundJobState.error) 

426 ).scalar_one() 

427 == 1 

428 ) 

429 assert ( 

430 session.execute( 

431 select(func.count()) 

432 .select_from(BackgroundJob) 

433 .where(BackgroundJob.state != BackgroundJobState.error) 

434 ).scalar_one() 

435 == 0 

436 ) 

437 

438 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

439 process_job() 

440 with session_scope() as session: 

441 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

442 process_job() 

443 with session_scope() as session: 

444 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

445 process_job() 

446 with session_scope() as session: 

447 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

448 process_job() 

449 

450 with session_scope() as session: 

451 assert ( 

452 session.execute( 

453 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed) 

454 ).scalar_one() 

455 == 1 

456 ) 

457 assert ( 

458 session.execute( 

459 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed) 

460 ).scalar_one() 

461 == 0 

462 ) 

463 

464 _check_job_counter("purge_login_tokens", "error", "4", "Exception") 

465 _check_job_counter("purge_login_tokens", "failed", "5", "Exception") 

466 

467 

468def test_no_jobs_no_problem(db): 

469 with session_scope() as session: 

470 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

471 

472 assert not process_job() 

473 

474 with session_scope() as session: 

475 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

476 

477 

478def test_send_message_notifications_basic(db): 

479 user1, token1 = generate_user() 

480 user2, token2 = generate_user() 

481 user3, token3 = generate_user() 

482 

483 make_friends(user1, user2) 

484 make_friends(user1, user3) 

485 make_friends(user2, user3) 

486 

487 send_message_notifications(empty_pb2.Empty()) 

488 

489 # should find no jobs, since there's no messages 

490 with session_scope() as session: 

491 assert ( 

492 session.execute( 

493 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

494 ).scalar_one() 

495 == 0 

496 ) 

497 

498 with conversations_session(token1) as c: 

499 group_chat_id = c.CreateGroupChat( 

500 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

501 ).group_chat_id 

502 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

503 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

504 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

505 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

506 

507 with conversations_session(token3) as c: 

508 group_chat_id = c.CreateGroupChat( 

509 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

510 ).group_chat_id 

511 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

512 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

513 

514 send_message_notifications(empty_pb2.Empty()) 

515 

516 # no emails sent out 

517 with session_scope() as session: 

518 assert ( 

519 session.execute( 

520 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

521 ).scalar_one() 

522 == 0 

523 ) 

524 

525 # this should generate emails for both user2 and user3 

526 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

527 send_message_notifications(empty_pb2.Empty()) 

528 

529 with session_scope() as session: 

530 assert ( 

531 session.execute( 

532 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

533 ).scalar_one() 

534 == 2 

535 ) 

536 # delete them all 

537 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

538 

539 # shouldn't generate any more emails 

540 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

541 send_message_notifications(empty_pb2.Empty()) 

542 

543 with session_scope() as session: 

544 assert ( 

545 session.execute( 

546 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

547 ).scalar_one() 

548 == 0 

549 ) 

550 

551 

552def test_send_message_notifications_muted(db): 

553 user1, token1 = generate_user() 

554 user2, token2 = generate_user() 

555 user3, token3 = generate_user() 

556 

557 make_friends(user1, user2) 

558 make_friends(user1, user3) 

559 make_friends(user2, user3) 

560 

561 send_message_notifications(empty_pb2.Empty()) 

562 

563 # should find no jobs, since there's no messages 

564 with session_scope() as session: 

565 assert ( 

566 session.execute( 

567 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

568 ).scalar_one() 

569 == 0 

570 ) 

571 

572 with conversations_session(token1) as c: 

573 group_chat_id = c.CreateGroupChat( 

574 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

575 ).group_chat_id 

576 

577 with conversations_session(token3) as c: 

578 # mute it for user 3 

579 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True)) 

580 

581 with conversations_session(token1) as c: 

582 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

583 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

584 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

585 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

586 

587 with conversations_session(token3) as c: 

588 group_chat_id = c.CreateGroupChat( 

589 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

590 ).group_chat_id 

591 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

592 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

593 

594 send_message_notifications(empty_pb2.Empty()) 

595 

596 # no emails sent out 

597 with session_scope() as session: 

598 assert ( 

599 session.execute( 

600 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

601 ).scalar_one() 

602 == 0 

603 ) 

604 

605 # this should generate emails for both user2 and NOT user3 

606 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

607 send_message_notifications(empty_pb2.Empty()) 

608 

609 with session_scope() as session: 

610 assert ( 

611 session.execute( 

612 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

613 ).scalar_one() 

614 == 1 

615 ) 

616 # delete them all 

617 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

618 

619 # shouldn't generate any more emails 

620 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

621 send_message_notifications(empty_pb2.Empty()) 

622 

623 with session_scope() as session: 

624 assert ( 

625 session.execute( 

626 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

627 ).scalar_one() 

628 == 0 

629 ) 

630 

631 

632def test_send_request_notifications_host_request(db): 

633 user1, token1 = generate_user() 

634 user2, token2 = generate_user() 

635 

636 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

637 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

638 

639 send_request_notifications(empty_pb2.Empty()) 

640 

641 # should find no jobs, since there's no messages 

642 with session_scope() as session: 

643 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

644 

645 # first test that sending host request creates email 

646 with requests_session(token1) as requests: 

647 host_request_id = requests.CreateHostRequest( 

648 requests_pb2.CreateHostRequestReq( 

649 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request" 

650 ) 

651 ).host_request_id 

652 

653 with session_scope() as session: 

654 # delete send_email BackgroundJob created by CreateHostRequest 

655 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

656 

657 # check send_request_notifications successfully creates background job 

658 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

659 send_request_notifications(empty_pb2.Empty()) 

660 assert ( 

661 session.execute( 

662 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

663 ).scalar_one() 

664 == 1 

665 ) 

666 

667 # delete all BackgroundJobs 

668 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

669 

670 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

671 send_request_notifications(empty_pb2.Empty()) 

672 # should find no messages since host has already been notified 

673 assert ( 

674 session.execute( 

675 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

676 ).scalar_one() 

677 == 0 

678 ) 

679 

680 # then test that responding to host request creates email 

681 with requests_session(token2) as requests: 

682 requests.RespondHostRequest( 

683 requests_pb2.RespondHostRequestReq( 

684 host_request_id=host_request_id, 

685 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

686 text="Test request", 

687 ) 

688 ) 

689 

690 with session_scope() as session: 

691 # delete send_email BackgroundJob created by RespondHostRequest 

692 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

693 

694 # check send_request_notifications successfully creates background job 

695 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

696 send_request_notifications(empty_pb2.Empty()) 

697 assert ( 

698 session.execute( 

699 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

700 ).scalar_one() 

701 == 1 

702 ) 

703 

704 # delete all BackgroundJobs 

705 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

706 

707 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

708 send_request_notifications(empty_pb2.Empty()) 

709 # should find no messages since guest has already been notified 

710 assert ( 

711 session.execute( 

712 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

713 ).scalar_one() 

714 == 0 

715 ) 

716 

717 

718def test_send_message_notifications_seen(db): 

719 user1, token1 = generate_user() 

720 user2, token2 = generate_user() 

721 

722 make_friends(user1, user2) 

723 

724 send_message_notifications(empty_pb2.Empty()) 

725 

726 # should find no jobs, since there's no messages 

727 with session_scope() as session: 

728 assert ( 

729 session.execute( 

730 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

731 ).scalar_one() 

732 == 0 

733 ) 

734 

735 with conversations_session(token1) as c: 

736 group_chat_id = c.CreateGroupChat( 

737 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

738 ).group_chat_id 

739 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

740 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

741 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

742 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

743 

744 # user 2 now marks those messages as seen 

745 with conversations_session(token2) as c: 

746 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id 

747 c.MarkLastSeenGroupChat( 

748 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id) 

749 ) 

750 

751 send_message_notifications(empty_pb2.Empty()) 

752 

753 # no emails sent out 

754 with session_scope() as session: 

755 assert ( 

756 session.execute( 

757 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

758 ).scalar_one() 

759 == 0 

760 ) 

761 

762 def now_30_min_in_future(): 

763 return now() + timedelta(minutes=30) 

764 

765 # still shouldn't generate emails as user2 has seen all messages 

766 with patch("couchers.jobs.handlers.now", now_30_min_in_future): 

767 send_message_notifications(empty_pb2.Empty()) 

768 

769 with session_scope() as session: 

770 assert ( 

771 session.execute( 

772 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

773 ).scalar_one() 

774 == 0 

775 ) 

776 

777 

778def test_send_onboarding_emails(db): 

779 # needs to get first onboarding email 

780 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None) 

781 

782 send_onboarding_emails(empty_pb2.Empty()) 

783 

784 with session_scope() as session: 

785 assert ( 

786 session.execute( 

787 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

788 ).scalar_one() 

789 == 1 

790 ) 

791 

792 # needs to get second onboarding email, but not yet 

793 user2, token2 = generate_user(onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6)) 

794 

795 send_onboarding_emails(empty_pb2.Empty()) 

796 

797 with session_scope() as session: 

798 assert ( 

799 session.execute( 

800 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

801 ).scalar_one() 

802 == 1 

803 ) 

804 

805 # needs to get second onboarding email 

806 user3, token3 = generate_user(onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8)) 

807 

808 send_onboarding_emails(empty_pb2.Empty()) 

809 

810 with session_scope() as session: 

811 assert ( 

812 session.execute( 

813 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

814 ).scalar_one() 

815 == 2 

816 ) 

817 

818 

819def test_send_reference_reminders(db): 

820 # need to test: 

821 # case 1: bidirectional (no emails) 

822 # case 2: host left ref (surfer needs an email) 

823 # case 3: surfer left ref (host needs an email) 

824 # case 4: neither left ref (host & surfer need an email) 

825 

826 send_reference_reminders(empty_pb2.Empty()) 

827 

828 # case 1: bidirectional (no emails) 

829 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1") 

830 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2") 

831 

832 # case 2: host left ref (surfer needs an email) 

833 # host 

834 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3") 

835 # surfer 

836 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4") 

837 

838 # case 3: surfer left ref (host needs an email) 

839 # host 

840 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5") 

841 # surfer 

842 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6") 

843 

844 # case 4: neither left ref (host & surfer need an email) 

845 # host 

846 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7") 

847 # surfer 

848 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8") 

849 

850 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

851 # host 

852 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9") 

853 # surfer 

854 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10") 

855 

856 make_user_block(user9, user10) 

857 

858 with session_scope() as session: 

859 # note that create_host_reference creates a host request whose age is one day older than the timedelta here 

860 

861 # case 1: bidirectional (no emails) 

862 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True) 

863 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1) 

864 

865 # case 2: host left ref (surfer needs an email) 

866 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False) 

867 

868 # case 3: surfer left ref (host needs an email) 

869 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True) 

870 

871 # case 4: neither left ref (host & surfer need an email) 

872 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4)) 

873 

874 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

875 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7)) 

876 

877 expected_emails = [ 

878 ("user4@couchers.org.invalid", "[TEST] You have 3 days to write a reference for User 3!"), 

879 ("user5@couchers.org.invalid", "[TEST] You have 7 days to write a reference for User 6!"), 

880 ("user7@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 8!"), 

881 ("user8@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 7!"), 

882 ] 

883 

884 send_reference_reminders(empty_pb2.Empty()) 

885 

886 while process_job(): 

887 pass 

888 

889 with session_scope() as session: 

890 emails = [ 

891 (email.recipient, email.subject) 

892 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all() 

893 ] 

894 

895 print(emails) 

896 print(expected_emails) 

897 

898 assert emails == expected_emails 

899 

900 

901def test_add_users_to_email_list(db): 

902 new_config = config.copy() 

903 new_config["MAILCHIMP_ENABLED"] = True 

904 new_config["MAILCHIMP_API_KEY"] = "dummy_api_key" 

905 new_config["MAILCHIMP_DC"] = "dc99" 

906 new_config["MAILCHIMP_LIST_ID"] = "dummy_list_id" 

907 

908 with patch("couchers.config.config", new_config): 

909 with patch("couchers.jobs.handlers.requests.post") as mock: 

910 add_users_to_email_list(empty_pb2.Empty()) 

911 mock.assert_not_called() 

912 

913 generate_user(added_to_mailing_list=False, email="testing1@couchers.invalid", name="Tester1") 

914 generate_user(added_to_mailing_list=True, email="testing2@couchers.invalid", name="Tester2") 

915 generate_user(added_to_mailing_list=False, email="testing3@couchers.invalid", name="Tester3 von test") 

916 

917 with patch("couchers.jobs.handlers.requests.post") as mock: 

918 ret = mock.return_value 

919 ret.status_code = 200 

920 add_users_to_email_list(empty_pb2.Empty()) 

921 

922 mock.assert_called_once_with( 

923 "https://dc99.api.mailchimp.com/3.0/lists/dummy_list_id", 

924 auth=("apikey", "dummy_api_key"), 

925 json={ 

926 "members": [ 

927 { 

928 "email_address": "testing1@couchers.invalid", 

929 "status_if_new": "subscribed", 

930 "status": "subscribed", 

931 "merge_fields": { 

932 "FNAME": "Tester1", 

933 }, 

934 }, 

935 { 

936 "email_address": "testing3@couchers.invalid", 

937 "status_if_new": "subscribed", 

938 "status": "subscribed", 

939 "merge_fields": { 

940 "FNAME": "Tester3 von test", 

941 }, 

942 }, 

943 ] 

944 }, 

945 ) 

946 

947 with patch("couchers.jobs.handlers.requests.post") as mock: 

948 add_users_to_email_list(empty_pb2.Empty()) 

949 mock.assert_not_called() 

950 

951 

952def test_update_recommendation_scores(db): 

953 update_recommendation_scores(empty_pb2.Empty()) 

954 

955 

956def test_update_badges(db): 

957 user1, _ = generate_user() 

958 user2, _ = generate_user() 

959 user3, _ = generate_user() 

960 user4, _ = generate_user(phone="+15555555555", phone_verification_verified=func.now()) 

961 user5, _ = generate_user(phone="+15555555556", phone_verification_verified=func.now()) 

962 user6, _ = generate_user() 

963 

964 update_badges(empty_pb2.Empty()) 

965 

966 with session_scope() as session: 

967 badge_tuples = session.execute( 

968 select(UserBadge.user_id, UserBadge.badge_id).order_by(UserBadge.user_id.asc(), UserBadge.id.asc()) 

969 ).all() 

970 

971 expected = [ 

972 (user1.id, "founder"), 

973 (user1.id, "board_member"), 

974 (user2.id, "founder"), 

975 (user2.id, "board_member"), 

976 (user4.id, "phone_verified"), 

977 (user5.id, "phone_verified"), 

978 ] 

979 

980 assert badge_tuples == expected