Coverage for src/tests/test_bg_jobs.py: 100%

418 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1from datetime import timedelta 

2from unittest.mock import call, patch 

3 

4import pytest 

5import requests 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import delete, func 

8 

9import couchers.jobs.worker 

10from couchers.config import config 

11from couchers.crypto import urlsafe_secure_token 

12from couchers.db import session_scope 

13from couchers.email import queue_email 

14from couchers.email.dev import print_dev_email 

15from couchers.jobs.enqueue import queue_job 

16from couchers.jobs.handlers import ( 

17 add_users_to_email_list, 

18 send_message_notifications, 

19 send_onboarding_emails, 

20 send_reference_reminders, 

21 send_request_notifications, 

22 update_badges, 

23 update_recommendation_scores, 

24) 

25from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs 

26from couchers.metrics import create_prometheus_server, job_process_registry 

27from couchers.models import ( 

28 AccountDeletionToken, 

29 BackgroundJob, 

30 BackgroundJobState, 

31 Email, 

32 LoginToken, 

33 PasswordResetToken, 

34 UserBadge, 

35) 

36from couchers.sql import couchers_select as select 

37from couchers.utils import now, today 

38from proto import conversations_pb2, requests_pb2 

39from tests.test_fixtures import ( # noqa 

40 auth_api_session, 

41 conversations_session, 

42 db, 

43 generate_user, 

44 make_friends, 

45 make_user_block, 

46 process_jobs, 

47 push_collector, 

48 requests_session, 

49 testconfig, 

50) 

51from tests.test_references import create_host_reference, create_host_request 

52 

53 

54def now_5_min_in_future(): 

55 return now() + timedelta(minutes=5) 

56 

57 

58@pytest.fixture(autouse=True) 

59def _(testconfig): 

60 pass 

61 

62 

63def _check_job_counter(job, status, attempt, exception): 

64 metrics_string = requests.get("http://localhost:8001").text 

65 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"' 

66 assert string_to_check in metrics_string 

67 

68 

69def test_email_job(db): 

70 with session_scope() as session: 

71 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html") 

72 

73 def mock_print_dev_email( 

74 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

75 ): 

76 assert sender_name == "sender_name" 

77 assert sender_email == "sender_email" 

78 assert recipient == "recipient" 

79 assert subject == "subject" 

80 assert plain == "plain" 

81 assert html == "html" 

82 return print_dev_email( 

83 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

84 ) 

85 

86 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email): 

87 process_job() 

88 

89 with session_scope() as session: 

90 assert ( 

91 session.execute( 

92 select(func.count()) 

93 .select_from(BackgroundJob) 

94 .where(BackgroundJob.state == BackgroundJobState.completed) 

95 ).scalar_one() 

96 == 1 

97 ) 

98 assert ( 

99 session.execute( 

100 select(func.count()) 

101 .select_from(BackgroundJob) 

102 .where(BackgroundJob.state != BackgroundJobState.completed) 

103 ).scalar_one() 

104 == 0 

105 ) 

106 

107 

108def test_purge_login_tokens(db): 

109 user, api_token = generate_user() 

110 

111 with session_scope() as session: 

112 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

113 session.add(login_token) 

114 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1 

115 

116 queue_job(session, "purge_login_tokens", empty_pb2.Empty()) 

117 process_job() 

118 

119 with session_scope() as session: 

120 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0 

121 

122 with session_scope() as session: 

123 assert ( 

124 session.execute( 

125 select(func.count()) 

126 .select_from(BackgroundJob) 

127 .where(BackgroundJob.state == BackgroundJobState.completed) 

128 ).scalar_one() 

129 == 1 

130 ) 

131 assert ( 

132 session.execute( 

133 select(func.count()) 

134 .select_from(BackgroundJob) 

135 .where(BackgroundJob.state != BackgroundJobState.completed) 

136 ).scalar_one() 

137 == 0 

138 ) 

139 

140 

141def test_purge_password_reset_tokens(db): 

