Coverage for src/tests/test_bg_jobs.py: 100%

407 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-11-21 04:21 +0000

1from datetime import timedelta 

2from unittest.mock import call, patch 

3 

4import pytest 

5import requests 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import delete, func 

8 

9import couchers.jobs.worker 

10from couchers.config import config 

11from couchers.crypto import urlsafe_secure_token 

12from couchers.db import session_scope 

13from couchers.email import queue_email 

14from couchers.email.dev import print_dev_email 

15from couchers.jobs.enqueue import queue_job 

16from couchers.jobs.handlers import ( 

17 add_users_to_email_list, 

18 send_message_notifications, 

19 send_onboarding_emails, 

20 send_reference_reminders, 

21 send_request_notifications, 

22 update_badges, 

23 update_recommendation_scores, 

24) 

25from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs 

26from couchers.metrics import create_prometheus_server, job_process_registry 

27from couchers.models import ( 

28 AccountDeletionToken, 

29 BackgroundJob, 

30 BackgroundJobState, 

31 Email, 

32 LoginToken, 

33 PasswordResetToken, 

34 UserBadge, 

35) 

36from couchers.sql import couchers_select as select 

37from couchers.utils import now, today 

38from proto import conversations_pb2, requests_pb2 

39from tests.test_fixtures import ( # noqa 

40 auth_api_session, 

41 conversations_session, 

42 db, 

43 generate_user, 

44 make_friends, 

45 make_user_block, 

46 process_jobs, 

47 push_collector, 

48 requests_session, 

49 testconfig, 

50) 

51from tests.test_references import create_host_reference, create_host_request 

52 

53 

54def now_5_min_in_future(): 

55 return now() + timedelta(minutes=5) 

56 

57 

58@pytest.fixture(autouse=True) 

59def _(testconfig): 

60 pass 

61 

62 

63def _check_job_counter(job, status, attempt, exception): 

64 metrics_string = requests.get("http://localhost:8001").text 

65 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"' 

66 assert string_to_check in metrics_string 

67 

68 

69def test_email_job(db): 

70 with session_scope() as session: 

71 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html") 

72 

73 def mock_print_dev_email( 

74 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

75 ): 

76 assert sender_name == "sender_name" 

77 assert sender_email == "sender_email" 

78 assert recipient == "recipient" 

79 assert subject == "subject" 

80 assert plain == "plain" 

81 assert html == "html" 

82 return print_dev_email( 

83 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

84 ) 

85 

86 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email): 

87 process_job() 

88 

89 with session_scope() as session: 

90 assert ( 

91 session.execute( 

92 select(func.count()) 

93 .select_from(BackgroundJob) 

94 .where(BackgroundJob.state == BackgroundJobState.completed) 

95 ).scalar_one() 

96 == 1 

97 ) 

98 assert ( 

99 session.execute( 

100 select(func.count()) 

101 .select_from(BackgroundJob) 

102 .where(BackgroundJob.state != BackgroundJobState.completed) 

103 ).scalar_one() 

104 == 0 

105 ) 

106 

107 

108def test_purge_login_tokens(db): 

109 user, api_token = generate_user() 

110 

111 with session_scope() as session: 

112 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

113 session.add(login_token) 

114 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1 

115 

116 queue_job(session, "purge_login_tokens", empty_pb2.Empty()) 

117 process_job() 

118 

119 with session_scope() as session: 

120 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0 

121 

122 with session_scope() as session: 

123 assert ( 

124 session.execute( 

125 select(func.count()) 

126 .select_from(BackgroundJob) 

127 .where(BackgroundJob.state == BackgroundJobState.completed) 

128 ).scalar_one() 

129 == 1 

130 ) 

131 assert ( 

132 session.execute( 

133 select(func.count()) 

134 .select_from(BackgroundJob) 

135 .where(BackgroundJob.state != BackgroundJobState.completed) 

136 ).scalar_one() 

137 == 0 

138 ) 

139 

140 

141def test_purge_password_reset_tokens(db): 

142 user, api_token = generate_user() 

143 

144 with session_scope() as session: 

145 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

146 session.add(password_reset_token) 

147 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1 

148 

149 queue_job(session, "purge_password_reset_tokens", empty_pb2.Empty()) 

150 process_job() 

