Coverage for src/tests/test_bg_jobs.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

377 statements  

1from datetime import timedelta 

2from unittest.mock import patch 

3 

4import pytest 

5import requests 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import delete, func 

8 

9import couchers.jobs.worker 

10from couchers.config import config 

11from couchers.crypto import urlsafe_secure_token 

12from couchers.db import session_scope 

13from couchers.email import queue_email 

14from couchers.email.dev import print_dev_email 

15from couchers.jobs.enqueue import queue_job 

16from couchers.jobs.handlers import ( 

17 process_add_users_to_email_list, 

18 process_send_message_notifications, 

19 process_send_onboarding_emails, 

20 process_send_reference_reminders, 

21 process_send_request_notifications, 

22 process_update_recommendation_scores, 

23) 

24from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs 

25from couchers.metrics import create_prometheus_server, job_process_registry 

26from couchers.models import ( 

27 AccountDeletionToken, 

28 BackgroundJob, 

29 BackgroundJobState, 

30 BackgroundJobType, 

31 Email, 

32 LoginToken, 

33 PasswordResetToken, 

34) 

35from couchers.sql import couchers_select as select 

36from couchers.tasks import send_login_email 

37from couchers.utils import now, today 

38from proto import conversations_pb2, requests_pb2 

39from tests.test_fixtures import ( # noqa 

40 auth_api_session, 

41 conversations_session, 

42 db, 

43 generate_user, 

44 make_friends, 

45 make_user_block, 

46 requests_session, 

47 testconfig, 

48) 

49from tests.test_references import create_host_reference, create_host_request 

50 

51 

52def now_5_min_in_future(): 

53 return now() + timedelta(minutes=5) 

54 

55 

56@pytest.fixture(autouse=True) 

57def _(testconfig): 

58 pass 

59 

60 

61def _check_job_counter(job, status, attempt, exception): 

62 metrics_string = requests.get("http://localhost:8001").text 

63 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"' 

64 assert string_to_check in metrics_string 

65 

66 

67def test_login_email_full(db): 

68 user, api_token = generate_user() 

69 user_email = user.email 

70 

71 with session_scope() as session: 

72 login_token = send_login_email(session, user) 

73 

74 def mock_print_dev_email(sender_name, sender_email, recipient, subject, plain, html): 

75 assert recipient == user.email 

76 assert "login" in subject.lower() 

77 assert login_token.token in plain 

78 assert login_token.token in html 

79 return print_dev_email(sender_name, sender_email, recipient, subject, plain, html) 

80 

81 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email): 

82 process_job() 

83 

84 with session_scope() as session: 

85 assert ( 

86 session.execute( 

87 select(func.count()) 

88 .select_from(BackgroundJob) 

89 .where(BackgroundJob.state == BackgroundJobState.completed) 

90 ).scalar_one() 

91 == 1 

92 ) 

93 assert ( 

94 session.execute( 

95 select(func.count()) 

96 .select_from(BackgroundJob) 

97 .where(BackgroundJob.state != BackgroundJobState.completed) 

98 ).scalar_one() 

99 == 0 

100 ) 

101 

102 

103def test_email_job(db): 

104 queue_email("sender_name", "sender_email", "recipient", "subject", "plain", "html") 

105 

106 def mock_print_dev_email(sender_name, sender_email, recipient, subject, plain, html): 

107 assert sender_name == "sender_name" 

108 assert sender_email == "sender_email" 

109 assert recipient == "recipient" 

110 assert subject == "subject" 

111 assert plain == "plain" 

112 assert html == "html" 

113 return print_dev_email(sender_name, sender_email, recipient, subject, plain, html) 

114 

115 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email): 

116 process_job() 

117 

118 with session_scope() as session: 

119 assert ( 

120 session.execute( 

121 select(func.count()) 

122 .select_from(BackgroundJob) 

123 .where(BackgroundJob.state == BackgroundJobState.completed) 

124 ).scalar_one() 

125 == 1 

126 ) 