142 user, api_token = generate_user() 

143 

144 with session_scope() as session: 

145 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

146 session.add(password_reset_token) 

147 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1 

148 

149 queue_job(session, "purge_password_reset_tokens", empty_pb2.Empty()) 

150 process_job() 

151 

152 with session_scope() as session: 

153 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0 

154 

155 with session_scope() as session: 

156 assert ( 

157 session.execute( 

158 select(func.count()) 

159 .select_from(BackgroundJob) 

160 .where(BackgroundJob.state == BackgroundJobState.completed) 

161 ).scalar_one() 

162 == 1 

163 ) 

164 assert ( 

165 session.execute( 

166 select(func.count()) 

167 .select_from(BackgroundJob) 

168 .where(BackgroundJob.state != BackgroundJobState.completed) 

169 ).scalar_one() 

170 == 0 

171 ) 

172 

173 

174def test_purge_account_deletion_tokens(db): 

175 user, api_token = generate_user() 

176 user2, api_token2 = generate_user() 

177 user3, api_token3 = generate_user() 

178 

179 with session_scope() as session: 

180 """ 

181 3 cases: 

182 1) Token is valid 

183 2) Token expired but account retrievable 

184 3) Account is irretrievable (and expired) 

185 """ 

186 account_deletion_tokens = [ 

187 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)), 

188 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()), 

189 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)), 

190 ] 

191 for token in account_deletion_tokens: 

192 session.add(token) 

193 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

194 

195 queue_job(session, "purge_account_deletion_tokens", empty_pb2.Empty()) 

196 process_job() 

197 

198 with session_scope() as session: 

199 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1 

200 

201 with session_scope() as session: 

202 assert ( 

203 session.execute( 

204 select(func.count()) 

205 .select_from(BackgroundJob) 

206 .where(BackgroundJob.state == BackgroundJobState.completed) 

207 ).scalar_one() 

208 == 1 

209 ) 

210 assert ( 

211 session.execute( 

212 select(func.count()) 

213 .select_from(BackgroundJob) 

214 .where(BackgroundJob.state != BackgroundJobState.completed) 

215 ).scalar_one() 

216 == 0 

217 ) 

218 

219 

220def test_enforce_community_memberships(db): 

221 with session_scope() as session: 

222 queue_job(session, "enforce_community_membership", empty_pb2.Empty()) 

223 process_job() 

224 

225 with session_scope() as session: 

226 assert ( 

227 session.execute( 

228 select(func.count()) 

229 .select_from(BackgroundJob) 

230 .where(BackgroundJob.state == BackgroundJobState.completed) 

231 ).scalar_one() 

232 == 1 

233 ) 

234 assert ( 

235 session.execute( 

236 select(func.count()) 

237 .select_from(BackgroundJob) 

238 .where(BackgroundJob.state != BackgroundJobState.completed) 

239 ).scalar_one() 

240 == 0 

241 ) 

242 

243 

244def test_refresh_materialized_views(db): 

245 with session_scope() as session: 

246 queue_job(session, "refresh_materialized_views", empty_pb2.Empty()) 

247 

248 process_job() 

249 

250 with session_scope() as session: 

251 assert ( 

252 session.execute( 

253 select(func.count()) 

254 .select_from(BackgroundJob) 

255 .where(BackgroundJob.state == BackgroundJobState.completed) 

256 ).scalar_one() 

257 == 1 

258 ) 

259 assert ( 

260 session.execute( 

261 select(func.count()) 

262 .select_from(BackgroundJob) 

263 .where(BackgroundJob.state != BackgroundJobState.completed) 

264 ).scalar_one() 

265 == 0 

266 ) 

267 

268 

269def test_service_jobs(db): 

270 with session_scope() as session: 

271 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html") 

272 

273 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise 

274 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left 

275 class HitSleep(Exception): 

276 pass 

277 

278 # the mock `sleep` function that instead raises the aforementioned exception 

