Coverage for src/tests/test_bg_jobs.py: 100%

402 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 17:19 +0000

1from datetime import timedelta 

2from unittest.mock import call, patch 

3 

4import pytest 

5import requests 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import delete, func 

8 

9import couchers.jobs.worker 

10from couchers.config import config 

11from couchers.crypto import urlsafe_secure_token 

12from couchers.db import session_scope 

13from couchers.email import queue_email 

14from couchers.email.dev import print_dev_email 

15from couchers.jobs.enqueue import queue_job 

16from couchers.jobs.handlers import ( 

17 add_users_to_email_list, 

18 send_message_notifications, 

19 send_onboarding_emails, 

20 send_reference_reminders, 

21 send_request_notifications, 

22 update_badges, 

23 update_recommendation_scores, 

24) 

25from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs 

26from couchers.metrics import create_prometheus_server, job_process_registry 

27from couchers.models import ( 

28 AccountDeletionToken, 

29 BackgroundJob, 

30 BackgroundJobState, 

31 Email, 

32 LoginToken, 

33 PasswordResetToken, 

34 UserBadge, 

35) 

36from couchers.sql import couchers_select as select 

37from couchers.utils import now, today 

38from proto import conversations_pb2, requests_pb2 

39from tests.test_fixtures import ( # noqa 

40 auth_api_session, 

41 conversations_session, 

42 db, 

43 generate_user, 

44 make_friends, 

45 make_user_block, 

46 process_jobs, 

47 push_collector, 

48 requests_session, 

49 testconfig, 

50) 

51from tests.test_references import create_host_reference, create_host_request 

52 

53 

54def now_5_min_in_future(): 

55 return now() + timedelta(minutes=5) 

56 

57 

58@pytest.fixture(autouse=True) 

59def _(testconfig): 

60 pass 

61 

62 

63def _check_job_counter(job, status, attempt, exception): 

64 metrics_string = requests.get("http://localhost:8001").text 

65 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"' 

66 assert string_to_check in metrics_string 

67 

68 

69def test_email_job(db): 

70 queue_email("sender_name", "sender_email", "recipient", "subject", "plain", "html") 

71 

72 def mock_print_dev_email( 

73 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

74 ): 

75 assert sender_name == "sender_name" 

76 assert sender_email == "sender_email" 

77 assert recipient == "recipient" 

78 assert subject == "subject" 

79 assert plain == "plain" 

80 assert html == "html" 

81 return print_dev_email( 

82 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

83 ) 

84 

85 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email): 

86 process_job() 

87 

88 with session_scope() as session: 

89 assert ( 

90 session.execute( 

91 select(func.count()) 

92 .select_from(BackgroundJob) 

93 .where(BackgroundJob.state == BackgroundJobState.completed) 

94 ).scalar_one() 

95 == 1 

96 ) 

97 assert ( 

98 session.execute( 

99 select(func.count()) 

100 .select_from(BackgroundJob) 

101 .where(BackgroundJob.state != BackgroundJobState.completed) 

102 ).scalar_one() 

103 == 0 

104 ) 

105 

106 

107def test_purge_login_tokens(db): 

108 user, api_token = generate_user() 

109 

110 with session_scope() as session: 

111 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

112 session.add(login_token) 

113 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1 

114 

115 queue_job("purge_login_tokens", empty_pb2.Empty()) 

116 process_job() 

117 

118 with session_scope() as session: 

119 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0 

120 

121 with session_scope() as session: 

122 assert ( 

123 session.execute( 

124 select(func.count()) 

125 .select_from(BackgroundJob) 

126 .where(BackgroundJob.state == BackgroundJobState.completed) 

127 ).scalar_one() 

128 == 1 

129 ) 

130 assert ( 

131 session.execute( 

132 select(func.count()) 

133 .select_from(BackgroundJob) 

134 .where(BackgroundJob.state != BackgroundJobState.completed) 

135 ).scalar_one() 

136 == 0 

137 ) 

138 

139 

140def test_purge_password_reset_tokens(db): 

141 user, api_token = generate_user() 

142 

143 with session_scope() as session: 

144 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

145 session.add(password_reset_token) 

146 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1 

147 