151 

152 with session_scope() as session: 

153 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0 

154 

155 with session_scope() as session: 

156 assert ( 

157 session.execute( 

158 select(func.count()) 

159 .select_from(BackgroundJob) 

160 .where(BackgroundJob.state == BackgroundJobState.completed) 

161 ).scalar_one() 

162 == 1 

163 ) 

164 assert ( 

165 session.execute( 

166 select(func.count()) 

167 .select_from(BackgroundJob) 

168 .where(BackgroundJob.state != BackgroundJobState.completed) 

169 ).scalar_one() 

170 == 0 

171 ) 

172 

173 

174def test_purge_account_deletion_tokens(db): 

175 user, api_token = generate_user() 

176 user2, api_token2 = generate_user() 

177 user3, api_token3 = generate_user() 

178 

179 with session_scope() as session: 

180 """ 

181 3 cases: 

182 1) Token is valid 

183 2) Token expired but account retrievable 

184 3) Account is irretrievable (and expired) 

185 """ 

186 account_deletion_tokens = [ 

187 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)), 

188 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()), 

189 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)), 

190 ] 

191 for token in account_deletion_tokens: 

192 session.add(token) 

193 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

194 

195 queue_job(session, "purge_account_deletion_tokens", empty_pb2.Empty()) 

196 process_job() 

197 

198 with session_scope() as session: 

199 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1 

200 

201 with session_scope() as session: 

202 assert ( 

203 session.execute( 

204 select(func.count()) 

205 .select_from(BackgroundJob) 

206 .where(BackgroundJob.state == BackgroundJobState.completed) 

207 ).scalar_one() 

208 == 1 

209 ) 

210 assert ( 

211 session.execute( 

212 select(func.count()) 

213 .select_from(BackgroundJob) 

214 .where(BackgroundJob.state != BackgroundJobState.completed) 

215 ).scalar_one() 

216 == 0 

217 ) 

218 

219 

220def test_enforce_community_memberships(db): 

221 with session_scope() as session: 

222 queue_job(session, "enforce_community_membership", empty_pb2.Empty()) 

223 process_job() 

224 

225 with session_scope() as session: 

226 assert ( 

227 session.execute( 

228 select(func.count()) 

229 .select_from(BackgroundJob) 

230 .where(BackgroundJob.state == BackgroundJobState.completed) 

231 ).scalar_one() 

232 == 1 

233 ) 

234 assert ( 

235 session.execute( 

236 select(func.count()) 

237 .select_from(BackgroundJob) 

238 .where(BackgroundJob.state != BackgroundJobState.completed) 

239 ).scalar_one() 

240 == 0 

241 ) 

242 

243 

244def test_refresh_materialized_views(db): 

245 with session_scope() as session: 

246 queue_job(session, "refresh_materialized_views", empty_pb2.Empty()) 

247 

248 process_job() 

249 

250 with session_scope() as session: 

251 assert ( 

252 session.execute( 

253 select(func.count()) 

254 .select_from(BackgroundJob) 

255 .where(BackgroundJob.state == BackgroundJobState.completed) 

256 ).scalar_one() 

257 == 1 

258 ) 

259 assert ( 

260 session.execute( 

261 select(func.count()) 

262 .select_from(BackgroundJob) 

263 .where(BackgroundJob.state != BackgroundJobState.completed) 

264 ).scalar_one() 

265 == 0 

266 ) 

267 

268 

269def test_service_jobs(db): 

270 with session_scope() as session: 

271 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html") 

272 

273 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise 

274 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left 

275 class HitSleep(Exception): 

276 pass 

277 

278 # the mock `sleep` function that instead raises the aforementioned exception 

279 def raising_sleep(seconds): 

280 raise HitSleep() 

281 

282 with pytest.raises(HitSleep): 

283 with patch("couchers.jobs.worker.sleep", raising_sleep): 

284 service_jobs() 

285 

286 with session_scope() as session: 

287 assert ( 

288 session.execute( 

289 select(func.count()) 

290 .select_from(BackgroundJob) 

291 .where(BackgroundJob.state == BackgroundJobState.completed) 

292 ).scalar_one() 

293 == 1 

294 ) 

