Coverage for src/couchers/jobs/handlers.py: 96%

393 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import cos, pi, sin, sqrt 

8from random import sample 

9 

10import requests 

11from google.protobuf import empty_pb2 

12from sqlalchemy import Float, Integer 

13from sqlalchemy.orm import aliased 

14from sqlalchemy.sql import ( 

15 and_, 

16 case, 

17 cast, 

18 delete, 

19 distinct, 

20 exists, 

21 extract, 

22 func, 

23 literal, 

24 not_, 

25 or_, 

26 select, 

27 union_all, 

28 update, 

29) 

30 

31from couchers.config import config 

32from couchers.constants import ( 

33 ACTIVENESS_PROBE_EXPIRY_TIME, 

34 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

35 ACTIVENESS_PROBE_TIME_REMINDERS, 

36 EVENT_REMINDER_TIMEDELTA, 

37 HOST_REQUEST_MAX_REMINDERS, 

38 HOST_REQUEST_REMINDER_INTERVAL, 

39) 

40from couchers.context import make_background_user_context 

41from couchers.crypto import ( 

42 USER_LOCATION_RANDOMIZATION_NAME, 

43 asym_encrypt, 

44 b64decode, 

45 get_secret, 

46 simple_decrypt, 

47 stable_secure_uniform, 

48) 

49from couchers.db import session_scope 

50from couchers.email.dev import print_dev_email 

51from couchers.email.smtp import send_smtp_email 

52from couchers.helpers.badges import user_add_badge, user_remove_badge 

53from couchers.materialized_views import ( 

54 UserResponseRate, 

55 refresh_materialized_views, 

56 refresh_materialized_views_rapid, 

57) 

58from couchers.metrics import strong_verification_completions_counter 

59from couchers.models import ( 

60 AccountDeletionToken, 

61 ActivenessProbe, 

62 ActivenessProbeStatus, 

63 Cluster, 

64 ClusterRole, 

65 ClusterSubscription, 

66 EventOccurrence, 

67 EventOccurrenceAttendee, 

68 GroupChat, 

69 GroupChatSubscription, 

70 HostingStatus, 

71 HostRequest, 

72 HostRequestStatus, 

73 Invoice, 

74 LoginToken, 

75 MeetupStatus, 

76 Message, 

77 MessageType, 

78 PassportSex, 

79 PasswordResetToken, 

80 Reference, 

81 StrongVerificationAttempt, 

82 StrongVerificationAttemptStatus, 

83 User, 

84 UserBadge, 

85) 

86from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

87from couchers.notifications.notify import notify 

88from couchers.resources import get_badge_dict, get_static_badge_dict 

89from couchers.servicers.admin import generate_new_blog_post_notifications 

90from couchers.servicers.api import user_model_to_pb 

91from couchers.servicers.blocking import is_not_visible 

92from couchers.servicers.conversations import generate_message_notifications 

93from couchers.servicers.discussions import generate_create_discussion_notifications 

94from couchers.servicers.events import ( 

95 event_to_pb, 

96 generate_event_cancel_notifications, 

97 generate_event_create_notifications, 

98 generate_event_delete_notifications, 

99 generate_event_update_notifications, 

100) 

101from couchers.servicers.requests import host_request_to_pb 

102from couchers.servicers.threads import generate_reply_notifications 

103from couchers.sql import couchers_select as select 

104from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

105from couchers.tasks import send_duplicate_strong_verification_email 

106from couchers.utils import ( 

107 Timestamp_from_datetime, 

108 create_coordinate, 

109 get_coordinates, 

110 now, 

111) 

112from proto import notification_data_pb2 

113from proto.internal import jobs_pb2, verification_pb2 

114 

115logger = logging.getLogger(__name__) 

116 

117# these were straight up imported 

118handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

119 

120send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

121 

122handle_email_digests.PAYLOAD = empty_pb2.Empty 

123handle_email_digests.SCHEDULE = timedelta(minutes=15) 

124 

125generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

126 

127generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload 

128 

129generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload 

130 

131generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

132 

133generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

134 

135generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

136 

137generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

138 

139generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload 

140 

141refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

142refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

143 

144refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

145refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

146 