148 queue_job("purge_password_reset_tokens", empty_pb2.Empty()) 

149 process_job() 

150 

151 with session_scope() as session: 

152 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0 

153 

154 with session_scope() as session: 

155 assert ( 

156 session.execute( 

157 select(func.count()) 

158 .select_from(BackgroundJob) 

159 .where(BackgroundJob.state == BackgroundJobState.completed) 

160 ).scalar_one() 

161 == 1 

162 ) 

163 assert ( 

164 session.execute( 

165 select(func.count()) 

166 .select_from(BackgroundJob) 

167 .where(BackgroundJob.state != BackgroundJobState.completed) 

168 ).scalar_one() 

169 == 0 

170 ) 

171 

172 

173def test_purge_account_deletion_tokens(db): 

174 user, api_token = generate_user() 

175 user2, api_token2 = generate_user() 

176 user3, api_token3 = generate_user() 

177 

178 with session_scope() as session: 

179 """ 

180 3 cases: 

181 1) Token is valid 

182 2) Token expired but account retrievable 

183 3) Account is irretrievable (and expired) 

184 """ 

185 account_deletion_tokens = [ 

186 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)), 

187 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()), 

188 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)), 

189 ] 

190 for token in account_deletion_tokens: 

191 session.add(token) 

192 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

193 

194 queue_job("purge_account_deletion_tokens", empty_pb2.Empty()) 

195 process_job() 

196 

197 with session_scope() as session: 

198 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1 

199 

200 with session_scope() as session: 

201 assert ( 

202 session.execute( 

203 select(func.count()) 

204 .select_from(BackgroundJob) 

205 .where(BackgroundJob.state == BackgroundJobState.completed) 

206 ).scalar_one() 

207 == 1 

208 ) 

209 assert ( 

210 session.execute( 

211 select(func.count()) 

212 .select_from(BackgroundJob) 

213 .where(BackgroundJob.state != BackgroundJobState.completed) 

214 ).scalar_one() 

215 == 0 

216 ) 

217 

218 

219def test_enforce_community_memberships(db): 

220 queue_job("enforce_community_membership", empty_pb2.Empty()) 

221 process_job() 

222 

223 with session_scope() as session: 

224 assert ( 

225 session.execute( 

226 select(func.count()) 

227 .select_from(BackgroundJob) 

228 .where(BackgroundJob.state == BackgroundJobState.completed) 

229 ).scalar_one() 

230 == 1 

231 ) 

232 assert ( 

233 session.execute( 

234 select(func.count()) 

235 .select_from(BackgroundJob) 

236 .where(BackgroundJob.state != BackgroundJobState.completed) 

237 ).scalar_one() 

238 == 0 

239 ) 

240 

241 

242def test_refresh_materialized_views(db): 

243 queue_job("refresh_materialized_views", empty_pb2.Empty()) 

244 process_job() 

245 

246 with session_scope() as session: 

247 assert ( 

248 session.execute( 

249 select(func.count()) 

250 .select_from(BackgroundJob) 

251 .where(BackgroundJob.state == BackgroundJobState.completed) 

252 ).scalar_one() 

253 == 1 

254 ) 

255 assert ( 

256 session.execute( 

257 select(func.count()) 

258 .select_from(BackgroundJob) 

259 .where(BackgroundJob.state != BackgroundJobState.completed) 

260 ).scalar_one() 

261 == 0 

262 ) 

263 

264 

265def test_service_jobs(db): 

266 queue_email("sender_name", "sender_email", "recipient", "subject", "plain", "html") 

267 

268 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise 

269 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left 

270 class HitSleep(Exception): 

271 pass 

272 

273 # the mock `sleep` function that instead raises the aforementioned exception 

274 def raising_sleep(seconds): 

275 raise HitSleep() 

276 

277 with pytest.raises(HitSleep): 

278 with patch("couchers.jobs.worker.sleep", raising_sleep): 

279 service_jobs() 

280 

281 with session_scope() as session: 

282 assert ( 

283 session.execute( 

284 select(func.count()) 

285 .select_from(BackgroundJob) 

286 .where(BackgroundJob.state == BackgroundJobState.completed) 

287 ).scalar_one() 

288 == 1 

289 ) 