295 assert ( 

296 session.execute( 

297 select(func.count()) 

298 .select_from(BackgroundJob) 

299 .where(BackgroundJob.state != BackgroundJobState.completed) 

300 ).scalar_one() 

301 == 0 

302 ) 

303 

304 

305def test_scheduler(db, monkeypatch): 

306 MOCK_SCHEDULE = [ 

307 ("purge_login_tokens", timedelta(seconds=7)), 

308 ("send_message_notifications", timedelta(seconds=11)), 

309 ] 

310 

311 current_time = 0 

312 end_time = 70 

313 

314 class EndOfTime(Exception): 

315 pass 

316 

317 def mock_monotonic(): 

318 nonlocal current_time 

319 return current_time 

320 

321 def mock_sleep(seconds): 

322 nonlocal current_time 

323 current_time += seconds 

324 if current_time > end_time: 

325 raise EndOfTime() 

326 

327 realized_schedule = [] 

328 

329 def mock_run_job_and_schedule(sched, schedule_id): 

330 nonlocal current_time 

331 realized_schedule.append((current_time, schedule_id)) 

332 _run_job_and_schedule(sched, schedule_id) 

333 

334 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule) 

335 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE) 

336 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic) 

337 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep) 

338 

339 with pytest.raises(EndOfTime): 

340 run_scheduler() 

341 

342 assert realized_schedule == [ 

343 (0.0, 0), 

344 (0.0, 1), 

345 (7.0, 0), 

346 (11.0, 1), 

347 (14.0, 0), 

348 (21.0, 0), 

349 (22.0, 1), 

350 (28.0, 0), 

351 (33.0, 1), 

352 (35.0, 0), 

353 (42.0, 0), 

354 (44.0, 1), 

355 (49.0, 0), 

356 (55.0, 1), 

357 (56.0, 0), 

358 (63.0, 0), 

359 (66.0, 1), 

360 (70.0, 0), 

361 ] 

362 

363 with session_scope() as session: 

364 assert ( 

365 session.execute( 

366 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending) 

367 ).scalar_one() 

368 == 18 

369 ) 

370 assert ( 

371 session.execute( 

372 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending) 

373 ).scalar_one() 

374 == 0 

375 ) 

376 

377 

378def test_job_retry(db): 

379 with session_scope() as session: 

380 queue_job(session, "mock_job", empty_pb2.Empty()) 

381 

382 called_count = 0 

383 

384 def mock_job(payload): 

385 nonlocal called_count 

386 called_count += 1 

387 raise Exception() 

388 

389 MOCK_JOBS = { 

390 "mock_job": (empty_pb2.Empty, mock_job), 

391 } 

392 create_prometheus_server(registry=job_process_registry, port=8001) 

393 

394 # if IN_TEST is true, then the bg worker will raise on exceptions 

395 new_config = config.copy() 

396 new_config["IN_TEST"] = False 

397 

398 with patch("couchers.jobs.worker.config", new_config), patch("couchers.jobs.worker.JOBS", MOCK_JOBS): 

399 process_job() 

400 with session_scope() as session: 

401 assert ( 

402 session.execute( 

403 select(func.count()) 

404 .select_from(BackgroundJob) 

405 .where(BackgroundJob.state == BackgroundJobState.error) 

406 ).scalar_one() 

407 == 1 

408 ) 

409 assert ( 

410 session.execute( 

411 select(func.count()) 

412 .select_from(BackgroundJob) 

413 .where(BackgroundJob.state != BackgroundJobState.error) 

414 ).scalar_one() 

415 == 0 

416 ) 

417 

418 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

419 process_job() 

420 with session_scope() as session: 

421 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

422 process_job() 

423 with session_scope() as session: 

424 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

425 process_job() 

426 with session_scope() as session: 

427 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

428 process_job() 

429 

430 with session_scope() as session: 

431 assert ( 

432 session.execute( 

433 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed) 

434 ).scalar_one() 

435 == 1 

436 ) 

437 assert ( 

438 session.execute( 

439 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed) 

440 ).scalar_one() 

441 == 0 

442 ) 

443 

444 _check_job_counter("mock_job", "error", "4", "Exception") 

445 _check_job_counter("mock_job", "failed", "5", "Exception") 

446 

447 

448def test_no_jobs_no_problem(db): 