279 def raising_sleep(seconds): 

280 raise HitSleep() 

281 

282 with pytest.raises(HitSleep): 

283 with patch("couchers.jobs.worker.sleep", raising_sleep): 

284 service_jobs() 

285 

286 with session_scope() as session: 

287 assert ( 

288 session.execute( 

289 select(func.count()) 

290 .select_from(BackgroundJob) 

291 .where(BackgroundJob.state == BackgroundJobState.completed) 

292 ).scalar_one() 

293 == 1 

294 ) 

295 assert ( 

296 session.execute( 

297 select(func.count()) 

298 .select_from(BackgroundJob) 

299 .where(BackgroundJob.state != BackgroundJobState.completed) 

300 ).scalar_one() 

301 == 0 

302 ) 

303 

304 

305def test_scheduler(db, monkeypatch): 

306 MOCK_SCHEDULE = [ 

307 ("purge_login_tokens", timedelta(seconds=7)), 

308 ("send_message_notifications", timedelta(seconds=11)), 

309 ] 

310 

311 current_time = 0 

312 end_time = 70 

313 

314 class EndOfTime(Exception): 

315 pass 

316 

317 def mock_monotonic(): 

318 nonlocal current_time 

319 return current_time 

320 

321 def mock_sleep(seconds): 

322 nonlocal current_time 

323 current_time += seconds 

324 if current_time > end_time: 

325 raise EndOfTime() 

326 

327 realized_schedule = [] 

328 

329 def mock_run_job_and_schedule(sched, schedule_id): 

330 nonlocal current_time 

331 realized_schedule.append((current_time, schedule_id)) 

332 _run_job_and_schedule(sched, schedule_id) 

333 

334 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule) 

335 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE) 

336 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic) 

337 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep) 

338 

339 with pytest.raises(EndOfTime): 

340 run_scheduler() 

341 

342 assert realized_schedule == [ 

343 (0.0, 0), 

344 (0.0, 1), 

345 (7.0, 0), 

346 (11.0, 1), 

347 (14.0, 0), 

348 (21.0, 0), 

349 (22.0, 1), 

350 (28.0, 0), 

351 (33.0, 1), 

352 (35.0, 0), 

353 (42.0, 0), 

354 (44.0, 1), 

355 (49.0, 0), 

356 (55.0, 1), 

357 (56.0, 0), 

358 (63.0, 0), 

359 (66.0, 1), 

360 (70.0, 0), 

361 ] 

362 

363 with session_scope() as session: 

364 assert ( 

365 session.execute( 

366 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending) 

367 ).scalar_one() 

368 == 18 

369 ) 

370 assert ( 

371 session.execute( 

372 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending) 

373 ).scalar_one() 

374 == 0 

375 ) 

376 

377 

378def test_job_retry(db): 

379 with session_scope() as session: 

380 queue_job(session, "mock_job", empty_pb2.Empty()) 

381 

382 called_count = 0 

383 

384 def mock_job(payload): 

385 nonlocal called_count 

386 called_count += 1 

387 raise Exception() 

388 

389 MOCK_JOBS = { 

390 "mock_job": (empty_pb2.Empty, mock_job), 

391 } 

392 create_prometheus_server(registry=job_process_registry, port=8001) 

393 

394 # if IN_TEST is true, then the bg worker will raise on exceptions 

395 new_config = config.copy() 

396 new_config["IN_TEST"] = False 

397 

398 with patch("couchers.jobs.worker.config", new_config), patch("couchers.jobs.worker.JOBS", MOCK_JOBS): 

399 process_job() 

400 with session_scope() as session: 

401 assert ( 

402 session.execute( 

403 select(func.count()) 

404 .select_from(BackgroundJob) 

405 .where(BackgroundJob.state == BackgroundJobState.error) 

406 ).scalar_one() 

407 == 1 

408 ) 