147 

148def send_email(payload): 

149 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

150 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

151 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

152 # the sender must return a models.Email object that can be added to the database 

153 email = sender( 

154 sender_name=payload.sender_name, 

155 sender_email=payload.sender_email, 

156 recipient=payload.recipient, 

157 subject=payload.subject, 

158 plain=payload.plain, 

159 html=payload.html, 

160 list_unsubscribe_header=payload.list_unsubscribe_header, 

161 source_data=payload.source_data, 

162 ) 

163 with session_scope() as session: 

164 session.add(email) 

165 

166 

167send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

168 

169 

170def purge_login_tokens(payload): 

171 logger.info("Purging login tokens") 

172 with session_scope() as session: 

173 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

174 

175 

176purge_login_tokens.PAYLOAD = empty_pb2.Empty 

177purge_login_tokens.SCHEDULE = timedelta(hours=24) 

178 

179 

180def purge_password_reset_tokens(payload): 

181 logger.info("Purging login tokens") 

182 with session_scope() as session: 

183 session.execute( 

184 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

185 ) 

186 

187 

188purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

189purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

190 

191 

192def purge_account_deletion_tokens(payload): 

193 logger.info("Purging account deletion tokens") 

194 with session_scope() as session: 

195 session.execute( 

196 delete(AccountDeletionToken) 

197 .where(~AccountDeletionToken.is_valid) 

198 .execution_options(synchronize_session=False) 

199 ) 

200 

201 

202purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

203purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

204 

205 

206def send_message_notifications(payload): 

207 """ 

208 Sends out email notifications for messages that have been unseen for a long enough time 

209 """ 

210 # very crude and dumb algorithm 

211 logger.info("Sending out email notifications for unseen messages") 

212 

213 with session_scope() as session: 

214 # users who have unnotified messages older than 5 minutes in any group chat 

215 users = ( 

216 session.execute( 

217 select(User) 

218 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

219 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

220 .where(not_(GroupChatSubscription.is_muted)) 

221 .where(User.is_visible) 

222 .where(Message.time >= GroupChatSubscription.joined) 

223 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

224 .where(Message.id > User.last_notified_message_id) 

225 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

226 .where(Message.time < now() - timedelta(minutes=5)) 

227 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

228 ) 

229 .scalars() 

230 .unique() 

231 ) 

232 

233 for user in users: 

234 # now actually grab all the group chats, not just less than 5 min old 

235 subquery = ( 

236 select( 

237 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

238 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

239 func.max(Message.id).label("message_id"), 

240 func.count(Message.id).label("count_unseen"), 

241 ) 

242 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

243 .where(GroupChatSubscription.user_id == user.id) 

244 .where(not_(GroupChatSubscription.is_muted)) 

245 .where(Message.id > user.last_notified_message_id) 

246 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

247 .where(Message.time >= GroupChatSubscription.joined) 

248 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

249 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

250 .group_by(GroupChatSubscription.group_chat_id) 

251 .order_by(func.max(Message.id).desc()) 

252 .subquery() 

253 ) 

254 

255 unseen_messages = session.execute( 

256 select(GroupChat, Message, subquery.c.count_unseen) 

257 .join(subquery, subquery.c.message_id == Message.id) 

258 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

259 .order_by(subquery.c.message_id.desc()) 

260 ).all() 

261 

262 if not unseen_messages: 

263 continue 

264 

265 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

266 

267 def format_title(message, group_chat, count_unseen): 

268 if group_chat.is_dm: 

269 return f"You missed {count_unseen} message(s) from {message.author.name}" 

270 else: 

271 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

272 

273 notify( 

274 session, 

275 user_id=user.id, 

276 topic_action="chat:missed_messages", 

277 data=notification_data_pb2.ChatMissedMessages( 

278 messages=[ 

279 notification_data_pb2.ChatMessage( 

280 author=user_model_to_pb( 

281 message.author, 

282 session, 

283 make_background_user_context(user_id=user.id), 

284 ), 

285 message=format_title(message, group_chat, count_unseen), 

286 text=message.text, 

287 group_chat_id=message.conversation_id, 

288 ) 

289 for group_chat, message, count_unseen in unseen_messages 

290 ], 

291 ), 

292 ) 