449 with session_scope() as session: 

450 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

451 

452 assert not process_job() 

453 

454 with session_scope() as session: 

455 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

456 

457 

458def test_send_message_notifications_basic(db): 

459 user1, token1 = generate_user() 

460 user2, token2 = generate_user() 

461 user3, token3 = generate_user() 

462 

463 make_friends(user1, user2) 

464 make_friends(user1, user3) 

465 make_friends(user2, user3) 

466 

467 send_message_notifications(empty_pb2.Empty()) 

468 process_jobs() 

469 

470 # should find no jobs, since there's no messages 

471 with session_scope() as session: 

472 assert ( 

473 session.execute( 

474 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

475 ).scalar_one() 

476 == 0 

477 ) 

478 

479 with conversations_session(token1) as c: 

480 group_chat_id = c.CreateGroupChat( 

481 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

482 ).group_chat_id 

483 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

484 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

485 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

486 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

487 

488 with conversations_session(token3) as c: 

489 group_chat_id = c.CreateGroupChat( 

490 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

491 ).group_chat_id 

492 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

493 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

494 

495 send_message_notifications(empty_pb2.Empty()) 

496 process_jobs() 

497 

498 # no emails sent out 

499 with session_scope() as session: 

500 assert ( 

501 session.execute( 

502 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

503 ).scalar_one() 

504 == 0 

505 ) 

506 

507 # this should generate emails for both user2 and user3 

508 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

509 send_message_notifications(empty_pb2.Empty()) 

510 process_jobs() 

511 

512 with session_scope() as session: 

513 assert ( 

514 session.execute( 

515 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

516 ).scalar_one() 

517 == 2 

518 ) 

519 # delete them all 

520 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

521 

522 # shouldn't generate any more emails 

523 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

524 send_message_notifications(empty_pb2.Empty()) 

525 process_jobs() 

526 

527 with session_scope() as session: 

528 assert ( 

529 session.execute( 

530 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

531 ).scalar_one() 

532 == 0 

533 ) 

534 

535 

536def test_send_message_notifications_muted(db): 

537 user1, token1 = generate_user() 

538 user2, token2 = generate_user() 

539 user3, token3 = generate_user() 

540 

541 make_friends(user1, user2) 

542 make_friends(user1, user3) 

543 make_friends(user2, user3) 

544 

545 send_message_notifications(empty_pb2.Empty()) 

546 process_jobs() 

547 

548 # should find no jobs, since there's no messages 

549 with session_scope() as session: 

550 assert ( 

551 session.execute( 

552 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

553 ).scalar_one() 

554 == 0 

555 ) 

556 

557 with conversations_session(token1) as c: 

558 group_chat_id = c.CreateGroupChat( 

559 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

560 ).group_chat_id 

561 

562 with conversations_session(token3) as c: 

563 # mute it for user 3 

564 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True)) 

565 

566 with conversations_session(token1) as c: 

567 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

568 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

569 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

570 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

571 

572 with conversations_session(token3) as c: 

573 group_chat_id = c.CreateGroupChat( 

574 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

575 ).group_chat_id 

576 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

577 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

578 

579 send_message_notifications(empty_pb2.Empty()) 

580 process_jobs() 

581 

582 # no emails sent out 

583 with session_scope() as session: 

584 assert ( 

585 session.execute( 

586 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

587 ).scalar_one() 

588 == 0 

589 ) 

590 

591 # this should generate emails for both user2 and NOT user3 

592 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

593 send_message_notifications(empty_pb2.Empty()) 

594 process_jobs() 

595 

596 with session_scope() as session: 

597 assert ( 

598 session.execute( 

599 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

600 ).scalar_one() 

601 == 1 

602 ) 

603 # delete them all 

604 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

605 

606 # shouldn't generate any more emails 

607 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

608 send_message_notifications(empty_pb2.Empty()) 

609 process_jobs() 

610 

611 with session_scope() as session: 

612 assert ( 

613 session.execute( 

614 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

615 ).scalar_one() 

616 == 0 

617 ) 

618 

619 

620def test_send_request_notifications_host_request(db): 

621 user1, token1 = generate_user() 

622 user2, token2 = generate_user() 

623 

624 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

625 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

626 

627 send_request_notifications(empty_pb2.Empty()) 