409 assert ( 

410 session.execute( 

411 select(func.count()) 

412 .select_from(BackgroundJob) 

413 .where(BackgroundJob.state != BackgroundJobState.error) 

414 ).scalar_one() 

415 == 0 

416 ) 

417 

418 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

419 process_job() 

420 with session_scope() as session: 

421 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

422 process_job() 

423 with session_scope() as session: 

424 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

425 process_job() 

426 with session_scope() as session: 

427 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

428 process_job() 

429 

430 with session_scope() as session: 

431 assert ( 

432 session.execute( 

433 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed) 

434 ).scalar_one() 

435 == 1 

436 ) 

437 assert ( 

438 session.execute( 

439 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed) 

440 ).scalar_one() 

441 == 0 

442 ) 

443 

444 _check_job_counter("mock_job", "error", "4", "Exception") 

445 _check_job_counter("mock_job", "failed", "5", "Exception") 

446 

447 

448def test_no_jobs_no_problem(db): 

449 with session_scope() as session: 

450 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

451 

452 assert not process_job() 

453 

454 with session_scope() as session: 

455 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

456 

457 

458def test_send_message_notifications_basic(db): 

459 user1, token1 = generate_user() 

460 user2, token2 = generate_user() 

461 user3, token3 = generate_user() 

462 

463 make_friends(user1, user2) 

464 make_friends(user1, user3) 

465 make_friends(user2, user3) 

466 

467 send_message_notifications(empty_pb2.Empty()) 

468 process_jobs() 

469 

470 # should find no jobs, since there's no messages 

471 with session_scope() as session: 

472 assert ( 

473 session.execute( 

474 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

475 ).scalar_one() 

476 == 0 

477 ) 

478 

479 with conversations_session(token1) as c: 

480 group_chat_id = c.CreateGroupChat( 

481 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

482 ).group_chat_id 

483 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

484 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

485 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

486 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

487 

488 with conversations_session(token3) as c: 

489 group_chat_id = c.CreateGroupChat( 

490 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

491 ).group_chat_id 

492 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

493 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

494 

495 send_message_notifications(empty_pb2.Empty()) 

496 process_jobs() 

497 

498 # no emails sent out 

499 with session_scope() as session: 

500 assert ( 

501 session.execute( 

502 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

503 ).scalar_one() 

504 == 0 

505 ) 

506 

507 # this should generate emails for both user2 and user3 

508 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

509 send_message_notifications(empty_pb2.Empty()) 

510 process_jobs() 

511 

512 with session_scope() as session: 

513 assert ( 

514 session.execute( 

515 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

516 ).scalar_one() 

517 == 2 

518 ) 

519 # delete them all 

520 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

521 

522 # shouldn't generate any more emails 

523 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

524 send_message_notifications(empty_pb2.Empty()) 

525 process_jobs() 

526 

527 with session_scope() as session: 

528 assert ( 

529 session.execute( 

530 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

531 ).scalar_one() 

532 == 0 

533 ) 

534 

535 

536def test_send_message_notifications_muted(db): 

537 user1, token1 = generate_user() 

538 user2, token2 = generate_user() 

539 user3, token3 = generate_user() 

540 

541 make_friends(user1, user2) 

542 make_friends(user1, user3) 

543 make_friends(user2, user3) 

544 

545 send_message_notifications(empty_pb2.Empty()) 

546 process_jobs() 

547 

548 # should find no jobs, since there's no messages 

549 with session_scope() as session: 

550 assert ( 

551 session.execute( 

552 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

553 ).scalar_one() 

554 == 0 

555 ) 

556 

557 with conversations_session(token1) as c: 

558 group_chat_id = c.CreateGroupChat( 

559 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

560 ).group_chat_id 

561 

562 with conversations_session(token3) as c: 

563 # mute it for user 3 

564 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True)) 

565 

566 with conversations_session(token1) as c: 

567 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

568 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

569 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

570 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

571 

572 with conversations_session(token3) as c: 

573 group_chat_id = c.CreateGroupChat( 

574 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

575 ).group_chat_id 

