Coverage for src/tests/test_bg_jobs.py: 100%

458 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-05 23:21 +0000

1from datetime import timedelta 

2from unittest.mock import call, patch 

3 

4import pytest 

5import requests 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import delete, func 

8 

9import couchers.jobs.worker 

10from couchers.config import config 

11from couchers.constants import HOST_REQUEST_MAX_REMINDERS, HOST_REQUEST_REMINDER_INTERVAL 

12from couchers.crypto import urlsafe_secure_token 

13from couchers.db import session_scope 

14from couchers.email import queue_email 

15from couchers.email.dev import print_dev_email 

16from couchers.jobs.enqueue import queue_job 

17from couchers.jobs.handlers import ( 

18 add_users_to_email_list, 

19 send_host_request_reminders, 

20 send_message_notifications, 

21 send_onboarding_emails, 

22 send_reference_reminders, 

23 send_request_notifications, 

24 update_badges, 

25 update_recommendation_scores, 

26) 

27from couchers.jobs.worker import _run_job_and_schedule, process_job, run_scheduler, service_jobs 

28from couchers.metrics import create_prometheus_server 

29from couchers.models import ( 

30 AccountDeletionToken, 

31 BackgroundJob, 

32 BackgroundJobState, 

33 Email, 

34 HostRequestStatus, 

35 LoginToken, 

36 Message, 

37 MessageType, 

38 PasswordResetToken, 

39 UserBadge, 

40) 

41from couchers.sql import couchers_select as select 

42from couchers.utils import now, today 

43from proto import conversations_pb2, requests_pb2 

44from tests.test_fixtures import ( # noqa 

45 auth_api_session, 

46 conversations_session, 

47 db, 

48 generate_user, 

49 make_friends, 

50 make_user_block, 

51 process_jobs, 

52 push_collector, 

53 requests_session, 

54 testconfig, 

55) 

56from tests.test_references import create_host_reference, create_host_request, create_host_request_by_date 

57 

58 

59def now_5_min_in_future(): 

60 return now() + timedelta(minutes=5) 

61 

62 

63@pytest.fixture(autouse=True) 

64def _(testconfig): 

65 pass 

66 

67 

68def _check_job_counter(job, status, attempt, exception): 

69 metrics_string = requests.get("http://localhost:8000").text 

70 string_to_check = f'attempt="{attempt}",exception="{exception}",job="{job}",status="{status}"' 

71 assert string_to_check in metrics_string 

72 

73 

74def test_email_job(db): 

75 with session_scope() as session: 

76 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html") 

77 

78 def mock_print_dev_email( 

79 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

80 ): 

81 assert sender_name == "sender_name" 

82 assert sender_email == "sender_email" 

83 assert recipient == "recipient" 

84 assert subject == "subject" 

85 assert plain == "plain" 

86 assert html == "html" 

87 return print_dev_email( 

88 sender_name, sender_email, recipient, subject, plain, html, list_unsubscribe_header, source_data 

89 ) 

90 

91 with patch("couchers.jobs.handlers.print_dev_email", mock_print_dev_email): 

92 process_job() 

93 

94 with session_scope() as session: 

95 assert ( 

96 session.execute( 

97 select(func.count()) 

98 .select_from(BackgroundJob) 

99 .where(BackgroundJob.state == BackgroundJobState.completed) 

100 ).scalar_one() 

101 == 1 

102 ) 

103 assert ( 

104 session.execute( 

105 select(func.count()) 

106 .select_from(BackgroundJob) 

107 .where(BackgroundJob.state != BackgroundJobState.completed) 

108 ).scalar_one() 

109 == 0 

110 ) 

111 

112 

113def test_purge_login_tokens(db): 

114 user, api_token = generate_user() 

115 

116 with session_scope() as session: 

117 login_token = LoginToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

118 session.add(login_token) 

119 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 1 

120 

121 queue_job(session, "purge_login_tokens", empty_pb2.Empty()) 

122 process_job() 

123 

124 with session_scope() as session: 

125 assert session.execute(select(func.count()).select_from(LoginToken)).scalar_one() == 0 

126 

127 with session_scope() as session: 

128 assert ( 

129 session.execute( 

130 select(func.count()) 

131 .select_from(BackgroundJob) 

132 .where(BackgroundJob.state == BackgroundJobState.completed) 

133 ).scalar_one() 

134 == 1 

135 ) 

136 assert ( 

137 session.execute( 

138 select(func.count()) 

139 .select_from(BackgroundJob) 

140 .where(BackgroundJob.state != BackgroundJobState.completed) 

141 ).scalar_one() 

142 == 0 

143 ) 

144 

145 

146def test_purge_password_reset_tokens(db): 

147 user, api_token = generate_user() 

148 

149 with session_scope() as session: 

150 password_reset_token = PasswordResetToken(token=urlsafe_secure_token(), user=user, expiry=now()) 

151 session.add(password_reset_token) 

152 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 1 

153 

154 queue_job(session, "purge_password_reset_tokens", empty_pb2.Empty()) 

155 process_job() 

156 

157 with session_scope() as session: 

158 assert session.execute(select(func.count()).select_from(PasswordResetToken)).scalar_one() == 0 