293 session.commit() 

294 

295 

296send_message_notifications.PAYLOAD = empty_pb2.Empty 

297send_message_notifications.SCHEDULE = timedelta(minutes=3) 

298 

299 

300def send_request_notifications(payload): 

301 """ 

302 Sends out email notifications for unseen messages in host requests (as surfer or host) 

303 """ 

304 logger.info("Sending out email notifications for unseen messages in host requests") 

305 

306 with session_scope() as session: 

307 # requests where this user is surfing 

308 surfing_reqs = session.execute( 

309 select(User, HostRequest, func.max(Message.id)) 

310 .where(User.is_visible) 

311 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

312 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

313 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

314 .where(Message.id > User.last_notified_request_message_id) 

315 .where(Message.time < now() - timedelta(minutes=5)) 

316 .where(Message.message_type == MessageType.text) 

317 .group_by(User, HostRequest) 

318 ).all() 

319 

320 # where this user is hosting 

321 hosting_reqs = session.execute( 

322 select(User, HostRequest, func.max(Message.id)) 

323 .where(User.is_visible) 

324 .join(HostRequest, HostRequest.host_user_id == User.id) 

325 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

326 .where(Message.id > HostRequest.host_last_seen_message_id) 

327 .where(Message.id > User.last_notified_request_message_id) 

328 .where(Message.time < now() - timedelta(minutes=5)) 

329 .where(Message.message_type == MessageType.text) 

330 .group_by(User, HostRequest) 

331 ).all() 

332 

333 for user, host_request, max_message_id in surfing_reqs: 

334 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

335 session.flush() 

336 

337 context = make_background_user_context(user_id=user.id) 

338 notify( 

339 session, 

340 user_id=user.id, 

341 topic_action="host_request:missed_messages", 

342 key=host_request.conversation_id, 

343 data=notification_data_pb2.HostRequestMissedMessages( 

344 host_request=host_request_to_pb(host_request, session, context), 

345 user=user_model_to_pb(host_request.host, session, context), 

346 am_host=False, 

347 ), 

348 ) 

349 

350 for user, host_request, max_message_id in hosting_reqs: 

351 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

352 session.flush() 

353 

354 context = make_background_user_context(user_id=user.id) 

355 notify( 

356 session, 

357 user_id=user.id, 

358 topic_action="host_request:missed_messages", 

359 key=host_request.conversation_id, 

360 data=notification_data_pb2.HostRequestMissedMessages( 

361 host_request=host_request_to_pb(host_request, session, context), 

362 user=user_model_to_pb(host_request.surfer, session, context), 

363 am_host=True, 

364 ), 

365 ) 

366 

367 

368send_request_notifications.PAYLOAD = empty_pb2.Empty 

369send_request_notifications.SCHEDULE = timedelta(minutes=3) 

370 

371 

372def send_onboarding_emails(payload): 

373 """ 

374 Sends out onboarding emails 

375 """ 

376 logger.info("Sending out onboarding emails") 

377 

378 with session_scope() as session: 

379 # first onboarding email 

380 users = ( 

381 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

382 ) 

383 

384 for user in users: 

385 notify( 

386 session, 

387 user_id=user.id, 

388 topic_action="onboarding:reminder", 

389 key="1", 

390 ) 

391 user.onboarding_emails_sent = 1 

392 user.last_onboarding_email_sent = now() 

393 session.commit() 

394 

395 # second onboarding email 

396 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

397 users = ( 

398 session.execute( 

399 select(User) 

400 .where(User.is_visible) 

401 .where(User.onboarding_emails_sent == 1) 

402 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

403 .where(User.has_completed_profile == False) 

404 ) 

405 .scalars() 

406 .all() 

407 ) 

408 

409 for user in users: 

410 notify( 

411 session, 

412 user_id=user.id, 

413 topic_action="onboarding:reminder", 

414 key="2", 

415 ) 

416 user.onboarding_emails_sent = 2 

417 user.last_onboarding_email_sent = now() 

418 session.commit() 

419 

420 

421send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

422send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

423 

424 

425def send_reference_reminders(payload): 

426 """ 

427 Sends out reminders to write references after hosting/staying 

428 """ 