576 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

577 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

578 

579 send_message_notifications(empty_pb2.Empty()) 

580 process_jobs() 

581 

582 # no emails sent out 

583 with session_scope() as session: 

584 assert ( 

585 session.execute( 

586 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

587 ).scalar_one() 

588 == 0 

589 ) 

590 

591 # this should generate emails for both user2 and NOT user3 

592 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

593 send_message_notifications(empty_pb2.Empty()) 

594 process_jobs() 

595 

596 with session_scope() as session: 

597 assert ( 

598 session.execute( 

599 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

600 ).scalar_one() 

601 == 1 

602 ) 

603 # delete them all 

604 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

605 

606 # shouldn't generate any more emails 

607 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

608 send_message_notifications(empty_pb2.Empty()) 

609 process_jobs() 

610 

611 with session_scope() as session: 

612 assert ( 

613 session.execute( 

614 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

615 ).scalar_one() 

616 == 0 

617 ) 

618 

619 

620def test_send_request_notifications_host_request(db): 

621 user1, token1 = generate_user() 

622 user2, token2 = generate_user() 

623 

624 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

625 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

626 

627 send_request_notifications(empty_pb2.Empty()) 

628 process_jobs() 

629 

630 # should find no jobs, since there's no messages 

631 with session_scope() as session: 

632 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

633 

634 # first test that sending host request creates email 

635 with requests_session(token1) as requests: 

636 host_request_id = requests.CreateHostRequest( 

637 requests_pb2.CreateHostRequestReq( 

638 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request" 

639 ) 

640 ).host_request_id 

641 

642 with session_scope() as session: 

643 # delete send_email BackgroundJob created by CreateHostRequest 

644 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

645 

646 # check send_request_notifications successfully creates background job 

647 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

648 send_request_notifications(empty_pb2.Empty()) 

649 process_jobs() 

650 assert ( 

651 session.execute( 

652 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

653 ).scalar_one() 

654 == 1 

655 ) 

656 

657 # delete all BackgroundJobs 

658 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

659 

660 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

661 send_request_notifications(empty_pb2.Empty()) 

662 process_jobs() 

663 # should find no messages since host has already been notified 

664 assert ( 

665 session.execute( 

666 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

667 ).scalar_one() 

668 == 0 

669 ) 

670 

671 # then test that responding to host request creates email 

672 with requests_session(token2) as requests: 

673 requests.RespondHostRequest( 

674 requests_pb2.RespondHostRequestReq( 

675 host_request_id=host_request_id, 

676 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

677 text="Test request", 

678 ) 

679 ) 

680 

681 with session_scope() as session: 

682 # delete send_email BackgroundJob created by RespondHostRequest 

683 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

684 

685 # check send_request_notifications successfully creates background job 

686 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

687 send_request_notifications(empty_pb2.Empty()) 

688 process_jobs() 

689 assert ( 

690 session.execute( 

691 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

692 ).scalar_one() 

693 == 1 

694 ) 

695 

696 # delete all BackgroundJobs 

697 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

698 

699 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

700 send_request_notifications(empty_pb2.Empty()) 

701 process_jobs() 

702 # should find no messages since guest has already been notified 

703 assert ( 

704 session.execute( 

705 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

706 ).scalar_one() 

707 == 0 

708 ) 

709 

710 

711def test_send_message_notifications_seen(db): 

712 user1, token1 = generate_user() 

713 user2, token2 = generate_user() 

714 

715 make_friends(user1, user2) 

716 

717 send_message_notifications(empty_pb2.Empty()) 

718 

719 # should find no jobs, since there's no messages 

720 with session_scope() as session: 

721 assert ( 

722 session.execute( 

723 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

724 ).scalar_one() 

725 == 0 

726 ) 

727 

728 with conversations_session(token1) as c: 

729 group_chat_id = c.CreateGroupChat( 

730 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

731 ).group_chat_id 

732 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

733 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

734 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

735 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