159 

160 with session_scope() as session: 

161 assert ( 

162 session.execute( 

163 select(func.count()) 

164 .select_from(BackgroundJob) 

165 .where(BackgroundJob.state == BackgroundJobState.completed) 

166 ).scalar_one() 

167 == 1 

168 ) 

169 assert ( 

170 session.execute( 

171 select(func.count()) 

172 .select_from(BackgroundJob) 

173 .where(BackgroundJob.state != BackgroundJobState.completed) 

174 ).scalar_one() 

175 == 0 

176 ) 

177 

178 

179def test_purge_account_deletion_tokens(db): 

180 user, api_token = generate_user() 

181 user2, api_token2 = generate_user() 

182 user3, api_token3 = generate_user() 

183 

184 with session_scope() as session: 

185 """ 

186 3 cases: 

187 1) Token is valid 

188 2) Token expired but account retrievable 

189 3) Account is irretrievable (and expired) 

190 """ 

191 account_deletion_tokens = [ 

192 AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() - timedelta(hours=2)), 

193 AccountDeletionToken(token=urlsafe_secure_token(), user=user2, expiry=now()), 

194 AccountDeletionToken(token=urlsafe_secure_token(), user=user3, expiry=now() + timedelta(hours=5)), 

195 ] 

196 for token in account_deletion_tokens: 

197 session.add(token) 

198 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

199 

200 queue_job(session, "purge_account_deletion_tokens", empty_pb2.Empty()) 

201 process_job() 

202 

203 with session_scope() as session: 

204 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 1 

205 

206 with session_scope() as session: 

207 assert ( 

208 session.execute( 

209 select(func.count()) 

210 .select_from(BackgroundJob) 

211 .where(BackgroundJob.state == BackgroundJobState.completed) 

212 ).scalar_one() 

213 == 1 

214 ) 

215 assert ( 

216 session.execute( 

217 select(func.count()) 

218 .select_from(BackgroundJob) 

219 .where(BackgroundJob.state != BackgroundJobState.completed) 

220 ).scalar_one() 

221 == 0 

222 ) 

223 

224 

225def test_enforce_community_memberships(db): 

226 with session_scope() as session: 

227 queue_job(session, "enforce_community_membership", empty_pb2.Empty()) 

228 process_job() 

229 

230 with session_scope() as session: 

231 assert ( 

232 session.execute( 

233 select(func.count()) 

234 .select_from(BackgroundJob) 

235 .where(BackgroundJob.state == BackgroundJobState.completed) 

236 ).scalar_one() 

237 == 1 

238 ) 

239 assert ( 

240 session.execute( 

241 select(func.count()) 

242 .select_from(BackgroundJob) 

243 .where(BackgroundJob.state != BackgroundJobState.completed) 

244 ).scalar_one() 

245 == 0 

246 ) 

247 

248 

249def test_refresh_materialized_views(db): 

250 with session_scope() as session: 

251 queue_job(session, "refresh_materialized_views", empty_pb2.Empty()) 

252 

253 process_job() 

254 

255 with session_scope() as session: 

256 assert ( 

257 session.execute( 

258 select(func.count()) 

259 .select_from(BackgroundJob) 

260 .where(BackgroundJob.state == BackgroundJobState.completed) 

261 ).scalar_one() 

262 == 1 

263 ) 

264 assert ( 

265 session.execute( 

266 select(func.count()) 

267 .select_from(BackgroundJob) 

268 .where(BackgroundJob.state != BackgroundJobState.completed) 

269 ).scalar_one() 

270 == 0 

271 ) 

272 

273 

274def test_service_jobs(db): 

275 with session_scope() as session: 

276 queue_email(session, "sender_name", "sender_email", "recipient", "subject", "plain", "html") 

277 

278 # we create this HitSleep exception here, and mock out the normal sleep(1) in the infinite loop to instead raise 

279 # this. that allows us to conveniently get out of the infinite loop and know we had no more jobs left 

280 class HitSleep(Exception): 

281 pass 

282 

283 # the mock `sleep` function that instead raises the aforementioned exception 

284 def raising_sleep(seconds): 

285 raise HitSleep() 

286 

287 with pytest.raises(HitSleep): 

288 with patch("couchers.jobs.worker.sleep", raising_sleep): 

289 service_jobs() 

290 

291 with session_scope() as session: 

292 assert ( 

293 session.execute( 

294 select(func.count()) 

295 .select_from(BackgroundJob) 

296 .where(BackgroundJob.state == BackgroundJobState.completed) 

297 ).scalar_one() 

298 == 1 

299 ) 

300 assert ( 

301 session.execute( 

302 select(func.count()) 

303 .select_from(BackgroundJob) 

304 .where(BackgroundJob.state != BackgroundJobState.completed) 

305 ).scalar_one() 

306 == 0 

307 ) 

308 

309 

310def test_scheduler(db, monkeypatch): 

311 MOCK_SCHEDULE = [ 

312 ("purge_login_tokens", timedelta(seconds=7)), 

313 ("send_message_notifications", timedelta(seconds=11)), 

314 ] 

315 

316 current_time = 0 

317 end_time = 70 

318 