290 assert ( 

291 session.execute( 

292 select(func.count()) 

293 .select_from(BackgroundJob) 

294 .where(BackgroundJob.state != BackgroundJobState.completed) 

295 ).scalar_one() 

296 == 0 

297 ) 

298 

299 

300def test_scheduler(db, monkeypatch): 

301 MOCK_SCHEDULE = [ 

302 ("purge_login_tokens", timedelta(seconds=7)), 

303 ("send_message_notifications", timedelta(seconds=11)), 

304 ] 

305 

306 current_time = 0 

307 end_time = 70 

308 

309 class EndOfTime(Exception): 

310 pass 

311 

312 def mock_monotonic(): 

313 nonlocal current_time 

314 return current_time 

315 

316 def mock_sleep(seconds): 

317 nonlocal current_time 

318 current_time += seconds 

319 if current_time > end_time: 

320 raise EndOfTime() 

321 

322 realized_schedule = [] 

323 

324 def mock_run_job_and_schedule(sched, schedule_id): 

325 nonlocal current_time 

326 realized_schedule.append((current_time, schedule_id)) 

327 _run_job_and_schedule(sched, schedule_id) 

328 

329 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule) 

330 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE) 

331 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic) 

332 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep) 

333 

334 with pytest.raises(EndOfTime): 

335 run_scheduler() 

336 

337 assert realized_schedule == [ 

338 (0.0, 0), 

339 (0.0, 1), 

340 (7.0, 0), 

341 (11.0, 1), 

342 (14.0, 0), 

343 (21.0, 0), 

344 (22.0, 1), 

345 (28.0, 0), 

346 (33.0, 1), 

347 (35.0, 0), 

348 (42.0, 0), 

349 (44.0, 1), 

350 (49.0, 0), 

351 (55.0, 1), 

352 (56.0, 0), 

353 (63.0, 0), 

354 (66.0, 1), 

355 (70.0, 0), 

356 ] 

357 

358 with session_scope() as session: 

359 assert ( 

360 session.execute( 

361 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending) 

362 ).scalar_one() 

363 == 18 

364 ) 

365 assert ( 

366 session.execute( 

367 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending) 

368 ).scalar_one() 

369 == 0 

370 ) 

371 

372 

373def test_job_retry(db): 

374 queue_job("mock_job", empty_pb2.Empty()) 

375 

376 called_count = 0 

377 

378 def mock_job(payload): 

379 nonlocal called_count 

380 called_count += 1 

381 raise Exception() 

382 

383 MOCK_JOBS = { 

384 "mock_job": (empty_pb2.Empty, mock_job), 

385 } 

386 create_prometheus_server(registry=job_process_registry, port=8001) 

387 

388 # if IN_TEST is true, then the bg worker will raise on exceptions 

389 new_config = config.copy() 

390 new_config["IN_TEST"] = False 

391 

392 with patch("couchers.jobs.worker.config", new_config), patch("couchers.jobs.worker.JOBS", MOCK_JOBS): 

393 process_job() 

394 with session_scope() as session: 

395 assert ( 

396 session.execute( 

397 select(func.count()) 

398 .select_from(BackgroundJob) 

399 .where(BackgroundJob.state == BackgroundJobState.error) 

400 ).scalar_one() 

401 == 1 

402 ) 

403 assert ( 

404 session.execute( 

405 select(func.count()) 

406 .select_from(BackgroundJob) 

407 .where(BackgroundJob.state != BackgroundJobState.error) 

408 ).scalar_one() 

409 == 0 

410 ) 

411 

412 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

413 process_job() 

414 with session_scope() as session: 

415 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

416 process_job() 

417 with session_scope() as session: 

418 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

419 process_job() 

420 with session_scope() as session: 

421 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

422 process_job() 

423 

424 with session_scope() as session: 

425 assert ( 

426 session.execute( 

427 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed) 

428 ).scalar_one() 

429 == 1 

430 ) 

431 assert ( 

432 session.execute( 

433 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed) 

434 ).scalar_one() 

435 == 0 

436 ) 

437 

438 _check_job_counter("mock_job", "error", "4", "Exception") 

439 _check_job_counter("mock_job", "failed", "5", "Exception") 