127 assert ( 

128 session.execute( 

129 select(func.count()) 

130 .select_from(BackgroundJob) 

131 .where(BackgroundJob.state != BackgroundJobState.completed) 

132 ).scalar_one() 

133 == 0 

134 ) 

135 

136 

137def test_purge_login_tokens(db): 

138 user, api_token = generate_user() 

139 

140 with session_scope() as session: 

141 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

142 session.add(login_token) 

143 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1 

144 

145 queue_job(BackgroundJobType.purge_login_tokens, empty_pb2.Empty()) 

146 process_job() 

147 

148 with session_scope() as session: 

149 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0 

150 

151 with session_scope() as session: 

152 assert ( 

153 session.execute( 

154 select(func.count()) 

155 .select_from(BackgroundJob) 

156 .where(BackgroundJob.state == BackgroundJobState.completed) 

157 ).scalar_one() 

158 == 1 

159 ) 

160 assert ( 

161 session.execute( 

162 select(func.count()) 

163 .select_from(BackgroundJob) 

164 .where(BackgroundJob.state != BackgroundJobState.completed) 

165 ).scalar_one() 

166 == 0 

167 ) 

168 

169 

170def test_purge_password_reset_tokens(db): 

171 user, api_token = generate_user() 

172 

173 with session_scope() as session: 

174 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

175 session.add(password_reset_token) 

176 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1 

177 

178 queue_job(BackgroundJobType.purge_password_reset_tokens, empty_pb2.Empty()) 

179 process_job() 

180 

181 with session_scope() as session: 

182 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0 

183 

184 with session_scope() as session: 

185 assert ( 

186 session.execute( 

187 select(func.count()) 

188 .select_from(BackgroundJob) 

189 .where(BackgroundJob.state == BackgroundJobState.completed) 

190 ).scalar_one() 

191 == 1 

192 ) 

193 assert ( 

194 session.execute( 

195 select(func.count()) 

196 .select_from(BackgroundJob) 

197 .where(BackgroundJob.state != BackgroundJobState.completed) 

198 ).scalar_one() 

199 == 0 

200 ) 

201 

202 

203def test_purge_account_deletion_tokens(db): 

204 user, api_token = generate_user() 

205 user2, api_token2 = generate_user() 

206 user3, api_token3 = generate_user() 

207 

208 with session_scope() as session: 

209 """ 

210 3 cases: 

211 1) Token is valid 

212 2) Token expired but account retrievable 

213 3) Account is irretrievable (and expired) 

214 """ 

215 account_deletion_tokens = [ 

216 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)), 

217 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()), 

218 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)), 

219 ] 

220 for token in account_deletion_tokens: 

221 session.add(token) 

222 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

223 

224 queue_job(BackgroundJobType.purge_account_deletion_tokens, empty_pb2.Empty()) 

225 process_job() 

226 

227 with session_scope() as session: 

228 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1 

229 

230 with session_scope() as session: 

231 assert ( 

232 session.execute( 

233 select(func.count()) 

234 .select_from(BackgroundJob) 

235 .where(BackgroundJob.state == BackgroundJobState.completed) 

236 ).scalar_one() 

237 == 1 

238 ) 

239 assert ( 

240 session.execute( 

241 select(func.count()) 

242 .select_from(BackgroundJob) 

243 .where(BackgroundJob.state != BackgroundJobState.completed) 

244 ).scalar_one() 

245 == 0 

246 ) 

247 

248 

249def test_enforce_community_memberships(db): 

250 queue_job(BackgroundJobType.enforce_community_membership, empty_pb2.Empty()) 

251 process_job() 

252 

253 with session_scope() as session: 

254 assert ( 

255 session.execute( 

256 select(func.count()) 

257 .select_from(BackgroundJob) 

258 .where(BackgroundJob.state == BackgroundJobState.completed) 

259 ).scalar_one() 

260 == 1 

261 ) 

262 assert ( 

263 session.execute( 

264 select(func.count()) 

265 .select_from(BackgroundJob) 

266 .where(BackgroundJob.state != BackgroundJobState.completed) 

267 ).scalar_one() 

268 == 0 

269 ) 

270 

271 