736 

737 # user 2 now marks those messages as seen 

738 with conversations_session(token2) as c: 

739 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id 

740 c.MarkLastSeenGroupChat( 

741 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id) 

742 ) 

743 

744 send_message_notifications(empty_pb2.Empty()) 

745 

746 # no emails sent out 

747 with session_scope() as session: 

748 assert ( 

749 session.execute( 

750 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

751 ).scalar_one() 

752 == 0 

753 ) 

754 

755 def now_30_min_in_future(): 

756 return now() + timedelta(minutes=30) 

757 

758 # still shouldn't generate emails as user2 has seen all messages 

759 with patch("couchers.jobs.handlers.now", now_30_min_in_future): 

760 send_message_notifications(empty_pb2.Empty()) 

761 

762 with session_scope() as session: 

763 assert ( 

764 session.execute( 

765 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

766 ).scalar_one() 

767 == 0 

768 ) 

769 

770 

771def test_send_onboarding_emails(db): 

772 # needs to get first onboarding email 

773 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None, complete_profile=False) 

774 

775 send_onboarding_emails(empty_pb2.Empty()) 

776 process_jobs() 

777 

778 with session_scope() as session: 

779 assert ( 

780 session.execute( 

781 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

782 ).scalar_one() 

783 == 1 

784 ) 

785 

786 # needs to get second onboarding email, but not yet 

787 user2, token2 = generate_user( 

788 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6), complete_profile=False 

789 ) 

790 

791 send_onboarding_emails(empty_pb2.Empty()) 

792 process_jobs() 

793 

794 with session_scope() as session: 

795 assert ( 

796 session.execute( 

797 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

798 ).scalar_one() 

799 == 1 

800 ) 

801 

802 # needs to get second onboarding email 

803 user3, token3 = generate_user( 

804 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8), complete_profile=False 

805 ) 

806 

807 send_onboarding_emails(empty_pb2.Empty()) 

808 process_jobs() 

809 

810 with session_scope() as session: 

811 assert ( 

812 session.execute( 

813 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

814 ).scalar_one() 

815 == 2 

816 ) 

817 

818 

819def test_send_reference_reminders(db): 

820 # need to test: 

821 # case 1: bidirectional (no emails) 

822 # case 2: host left ref (surfer needs an email) 

823 # case 3: surfer left ref (host needs an email) 

824 # case 4: neither left ref (host & surfer need an email) 

825 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

826 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email) 

827 

828 send_reference_reminders(empty_pb2.Empty()) 

829 

830 # case 1: bidirectional (no emails) 

831 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1") 

832 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2") 

833 

834 # case 2: host left ref (surfer needs an email) 

835 # host 

836 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3") 

837 # surfer 

838 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4") 

839 

840 # case 3: surfer left ref (host needs an email) 

841 # host 

842 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5") 

843 # surfer 

844 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6") 

845 

846 # case 4: neither left ref (host & surfer need an email) 

847 # surfer 

848 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7") 

849 # host 

850 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8") 

851 

852 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

853 # surfer 

854 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9") 

855 # host 

856 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10") 

857 

858 make_user_block(user9, user10) 

859 

860 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email) 

861 # host 

862 user11, token11 = generate_user(email="user11@couchers.org.invalid", name="User 11") 

863 # surfer 

864 user12, token12 = generate_user(email="user12@couchers.org.invalid", name="User 12") 

865 

866 with session_scope() as session: 

867 # note that create_host_reference creates a host request whose age is one day older than the timedelta here 

868 

869 # case 1: bidirectional (no emails) 

870 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True) 

871 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1) 

872 

873 # case 2: host left ref (surfer needs an email) 

874 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False) 

875 

876 # case 3: surfer left ref (host needs an email) 

877 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True) 

878 

879 # case 4: neither left ref (host & surfer need an email) 

880 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4)) 

881 

882 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

883 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7)) 

884 

885 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email) 