440 

441 

442def test_no_jobs_no_problem(db): 

443 with session_scope() as session: 

444 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

445 

446 assert not process_job() 

447 

448 with session_scope() as session: 

449 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

450 

451 

452def test_send_message_notifications_basic(db): 

453 user1, token1 = generate_user() 

454 user2, token2 = generate_user() 

455 user3, token3 = generate_user() 

456 

457 make_friends(user1, user2) 

458 make_friends(user1, user3) 

459 make_friends(user2, user3) 

460 

461 send_message_notifications(empty_pb2.Empty()) 

462 process_jobs() 

463 

464 # should find no jobs, since there's no messages 

465 with session_scope() as session: 

466 assert ( 

467 session.execute( 

468 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

469 ).scalar_one() 

470 == 0 

471 ) 

472 

473 with conversations_session(token1) as c: 

474 group_chat_id = c.CreateGroupChat( 

475 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

476 ).group_chat_id 

477 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

478 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

479 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

480 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

481 

482 with conversations_session(token3) as c: 

483 group_chat_id = c.CreateGroupChat( 

484 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

485 ).group_chat_id 

486 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

487 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

488 

489 send_message_notifications(empty_pb2.Empty()) 

490 process_jobs() 

491 

492 # no emails sent out 

493 with session_scope() as session: 

494 assert ( 

495 session.execute( 

496 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

497 ).scalar_one() 

498 == 0 

499 ) 

500 

501 # this should generate emails for both user2 and user3 

502 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

503 send_message_notifications(empty_pb2.Empty()) 

504 process_jobs() 

505 

506 with session_scope() as session: 

507 assert ( 

508 session.execute( 

509 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

510 ).scalar_one() 

511 == 2 

512 ) 

513 # delete them all 

514 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

515 

516 # shouldn't generate any more emails 

517 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

518 send_message_notifications(empty_pb2.Empty()) 

519 process_jobs() 

520 

521 with session_scope() as session: 

522 assert ( 

523 session.execute( 

524 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

525 ).scalar_one() 

526 == 0 

527 ) 

528 

529 

530def test_send_message_notifications_muted(db): 

531 user1, token1 = generate_user() 

532 user2, token2 = generate_user() 

533 user3, token3 = generate_user() 

534 

535 make_friends(user1, user2) 

536 make_friends(user1, user3) 

537 make_friends(user2, user3) 

538 

539 send_message_notifications(empty_pb2.Empty()) 

540 process_jobs() 

541 

542 # should find no jobs, since there's no messages 

543 with session_scope() as session: 

544 assert ( 

545 session.execute( 

546 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

547 ).scalar_one() 

548 == 0 

549 ) 

550 

551 with conversations_session(token1) as c: 

552 group_chat_id = c.CreateGroupChat( 

553 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

554 ).group_chat_id 

555 

556 with conversations_session(token3) as c: 

557 # mute it for user 3 

558 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True)) 

559 

560 with conversations_session(token1) as c: 

561 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

562 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

563 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

564 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

565 

566 with conversations_session(token3) as c: 

567 group_chat_id = c.CreateGroupChat( 

568 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

569 ).group_chat_id 

570 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

571 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

572 

573 send_message_notifications(empty_pb2.Empty()) 

574 process_jobs() 

575 

576 # no emails sent out 

577 with session_scope() as session: 

578 assert ( 

579 session.execute( 

580 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

581 ).scalar_one() 

582 == 0 

583 ) 

584 

585 # this should generate emails for both user2 and NOT user3 

586 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

587 send_message_notifications(empty_pb2.Empty()) 

588 process_jobs() 

589 

590 with session_scope() as session: 

591 assert ( 

592 session.execute( 

593 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

594 ).scalar_one() 

595 == 1 

596 ) 

597 # delete them all 

598 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

599 

600 # shouldn't generate any more emails 

601 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

602 send_message_notifications(empty_pb2.Empty()) 

603 process_jobs() 

604 

605 with session_scope() as session: 

606 assert ( 

607 session.execute( 

608 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

609 ).scalar_one() 

610 == 0 

611 ) 

612 

613 

614def test_send_request_notifications_host_request(db): 

615 user1, token1 = generate_user() 