429 logger.info("Sending out reference reminder emails") 

430 

431 # Keep this in chronological order! 

432 reference_reminder_schedule = [ 

433 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

434 # the end time to write a reference is supposed to be midnight in the host's timezone 

435 # 8 pm ish on the last day of the stay 

436 (1, timedelta(days=15) - timedelta(hours=20), 14), 

437 # 2 pm ish a week after stay 

438 (2, timedelta(days=8) - timedelta(hours=14), 7), 

439 # 10 am ish 3 days before end of time to write ref 

440 (3, timedelta(days=4) - timedelta(hours=10), 3), 

441 ] 

442 

443 with session_scope() as session: 

444 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

445 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

446 user = aliased(User) 

447 other_user = aliased(User) 

448 # surfers needing to write a ref 

449 q1 = ( 

450 select(literal(True), HostRequest, user, other_user) 

451 .join(user, user.id == HostRequest.surfer_user_id) 

452 .join(other_user, other_user.id == HostRequest.host_user_id) 

453 .outerjoin( 

454 Reference, 

455 and_( 

456 Reference.host_request_id == HostRequest.conversation_id, 

457 # if no reference is found in this join, then the surfer has not written a ref 

458 Reference.from_user_id == HostRequest.surfer_user_id, 

459 ), 

460 ) 

461 .where(user.is_visible) 

462 .where(other_user.is_visible) 

463 .where(Reference.id == None) 

464 .where(HostRequest.can_write_reference) 

465 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

466 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

467 .where(HostRequest.surfer_reason_didnt_meetup == None) 

468 ) 

469 

470 # hosts needing to write a ref 

471 q2 = ( 

472 select(literal(False), HostRequest, user, other_user) 

473 .join(user, user.id == HostRequest.host_user_id) 

474 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

475 .outerjoin( 

476 Reference, 

477 and_( 

478 Reference.host_request_id == HostRequest.conversation_id, 

479 # if no reference is found in this join, then the host has not written a ref 

480 Reference.from_user_id == HostRequest.host_user_id, 

481 ), 

482 ) 

483 .where(user.is_visible) 

484 .where(other_user.is_visible) 

485 .where(Reference.id == None) 

486 .where(HostRequest.can_write_reference) 

487 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

488 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

489 .where(HostRequest.host_reason_didnt_meetup == None) 

490 ) 

491 

492 union = union_all(q1, q2).subquery() 

493 union = select( 

494 union.c[0].label("surfed"), 

495 aliased(HostRequest, union), 

496 aliased(user, union), 

497 aliased(other_user, union), 

498 ) 

499 reference_reminders = session.execute(union).all() 

500 

501 for surfed, host_request, user, other_user in reference_reminders: 

502 # checked in sql 

503 assert user.is_visible 

504 if not is_not_visible(session, user.id, other_user.id): 

505 context = make_background_user_context(user_id=user.id) 

506 notify( 

507 session, 

508 user_id=user.id, 

509 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

510 data=notification_data_pb2.ReferenceReminder( 

511 host_request_id=host_request.conversation_id, 

512 other_user=user_model_to_pb(other_user, session, context), 

513 days_left=reminder_days_left, 

514 ), 

515 ) 

516 if surfed: 

517 host_request.surfer_sent_reference_reminders = reminder_number 

518 else: 

519 host_request.host_sent_reference_reminders = reminder_number 

520 session.commit() 

521 

522 

523send_reference_reminders.PAYLOAD = empty_pb2.Empty 

524send_reference_reminders.SCHEDULE = timedelta(hours=1) 

525 

526 

527def send_host_request_reminders(payload): 

528 with session_scope() as session: 

529 host_has_sent_message = select(1).where( 

530 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id 

531 ) 

532 

533 requests = ( 

534 session.execute( 

535 select(HostRequest) 

536 .where(HostRequest.status == HostRequestStatus.pending) 

537 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

538 .where(HostRequest.start_time > func.now()) 

539 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

540 .where(~exists(host_has_sent_message)) 

541 ) 

542 .scalars() 

543 .all() 

544 ) 

545 

546 for host_request in requests: 

547 host_request.host_sent_request_reminders += 1 

548 host_request.last_sent_request_reminder_time = now() 