886 hr6 = create_host_request(session, user12.id, user11.id, timedelta(days=6), surfer_reason_didnt_meetup="") 

887 

888 expected_emails = [ 

889 ( 

890 "user11@couchers.org.invalid", 

891 "[TEST] You have 14 days to write a reference for User 12!", 

892 ("from when you hosted them", "/leave-reference/hosted/"), 

893 ), 

894 ( 

895 "user4@couchers.org.invalid", 

896 "[TEST] You have 3 days to write a reference for User 3!", 

897 ("from when you surfed with them", "/leave-reference/surfed/"), 

898 ), 

899 ( 

900 "user5@couchers.org.invalid", 

901 "[TEST] You have 7 days to write a reference for User 6!", 

902 ("from when you hosted them", "/leave-reference/hosted/"), 

903 ), 

904 ( 

905 "user7@couchers.org.invalid", 

906 "[TEST] You have 14 days to write a reference for User 8!", 

907 ("from when you surfed with them", "/leave-reference/surfed/"), 

908 ), 

909 ( 

910 "user8@couchers.org.invalid", 

911 "[TEST] You have 14 days to write a reference for User 7!", 

912 ("from when you hosted them", "/leave-reference/hosted/"), 

913 ), 

914 ] 

915 

916 send_reference_reminders(empty_pb2.Empty()) 

917 

918 while process_job(): 

919 pass 

920 

921 with session_scope() as session: 

922 emails = [ 

923 (email.recipient, email.subject, email.plain, email.html) 

924 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all() 

925 ] 

926 

927 actual_addresses_and_subjects = [email[:2] for email in emails] 

928 expected_addresses_and_subjects = [email[:2] for email in expected_emails] 

929 

930 print(actual_addresses_and_subjects) 

931 print(expected_addresses_and_subjects) 

932 

933 assert actual_addresses_and_subjects == expected_addresses_and_subjects 

934 

935 for (address, subject, plain, html), (_, _, search_strings) in zip(emails, expected_emails): 

936 for find in search_strings: 

937 assert find in plain, f"Expected to find string {find} in PLAIN email {subject} to {address}, didn't" 

938 assert find in html, f"Expected to find string {find} in HTML email {subject} to {address}, didn't" 

939 

940 

941def test_add_users_to_email_list(db): 

942 new_config = config.copy() 

943 new_config["LISTMONK_ENABLED"] = True 

944 new_config["LISTMONK_BASE_URL"] = "https://example.com" 

945 new_config["LISTMONK_API_USERNAME"] = "test_user" 

946 new_config["LISTMONK_API_KEY"] = "dummy_api_key" 

947 new_config["LISTMONK_LIST_ID"] = 6 

948 

949 with patch("couchers.jobs.handlers.config", new_config): 

950 with patch("couchers.jobs.handlers.requests.post") as mock: 

951 add_users_to_email_list(empty_pb2.Empty()) 

952 mock.assert_not_called() 

953 

954 generate_user(in_sync_with_newsletter=False, email="testing1@couchers.invalid", name="Tester1", id=15) 

955 generate_user(in_sync_with_newsletter=True, email="testing2@couchers.invalid", name="Tester2") 

956 generate_user(in_sync_with_newsletter=False, email="testing3@couchers.invalid", name="Tester3 von test", id=17) 

957 generate_user( 

958 in_sync_with_newsletter=False, email="testing4@couchers.invalid", name="Tester4", opt_out_of_newsletter=True 

959 ) 

960 

961 with patch("couchers.jobs.handlers.requests.post") as mock: 

962 ret = mock.return_value 

963 ret.status_code = 200 

964 add_users_to_email_list(empty_pb2.Empty()) 