272def test_refresh_materialized_views(db): 

273 queue_job(BackgroundJobType.refresh_materialized_views, empty_pb2.Empty()) 

274 process_job() 

275 

276 with session_scope() as session: 

277 assert ( 

278 session.execute( 

279 select(func.count()) 

280 .select_from(BackgroundJob) 

281 .where(BackgroundJob.state == BackgroundJobState.completed) 

282 ).scalar_one() 

283 == 1 

284 ) 

285 assert ( 

286 session.execute( 

287 select(func.count()) 

288 .select_from(BackgroundJob) 

289 .where(BackgroundJob.state != BackgroundJobState.completed) 

290 ).scalar_one() 

291 == 0 

292 ) 

293 

294 

295def test_service_jobs(db): 

296 queue_email("sender_name", "sender_email", "recipient", "subject", "plain", "html") 

297 

298 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise 

299 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left 

300 class HitSleep(Exception): 

301 pass 

302 

303 # the mock `sleep` function that instead raises the aforementioned exception 

304 def raising_sleep(seconds): 

305 raise HitSleep() 

306 

307 with pytest.raises(HitSleep): 

308 with patch("couchers.jobs.worker.sleep", raising_sleep): 

309 service_jobs() 

310 

311 with session_scope() as session: 

312 assert ( 

313 session.execute( 

314 select(func.count()) 

315 .select_from(BackgroundJob) 

316 .where(BackgroundJob.state == BackgroundJobState.completed) 

317 ).scalar_one() 

318 == 1 

319 ) 

320 assert ( 

321 session.execute( 

322 select(func.count()) 

323 .select_from(BackgroundJob) 

324 .where(BackgroundJob.state != BackgroundJobState.completed) 

325 ).scalar_one() 

326 == 0 

327 ) 

328 

329 

330def test_scheduler(db, monkeypatch): 

331 MOCK_SCHEDULE = [ 

332 (BackgroundJobType.purge_login_tokens, timedelta(seconds=7)), 

333 (BackgroundJobType.send_message_notifications, timedelta(seconds=11)), 

334 ] 

335 

336 current_time = 0 

337 end_time = 70 

338 

339 class EndOfTime(Exception): 

340 pass 

341 

342 def mock_monotonic(): 

343 nonlocal current_time 

344 return current_time 

345 

346 def mock_sleep(seconds): 

347 nonlocal current_time 

348 current_time += seconds 

349 if current_time > end_time: 

350 raise EndOfTime() 

351 

352 realized_schedule = [] 

353 

354 def mock_run_job_and_schedule(sched, schedule_id): 

355 nonlocal current_time 

356 realized_schedule.append((current_time, schedule_id)) 

357 _run_job_and_schedule(sched, schedule_id) 

358 

359 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule) 

360 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE) 

361 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic) 

362 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep) 

363 

364 with pytest.raises(EndOfTime): 

365 run_scheduler() 

366 

367 assert realized_schedule == [ 

368 (0.0, 0), 

369 (0.0, 1), 

370 (7.0, 0), 

371 (11.0, 1), 

372 (14.0, 0), 

373 (21.0, 0), 

374 (22.0, 1), 

375 (28.0, 0), 

376 (33.0, 1), 

377 (35.0, 0), 

378 (42.0, 0), 

379 (44.0, 1), 

380 (49.0, 0), 

381 (55.0, 1), 

382 (56.0, 0), 

383 (63.0, 0), 

384 (66.0, 1), 

385 (70.0, 0), 

386 ] 

387 

388 with session_scope() as session: 

389 assert ( 

390 session.execute( 

391 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending) 

392 ).scalar_one() 

393 == 18 

394 ) 

395 assert ( 

396 session.execute( 

397 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending) 

398 ).scalar_one() 

399 == 0 

400 ) 

401 

402 

403def test_job_retry(db): 

404 queue_job(BackgroundJobType.purge_login_tokens, empty_pb2.Empty()) 

405 

406 called_count = 0 

407 

408 def mock_handler(payload): 

409 nonlocal called_count 

410 called_count += 1 