549 

550 context = make_background_user_context(user_id=host_request.host_user_id) 

551 notify( 

552 session, 

553 user_id=host_request.host_user_id, 

554 topic_action="host_request:reminder", 

555 data=notification_data_pb2.HostRequestReminder( 

556 host_request=host_request_to_pb(host_request, session, context), 

557 surfer=user_model_to_pb(host_request.surfer, session, context), 

558 ), 

559 ) 

560 

561 session.commit() 

562 

563 

564send_host_request_reminders.PAYLOAD = empty_pb2.Empty 

565send_host_request_reminders.SCHEDULE = timedelta(minutes=15) 

566 

567 

568def add_users_to_email_list(payload): 

569 if not config["LISTMONK_ENABLED"]: 

570 logger.info("Not adding users to mailing list") 

571 return 

572 

573 logger.info("Adding users to mailing list") 

574 

575 while True: 

576 with session_scope() as session: 

577 user = session.execute( 

578 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

579 ).scalar_one_or_none() 

580 if not user: 

581 logger.info("Finished adding users to mailing list") 

582 return 

583 

584 if user.opt_out_of_newsletter: 

585 user.in_sync_with_newsletter = True 

586 session.commit() 

587 continue 

588 

589 r = requests.post( 

590 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

591 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

592 json={ 

593 "email": user.email, 

594 "name": user.name, 

595 "lists": [config["LISTMONK_LIST_ID"]], 

596 "preconfirm_subscriptions": True, 

597 "attribs": {"couchers_user_id": user.id}, 

598 "status": "enabled", 

599 }, 

600 timeout=10, 

601 ) 

602 # the API returns if the user is already subscribed 

603 if r.status_code == 200 or r.status_code == 409: 

604 user.in_sync_with_newsletter = True 

605 session.commit() 

606 else: 

607 raise Exception("Failed to add users to mailing list") 

608 

609 

610add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

611add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

612 

613 

614def enforce_community_membership(payload): 

615 tasks_enforce_community_memberships() 

616 

617 

618enforce_community_membership.PAYLOAD = empty_pb2.Empty 

619enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

620 

621 

622def update_recommendation_scores(payload): 

623 text_fields = [ 

624 User.hometown, 

625 User.occupation, 

626 User.education, 

627 User.about_me, 

628 User.things_i_like, 

629 User.about_place, 

630 User.additional_information, 

631 User.pet_details, 

632 User.kid_details, 

633 User.housemate_details, 

634 User.other_host_info, 

635 User.sleeping_details, 

636 User.area, 

637 User.house_rules, 

638 ] 

639 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

640 

641 def poor_man_gaussian(): 

642 """ 

643 Produces an approximatley std normal random variate 

644 """ 

645 trials = 5 

646 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

647 

648 def int_(stmt): 

649 return func.coalesce(cast(stmt, Integer), 0) 

650 

651 def float_(stmt): 

652 return func.coalesce(cast(stmt, Float), 0.0) 

653 

654 with session_scope() as session: 

655 # profile 

656 profile_text = "" 

657 for field in text_fields: 

658 profile_text += func.coalesce(field, "") 

659 text_length = func.length(profile_text) 

660 home_text = "" 

661 for field in home_fields: 

662 home_text += func.coalesce(field, "") 

663 home_length = func.length(home_text) 

664 

665 filled_profile = int_(User.has_completed_profile) 

666 has_text = int_(text_length > 500) 

667 long_text = int_(text_length > 2000) 

668 can_host = int_(User.hosting_status == HostingStatus.can_host) 

669 may_host = int_(User.hosting_status == HostingStatus.maybe) 

670 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

671 filled_home = int_(User.has_completed_my_home) 

672 filled_home_lots = int_(home_length > 200) 

673 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

674 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

675 

676 # references 

677 left_ref_expr = int_(1).label("left_reference") 

678 left_refs_subquery = ( 

679 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

680 ) 

681 left_reference = int_(left_refs_subquery.c.left_reference) 

682 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

683 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

684 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

685 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

686 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

687 "has_bad_ref" 

688 ) 