965 mock.assert_has_calls( 

966 [ 

967 call( 

968 "https://example.com/api/subscribers", 

969 auth=("test_user", "dummy_api_key"), 

970 json={ 

971 "email": "testing1@couchers.invalid", 

972 "name": "Tester1", 

973 "lists": [6], 

974 "preconfirm_subscriptions": True, 

975 "attribs": {"couchers_user_id": 15}, 

976 "status": "enabled", 

977 }, 

978 timeout=10, 

979 ), 

980 call( 

981 "https://example.com/api/subscribers", 

982 auth=("test_user", "dummy_api_key"), 

983 json={ 

984 "email": "testing3@couchers.invalid", 

985 "name": "Tester3 von test", 

986 "lists": [6], 

987 "preconfirm_subscriptions": True, 

988 "attribs": {"couchers_user_id": 17}, 

989 "status": "enabled", 

990 }, 

991 timeout=10, 

992 ), 

993 ], 

994 any_order=True, 

995 ) 

996 

997 with patch("couchers.jobs.handlers.requests.post") as mock: 

998 add_users_to_email_list(empty_pb2.Empty()) 

999 mock.assert_not_called() 

1000 

1001 

1002def test_update_recommendation_scores(db): 

1003 update_recommendation_scores(empty_pb2.Empty()) 

1004 

1005 

1006def test_update_badges(db, push_collector): 

1007 user1, _ = generate_user() 

1008 user2, _ = generate_user() 

1009 user3, _ = generate_user() 

1010 user4, _ = generate_user(phone="+15555555555", phone_verification_verified=func.now()) 

1011 user5, _ = generate_user(phone="+15555555556", phone_verification_verified=func.now()) 

1012 user6, _ = generate_user() 

1013 

1014 with session_scope() as session: 

1015 session.add(UserBadge(user_id=user5.id, badge_id="board_member")) 

1016 

1017 update_badges(empty_pb2.Empty()) 

1018 process_jobs() 

1019 

1020 with session_scope() as session: 

1021 badge_tuples = session.execute( 

1022 select(UserBadge.user_id, UserBadge.badge_id).order_by(UserBadge.user_id.asc(), UserBadge.id.asc()) 

1023 ).all() 

1024 

1025 expected = [ 

1026 (user1.id, "founder"), 

1027 (user1.id, "board_member"), 

1028 (user2.id, "founder"), 

1029 (user2.id, "board_member"), 

1030 (user4.id, "phone_verified"), 

1031 (user5.id, "phone_verified"), 

1032 ] 

1033 

1034 assert badge_tuples == expected 

1035 

1036 print(push_collector.pushes) 

1037 

1038 push_collector.assert_user_push_matches_fields( 

1039 user1.id, 

1040 ix=0, 

1041 title="The Founder badge was added to your profile", 

1042 body="Check out your profile to see the new badge!", 

1043 ) 

1044 push_collector.assert_user_push_matches_fields( 

1045 user1.id, 

1046 ix=1, 

1047 title="The Board Member badge was added to your profile", 

1048 body="Check out your profile to see the new badge!", 

1049 ) 

1050 push_collector.assert_user_push_matches_fields( 

1051 user2.id, 

1052 ix=0, 

1053 title="The Founder badge was added to your profile", 

1054 body="Check out your profile to see the new badge!", 

1055 ) 

1056 push_collector.assert_user_push_matches_fields( 

1057 user2.id, 

1058 ix=1, 

1059 title="The Board Member badge was added to your profile", 

1060 body="Check out your profile to see the new badge!", 

1061 ) 

1062 push_collector.assert_user_push_matches_fields( 

1063 user4.id, 

1064 ix=0, 

1065 title="The Verified Phone badge was added to your profile", 

1066 body="Check out your profile to see the new badge!", 

1067 ) 

1068 push_collector.assert_user_push_matches_fields( 

1069 user5.id, 

1070 ix=0, 

1071 title="The Board Member badge was removed from your profile", 

1072 body="You can see all your badges on your profile.", 

1073 ) 

1074 push_collector.assert_user_push_matches_fields( 

1075 user5.id, 

1076 ix=1, 

1077 title="The Verified Phone badge was added to your profile", 

1078 body="Check out your profile to see the new badge!", 

1079 )