411 raise Exception() 

412 

413 MOCK_JOBS = { 

414 BackgroundJobType.purge_login_tokens: (empty_pb2.Empty, mock_handler), 

415 } 

416 create_prometheus_server(registry=job_process_registry, port=8001) 

417 with patch("couchers.jobs.worker.JOBS", MOCK_JOBS): 

418 process_job() 

419 with session_scope() as session: 

420 assert ( 

421 session.execute( 

422 select(func.count()) 

423 .select_from(BackgroundJob) 

424 .where(BackgroundJob.state == BackgroundJobState.error) 

425 ).scalar_one() 

426 == 1 

427 ) 

428 assert ( 

429 session.execute( 

430 select(func.count()) 

431 .select_from(BackgroundJob) 

432 .where(BackgroundJob.state != BackgroundJobState.error) 

433 ).scalar_one() 

434 == 0 

435 ) 

436 

437 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

438 process_job() 

439 with session_scope() as session: 

440 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

441 process_job() 

442 with session_scope() as session: 

443 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

444 process_job() 

445 with session_scope() as session: 

446 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

447 process_job() 

448 

449 with session_scope() as session: 

450 assert ( 

451 session.execute( 

452 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed) 

453 ).scalar_one() 

454 == 1 

455 ) 

456 assert ( 

457 session.execute( 

458 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed) 

459 ).scalar_one() 

460 == 0 

461 ) 

462 

463 _check_job_counter("purge_login_tokens", "error", "4", "Exception") 

464 _check_job_counter("purge_login_tokens", "failed", "5", "Exception") 

465 

466 

467def test_no_jobs_no_problem(db): 

468 with session_scope() as session: 

469 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

470 

471 assert not process_job() 

472 

473 with session_scope() as session: 

474 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

475 

476 

477def test_process_send_message_notifications_basic(db): 

478 user1, token1 = generate_user() 

479 user2, token2 = generate_user() 

480 user3, token3 = generate_user() 

481 

482 make_friends(user1, user2) 

483 make_friends(user1, user3) 

484 make_friends(user2, user3) 

485 

486 process_send_message_notifications(empty_pb2.Empty()) 

487 

488 # should find no jobs, since there's no messages 

489 with session_scope() as session: 

490 assert ( 

491 session.execute( 

492 select(func.count()) 

493 .select_from(BackgroundJob) 

494 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

495 ).scalar_one() 

496 == 0 

497 ) 

498 

499 with conversations_session(token1) as c: 

500 group_chat_id = c.CreateGroupChat( 

501 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

502 ).group_chat_id 

503 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

504 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

505 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

506 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

507 

508 with conversations_session(token3) as c: 

509 group_chat_id = c.CreateGroupChat( 

510 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

511 ).group_chat_id 

512 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

513 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

514 

515 process_send_message_notifications(empty_pb2.Empty()) 

516 

517 # no emails sent out 

518 with session_scope() as session: 

519 assert ( 

520 session.execute( 

521 select(func.count()) 

522 .select_from(BackgroundJob) 

523 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

524 ).scalar_one() 

525 == 0 

526 ) 

527 

528 # this should generate emails for both user2 and user3 

529 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

530 process_send_message_notifications(empty_pb2.Empty()) 

531 

532 with session_scope() as session: 

533 assert ( 

534 session.execute( 

535 select(func.count()) 

536 .select_from(BackgroundJob) 

537 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

538 ).scalar_one() 

539 == 2 

540 ) 

541 # delete them all 

542 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

543 

544 # shouldn't generate any more emails 

545 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

546 process_send_message_notifications(empty_pb2.Empty()) 

547 

548 with session_scope() as session: 

549 assert ( 

550 session.execute( 

551 select(func.count()) 

552 .select_from(BackgroundJob) 

553 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

554 ).scalar_one() 

555 == 0 

556 ) 

557 

558 

559def test_process_send_message_notifications_muted(db): 

560 user1, token1 = generate_user() 

561 user2, token2 = generate_user() 

562 user3, token3 = generate_user() 

563 

564 make_friends(user1, user2) 