319 class EndOfTime(Exception): 

320 pass 

321 

322 def mock_monotonic(): 

323 nonlocal current_time 

324 return current_time 

325 

326 def mock_sleep(seconds): 

327 nonlocal current_time 

328 current_time += seconds 

329 if current_time > end_time: 

330 raise EndOfTime() 

331 

332 realized_schedule = [] 

333 

334 def mock_run_job_and_schedule(sched, schedule_id): 

335 nonlocal current_time 

336 realized_schedule.append((current_time, schedule_id)) 

337 _run_job_and_schedule(sched, schedule_id) 

338 

339 monkeypatch.setattr(couchers.jobs.worker, "_run_job_and_schedule", mock_run_job_and_schedule) 

340 monkeypatch.setattr(couchers.jobs.worker, "SCHEDULE", MOCK_SCHEDULE) 

341 monkeypatch.setattr(couchers.jobs.worker, "monotonic", mock_monotonic) 

342 monkeypatch.setattr(couchers.jobs.worker, "sleep", mock_sleep) 

343 

344 with pytest.raises(EndOfTime): 

345 run_scheduler() 

346 

347 assert realized_schedule == [ 

348 (0.0, 0), 

349 (0.0, 1), 

350 (7.0, 0), 

351 (11.0, 1), 

352 (14.0, 0), 

353 (21.0, 0), 

354 (22.0, 1), 

355 (28.0, 0), 

356 (33.0, 1), 

357 (35.0, 0), 

358 (42.0, 0), 

359 (44.0, 1), 

360 (49.0, 0), 

361 (55.0, 1), 

362 (56.0, 0), 

363 (63.0, 0), 

364 (66.0, 1), 

365 (70.0, 0), 

366 ] 

367 

368 with session_scope() as session: 

369 assert ( 

370 session.execute( 

371 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.pending) 

372 ).scalar_one() 

373 == 18 

374 ) 

375 assert ( 

376 session.execute( 

377 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.pending) 

378 ).scalar_one() 

379 == 0 

380 ) 

381 

382 

383def test_job_retry(db): 

384 with session_scope() as session: 

385 queue_job(session, "mock_job", empty_pb2.Empty()) 

386 

387 called_count = 0 

388 

389 def mock_job(payload): 

390 nonlocal called_count 

391 called_count += 1 

392 raise Exception() 

393 

394 MOCK_JOBS = { 

395 "mock_job": (empty_pb2.Empty, mock_job), 

396 } 

397 create_prometheus_server(port=8000) 

398 

399 # if IN_TEST is true, then the bg worker will raise on exceptions 

400 new_config = config.copy() 

401 new_config["IN_TEST"] = False 

402 

403 with patch("couchers.jobs.worker.config", new_config), patch("couchers.jobs.worker.JOBS", MOCK_JOBS): 

404 process_job() 

405 with session_scope() as session: 

406 assert ( 

407 session.execute( 

408 select(func.count()) 

409 .select_from(BackgroundJob) 

410 .where(BackgroundJob.state == BackgroundJobState.error) 

411 ).scalar_one() 

412 == 1 

413 ) 

414 assert ( 

415 session.execute( 

416 select(func.count()) 

417 .select_from(BackgroundJob) 

418 .where(BackgroundJob.state != BackgroundJobState.error) 

419 ).scalar_one() 

420 == 0 

421 ) 

422 

423 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

424 process_job() 

425 with session_scope() as session: 

426 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

427 process_job() 

428 with session_scope() as session: 

429 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

430 process_job() 

431 with session_scope() as session: 

432 session.execute(select(BackgroundJob)).scalar_one().next_attempt_after = func.now() 

433 process_job() 

434 

435 with session_scope() as session: 

436 assert ( 

437 session.execute( 

438 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state == BackgroundJobState.failed) 

439 ).scalar_one() 

440 == 1 

441 ) 

442 assert ( 

443 session.execute( 

444 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.state != BackgroundJobState.failed) 

445 ).scalar_one() 

446 == 0 

447 ) 

448 

449 _check_job_counter("mock_job", "error", "4", "Exception") 

450 _check_job_counter("mock_job", "failed", "5", "Exception") 

451 

452 

453def test_no_jobs_no_problem(db): 

454 with session_scope() as session: 

455 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

456 

457 assert not process_job() 

458 

459 with session_scope() as session: 

460 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

461 

462 

463def test_send_message_notifications_basic(db): 

464 user1, token1 = generate_user() 

465 user2, token2 = generate_user() 

466 user3, token3 = generate_user() 

467 

468 make_friends(user1, user2) 

469 make_friends(user1, user3) 

470 make_friends(user2, user3) 

471 

472 send_message_notifications(empty_pb2.Empty()) 

473 process_jobs() 

474 

475 # should find no jobs, since there's no messages 

476 with session_scope() as session: 

477 assert ( 

478 session.execute( 

479 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

480 ).scalar_one() 

481 == 0 

482 ) 

483 

484 with conversations_session(token1) as c: 

485 group_chat_id = c.CreateGroupChat( 

486 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

487 ).group_chat_id 

488 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

489 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

490 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

491 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

492 

493 with conversations_session(token3) as c: 