689 received_ref_subquery = ( 

690 select( 

691 Reference.to_user_id.label("user_id"), 

692 has_reference_expr, 

693 has_multiple_types_expr, 

694 has_bad_ref_expr, 

695 ref_count_expr, 

696 ref_avg_expr, 

697 ) 

698 .group_by(Reference.to_user_id) 

699 .subquery() 

700 ) 

701 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

702 has_reference = int_(received_ref_subquery.c.has_reference) 

703 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

704 rating_score = float_( 

705 received_ref_subquery.c.ref_avg 

706 * ( 

707 2 * func.least(received_ref_subquery.c.ref_count, 5) 

708 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

709 ) 

710 ) 

711 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

712 

713 # activeness 

714 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

715 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

716 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

717 messaged_lots = int_(func.count(Message.id) > 5) 

718 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

719 messaging_subquery = ( 

720 select(Message.author_id.label("user_id"), messaging_points_subquery) 

721 .where(Message.message_type == MessageType.text) 

722 .group_by(Message.author_id) 

723 .subquery() 

724 ) 

725 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

726 

727 # verification 

728 cb_subquery = ( 

729 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

730 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

731 .where(ClusterSubscription.role == ClusterRole.admin) 

732 .where(Cluster.is_official_cluster) 

733 .group_by(ClusterSubscription.user_id) 

734 .subquery() 

735 ) 

736 min_node_id = cb_subquery.c.min_node_id 

737 cb = int_(min_node_id >= 1) 

738 wcb = int_(min_node_id == 1) 

739 badge_points = { 

740 "founder": 100, 

741 "board_member": 20, 

742 "past_board_member": 5, 

743 "strong_verification": 3, 

744 "volunteer": 3, 

745 "past_volunteer": 2, 

746 "donor": 1, 

747 "phone_verified": 1, 

748 } 

749 

750 badge_subquery = ( 

751 select( 

752 UserBadge.user_id.label("user_id"), 

753 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

754 ) 

755 .group_by(UserBadge.user_id) 

756 .subquery() 

757 ) 

758 

759 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

760 

761 # response rate 

762 hr_subquery = select( 

763 UserResponseRate.user_id, 

764 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

765 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

766 ).subquery() 

767 response_time_33p = hr_subquery.c.response_time_33p 

768 response_time_66p = hr_subquery.c.response_time_66p 

769 # be careful with nulls 

770 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

771 

772 recommendation_score = ( 

773 hosting_status_points 

774 + profile_points 

775 + ref_score 

776 + activeness_points 

777 + other_points 

778 + response_rate_points 

779 + 2 * poor_man_gaussian() 

780 ) 

781 

782 scores = ( 

783 select(User.id.label("user_id"), recommendation_score.label("score")) 

784 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

785 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

786 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

787 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

788 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

789 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

790 ).subquery() 

791 

792 session.execute( 

793 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

794 ) 

795 

796 logger.info("Updated recommendation scores") 

797 

798 

799update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

800update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

801 

802 

803def update_badges(payload): 

804 with session_scope() as session: 

805 

806 def update_badge(badge_id: str, members: list[int]): 

807 badge = get_badge_dict()[badge_id] 

808 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

809 # in case the user ids don't exist in the db 

810 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

811 # we should add the badge to these 

812 add = set(actual_members) - set(user_ids) 

813 # we should remove the badge from these 

814 remove = set(user_ids) - set(actual_members) 

815 for user_id in add: 

816 user_add_badge(session, user_id, badge_id) 

817 

818 for user_id in remove: 

819 user_remove_badge(session, user_id, badge_id) 

820 

821 update_badge("founder", get_static_badge_dict()["founder"]) 

822 update_badge("board_member", get_static_badge_dict()["board_member"]) 

823 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

824 update_badge( 

825 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

826 ) 

827 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

828 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

829 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

830 update_badge( 

831 "strong_verification", 

832 session.execute( 

833 select(User.id) 

834 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

835 .where(StrongVerificationAttempt.has_strong_verification(User)) 

836 ) 

837 .scalars() 

838 .all(), 

839 ) 

840 

841 

842update_badges.PAYLOAD = empty_pb2.Empty 

843update_badges.SCHEDULE = timedelta(minutes=15) 

844 

845 

846def finalize_strong_verification(payload): 