628 process_jobs() 

629 

630 # should find no jobs, since there's no messages 

631 with session_scope() as session: 

632 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

633 

634 # first test that sending host request creates email 

635 with requests_session(token1) as requests: 

636 host_request_id = requests.CreateHostRequest( 

637 requests_pb2.CreateHostRequestReq( 

638 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request" 

639 ) 

640 ).host_request_id 

641 

642 with session_scope() as session: 

643 # delete send_email BackgroundJob created by CreateHostRequest 

644 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

645 

646 # check send_request_notifications successfully creates background job 

647 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

648 send_request_notifications(empty_pb2.Empty()) 

649 process_jobs() 

650 assert ( 

651 session.execute( 

652 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

653 ).scalar_one() 

654 == 1 

655 ) 

656 

657 # delete all BackgroundJobs 

658 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

659 

660 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

661 send_request_notifications(empty_pb2.Empty()) 

662 process_jobs() 

663 # should find no messages since host has already been notified 

664 assert ( 

665 session.execute( 

666 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

667 ).scalar_one() 

668 == 0 

669 ) 

670 

671 # then test that responding to host request creates email 

672 with requests_session(token2) as requests: 

673 requests.RespondHostRequest( 

674 requests_pb2.RespondHostRequestReq( 

675 host_request_id=host_request_id, 

676 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

677 text="Test request", 

678 ) 

679 ) 

680 

681 with session_scope() as session: 

682 # delete send_email BackgroundJob created by RespondHostRequest 

683 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

684 

685 # check send_request_notifications successfully creates background job 

686 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

687 send_request_notifications(empty_pb2.Empty()) 

688 process_jobs() 

689 assert ( 

690 session.execute( 

691 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

692 ).scalar_one() 

693 == 1 

694 ) 

695 

696 # delete all BackgroundJobs 

697 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

698 

699 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

700 send_request_notifications(empty_pb2.Empty()) 

701 process_jobs() 

702 # should find no messages since guest has already been notified 

703 assert ( 

704 session.execute( 

705 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

706 ).scalar_one() 

707 == 0 

708 ) 

709 

710 

711def test_send_message_notifications_seen(db): 

712 user1, token1 = generate_user() 

713 user2, token2 = generate_user() 

714 

715 make_friends(user1, user2) 

716 

717 send_message_notifications(empty_pb2.Empty()) 

718 

719 # should find no jobs, since there's no messages 

720 with session_scope() as session: 

721 assert ( 

722 session.execute( 

723 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

724 ).scalar_one() 

725 == 0 

726 ) 

727 

728 with conversations_session(token1) as c: 

729 group_chat_id = c.CreateGroupChat( 

730 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

731 ).group_chat_id 

732 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

733 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

734 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

735 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

736 

737 # user 2 now marks those messages as seen 

738 with conversations_session(token2) as c: 

739 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id 

740 c.MarkLastSeenGroupChat( 

741 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id) 

742 ) 

743 

744 send_message_notifications(empty_pb2.Empty()) 

745 

746 # no emails sent out 

747 with session_scope() as session: 

748 assert ( 

749 session.execute( 

750 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

751 ).scalar_one() 

752 == 0 

753 ) 

754 

755 def now_30_min_in_future(): 

756 return now() + timedelta(minutes=30) 

757 

758 # still shouldn't generate emails as user2 has seen all messages 

759 with patch("couchers.jobs.handlers.now", now_30_min_in_future): 

760 send_message_notifications(empty_pb2.Empty()) 

761 

762 with session_scope() as session: 

763 assert ( 

764 session.execute( 

765 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

766 ).scalar_one() 

767 == 0 

768 ) 

769 

770 

771def test_send_onboarding_emails(db): 

772 # needs to get first onboarding email 

773 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None, complete_profile=False) 

774 

775 send_onboarding_emails(empty_pb2.Empty()) 

776 process_jobs() 

777 

778 with session_scope() as session: 

779 assert ( 

780 session.execute( 

781 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

782 ).scalar_one() 

783 == 1 

784 ) 

785 

786 # needs to get second onboarding email, but not yet 

787 user2, token2 = generate_user( 

788 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6), complete_profile=False 

789 ) 

790 

791 send_onboarding_emails(empty_pb2.Empty()) 