494 group_chat_id = c.CreateGroupChat( 

495 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

496 ).group_chat_id 

497 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

498 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

499 

500 send_message_notifications(empty_pb2.Empty()) 

501 process_jobs() 

502 

503 # no emails sent out 

504 with session_scope() as session: 

505 assert ( 

506 session.execute( 

507 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

508 ).scalar_one() 

509 == 0 

510 ) 

511 

512 # this should generate emails for both user2 and user3 

513 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

514 send_message_notifications(empty_pb2.Empty()) 

515 process_jobs() 

516 

517 with session_scope() as session: 

518 assert ( 

519 session.execute( 

520 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

521 ).scalar_one() 

522 == 2 

523 ) 

524 # delete them all 

525 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

526 

527 # shouldn't generate any more emails 

528 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

529 send_message_notifications(empty_pb2.Empty()) 

530 process_jobs() 

531 

532 with session_scope() as session: 

533 assert ( 

534 session.execute( 

535 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

536 ).scalar_one() 

537 == 0 

538 ) 

539 

540 

541def test_send_message_notifications_muted(db): 

542 user1, token1 = generate_user() 

543 user2, token2 = generate_user() 

544 user3, token3 = generate_user() 

545 

546 make_friends(user1, user2) 

547 make_friends(user1, user3) 

548 make_friends(user2, user3) 

549 

550 send_message_notifications(empty_pb2.Empty()) 

551 process_jobs() 

552 

553 # should find no jobs, since there's no messages 

554 with session_scope() as session: 

555 assert ( 

556 session.execute( 

557 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

558 ).scalar_one() 

559 == 0 

560 ) 

561 

562 with conversations_session(token1) as c: 

563 group_chat_id = c.CreateGroupChat( 

564 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id, user3.id]) 

565 ).group_chat_id 

566 

567 with conversations_session(token3) as c: 

568 # mute it for user 3 

569 c.MuteGroupChat(conversations_pb2.MuteGroupChatReq(group_chat_id=group_chat_id, forever=True)) 

570 

571 with conversations_session(token1) as c: 

572 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

573 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

574 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

575 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

576 

577 with conversations_session(token3) as c: 

578 group_chat_id = c.CreateGroupChat( 

579 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

580 ).group_chat_id 

581 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 5")) 

582 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 6")) 

583 

584 send_message_notifications(empty_pb2.Empty()) 

585 process_jobs() 

586 

587 # no emails sent out 

588 with session_scope() as session: 

589 assert ( 

590 session.execute( 

591 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

592 ).scalar_one() 

593 == 0 

594 ) 

595 

596 # this should generate emails for both user2 and NOT user3 

597 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

598 send_message_notifications(empty_pb2.Empty()) 

599 process_jobs() 

600 

601 with session_scope() as session: 

602 assert ( 

603 session.execute( 

604 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

605 ).scalar_one() 

606 == 1 

607 ) 

608 # delete them all 

609 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

610 

611 # shouldn't generate any more emails 

612 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

613 send_message_notifications(empty_pb2.Empty()) 

614 process_jobs() 

615 

616 with session_scope() as session: 

617 assert ( 

618 session.execute( 

619 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

620 ).scalar_one() 

621 == 0 

622 ) 

623 

624 

625def test_send_request_notifications_host_request(db): 

626 user1, token1 = generate_user() 

627 user2, token2 = generate_user() 

628 

629 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

630 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

631 

632 send_request_notifications(empty_pb2.Empty()) 

633 process_jobs() 

634 

635 # should find no jobs, since there's no messages 

636 with session_scope() as session: 

637 assert session.execute(select(func.count()).select_from(BackgroundJob)).scalar_one() == 0 

638 

639 # first test that sending host request creates email 

640 with requests_session(token1) as requests: 

641 host_request_id = requests.CreateHostRequest( 

642 requests_pb2.CreateHostRequestReq( 

643 host_user_id=user2.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request" 

644 ) 

645 ).host_request_id 

646 

647 with session_scope() as session: 

648 # delete send_email BackgroundJob created by CreateHostRequest 

649 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

650 

651 # check send_request_notifications successfully creates background job 

652 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

653 send_request_notifications(empty_pb2.Empty()) 

654 process_jobs() 

655 assert ( 

656 session.execute( 

657 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

658 ).scalar_one() 

659 == 1 

660 ) 

661 

662 # delete all BackgroundJobs 

663 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

664 

665 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

666 send_request_notifications(empty_pb2.Empty()) 

667 process_jobs() 

668 # should find no messages since host has already been notified 

669 assert ( 

670 session.execute( 

671 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

672 ).scalar_one() 

673 == 0 

674 ) 

675 

676 # then test that responding to host request creates email 

677 with requests_session(token2) as requests: 

678 requests.RespondHostRequest( 

679 requests_pb2.RespondHostRequestReq( 

680 host_request_id=host_request_id, 

681 status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

682 text="Test request", 

683 ) 

684 ) 

685 

686 with session_scope() as session: 

687 # delete send_email BackgroundJob created by RespondHostRequest 

688 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

689 

690 # check send_request_notifications successfully creates background job 

691 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