847 with session_scope() as session: 

848 verification_attempt = session.execute( 

849 select(StrongVerificationAttempt) 

850 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

851 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

852 ).scalar_one() 

853 response = requests.post( 

854 "https://passportreader.app/api/v1/session.get", 

855 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

856 json={"id": verification_attempt.iris_session_id}, 

857 timeout=10, 

858 ) 

859 if response.status_code != 200: 

860 raise Exception(f"Iris didn't return 200: {response.text}") 

861 json_data = response.json() 

862 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

863 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

864 ) 

865 assert verification_attempt.user_id == reference_payload.user_id 

866 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

867 assert verification_attempt.iris_session_id == json_data["id"] 

868 assert json_data["state"] == "APPROVED" 

869 

870 if json_data["document_type"] != "PASSPORT": 

871 verification_attempt.status = StrongVerificationAttemptStatus.failed 

872 notify( 

873 session, 

874 user_id=verification_attempt.user_id, 

875 topic_action="verification:sv_fail", 

876 data=notification_data_pb2.VerificationSVFail( 

877 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

878 ), 

879 ) 

880 return 

881 

882 assert json_data["document_type"] == "PASSPORT" 

883 

884 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

885 nationality = json_data["nationality"] 

886 last_three_document_chars = json_data["document_number"][-3:] 

887 

888 existing_attempt = session.execute( 

889 select(StrongVerificationAttempt) 

890 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

891 .where(StrongVerificationAttempt.passport_nationality == nationality) 

892 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

893 .order_by(StrongVerificationAttempt.id) 

894 .limit(1) 

895 ).scalar_one_or_none() 

896 

897 verification_attempt.has_minimal_data = True 

898 verification_attempt.passport_expiry_date = expiry_date 

899 verification_attempt.passport_nationality = nationality 

900 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

901 

902 if existing_attempt: 

903 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

904 

905 if existing_attempt.user_id != verification_attempt.user_id: 

906 session.flush() 

907 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

908 

909 notify( 

910 session, 

911 user_id=verification_attempt.user_id, 

912 topic_action="verification:sv_fail", 

913 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

914 ) 

915 return 

916 

917 verification_attempt.has_full_data = True 

918 verification_attempt.passport_encrypted_data = asym_encrypt( 

919 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

920 ) 

921 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

922 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

923 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

924 

925 session.flush() 

926 

927 strong_verification_completions_counter.inc() 

928 

929 user = verification_attempt.user 

930 if verification_attempt.has_strong_verification(user): 

931 badge_id = "strong_verification" 

932 if session.execute( 

933 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

934 ).scalar_one_or_none(): 

935 return 

936 

937 user_add_badge(session, user.id, badge_id, do_notify=False) 

938 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success") 

939 else: 

940 notify( 

941 session, 

942 user_id=verification_attempt.user_id, 

943 topic_action="verification:sv_fail", 

944 data=notification_data_pb2.VerificationSVFail( 

945 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

946 ), 

947 ) 

948 

949 

950finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload 

951 

952 

953def send_activeness_probes(payload): 

954 with session_scope() as session: 

955 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

956 

957 if config["ACTIVENESS_PROBES_ENABLED"]: 

958 # current activeness probes 

959 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

960 

961 # users who we should send an activeness probe to 

962 new_probe_user_ids = ( 

963 session.execute( 

964 select(User.id) 

965 .where(User.is_visible) 

966 .where(User.hosting_status == HostingStatus.can_host) 

967 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

968 .where(User.id.not_in(select(subquery.c.user_id))) 

969 ) 

970 .scalars() 

971 .all() 

972 ) 

973 

974 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

975 probes_today = session.execute( 

976 select(func.count()) 

977 .select_from(ActivenessProbe) 

978 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

979 ).scalar_one() 

980 

981 # send probes to max 2% of users per day 

982 max_probes_per_day = 0.02 * total_users 

983 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

984 

985 if len(new_probe_user_ids) > max_probe_size: 

986 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

987 

988 for user_id in new_probe_user_ids: 

989 session.add(ActivenessProbe(user_id=user_id)) 

990 

991 session.commit() 

992 

993 ## Step 2: actually send out probe notifications 

994 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