616 user2, token2 = generate_user() 

617 

618 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

619 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

620 

621 send_request_notifications(empty_pb2.Empty()) 

622 process_jobs() 

623 

624 # should find no jobs, since there's no messages 

625 with session_scope() as session: 

626 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

627 

628 # first test that sending host request creates email 

629 with requests_session(token1) as requests: 

630 host_request_id = requests.CreateHostRequest( 

631 requests_pb2.CreateHostRequestReq( 

632 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request" 

633 ) 

634 ).host_request_id 

635 

636 with session_scope() as session: 

637 # delete send_email BackgroundJob created by CreateHostRequest 

638 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

639 

640 # check send_request_notifications successfully creates background job 

641 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

642 send_request_notifications(empty_pb2.Empty()) 

643 process_jobs() 

644 assert ( 

645 session.execute( 

646 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

647 ).scalar_one() 

648 == 1 

649 ) 

650 

651 # delete all BackgroundJobs 

652 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

653 

654 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

655 send_request_notifications(empty_pb2.Empty()) 

656 process_jobs() 

657 # should find no messages since host has already been notified 

658 assert ( 

659 session.execute( 

660 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

661 ).scalar_one() 

662 == 0 

663 ) 

664 

665 # then test that responding to host request creates email 

666 with requests_session(token2) as requests: 

667 requests.RespondHostRequest( 

668 requests_pb2.RespondHostRequestReq( 

669 host_request_id=host_request_id, 

670 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

671 text="Test request", 

672 ) 

673 ) 

674 

675 with session_scope() as session: 

676 # delete send_email BackgroundJob created by RespondHostRequest 

677 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

678 

679 # check send_request_notifications successfully creates background job 

680 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

681 send_request_notifications(empty_pb2.Empty()) 

682 process_jobs() 

683 assert ( 

684 session.execute( 

685 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

686 ).scalar_one() 

687 == 1 

688 ) 

689 

690 # delete all BackgroundJobs 

691 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

692 

693 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

694 send_request_notifications(empty_pb2.Empty()) 

695 process_jobs() 

696 # should find no messages since guest has already been notified 

697 assert ( 

698 session.execute( 

699 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

700 ).scalar_one() 

701 == 0 

702 ) 

703 

704 

705def test_send_message_notifications_seen(db): 

706 user1, token1 = generate_user() 

707 user2, token2 = generate_user() 

708 

709 make_friends(user1, user2) 

710 

711 send_message_notifications(empty_pb2.Empty()) 

712 

713 # should find no jobs, since there's no messages 

714 with session_scope() as session: 

715 assert ( 

716 session.execute( 

717 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

718 ).scalar_one() 

719 == 0 

720 ) 

721 

722 with conversations_session(token1) as c: 

723 group_chat_id = c.CreateGroupChat( 

724 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

725 ).group_chat_id 

726 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

727 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

728 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

729 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

730 

731 # user 2 now marks those messages as seen 

732 with conversations_session(token2) as c: 

733 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id 

734 c.MarkLastSeenGroupChat( 

735 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id) 

736 ) 

737 

738 send_message_notifications(empty_pb2.Empty()) 

739 

740 # no emails sent out 

741 with session_scope() as session: 

742 assert ( 

743 session.execute( 

744 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

745 ).scalar_one() 

746 == 0 

747 ) 

748 

749 def now_30_min_in_future(): 

750 return now() + timedelta(minutes=30) 

751 

752 # still shouldn't generate emails as user2 has seen all messages 

753 with patch("couchers.jobs.handlers.now", now_30_min_in_future): 

754 send_message_notifications(empty_pb2.Empty()) 

755 

756 with session_scope() as session: 

757 assert ( 

758 session.execute( 

759 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

760 ).scalar_one() 

761 == 0 

762 ) 

763 

764 

765def test_send_onboarding_emails(db): 

766 # needs to get first onboarding email 

767 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None) 

768 

769 send_onboarding_emails(empty_pb2.Empty()) 

770 process_jobs() 

771 

772 with session_scope() as session: 

773 assert ( 

774 session.execute( 

775 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

776 ).scalar_one() 

777 == 1 

778 ) 

779 

780 # needs to get second onboarding email, but not yet 