692 send_request_notifications(empty_pb2.Empty()) 

693 process_jobs() 

694 assert ( 

695 session.execute( 

696 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

697 ).scalar_one() 

698 == 1 

699 ) 

700 

701 # delete all BackgroundJobs 

702 session.execute(delete(BackgroundJob).execution_options(synchronize_session=False)) 

703 

704 with patch("couchers.jobs.handlers.now", now_5_min_in_future): 

705 send_request_notifications(empty_pb2.Empty()) 

706 process_jobs() 

707 # should find no messages since guest has already been notified 

708 assert ( 

709 session.execute( 

710 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

711 ).scalar_one() 

712 == 0 

713 ) 

714 

715 

716def test_send_message_notifications_seen(db): 

717 user1, token1 = generate_user() 

718 user2, token2 = generate_user() 

719 

720 make_friends(user1, user2) 

721 

722 send_message_notifications(empty_pb2.Empty()) 

723 

724 # should find no jobs, since there's no messages 

725 with session_scope() as session: 

726 assert ( 

727 session.execute( 

728 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

729 ).scalar_one() 

730 == 0 

731 ) 

732 

733 with conversations_session(token1) as c: 

734 group_chat_id = c.CreateGroupChat( 

735 conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user2.id]) 

736 ).group_chat_id 

737 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 1")) 

738 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 2")) 

739 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 3")) 

740 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text="Test message 4")) 

741 

742 # user 2 now marks those messages as seen 

743 with conversations_session(token2) as c: 

744 m_id = c.GetGroupChat(conversations_pb2.GetGroupChatReq(group_chat_id=group_chat_id)).latest_message.message_id 

745 c.MarkLastSeenGroupChat( 

746 conversations_pb2.MarkLastSeenGroupChatReq(group_chat_id=group_chat_id, last_seen_message_id=m_id) 

747 ) 

748 

749 send_message_notifications(empty_pb2.Empty()) 

750 

751 # no emails sent out 

752 with session_scope() as session: 

753 assert ( 

754 session.execute( 

755 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

756 ).scalar_one() 

757 == 0 

758 ) 

759 

760 def now_30_min_in_future(): 

761 return now() + timedelta(minutes=30) 

762 

763 # still shouldn't generate emails as user2 has seen all messages 

764 with patch("couchers.jobs.handlers.now", now_30_min_in_future): 

765 send_message_notifications(empty_pb2.Empty()) 

766 

767 with session_scope() as session: 

768 assert ( 

769 session.execute( 

770 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

771 ).scalar_one() 

772 == 0 

773 ) 

774 

775 

776def test_send_onboarding_emails(db): 

777 # needs to get first onboarding email 

778 user1, token1 = generate_user(onboarding_emails_sent=0, last_onboarding_email_sent=None, complete_profile=False) 

779 

780 send_onboarding_emails(empty_pb2.Empty()) 

781 process_jobs() 

782 

783 with session_scope() as session: 

784 assert ( 

785 session.execute( 

786 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

787 ).scalar_one() 

788 == 1 

789 ) 

790 

791 # needs to get second onboarding email, but not yet 

792 user2, token2 = generate_user( 

793 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=6), complete_profile=False 

794 ) 

795 

796 send_onboarding_emails(empty_pb2.Empty()) 

797 process_jobs() 

798 

799 with session_scope() as session: 

800 assert ( 

801 session.execute( 

802 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

803 ).scalar_one() 

804 == 1 

805 ) 

806 

807 # needs to get second onboarding email 

808 user3, token3 = generate_user( 

809 onboarding_emails_sent=1, last_onboarding_email_sent=now() - timedelta(days=8), complete_profile=False 

810 ) 

811 

812 send_onboarding_emails(empty_pb2.Empty()) 

813 process_jobs() 

814 

815 with session_scope() as session: 

816 assert ( 

817 session.execute( 

818 select(func.count()).select_from(BackgroundJob).where(BackgroundJob.job_type == "send_email") 

819 ).scalar_one() 

820 == 2 

821 ) 

822 

823 

824def test_send_reference_reminders(db): 

825 # need to test: 

826 # case 1: bidirectional (no emails) 

827 # case 2: host left ref (surfer needs an email) 

828 # case 3: surfer left ref (host needs an email) 

829 # case 4: neither left ref (host & surfer need an email) 

830 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

831 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email) 

832 

833 send_reference_reminders(empty_pb2.Empty()) 

834 

835 # case 1: bidirectional (no emails) 

836 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1") 

837 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2") 

838 

839 # case 2: host left ref (surfer needs an email) 

840 # host 

841 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3") 

842 # surfer 

843 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4") 

844 

845 # case 3: surfer left ref (host needs an email) 

846 # host 

847 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5") 

848 # surfer 

849 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6") 

850 

851 # case 4: neither left ref (host & surfer need an email) 

852 # surfer 

853 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7") 

854 # host 

855 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8") 

856 

857 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

858 # surfer 

859 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9") 

860 # host 

861 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10") 

862 

863 make_user_block(user9, user10) 

864 

865 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email) 

866 # host 

867 user11, token11 = generate_user(email="user11@couchers.org.invalid", name="User 11") 