995 probes = ( 

996 session.execute( 

997 select(ActivenessProbe) 

998 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

999 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1000 .where(ActivenessProbe.is_pending) 

1001 ) 

1002 .scalars() 

1003 .all() 

1004 ) 

1005 

1006 for probe in probes: 

1007 probe.notifications_sent = probe_number_minus_1 + 1 

1008 context = make_background_user_context(user_id=probe.user.id) 

1009 notify( 

1010 session, 

1011 user_id=probe.user.id, 

1012 topic_action="activeness:probe", 

1013 key=probe.id, 

1014 data=notification_data_pb2.ActivenessProbe( 

1015 reminder_number=probe_number_minus_1 + 1, 

1016 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1017 ), 

1018 ) 

1019 session.commit() 

1020 

1021 ## Step 3: for those who haven't responded, mark them as failed 

1022 expired_probes = ( 

1023 session.execute( 

1024 select(ActivenessProbe) 

1025 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1026 .where(ActivenessProbe.is_pending) 

1027 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1028 ) 

1029 .scalars() 

1030 .all() 

1031 ) 

1032 

1033 for probe in expired_probes: 

1034 probe.responded = now() 

1035 probe.response = ActivenessProbeStatus.expired 

1036 if probe.user.hosting_status == HostingStatus.can_host: 

1037 probe.user.hosting_status = HostingStatus.maybe 

1038 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 

1039 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1040 session.commit() 

1041 

1042 

1043send_activeness_probes.PAYLOAD = empty_pb2.Empty 

1044send_activeness_probes.SCHEDULE = timedelta(minutes=60) 

1045 

1046 

1047def update_randomized_locations(payload): 

1048 """ 

1049 We generate for each user a randomized location as follows: 

1050 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1051 - For each user, mix in the user_id for randomness 

1052 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1053 - Generate an angle from [0, 360] 

1054 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1055 """ 

1056 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1057 

1058 def gen_randomized_coords(user_id, lat, lng): 

1059 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1060 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1061 radius = 0.02 + 0.08 * radius_u 

1062 angle_rad = 2 * pi * angle_u 

1063 offset_lng = radius * cos(angle_rad) 

1064 offset_lat = radius * sin(angle_rad) 

1065 return lat + offset_lat, lng + offset_lng 

1066 

1067 user_updates = [] 

1068 

1069 with session_scope() as session: 

1070 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1071 

1072 for user_id, geom in users_to_update: 

1073 lat, lng = get_coordinates(geom) 

1074 user_updates.append( 

1075 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1076 ) 

1077 

1078 with session_scope() as session: 

1079 session.execute(update(User), user_updates) 

1080 

1081 

1082update_randomized_locations.PAYLOAD = empty_pb2.Empty 

1083update_randomized_locations.SCHEDULE = timedelta(hours=1) 

1084 

1085 

1086def send_event_reminders(payload: empty_pb2.Empty): 

1087 """ 

1088 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1089 """ 

1090 logger.info("Sending event reminder emails") 

1091 

1092 with session_scope() as session: 

1093 occurrences = ( 

1094 session.execute( 

1095 select(EventOccurrence) 

1096 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1097 .where(EventOccurrence.start_time >= now()) 

1098 ) 

1099 .scalars() 

1100 .all() 

1101 ) 

1102 

1103 for occurrence in occurrences: 

1104 results = session.execute( 

1105 select(User, EventOccurrenceAttendee) 

1106 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1107 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1108 .where(EventOccurrenceAttendee.reminder_sent == False) 

1109 ).all() 

1110 

1111 for user, attendee in results: 

1112 context = make_background_user_context(user_id=user.id) 

1113 

1114 notify( 

1115 session, 

1116 user_id=user.id, 

1117 topic_action="event:reminder", 

1118 data=notification_data_pb2.EventReminder( 

1119 event=event_to_pb(session, occurrence, context), 

1120 user=user_model_to_pb(user, session, context), 

1121 ), 

1122 ) 

1123 

1124 attendee.reminder_sent = True 

1125 session.commit() 

1126 

1127 

1128send_event_reminders.PAYLOAD = empty_pb2.Empty 

1129send_event_reminders.SCHEDULE = timedelta(hours=1)