781 user2, token2 = generate_user(onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6)) 

782 

783 send_onboarding_emails(empty_pb2.Empty()) 

784 process_jobs() 

785 

786 with session_scope() as session: 

787 assert ( 

788 session.execute( 

789 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

790 ).scalar_one() 

791 == 1 

792 ) 

793 

794 # needs to get second onboarding email 

795 user3, token3 = generate_user(onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8)) 

796 

797 send_onboarding_emails(empty_pb2.Empty()) 

798 process_jobs() 

799 

800 with session_scope() as session: 

801 assert ( 

802 session.execute( 

803 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

804 ).scalar_one() 

805 == 2 

806 ) 

807 

808 

809def test_send_reference_reminders(db): 

810 # need to test: 

811 # case 1: bidirectional (no emails) 

812 # case 2: host left ref (surfer needs an email) 

813 # case 3: surfer left ref (host needs an email) 

814 # case 4: neither left ref (host & surfer need an email) 

815 

816 send_reference_reminders(empty_pb2.Empty()) 

817 

818 # case 1: bidirectional (no emails) 

819 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1") 

820 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2") 

821 

822 # case 2: host left ref (surfer needs an email) 

823 # host 

824 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3") 

825 # surfer 

826 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4") 

827 

828 # case 3: surfer left ref (host needs an email) 

829 # host 

830 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5") 

831 # surfer 

832 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6") 

833 

834 # case 4: neither left ref (host & surfer need an email) 

835 # host 

836 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7") 

837 # surfer 

838 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8") 

839 

840 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

841 # host 

842 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9") 

843 # surfer 

844 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10") 

845 

846 make_user_block(user9, user10) 

847 

848 with session_scope() as session: 

849 # note that create_host_reference creates a host request whose age is one day older than the timedelta here 

850 

851 # case 1: bidirectional (no emails) 

852 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True) 

853 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1) 

854 

855 # case 2: host left ref (surfer needs an email) 

856 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False) 

857 

858 # case 3: surfer left ref (host needs an email) 

859 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True) 

860 

861 # case 4: neither left ref (host & surfer need an email) 

862 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4)) 

863 

864 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

865 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7)) 

866 

867 expected_emails = [ 

868 ("user4@couchers.org.invalid", "[TEST] You have 3 days to write a reference for User 3!"), 

869 ("user5@couchers.org.invalid", "[TEST] You have 7 days to write a reference for User 6!"), 

870 ("user7@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 8!"), 

871 ("user8@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 7!"), 

872 ] 

873 

874 send_reference_reminders(empty_pb2.Empty()) 

875 

876 while process_job(): 

877 pass 

878 

879 with session_scope() as session: 

880 emails = [ 

881 (email.recipient, email.subject) 

882 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all() 

883 ] 

884 

885 print(emails) 

886 print(expected_emails) 

887 

888 assert emails == expected_emails 

889 

890 

891def test_add_users_to_email_list(db): 

892 new_config = config.copy() 

893 new_config["LISTMONK_ENABLED"] = True 

894 new_config["LISTMONK_BASE_URL"] = "https://example.com" 

895 new_config["LISTMONK_API_KEY"] = "dummy_api_key" 

896 new_config["LISTMONK_LIST_UUID"] = "baf96eaa-5e70-409d-b776-f5c16fb091b9" 

897 

898 with patch("couchers.jobs.handlers.config", new_config): 

899 with patch("couchers.jobs.handlers.requests.post") as mock: 

900 add_users_to_email_list(empty_pb2.Empty()) 

901 mock.assert_not_called() 

902 

903 generate_user(in_sync_with_newsletter=False, email="testing1@couchers.invalid", name="Tester1") 

904 generate_user(in_sync_with_newsletter=True, email="testing2@couchers.invalid", name="Tester2") 

905 generate_user(in_sync_with_newsletter=False, email="testing3@couchers.invalid", name="Tester3 von test") 

906 generate_user( 

907 in_sync_with_newsletter=False, email="testing4@couchers.invalid", name="Tester4", opt_out_of_newsletter=True 

908 ) 

909 

910 with patch("couchers.jobs.handlers.requests.post") as mock: 

911 ret = mock.return_value 