868 # surfer 

869 user12, token12 = generate_user(email="user12@couchers.org.invalid", name="User 12") 

870 

871 with session_scope() as session: 

872 # note that create_host_reference creates a host request whose age is one day older than the timedelta here 

873 

874 # case 1: bidirectional (no emails) 

875 ref1, hr1 = create_host_reference(session, user2.id, user1.id, timedelta(days=7), surfing=True) 

876 create_host_reference(session, user1.id, user2.id, timedelta(days=7), host_request_id=hr1) 

877 

878 # case 2: host left ref (surfer needs an email) 

879 ref2, hr2 = create_host_reference(session, user3.id, user4.id, timedelta(days=11), surfing=False) 

880 

881 # case 3: surfer left ref (host needs an email) 

882 ref3, hr3 = create_host_reference(session, user6.id, user5.id, timedelta(days=9), surfing=True) 

883 

884 # case 4: neither left ref (host & surfer need an email) 

885 hr4 = create_host_request(session, user7.id, user8.id, timedelta(days=4)) 

886 

887 # case 5: neither left ref, but host blocked surfer, so neither should get an email 

888 hr5 = create_host_request(session, user9.id, user10.id, timedelta(days=7)) 

889 

890 # case 6: neither left ref, surfer indicated they didn't meet up, (host still needs an email) 

891 hr6 = create_host_request(session, user12.id, user11.id, timedelta(days=6), surfer_reason_didnt_meetup="") 

892 

893 expected_emails = [ 

894 ( 

895 "user11@couchers.org.invalid", 

896 "[TEST] You have 14 days to write a reference for User 12!", 

897 ("from when you hosted them", "/leave-reference/hosted/"), 

898 ), 

899 ( 

900 "user4@couchers.org.invalid", 

901 "[TEST] You have 3 days to write a reference for User 3!", 

902 ("from when you surfed with them", "/leave-reference/surfed/"), 

903 ), 

904 ( 

905 "user5@couchers.org.invalid", 

906 "[TEST] You have 7 days to write a reference for User 6!", 

907 ("from when you hosted them", "/leave-reference/hosted/"), 

908 ), 

909 ( 

910 "user7@couchers.org.invalid", 

911 "[TEST] You have 14 days to write a reference for User 8!", 

912 ("from when you surfed with them", "/leave-reference/surfed/"), 

913 ), 

914 ( 

915 "user8@couchers.org.invalid", 

916 "[TEST] You have 14 days to write a reference for User 7!", 

917 ("from when you hosted them", "/leave-reference/hosted/"), 

918 ), 

919 ] 

920 

921 send_reference_reminders(empty_pb2.Empty()) 

922 

923 while process_job(): 

924 pass 

925 

926 with session_scope() as session: 

927 emails = [ 

928 (email.recipient, email.subject, email.plain, email.html) 

929 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all() 

930 ] 

931 

932 actual_addresses_and_subjects = [email[:2] for email in emails] 

933 expected_addresses_and_subjects = [email[:2] for email in expected_emails] 

934 

935 print(actual_addresses_and_subjects) 

936 print(expected_addresses_and_subjects) 

937 

938 assert actual_addresses_and_subjects == expected_addresses_and_subjects 

939 

940 for (address, subject, plain, html), (_, _, search_strings) in zip(emails, expected_emails): 

941 for find in search_strings: 

942 assert find in plain, f"Expected to find string {find} in PLAIN email {subject} to {address}, didn't" 

943 assert find in html, f"Expected to find string {find} in HTML email {subject} to {address}, didn't" 

944 

945 

946def test_send_host_request_reminders(db): 

947 user1, token1 = generate_user(email="user1@couchers.org.invalid", name="User 1") 

948 user2, token2 = generate_user(email="user2@couchers.org.invalid", name="User 2") 

949 user3, token3 = generate_user(email="user3@couchers.org.invalid", name="User 3") 

950 user4, token4 = generate_user(email="user4@couchers.org.invalid", name="User 4") 

951 user5, token5 = generate_user(email="user5@couchers.org.invalid", name="User 5") 

952 user6, token6 = generate_user(email="user6@couchers.org.invalid", name="User 6") 

953 user7, token7 = generate_user(email="user7@couchers.org.invalid", name="User 7") 

954 user8, token8 = generate_user(email="user8@couchers.org.invalid", name="User 8") 

955 user9, token9 = generate_user(email="user9@couchers.org.invalid", name="User 9") 

956 user10, token10 = generate_user(email="user10@couchers.org.invalid", name="User 10") 

957 user11, token11 = generate_user(email="user11@couchers.org.invalid", name="User 11") 

958 user12, token12 = generate_user(email="user12@couchers.org.invalid", name="User 12") 

959 user13, token13 = generate_user(email="user13@couchers.org.invalid", name="User 13") 

960 user14, token14 = generate_user(email="user14@couchers.org.invalid", name="User 14") 

961 

962 with session_scope() as session: 

963 # case 1: pending, future, interval elapsed => notify 

964 hr1 = create_host_request_by_date( 

965 session=session, 

966 surfer_user_id=user1.id, 

967 host_user_id=user2.id, 

968 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1), 

969 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2), 