792 process_jobs() 

793 

794 with session_scope() as session: 

795 assert ( 

796 session.execute( 

797 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

798 ).scalar_one() 

799 == 1 

800 ) 

801 

802 # needs to get second onboarding email 

803 user3, token3 = generate_user( 

804 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8), complete_profile=False 

805 ) 

806 

807 send_onboarding_emails(empty_pb2.Empty()) 

808 process_jobs() 

809 

810 with session_scope() as session: 

811 assert ( 

812 session.execute( 

813 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

814 ).scalar_one() 

815 == 2 

816 ) 

817 

818 

819def test_send_reference_reminders(db): 

820 # need to test: 

821 # case 1: bidirectional (no emails) 

822 # case 2: host left ref (surfer needs an email) 

823 # case 3: surfer left ref (host needs an email) 

824 # case 4: neither left ref (host & surfer need an email) 

825 

826 send_reference_reminders(empty_pb2.Empty()) 

827 

828 # case 1: bidirectional (no emails) 

829 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1") 

830 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2") 

831 

832 # case 2: host left ref (surfer needs an email) 

833 # host 

834 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3") 

835 # surfer 

836 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4") 

837 

838 # case 3: surfer left ref (host needs an email) 

839 # host 

840 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5") 

841 # surfer 

842 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6") 

843 

844 # case 4: neither left ref (host & surfer need an email) 

845 # host 

846 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7") 

847 # surfer 

848 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8") 

849 

850 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

851 # host 

852 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9") 

853 # surfer 

854 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10") 

855 

856 make_user_block(user9, user10) 

857 

858 with session_scope() as session: 

859 # note that create_host_reference creates a host request whose age is one day older than the timedelta here 

860 

861 # case 1: bidirectional (no emails) 

862 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True) 

863 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1) 

864 

865 # case 2: host left ref (surfer needs an email) 

866 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False) 

867 

868 # case 3: surfer left ref (host needs an email) 

869 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True) 

870 

871 # case 4: neither left ref (host & surfer need an email) 

872 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4)) 

873 

874 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

875 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7)) 

876 

877 expected_emails = [ 

878 ("user4@couchers.org.invalid", "[TEST] You have 3 days to write a reference for User 3!"), 

879 ("user5@couchers.org.invalid", "[TEST] You have 7 days to write a reference for User 6!"), 

880 ("user7@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 8!"), 

881 ("user8@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 7!"), 

882 ] 

883 

884 send_reference_reminders(empty_pb2.Empty()) 

885 

886 while process_job(): 

887 pass 

888 

889 with session_scope() as session: 

890 emails = [ 

891 (email.recipient, email.subject) 

892 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all() 

893 ] 

894 

895 print(emails) 

896 print(expected_emails) 

897 

898 assert emails == expected_emails 

899 

900 

901def test_add_users_to_email_list(db): 

902 new_config = config.copy() 

903 new_config["LISTMONK_ENABLED"] = True 

904 new_config["LISTMONK_BASE_URL"] = "https://example.com" 

905 new_config["LISTMONK_API_KEY"] = "dummy_api_key" 

906 new_config["LISTMONK_LIST_UUID"] = "baf96eaa-5e70-409d-b776-f5c16fb091b9" 

907 

908 with patch("couchers.jobs.handlers.config", new_config): 

909 with patch("couchers.jobs.handlers.requests.post") as mock: 

910 add_users_to_email_list(empty_pb2.Empty()) 

911 mock.assert_not_called() 

912 

913 generate_user(in_sync_with_newsletter=False, email="testing1@couchers.invalid", name="Tester1", id=15) 

914 generate_user(in_sync_with_newsletter=True, email="testing2@couchers.invalid", name="Tester2") 

915 generate_user(in_sync_with_newsletter=False, email="testing3@couchers.invalid", name="Tester3 von test", id=17) 

916 generate_user( 

917 in_sync_with_newsletter=False, email="testing4@couchers.invalid", name="Tester4", opt_out_of_newsletter=True 

918 ) 

919 

920 with patch("couchers.jobs.handlers.requests.post") as mock: 

921 ret = mock.return_value 

922 ret.status_code = 200 

923 add_users_to_email_list(empty_pb2.Empty()) 