565 make_friends(user1, user3) 

566 make_friends(user2, user3) 

567 

568 process_send_message_notifications(empty_pb2.Empty()) 

569 

570 # should find no jobs, since there's no messages 

571 with session_scope() as session: 

572 assert ( 

573 session.execute( 

574 select(func.count()) 

575 .select_from(BackgroundJob) 

576 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

577 ).scalar_one() 

578 == 0 

579 ) 

580 

581 with conversations_session(token1) as c: 

582 group_chat_id = c.CreateGroupChat( 

583 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

584 ).group_chat_id 

585 

586 with conversations_session(token3) as c: 

587 # mute it for user 3 

588 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True)) 

589 

590 with conversations_session(token1) as c: 

591 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

592 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

593 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

594 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

595 

596 with conversations_session(token3) as c: 

597 group_chat_id = c.CreateGroupChat( 

598 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

599 ).group_chat_id 

600 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

601 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

602 

603 process_send_message_notifications(empty_pb2.Empty()) 

604 

605 # no emails sent out 

606 with session_scope() as session: 

607 assert ( 

608 session.execute( 

609 select(func.count()) 

610 .select_from(BackgroundJob) 

611 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

612 ).scalar_one() 

613 == 0 

614 ) 

615 

616 # this should generate emails for both user2 and NOT user3 

617 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

618 process_send_message_notifications(empty_pb2.Empty()) 

619 

620 with session_scope() as session: 

621 assert ( 

622 session.execute( 

623 select(func.count()) 

624 .select_from(BackgroundJob) 

625 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

626 ).scalar_one() 

627 == 1 

628 ) 

629 # delete them all 

630 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

631 

632 # shouldn't generate any more emails 

633 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

634 process_send_message_notifications(empty_pb2.Empty()) 

635 

636 with session_scope() as session: 

637 assert ( 

638 session.execute( 

639 select(func.count()) 

640 .select_from(BackgroundJob) 

641 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

642 ).scalar_one() 

643 == 0 

644 ) 

645 

646 

647def test_process_send_request_notifications_host_request(db): 

648 user1, token1 = generate_user() 

649 user2, token2 = generate_user() 

650 

651 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

652 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

653 

654 process_send_request_notifications(empty_pb2.Empty()) 

655 

656 # should find no jobs, since there's no messages 

657 with session_scope() as session: 

658 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

659 

660 # first test that sending host request creates email 

661 with requests_session(token1) as requests: 

662 host_request_id = requests.CreateHostRequest( 

663 requests_pb2.CreateHostRequestReq( 

664 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request" 

665 ) 

666 ).host_request_id 

667 

668 with session_scope() as session: 

669 # delete send_email BackgroundJob created by CreateHostRequest 

670 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

671 

672 # check process_send_request_notifications successfully creates background job 

673 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

674 process_send_request_notifications(empty_pb2.Empty()) 

675 assert ( 

676 session.execute( 

677 select(func.count()) 

678 .select_from(BackgroundJob) 

679 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

680 ).scalar_one() 

681 == 1 

682 ) 

683 

684 # delete all BackgroundJobs 

685 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

686 

687 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

688 process_send_request_notifications(empty_pb2.Empty()) 

689 # should find no messages since host has already been notified 

690 assert ( 

691 session.execute( 

692 select(func.count()) 

693 .select_from(BackgroundJob) 

694 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

695 ).scalar_one() 

696 == 0 

697 ) 

698 

699 # then test that responding to host request creates email 

700 with requests_session(token2) as requests: 

701 requests.RespondHostRequest( 

702 requests_pb2.RespondHostRequestReq( 

703 host_request_id=host_request_id, 

704 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

705 text="Test request", 

706 ) 

707 ) 

708 

709 with session_scope() as session: 

710 # delete send_email BackgroundJob created by RespondHostRequest 

711 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

712 

713 # check process_send_request_notifications successfully creates background job 

714 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

715 process_send_request_notifications(empty_pb2.Empty()) 

716 assert ( 

717 session.execute( 

718 select(func.count()) 

719 .select_from(BackgroundJob) 

720 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

721 ).scalar_one() 

722 == 1 

723 ) 