970 status=HostRequestStatus.pending, 

971 host_sent_request_reminders=0, 

972 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL, 

973 ) 

974 

975 # case 2: max reminders reached => do not notify 

976 hr2 = create_host_request_by_date( 

977 session=session, 

978 surfer_user_id=user3.id, 

979 host_user_id=user4.id, 

980 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1), 

981 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2), 

982 status=HostRequestStatus.pending, 

983 host_sent_request_reminders=HOST_REQUEST_MAX_REMINDERS, 

984 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL, 

985 ) 

986 

987 # case 3: interval not yet elapsed => do not notify 

988 hr3 = create_host_request_by_date( 

989 session=session, 

990 surfer_user_id=user5.id, 

991 host_user_id=user6.id, 

992 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1), 

993 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2), 

994 status=HostRequestStatus.pending, 

995 host_sent_request_reminders=0, 

996 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL + timedelta(hours=1), 

997 ) 

998 

999 # case 4: start date is today => do not notify 

1000 hr4 = create_host_request_by_date( 

1001 session=session, 

1002 surfer_user_id=user7.id, 

1003 host_user_id=user8.id, 

1004 from_date=today(), 

1005 to_date=today() + timedelta(days=2), 

1006 status=HostRequestStatus.pending, 

1007 host_sent_request_reminders=0, 

1008 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL, 

1009 ) 

1010 

1011 # case 5: from_date in the past => do not notify 

1012 hr5 = create_host_request_by_date( 

1013 session=session, 

1014 surfer_user_id=user9.id, 

1015 host_user_id=user10.id, 

1016 from_date=today() - timedelta(days=1), 

1017 to_date=today() + timedelta(days=1), 

1018 status=HostRequestStatus.pending, 

1019 host_sent_request_reminders=0, 

1020 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL, 

1021 ) 

1022 

1023 # case 6: non-pending status => do not notify 

1024 hr6 = create_host_request_by_date( 

1025 session=session, 

1026 surfer_user_id=user11.id, 

1027 host_user_id=user12.id, 

1028 from_date=today() + timedelta(days=3), 

1029 to_date=today() + timedelta(days=4), 

1030 status=HostRequestStatus.accepted, 

1031 host_sent_request_reminders=0, 

1032 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL, 

1033 ) 

1034 

1035 # case 7: host already sent a message => do not notify 

1036 hr7 = create_host_request_by_date( 

1037 session=session, 

1038 surfer_user_id=user13.id, 

1039 host_user_id=user14.id, 

1040 from_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=1), 

1041 to_date=today() + HOST_REQUEST_REMINDER_INTERVAL + timedelta(days=2), 

1042 status=HostRequestStatus.pending, 

1043 host_sent_request_reminders=0, 

1044 last_sent_request_reminder_time=now() - HOST_REQUEST_REMINDER_INTERVAL, 

1045 ) 

1046 

1047 session.add( 

1048 Message( 

1049 time=now(), 

1050 conversation_id=hr7, 

1051 author_id=user14.id, 

1052 text="Looking forward to hosting you!", 

1053 message_type=MessageType.text, 

1054 ) 

1055 ) 

1056 

1057 send_host_request_reminders(empty_pb2.Empty()) 

1058 

1059 while process_job(): 

1060 pass 

1061 

1062 with session_scope() as session: 

1063 emails = [ 

1064 (email.recipient, email.subject, email.plain, email.html) 

1065 for email in session.execute(select(Email).order_by(Email.recipient.asc())).scalars().all() 

1066 ] 

1067 

1068 expected_emails = [ 

1069 ( 

1070 "user2@couchers.org.invalid", 

1071 "[TEST] You have a pending host request from User 1!", 

1072 ("Please respond to the request!", "User 1"), 

1073 ) 

1074 ] 

1075 

1076 actual_addresses_and_subjects = [email[:2] for email in emails] 

1077 expected_addresses_and_subjects = [email[:2] for email in expected_emails] 

1078 

1079 print(actual_addresses_and_subjects) 

1080 print(expected_addresses_and_subjects) 

1081 

1082 assert actual_addresses_and_subjects == expected_addresses_and_subjects 

1083 

1084 for (address, subject, plain, html), (_, _, search_strings) in zip(emails, expected_emails): 

1085 for find in search_strings: 

1086 assert find in plain, f"Expected to find string {find} in PLAIN email {subject} to {address}, didn't" 

1087 assert find in html, f"Expected to find string {find} in HTML email {subject} to {address}, didn't" 

1088 

1089 

1090def test_add_users_to_email_list(db): 

1091 new_config = config.copy() 

1092 new_config["LISTMONK_ENABLED"] = True 

1093 new_config["LISTMONK_BASE_URL"] = "https://example.com" 

1094 new_config["LISTMONK_API_USERNAME"] = "test_user" 

1095 new_config["LISTMONK_API_KEY"] = "dummy_api_key" 

1096 new_config["LISTMONK_LIST_ID"] = 6 

1097 

1098 with patch("couchers.jobs.handlers.config", new_config): 

1099 with patch("couchers.jobs.handlers.requests.post") as mock: 

1100 add_users_to_email_list(empty_pb2.Empty()) 