912 ret.status_code = 200 

913 add_users_to_email_list(empty_pb2.Empty()) 

914 mock.assert_has_calls( 

915 [ 

916 call( 

917 "https://example.com/api/subscribers", 

918 auth=("listmonk", "dummy_api_key"), 

919 json={ 

920 "email": "testing1@couchers.invalid", 

921 "name": "Tester1", 

922 "list_uuids": ["baf96eaa-5e70-409d-b776-f5c16fb091b9"], 

923 "preconfirm_subscriptions": True, 

924 }, 

925 timeout=10, 

926 ), 

927 call( 

928 "https://example.com/api/subscribers", 

929 auth=("listmonk", "dummy_api_key"), 

930 json={ 

931 "email": "testing3@couchers.invalid", 

932 "name": "Tester3 von test", 

933 "list_uuids": ["baf96eaa-5e70-409d-b776-f5c16fb091b9"], 

934 "preconfirm_subscriptions": True, 

935 }, 

936 timeout=10, 

937 ), 

938 ], 

939 any_order=True, 

940 ) 

941 

942 with patch("couchers.jobs.handlers.requests.post") as mock: 

943 add_users_to_email_list(empty_pb2.Empty()) 

944 mock.assert_not_called() 

945 

946 

947def test_update_recommendation_scores(db): 

948 update_recommendation_scores(empty_pb2.Empty()) 

949 

950 

951def test_update_badges(db, push_collector): 

952 user1, _ = generate_user() 

953 user2, _ = generate_user() 

954 user3, _ = generate_user() 

955 user4, _ = generate_user(phone="+15555555555", phone_verification_verified=func.now()) 

956 user5, _ = generate_user(phone="+15555555556", phone_verification_verified=func.now()) 

957 user6, _ = generate_user() 

958 

959 with session_scope() as session: 

960 session.add(UserBadge(user_id=user5.id, badge_id="board_member")) 

961 

962 update_badges(empty_pb2.Empty()) 

963 process_jobs() 

964 

965 with session_scope() as session: 

966 badge_tuples = session.execute( 

967 select(UserBadge.user_id, UserBadge.badge_id).order_by(UserBadge.user_id.asc(), UserBadge.id.asc()) 

968 ).all() 

969 

970 expected = [ 

971 (user1.id, "founder"), 

972 (user1.id, "board_member"), 

973 (user2.id, "founder"), 

974 (user2.id, "board_member"), 

975 (user4.id, "phone_verified"), 

976 (user5.id, "phone_verified"), 

977 ] 

978 

979 assert badge_tuples == expected 

980 

981 print(push_collector.pushes) 

982 

983 push_collector.assert_user_push_matches_fields( 

984 user1.id, 

985 ix=0, 

986 title="The Founder badge was added to your profile", 

987 body="Check out your profile to see the new badge!", 

988 ) 

989 push_collector.assert_user_push_matches_fields( 

990 user1.id, 

991 ix=1, 

992 title="The Board Member badge was added to your profile", 

993 body="Check out your profile to see the new badge!", 

994 ) 

995 push_collector.assert_user_push_matches_fields( 

996 user2.id, 

997 ix=0, 

998 title="The Founder badge was added to your profile", 

999 body="Check out your profile to see the new badge!", 

1000 ) 

1001 push_collector.assert_user_push_matches_fields( 

1002 user2.id, 

1003 ix=1, 

1004 title="The Board Member badge was added to your profile", 

1005 body="Check out your profile to see the new badge!", 

1006 ) 

1007 push_collector.assert_user_push_matches_fields( 

1008 user4.id, 

1009 ix=0, 

1010 title="The Verified Phone badge was added to your profile", 

1011 body="Check out your profile to see the new badge!", 

1012 ) 

1013 push_collector.assert_user_push_matches_fields( 

1014 user5.id, 

1015 ix=0, 

1016 title="The Board Member badge was removed from your profile", 

1017 body="You can see all your badges on your profile.", 

1018 ) 

1019 push_collector.assert_user_push_matches_fields( 

1020 user5.id, 

1021 ix=1, 

1022 title="The Verified Phone badge was added to your profile", 

1023 body="Check out your profile to see the new badge!", 

1024 )