724 

725 # delete all BackgroundJobs 

726 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

727 

728 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

729 process_send_request_notifications(empty_pb2.Empty()) 

730 # should find no messages since guest has already been notified 

731 assert ( 

732 session.execute( 

733 select(func.count()) 

734 .select_from(BackgroundJob) 

735 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

736 ).scalar_one() 

737 == 0 

738 ) 

739 

740 

741def test_process_send_message_notifications_seen(db): 

742 user1, token1 = generate_user() 

743 user2, token2 = generate_user() 

744 

745 make_friends(user1, user2) 

746 

747 process_send_message_notifications(empty_pb2.Empty()) 

748 

749 # should find no jobs, since there's no messages 

750 with session_scope() as session: 

751 assert ( 

752 session.execute( 

753 select(func.count()) 

754 .select_from(BackgroundJob) 

755 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

756 ).scalar_one() 

757 == 0 

758 ) 

759 

760 with conversations_session(token1) as c: 

761 group_chat_id = c.CreateGroupChat( 

762 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

763 ).group_chat_id 

764 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

765 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

766 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

767 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

768 

769 # user 2 now marks those messages as seen 

770 with conversations_session(token2) as c: 

771 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id 

772 c.MarkLastSeenGroupChat( 

773 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id) 

774 ) 

775 

776 process_send_message_notifications(empty_pb2.Empty()) 

777 

778 # no emails sent out 

779 with session_scope() as session: 

780 assert ( 

781 session.execute( 

782 select(func.count()) 

783 .select_from(BackgroundJob) 

784 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

785 ).scalar_one() 

786 == 0 

787 ) 

788 

789 def now_30_min_in_future(): 

790 return now() + timedelta(minutes=30) 

791 

792 # still shouldn't generate emails as user2 has seen all messages 

793 with patch("couchers.jobs.handlers.now", now_30_min_in_future): 

794 process_send_message_notifications(empty_pb2.Empty()) 

795 

796 with session_scope() as session: 

797 assert ( 

798 session.execute( 

799 select(func.count()) 

800 .select_from(BackgroundJob) 

801 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

802 ).scalar_one() 

803 == 0 

804 ) 

805 

806 

807def test_process_send_onboarding_emails(db): 

808 # needs to get first onboarding email 

809 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None) 

810 

811 process_send_onboarding_emails(empty_pb2.Empty()) 

812 

813 with session_scope() as session: 

814 assert ( 

815 session.execute( 

816 select(func.count()) 

817 .select_from(BackgroundJob) 

818 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

819 ).scalar_one() 

820 == 1 

821 ) 

822 

823 # needs to get second onboarding email, but not yet 

824 user2, token2 = generate_user(onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6)) 

825 

826 process_send_onboarding_emails(empty_pb2.Empty()) 

827 

828 with session_scope() as session: 

829 assert ( 

830 session.execute( 

831 select(func.count()) 

832 .select_from(BackgroundJob) 

833 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

834 ).scalar_one() 

835 == 1 

836 ) 

837 

838 # needs to get second onboarding email 

839 user3, token3 = generate_user(onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8)) 

840 

841 process_send_onboarding_emails(empty_pb2.Empty()) 

842 

843 with session_scope() as session: 

844 assert ( 

845 session.execute( 

846 select(func.count()) 

847 .select_from(BackgroundJob) 

848 .where(BackgroundJob.job_type == BackgroundJobType.send_email) 

849 ).scalar_one() 

850 == 2 

851 ) 

852 

853 

854def test_process_send_reference_reminders(db): 

855 # need to test: 

856 # case 1: bidirectional (no emails) 

857 # case 2: host left ref (surfer needs an email) 

858 # case 3: surfer left ref (host needs an email) 

859 # case 4: neither left ref (host & surfer need an email) 

860 

861 process_send_reference_reminders(empty_pb2.Empty()) 

862 

863 # case 1: bidirectional (no emails) 

864 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1") 

865 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2") 

866 