1101 mock.assert_not_called() 

1102 

1103 generate_user(in_sync_with_newsletter=False, email="testing1@couchers.invalid", name="Tester1", id=15) 

1104 generate_user(in_sync_with_newsletter=True, email="testing2@couchers.invalid", name="Tester2") 

1105 generate_user(in_sync_with_newsletter=False, email="testing3@couchers.invalid", name="Tester3 von test", id=17) 

1106 generate_user( 

1107 in_sync_with_newsletter=False, email="testing4@couchers.invalid", name="Tester4", opt_out_of_newsletter=True 

1108 ) 

1109 

1110 with patch("couchers.jobs.handlers.requests.post") as mock: 

1111 ret = mock.return_value 

1112 ret.status_code = 200 

1113 add_users_to_email_list(empty_pb2.Empty()) 

1114 mock.assert_has_calls( 

1115 [ 

1116 call( 

1117 "https://example.com/api/subscribers", 

1118 auth=("test_user", "dummy_api_key"), 

1119 json={ 

1120 "email": "testing1@couchers.invalid", 

1121 "name": "Tester1", 

1122 "lists": [6], 

1123 "preconfirm_subscriptions": True, 

1124 "attribs": {"couchers_user_id": 15}, 

1125 "status": "enabled", 

1126 }, 

1127 timeout=10, 

1128 ), 

1129 call( 

1130 "https://example.com/api/subscribers", 

1131 auth=("test_user", "dummy_api_key"), 

1132 json={ 

1133 "email": "testing3@couchers.invalid", 

1134 "name": "Tester3 von test", 

1135 "lists": [6], 

1136 "preconfirm_subscriptions": True, 

1137 "attribs": {"couchers_user_id": 17}, 

1138 "status": "enabled", 

1139 }, 

1140 timeout=10, 

1141 ), 

1142 ], 

1143 any_order=True, 

1144 ) 

1145 

1146 with patch("couchers.jobs.handlers.requests.post") as mock: 

1147 add_users_to_email_list(empty_pb2.Empty()) 

1148 mock.assert_not_called() 

1149 

1150 

1151def test_update_recommendation_scores(db): 

1152 update_recommendation_scores(empty_pb2.Empty()) 

1153 

1154 

1155def test_update_badges(db, push_collector): 

1156 user1, _ = generate_user() 

1157 user2, _ = generate_user() 

1158 user3, _ = generate_user() 

1159 user4, _ = generate_user(phone="+15555555555", phone_verification_verified=func.now()) 

1160 user5, _ = generate_user(phone="+15555555556", phone_verification_verified=func.now()) 

1161 user6, _ = generate_user() 

1162 

1163 with session_scope() as session: 

1164 session.add(UserBadge(user_id=user5.id, badge_id="board_member")) 

1165 

1166 update_badges(empty_pb2.Empty()) 

1167 process_jobs() 

1168 

1169 with session_scope() as session: 

1170 badge_tuples = session.execute( 

1171 select(UserBadge.user_id, UserBadge.badge_id).order_by(UserBadge.user_id.asc(), UserBadge.id.asc()) 

1172 ).all() 

1173 

1174 expected = [ 

1175 (user1.id, "founder"), 

1176 (user1.id, "board_member"), 

1177 (user2.id, "founder"), 

1178 (user2.id, "board_member"), 

1179 (user4.id, "phone_verified"), 

1180 (user5.id, "phone_verified"), 

1181 ] 

1182 

1183 assert badge_tuples == expected 

1184 

1185 print(push_collector.pushes) 

1186 

1187 push_collector.assert_user_push_matches_fields( 

1188 user1.id, 

1189 ix=0, 

1190 title="The Founder badge was added to your profile", 

1191 body="Check out your profile to see the new badge!", 

1192 ) 

1193 push_collector.assert_user_push_matches_fields( 

1194 user1.id, 

1195 ix=1, 

1196 title="The Board Member badge was added to your profile", 

1197 body="Check out your profile to see the new badge!", 

1198 ) 

1199 push_collector.assert_user_push_matches_fields( 

1200 user2.id, 

1201 ix=0, 

1202 title="The Founder badge was added to your profile", 

1203 body="Check out your profile to see the new badge!", 

1204 ) 

1205 push_collector.assert_user_push_matches_fields( 

1206 user2.id, 

1207 ix=1, 

1208 title="The Board Member badge was added to your profile", 

1209 body="Check out your profile to see the new badge!", 

1210 ) 

1211 push_collector.assert_user_push_matches_fields( 

1212 user4.id, 

1213 ix=0, 

1214 title="The Verified Phone badge was added to your profile", 

1215 body="Check out your profile to see the new badge!", 

1216 ) 

1217 push_collector.assert_user_push_matches_fields( 

1218 user5.id, 

1219 ix=0, 

1220 title="The Board Member badge was removed from your profile", 

1221 body="You can see all your badges on your profile.", 

1222 ) 

1223 push_collector.assert_user_push_matches_fields( 

1224 user5.id, 

1225 ix=1, 

1226 title="The Verified Phone badge was added to your profile", 

1227 body="Check out your profile to see the new badge!", 

1228 )