924 mock.assert_has_calls( 

925 [ 

926 call( 

927 "https://example.com/api/subscribers", 

928 auth=("listmonk", "dummy_api_key"), 

929 json={ 

930 "email": "testing1@couchers.invalid", 

931 "name": "Tester1", 

932 "list_uuids": ["baf96eaa-5e70-409d-b776-f5c16fb091b9"], 

933 "preconfirm_subscriptions": True, 

934 "attribs": {"couchers_user_id": 15}, 

935 }, 

936 timeout=10, 

937 ), 

938 call( 

939 "https://example.com/api/subscribers", 

940 auth=("listmonk", "dummy_api_key"), 

941 json={ 

942 "email": "testing3@couchers.invalid", 

943 "name": "Tester3 von test", 

944 "list_uuids": ["baf96eaa-5e70-409d-b776-f5c16fb091b9"], 

945 "preconfirm_subscriptions": True, 

946 "attribs": {"couchers_user_id": 17}, 

947 }, 

948 timeout=10, 

949 ), 

950 ], 

951 any_order=True, 

952 ) 

953 

954 with patch("couchers.jobs.handlers.requests.post") as mock: 

955 add_users_to_email_list(empty_pb2.Empty()) 

956 mock.assert_not_called() 

957 

958 

959def test_update_recommendation_scores(db): 

960 update_recommendation_scores(empty_pb2.Empty()) 

961 

962 

963def test_update_badges(db, push_collector): 

964 user1, _ = generate_user() 

965 user2, _ = generate_user() 

966 user3, _ = generate_user() 

967 user4, _ = generate_user(phone="+15555555555", phone_verification_verified=func.now()) 

968 user5, _ = generate_user(phone="+15555555556", phone_verification_verified=func.now()) 

969 user6, _ = generate_user() 

970 

971 with session_scope() as session: 

972 session.add(UserBadge(user_id=user5.id, badge_id="board_member")) 

973 

974 update_badges(empty_pb2.Empty()) 

975 process_jobs() 

976 

977 with session_scope() as session: 

978 badge_tuples = session.execute( 

979 select(UserBadge.user_id, UserBadge.badge_id).order_by(UserBadge.user_id.asc(), UserBadge.id.asc()) 

980 ).all() 

981 

982 expected = [ 

983 (user1.id, "founder"), 

984 (user1.id, "board_member"), 

985 (user2.id, "founder"), 

986 (user2.id, "board_member"), 

987 (user4.id, "phone_verified"), 

988 (user5.id, "phone_verified"), 

989 ] 

990 

991 assert badge_tuples == expected 

992 

993 print(push_collector.pushes) 

994 

995 push_collector.assert_user_push_matches_fields( 

996 user1.id, 

997 ix=0, 

998 title="The Founder badge was added to your profile", 

999 body="Check out your profile to see the new badge!", 

1000 ) 

1001 push_collector.assert_user_push_matches_fields( 

1002 user1.id, 

1003 ix=1, 

1004 title="The Board Member badge was added to your profile", 

1005 body="Check out your profile to see the new badge!", 

1006 ) 

1007 push_collector.assert_user_push_matches_fields( 

1008 user2.id, 

1009 ix=0, 

1010 title="The Founder badge was added to your profile", 

1011 body="Check out your profile to see the new badge!", 

1012 ) 

1013 push_collector.assert_user_push_matches_fields( 

1014 user2.id, 

1015 ix=1, 

1016 title="The Board Member badge was added to your profile", 

1017 body="Check out your profile to see the new badge!", 

1018 ) 

1019 push_collector.assert_user_push_matches_fields( 

1020 user4.id, 

1021 ix=0, 

1022 title="The Verified Phone badge was added to your profile", 

1023 body="Check out your profile to see the new badge!", 

1024 ) 

1025 push_collector.assert_user_push_matches_fields( 

1026 user5.id, 

1027 ix=0, 

1028 title="The Board Member badge was removed from your profile", 

1029 body="You can see all your badges on your profile.", 

1030 ) 

1031 push_collector.assert_user_push_matches_fields( 

1032 user5.id, 

1033 ix=1, 

1034 title="The Verified Phone badge was added to your profile", 

1035 body="Check out your profile to see the new badge!", 

1036 )