867 # case 2: host left ref (surfer needs an email) 

868 # host 

869 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3") 

870 # surfer 

871 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4") 

872 

873 # case 3: surfer left ref (host needs an email) 

874 # host 

875 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5") 

876 # surfer 

877 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6") 

878 

879 # case 4: neither left ref (host & surfer need an email) 

880 # host 

881 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7") 

882 # surfer 

883 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8") 

884 

885 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

886 # host 

887 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9") 

888 # surfer 

889 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10") 

890 

891 make_user_block(user9, user10) 

892 

893 with session_scope() as session: 

894 # note that create_host_reference creates a host request whose age is one day older than the timedelta here 

895 

896 # case 1: bidirectional (no emails) 

897 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True) 

898 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1) 

899 

900 # case 2: host left ref (surfer needs an email) 

901 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False) 

902 

903 # case 3: surfer left ref (host needs an email) 

904 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True) 

905 

906 # case 4: neither left ref (host & surfer need an email) 

907 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4)) 

908 

909 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

910 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7)) 

911 

912 expected_emails = [ 

913 ("user4@couchers.org.invalid", "[TEST] You have 3 days to write a reference for User 3!"), 

914 ("user5@couchers.org.invalid", "[TEST] You have 7 days to write a reference for User 6!"), 

915 ("user7@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 8!"), 

916 ("user8@couchers.org.invalid", "[TEST] You have 14 days to write a reference for User 7!"), 

917 ] 

918 

919 process_send_reference_reminders(empty_pb2.Empty()) 

920 

921 while process_job(): 

922 pass 

923 

924 with session_scope() as session: 

925 emails = [ 

926 (email.recipient, email.subject) 

927 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all() 

928 ] 

929 

930 print(emails) 

931 print(expected_emails) 

932 

933 assert emails == expected_emails 

934 

935 

936def test_process_add_users_to_email_list(db): 

937 new_config = config.copy() 

938 new_config["MAILCHIMP_ENABLED"] = True 

939 new_config["MAILCHIMP_API_KEY"] = "dummy_api_key" 

940 new_config["MAILCHIMP_DC"] = "dc99" 

941 new_config["MAILCHIMP_LIST_ID"] = "dummy_list_id" 

942 

943 with patch("couchers.config.config", new_config): 

944 with patch("couchers.jobs.handlers.requests.post") as mock: 

945 process_add_users_to_email_list(empty_pb2.Empty()) 

946 mock.assert_not_called() 

947 

948 generate_user(added_to_mailing_list=False, email="testing1@couchers.invalid", name="Tester1") 

949 generate_user(added_to_mailing_list=True, email="testing2@couchers.invalid", name="Tester2") 

950 generate_user(added_to_mailing_list=False, email="testing3@couchers.invalid", name="Tester3 von test") 

951 

952 with patch("couchers.jobs.handlers.requests.post") as mock: 

953 ret = mock.return_value 

954 ret.status_code = 200 

955 process_add_users_to_email_list(empty_pb2.Empty()) 

956 

957 mock.assert_called_once_with( 

958 "https://dc99.api.mailchimp.com/3.0/lists/dummy_list_id", 

959 auth=("apikey", "dummy_api_key"), 

960 json={ 

961 "members": [ 

962 { 

963 "email_address": "testing1@couchers.invalid", 

964 "status_if_new": "subscribed", 

965 "status": "subscribed", 

966 "merge_fields": { 

967 "FNAME": "Tester1", 

968 }, 

969 }, 

970 { 

971 "email_address": "testing3@couchers.invalid", 

972 "status_if_new": "subscribed", 

973 "status": "subscribed", 

974 "merge_fields": { 

975 "FNAME": "Tester3 von test", 

976 }, 

977 }, 

978 ] 

979 }, 

980 ) 

981 

982 with patch("couchers.jobs.handlers.requests.post") as mock: 

983 process_add_users_to_email_list(empty_pb2.Empty()) 

984 mock.assert_not_called() 

985 

986 

987def test_process_update_recommendation_scores(db): 

988 process_update_recommendation_scores(empty_pb2.Empty())