Coverage for src/couchers/jobs/handlers.py: 96%

387 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-12 05:54 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import cos, pi, sin, sqrt 

8from random import sample 

9 

10import requests 

11from google.protobuf import empty_pb2 

12from sqlalchemy import Float, Integer 

13from sqlalchemy.orm import aliased 

14from sqlalchemy.sql import ( 

15 and_, 

16 case, 

17 cast, 

18 delete, 

19 distinct, 

20 exists, 

21 extract, 

22 func, 

23 literal, 

24 not_, 

25 or_, 

26 select, 

27 union_all, 

28 update, 

29) 

30 

31from couchers.config import config 

32from couchers.constants import ( 

33 ACTIVENESS_PROBE_EXPIRY_TIME, 

34 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

35 ACTIVENESS_PROBE_TIME_REMINDERS, 

36 EVENT_REMINDER_TIMEDELTA, 

37 HOST_REQUEST_MAX_REMINDERS, 

38 HOST_REQUEST_REMINDER_INTERVAL, 

39) 

40from couchers.crypto import ( 

41 USER_LOCATION_RANDOMIZATION_NAME, 

42 asym_encrypt, 

43 b64decode, 

44 get_secret, 

45 simple_decrypt, 

46 stable_secure_uniform, 

47) 

48from couchers.db import session_scope 

49from couchers.email.dev import print_dev_email 

50from couchers.email.smtp import send_smtp_email 

51from couchers.helpers.badges import user_add_badge, user_remove_badge 

52from couchers.materialized_views import ( 

53 refresh_materialized_views, 

54 refresh_materialized_views_rapid, 

55 user_response_rates, 

56) 

57from couchers.metrics import strong_verification_completions_counter 

58from couchers.models import ( 

59 AccountDeletionToken, 

60 ActivenessProbe, 

61 ActivenessProbeStatus, 

62 Cluster, 

63 ClusterRole, 

64 ClusterSubscription, 

65 EventOccurrence, 

66 EventOccurrenceAttendee, 

67 GroupChat, 

68 GroupChatSubscription, 

69 HostingStatus, 

70 HostRequest, 

71 HostRequestStatus, 

72 Invoice, 

73 LoginToken, 

74 MeetupStatus, 

75 Message, 

76 MessageType, 

77 PassportSex, 

78 PasswordResetToken, 

79 Reference, 

80 StrongVerificationAttempt, 

81 StrongVerificationAttemptStatus, 

82 User, 

83 UserBadge, 

84) 

85from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

86from couchers.notifications.notify import notify 

87from couchers.resources import get_badge_dict, get_static_badge_dict 

88from couchers.servicers.admin import generate_new_blog_post_notifications 

89from couchers.servicers.api import user_model_to_pb 

90from couchers.servicers.blocking import is_not_visible 

91from couchers.servicers.conversations import generate_message_notifications 

92from couchers.servicers.discussions import generate_create_discussion_notifications 

93from couchers.servicers.events import ( 

94 event_to_pb, 

95 generate_event_cancel_notifications, 

96 generate_event_create_notifications, 

97 generate_event_delete_notifications, 

98 generate_event_update_notifications, 

99) 

100from couchers.servicers.requests import host_request_to_pb 

101from couchers.servicers.threads import generate_reply_notifications 

102from couchers.sql import couchers_select as select 

103from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

104from couchers.tasks import send_duplicate_strong_verification_email 

105from couchers.utils import ( 

106 Timestamp_from_datetime, 

107 create_coordinate, 

108 get_coordinates, 

109 make_user_context, 

110 now, 

111) 

112from proto import notification_data_pb2 

113from proto.internal import jobs_pb2, verification_pb2 

114 

115logger = logging.getLogger(__name__) 

116 

117# these were straight up imported 

118handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

119 

120send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

121 

122handle_email_digests.PAYLOAD = empty_pb2.Empty 

123handle_email_digests.SCHEDULE = timedelta(minutes=15) 

124 

125generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

126 

127generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload 

128 

129generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload 

130 

131generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

132 

133generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

134 

135generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

136 

137generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

138 

139generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload 

140 

141refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

142refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

143 

144refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

145refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

146 

147 

148def send_email(payload): 

149 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

150 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

151 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

152 # the sender must return a models.Email object that can be added to the database 

153 email = sender( 

154 sender_name=payload.sender_name, 

155 sender_email=payload.sender_email, 

156 recipient=payload.recipient, 

157 subject=payload.subject, 

158 plain=payload.plain, 

159 html=payload.html, 

160 list_unsubscribe_header=payload.list_unsubscribe_header, 

161 source_data=payload.source_data, 

162 ) 

163 with session_scope() as session: 

164 session.add(email) 

165 

166 

167send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

168 

169 

170def purge_login_tokens(payload): 

171 logger.info("Purging login tokens") 

172 with session_scope() as session: 

173 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

174 

175 

176purge_login_tokens.PAYLOAD = empty_pb2.Empty 

177purge_login_tokens.SCHEDULE = timedelta(hours=24) 

178 

179 

180def purge_password_reset_tokens(payload): 

181 logger.info("Purging login tokens") 

182 with session_scope() as session: 

183 session.execute( 

184 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

185 ) 

186 

187 

188purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

189purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

190 

191 

192def purge_account_deletion_tokens(payload): 

193 logger.info("Purging account deletion tokens") 

194 with session_scope() as session: 

195 session.execute( 

196 delete(AccountDeletionToken) 

197 .where(~AccountDeletionToken.is_valid) 

198 .execution_options(synchronize_session=False) 

199 ) 

200 

201 

202purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

203purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

204 

205 

206def send_message_notifications(payload): 

207 """ 

208 Sends out email notifications for messages that have been unseen for a long enough time 

209 """ 

210 # very crude and dumb algorithm 

211 logger.info("Sending out email notifications for unseen messages") 

212 

213 with session_scope() as session: 

214 # users who have unnotified messages older than 5 minutes in any group chat 

215 users = ( 

216 session.execute( 

217 select(User) 

218 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

219 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

220 .where(not_(GroupChatSubscription.is_muted)) 

221 .where(User.is_visible) 

222 .where(Message.time >= GroupChatSubscription.joined) 

223 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

224 .where(Message.id > User.last_notified_message_id) 

225 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

226 .where(Message.time < now() - timedelta(minutes=5)) 

227 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

228 ) 

229 .scalars() 

230 .unique() 

231 ) 

232 

233 for user in users: 

234 # now actually grab all the group chats, not just less than 5 min old 

235 subquery = ( 

236 select( 

237 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

238 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

239 func.max(Message.id).label("message_id"), 

240 func.count(Message.id).label("count_unseen"), 

241 ) 

242 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

243 .where(GroupChatSubscription.user_id == user.id) 

244 .where(not_(GroupChatSubscription.is_muted)) 

245 .where(Message.id > user.last_notified_message_id) 

246 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

247 .where(Message.time >= GroupChatSubscription.joined) 

248 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

249 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

250 .group_by(GroupChatSubscription.group_chat_id) 

251 .order_by(func.max(Message.id).desc()) 

252 .subquery() 

253 ) 

254 

255 unseen_messages = session.execute( 

256 select(GroupChat, Message, subquery.c.count_unseen) 

257 .join(subquery, subquery.c.message_id == Message.id) 

258 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

259 .order_by(subquery.c.message_id.desc()) 

260 ).all() 

261 

262 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

263 

264 def format_title(message, group_chat, count_unseen): 

265 if group_chat.is_dm: 

266 return f"You missed {count_unseen} message(s) from {message.author.name}" 

267 else: 

268 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

269 

270 notify( 

271 session, 

272 user_id=user.id, 

273 topic_action="chat:missed_messages", 

274 data=notification_data_pb2.ChatMissedMessages( 

275 messages=[ 

276 notification_data_pb2.ChatMessage( 

277 author=user_model_to_pb( 

278 message.author, 

279 session, 

280 make_user_context(user_id=user.id), 

281 ), 

282 message=format_title(message, group_chat, count_unseen), 

283 text=message.text, 

284 group_chat_id=message.conversation_id, 

285 ) 

286 for group_chat, message, count_unseen in unseen_messages 

287 ], 

288 ), 

289 ) 

290 session.commit() 

291 

292 

293send_message_notifications.PAYLOAD = empty_pb2.Empty 

294send_message_notifications.SCHEDULE = timedelta(minutes=3) 

295 

296 

297def send_request_notifications(payload): 

298 """ 

299 Sends out email notifications for unseen messages in host requests (as surfer or host) 

300 """ 

301 logger.info("Sending out email notifications for unseen messages in host requests") 

302 

303 with session_scope() as session: 

304 # requests where this user is surfing 

305 surfing_reqs = session.execute( 

306 select(User, HostRequest, func.max(Message.id)) 

307 .where(User.is_visible) 

308 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

309 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

310 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

311 .where(Message.id > User.last_notified_request_message_id) 

312 .where(Message.time < now() - timedelta(minutes=5)) 

313 .where(Message.message_type == MessageType.text) 

314 .group_by(User, HostRequest) 

315 ).all() 

316 

317 # where this user is hosting 

318 hosting_reqs = session.execute( 

319 select(User, HostRequest, func.max(Message.id)) 

320 .where(User.is_visible) 

321 .join(HostRequest, HostRequest.host_user_id == User.id) 

322 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

323 .where(Message.id > HostRequest.host_last_seen_message_id) 

324 .where(Message.id > User.last_notified_request_message_id) 

325 .where(Message.time < now() - timedelta(minutes=5)) 

326 .where(Message.message_type == MessageType.text) 

327 .group_by(User, HostRequest) 

328 ).all() 

329 

330 for user, host_request, max_message_id in surfing_reqs: 

331 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

332 session.flush() 

333 

334 context = make_user_context(user_id=user.id) 

335 notify( 

336 session, 

337 user_id=user.id, 

338 topic_action="host_request:missed_messages", 

339 key=host_request.conversation_id, 

340 data=notification_data_pb2.HostRequestMissedMessages( 

341 host_request=host_request_to_pb(host_request, session, context), 

342 user=user_model_to_pb(host_request.host, session, context), 

343 am_host=False, 

344 ), 

345 ) 

346 

347 for user, host_request, max_message_id in hosting_reqs: 

348 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

349 session.flush() 

350 

351 context = make_user_context(user_id=user.id) 

352 notify( 

353 session, 

354 user_id=user.id, 

355 topic_action="host_request:missed_messages", 

356 key=host_request.conversation_id, 

357 data=notification_data_pb2.HostRequestMissedMessages( 

358 host_request=host_request_to_pb(host_request, session, context), 

359 user=user_model_to_pb(host_request.surfer, session, context), 

360 am_host=True, 

361 ), 

362 ) 

363 

364 

365send_request_notifications.PAYLOAD = empty_pb2.Empty 

366send_request_notifications.SCHEDULE = timedelta(minutes=3) 

367 

368 

369def send_onboarding_emails(payload): 

370 """ 

371 Sends out onboarding emails 

372 """ 

373 logger.info("Sending out onboarding emails") 

374 

375 with session_scope() as session: 

376 # first onboarding email 

377 users = ( 

378 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

379 ) 

380 

381 for user in users: 

382 notify( 

383 session, 

384 user_id=user.id, 

385 topic_action="onboarding:reminder", 

386 key="1", 

387 ) 

388 user.onboarding_emails_sent = 1 

389 user.last_onboarding_email_sent = now() 

390 session.commit() 

391 

392 # second onboarding email 

393 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

394 users = ( 

395 session.execute( 

396 select(User) 

397 .where(User.is_visible) 

398 .where(User.onboarding_emails_sent == 1) 

399 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

400 .where(User.has_completed_profile == False) 

401 ) 

402 .scalars() 

403 .all() 

404 ) 

405 

406 for user in users: 

407 notify( 

408 session, 

409 user_id=user.id, 

410 topic_action="onboarding:reminder", 

411 key="2", 

412 ) 

413 user.onboarding_emails_sent = 2 

414 user.last_onboarding_email_sent = now() 

415 session.commit() 

416 

417 

418send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

419send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

420 

421 

422def send_reference_reminders(payload): 

423 """ 

424 Sends out reminders to write references after hosting/staying 

425 """ 

426 logger.info("Sending out reference reminder emails") 

427 

428 # Keep this in chronological order! 

429 reference_reminder_schedule = [ 

430 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

431 # the end time to write a reference is supposed to be midnight in the host's timezone 

432 # 8 pm ish on the last day of the stay 

433 (1, timedelta(days=15) - timedelta(hours=20), 14), 

434 # 2 pm ish a week after stay 

435 (2, timedelta(days=8) - timedelta(hours=14), 7), 

436 # 10 am ish 3 days before end of time to write ref 

437 (3, timedelta(days=4) - timedelta(hours=10), 3), 

438 ] 

439 

440 with session_scope() as session: 

441 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

442 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

443 user = aliased(User) 

444 other_user = aliased(User) 

445 # surfers needing to write a ref 

446 q1 = ( 

447 select(literal(True), HostRequest, user, other_user) 

448 .join(user, user.id == HostRequest.surfer_user_id) 

449 .join(other_user, other_user.id == HostRequest.host_user_id) 

450 .outerjoin( 

451 Reference, 

452 and_( 

453 Reference.host_request_id == HostRequest.conversation_id, 

454 # if no reference is found in this join, then the surfer has not written a ref 

455 Reference.from_user_id == HostRequest.surfer_user_id, 

456 ), 

457 ) 

458 .where(user.is_visible) 

459 .where(other_user.is_visible) 

460 .where(Reference.id == None) 

461 .where(HostRequest.can_write_reference) 

462 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

463 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

464 .where(HostRequest.surfer_reason_didnt_meetup == None) 

465 ) 

466 

467 # hosts needing to write a ref 

468 q2 = ( 

469 select(literal(False), HostRequest, user, other_user) 

470 .join(user, user.id == HostRequest.host_user_id) 

471 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

472 .outerjoin( 

473 Reference, 

474 and_( 

475 Reference.host_request_id == HostRequest.conversation_id, 

476 # if no reference is found in this join, then the host has not written a ref 

477 Reference.from_user_id == HostRequest.host_user_id, 

478 ), 

479 ) 

480 .where(user.is_visible) 

481 .where(other_user.is_visible) 

482 .where(Reference.id == None) 

483 .where(HostRequest.can_write_reference) 

484 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

485 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

486 .where(HostRequest.host_reason_didnt_meetup == None) 

487 ) 

488 

489 union = union_all(q1, q2).subquery() 

490 union = select( 

491 union.c[0].label("surfed"), 

492 aliased(HostRequest, union), 

493 aliased(user, union), 

494 aliased(other_user, union), 

495 ) 

496 reference_reminders = session.execute(union).all() 

497 

498 for surfed, host_request, user, other_user in reference_reminders: 

499 # checked in sql 

500 assert user.is_visible 

501 if not is_not_visible(session, user.id, other_user.id): 

502 context = make_user_context(user_id=user.id) 

503 notify( 

504 session, 

505 user_id=user.id, 

506 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

507 data=notification_data_pb2.ReferenceReminder( 

508 host_request_id=host_request.conversation_id, 

509 other_user=user_model_to_pb(other_user, session, context), 

510 days_left=reminder_days_left, 

511 ), 

512 ) 

513 if surfed: 

514 host_request.surfer_sent_reference_reminders = reminder_number 

515 else: 

516 host_request.host_sent_reference_reminders = reminder_number 

517 session.commit() 

518 

519 

520send_reference_reminders.PAYLOAD = empty_pb2.Empty 

521send_reference_reminders.SCHEDULE = timedelta(hours=1) 

522 

523 

524def send_host_request_reminders(payload): 

525 with session_scope() as session: 

526 host_has_sent_message = select(1).where( 

527 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id 

528 ) 

529 

530 requests = ( 

531 session.execute( 

532 select(HostRequest) 

533 .where(HostRequest.status == HostRequestStatus.pending) 

534 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

535 .where(HostRequest.start_time > func.now()) 

536 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

537 .where(~exists(host_has_sent_message)) 

538 ) 

539 .scalars() 

540 .all() 

541 ) 

542 

543 for host_request in requests: 

544 host_request.host_sent_request_reminders += 1 

545 host_request.last_sent_request_reminder_time = now() 

546 

547 context = make_user_context(user_id=host_request.host_user_id) 

548 notify( 

549 session, 

550 user_id=host_request.host_user_id, 

551 topic_action="host_request:reminder", 

552 data=notification_data_pb2.HostRequestReminder( 

553 host_request=host_request_to_pb(host_request, session, context), 

554 surfer=user_model_to_pb(host_request.surfer, session, context), 

555 ), 

556 ) 

557 

558 session.commit() 

559 

560 

561send_host_request_reminders.PAYLOAD = empty_pb2.Empty 

562send_host_request_reminders.SCHEDULE = timedelta(minutes=15) 

563 

564 

565def add_users_to_email_list(payload): 

566 if not config["LISTMONK_ENABLED"]: 

567 logger.info("Not adding users to mailing list") 

568 return 

569 

570 logger.info("Adding users to mailing list") 

571 

572 while True: 

573 with session_scope() as session: 

574 user = session.execute( 

575 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

576 ).scalar_one_or_none() 

577 if not user: 

578 logger.info("Finished adding users to mailing list") 

579 return 

580 

581 if user.opt_out_of_newsletter: 

582 user.in_sync_with_newsletter = True 

583 session.commit() 

584 continue 

585 

586 r = requests.post( 

587 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

588 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

589 json={ 

590 "email": user.email, 

591 "name": user.name, 

592 "lists": [config["LISTMONK_LIST_ID"]], 

593 "preconfirm_subscriptions": True, 

594 "attribs": {"couchers_user_id": user.id}, 

595 "status": "enabled", 

596 }, 

597 timeout=10, 

598 ) 

599 # the API returns if the user is already subscribed 

600 if r.status_code == 200 or r.status_code == 409: 

601 user.in_sync_with_newsletter = True 

602 session.commit() 

603 else: 

604 raise Exception("Failed to add users to mailing list") 

605 

606 

607add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

608add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

609 

610 

611def enforce_community_membership(payload): 

612 tasks_enforce_community_memberships() 

613 

614 

615enforce_community_membership.PAYLOAD = empty_pb2.Empty 

616enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

617 

618 

619def update_recommendation_scores(payload): 

620 text_fields = [ 

621 User.hometown, 

622 User.occupation, 

623 User.education, 

624 User.about_me, 

625 User.things_i_like, 

626 User.about_place, 

627 User.additional_information, 

628 User.pet_details, 

629 User.kid_details, 

630 User.housemate_details, 

631 User.other_host_info, 

632 User.sleeping_details, 

633 User.area, 

634 User.house_rules, 

635 ] 

636 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

637 

638 def poor_man_gaussian(): 

639 """ 

640 Produces an approximatley std normal random variate 

641 """ 

642 trials = 5 

643 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

644 

645 def int_(stmt): 

646 return func.coalesce(cast(stmt, Integer), 0) 

647 

648 def float_(stmt): 

649 return func.coalesce(cast(stmt, Float), 0.0) 

650 

651 with session_scope() as session: 

652 # profile 

653 profile_text = "" 

654 for field in text_fields: 

655 profile_text += func.coalesce(field, "") 

656 text_length = func.length(profile_text) 

657 home_text = "" 

658 for field in home_fields: 

659 home_text += func.coalesce(field, "") 

660 home_length = func.length(home_text) 

661 

662 has_text = int_(text_length > 500) 

663 long_text = int_(text_length > 2000) 

664 has_pic = int_(User.avatar_key != None) 

665 can_host = int_(User.hosting_status == HostingStatus.can_host) 

666 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

667 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

668 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

669 

670 # references 

671 left_ref_expr = int_(1).label("left_reference") 

672 left_refs_subquery = ( 

673 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

674 ) 

675 left_reference = int_(left_refs_subquery.c.left_reference) 

676 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

677 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

678 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

679 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

680 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

681 "has_bad_ref" 

682 ) 

683 received_ref_subquery = ( 

684 select( 

685 Reference.to_user_id.label("user_id"), 

686 has_reference_expr, 

687 has_multiple_types_expr, 

688 has_bad_ref_expr, 

689 ref_count_expr, 

690 ref_avg_expr, 

691 ) 

692 .group_by(Reference.to_user_id) 

693 .subquery() 

694 ) 

695 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

696 has_reference = int_(received_ref_subquery.c.has_reference) 

697 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

698 rating_score = float_( 

699 received_ref_subquery.c.ref_avg 

700 * ( 

701 2 * func.least(received_ref_subquery.c.ref_count, 5) 

702 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

703 ) 

704 ) 

705 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

706 

707 # activeness 

708 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

709 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

710 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

711 messaged_lots = int_(func.count(Message.id) > 5) 

712 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

713 messaging_subquery = ( 

714 select(Message.author_id.label("user_id"), messaging_points_subquery) 

715 .where(Message.message_type == MessageType.text) 

716 .group_by(Message.author_id) 

717 .subquery() 

718 ) 

719 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

720 

721 # verification 

722 cb_subquery = ( 

723 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

724 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

725 .where(ClusterSubscription.role == ClusterRole.admin) 

726 .where(Cluster.is_official_cluster) 

727 .group_by(ClusterSubscription.user_id) 

728 .subquery() 

729 ) 

730 min_node_id = cb_subquery.c.min_node_id 

731 cb = int_(min_node_id >= 1) 

732 wcb = int_(min_node_id == 1) 

733 badge_points = { 

734 "founder": 100, 

735 "board_member": 20, 

736 "past_board_member": 5, 

737 "strong_verification": 3, 

738 "volunteer": 3, 

739 "past_volunteer": 2, 

740 "donor": 1, 

741 "phone_verified": 1, 

742 } 

743 

744 badge_subquery = ( 

745 select( 

746 UserBadge.user_id.label("user_id"), 

747 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

748 ) 

749 .group_by(UserBadge.user_id) 

750 .subquery() 

751 ) 

752 

753 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

754 

755 # response rate 

756 hr_subquery = select( 

757 user_response_rates.c.user_id, 

758 float_(extract("epoch", user_response_rates.c.response_time_33p) / 60.0).label("response_time_33p"), 

759 float_(extract("epoch", user_response_rates.c.response_time_66p) / 60.0).label("response_time_66p"), 

760 ).subquery() 

761 response_time_33p = hr_subquery.c.response_time_33p 

762 response_time_66p = hr_subquery.c.response_time_66p 

763 # be careful with nulls 

764 response_rate_points = -10 * int_(response_time_33p > 60 * 72.0) + 5 * int_(response_time_66p < 60 * 48.0) 

765 

766 recommendation_score = ( 

767 profile_points 

768 + ref_score 

769 + activeness_points 

770 + other_points 

771 + response_rate_points 

772 + 2 * poor_man_gaussian() 

773 ) 

774 

775 scores = ( 

776 select(User.id.label("user_id"), recommendation_score.label("score")) 

777 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

778 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

779 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

780 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

781 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

782 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

783 ).subquery() 

784 

785 session.execute( 

786 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

787 ) 

788 

789 logger.info("Updated recommendation scores") 

790 

791 

792update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

793update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

794 

795 

796def update_badges(payload): 

797 with session_scope() as session: 

798 

799 def update_badge(badge_id: str, members: list[int]): 

800 badge = get_badge_dict()[badge_id] 

801 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

802 # in case the user ids don't exist in the db 

803 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

804 # we should add the badge to these 

805 add = set(actual_members) - set(user_ids) 

806 # we should remove the badge from these 

807 remove = set(user_ids) - set(actual_members) 

808 for user_id in add: 

809 user_add_badge(session, user_id, badge_id) 

810 

811 for user_id in remove: 

812 user_remove_badge(session, user_id, badge_id) 

813 

814 update_badge("founder", get_static_badge_dict()["founder"]) 

815 update_badge("board_member", get_static_badge_dict()["board_member"]) 

816 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

817 update_badge( 

818 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

819 ) 

820 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

821 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

822 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

823 update_badge( 

824 "strong_verification", 

825 session.execute( 

826 select(User.id) 

827 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

828 .where(StrongVerificationAttempt.has_strong_verification(User)) 

829 ) 

830 .scalars() 

831 .all(), 

832 ) 

833 

834 

835update_badges.PAYLOAD = empty_pb2.Empty 

836update_badges.SCHEDULE = timedelta(minutes=15) 

837 

838 

839def finalize_strong_verification(payload): 

840 with session_scope() as session: 

841 verification_attempt = session.execute( 

842 select(StrongVerificationAttempt) 

843 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

844 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

845 ).scalar_one() 

846 response = requests.post( 

847 "https://passportreader.app/api/v1/session.get", 

848 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

849 json={"id": verification_attempt.iris_session_id}, 

850 timeout=10, 

851 ) 

852 if response.status_code != 200: 

853 raise Exception(f"Iris didn't return 200: {response.text}") 

854 json_data = response.json() 

855 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

856 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

857 ) 

858 assert verification_attempt.user_id == reference_payload.user_id 

859 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

860 assert verification_attempt.iris_session_id == json_data["id"] 

861 assert json_data["state"] == "APPROVED" 

862 

863 if json_data["document_type"] != "PASSPORT": 

864 verification_attempt.status = StrongVerificationAttemptStatus.failed 

865 notify( 

866 session, 

867 user_id=verification_attempt.user_id, 

868 topic_action="verification:sv_fail", 

869 data=notification_data_pb2.VerificationSVFail( 

870 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

871 ), 

872 ) 

873 return 

874 

875 assert json_data["document_type"] == "PASSPORT" 

876 

877 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

878 nationality = json_data["nationality"] 

879 last_three_document_chars = json_data["document_number"][-3:] 

880 

881 existing_attempt = session.execute( 

882 select(StrongVerificationAttempt) 

883 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

884 .where(StrongVerificationAttempt.passport_nationality == nationality) 

885 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

886 .order_by(StrongVerificationAttempt.id) 

887 .limit(1) 

888 ).scalar_one_or_none() 

889 

890 verification_attempt.has_minimal_data = True 

891 verification_attempt.passport_expiry_date = expiry_date 

892 verification_attempt.passport_nationality = nationality 

893 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

894 

895 if existing_attempt: 

896 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

897 

898 if existing_attempt.user_id != verification_attempt.user_id: 

899 session.flush() 

900 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

901 

902 notify( 

903 session, 

904 user_id=verification_attempt.user_id, 

905 topic_action="verification:sv_fail", 

906 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

907 ) 

908 return 

909 

910 verification_attempt.has_full_data = True 

911 verification_attempt.passport_encrypted_data = asym_encrypt( 

912 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

913 ) 

914 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

915 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

916 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

917 

918 session.flush() 

919 

920 strong_verification_completions_counter.inc() 

921 

922 user = verification_attempt.user 

923 if verification_attempt.has_strong_verification(user): 

924 badge_id = "strong_verification" 

925 if session.execute( 

926 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

927 ).scalar_one_or_none(): 

928 return 

929 

930 user_add_badge(session, user.id, badge_id, do_notify=False) 

931 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success") 

932 else: 

933 notify( 

934 session, 

935 user_id=verification_attempt.user_id, 

936 topic_action="verification:sv_fail", 

937 data=notification_data_pb2.VerificationSVFail( 

938 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

939 ), 

940 ) 

941 

942 

943finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload 

944 

945 

946def send_activeness_probes(payload): 

947 with session_scope() as session: 

948 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

949 

950 if config["ACTIVENESS_PROBES_ENABLED"]: 

951 # current activeness probes 

952 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

953 

954 # users who we should send an activeness probe to 

955 new_probe_user_ids = ( 

956 session.execute( 

957 select(User.id) 

958 .where(User.is_visible) 

959 .where(User.hosting_status == HostingStatus.can_host) 

960 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

961 .where(User.id.not_in(select(subquery.c.user_id))) 

962 ) 

963 .scalars() 

964 .all() 

965 ) 

966 

967 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

968 probes_today = session.execute( 

969 select(func.count()) 

970 .select_from(ActivenessProbe) 

971 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

972 ).scalar_one() 

973 

974 # send probes to max 2% of users per day 

975 max_probes_per_day = 0.02 * total_users 

976 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

977 

978 if len(new_probe_user_ids) > max_probe_size: 

979 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

980 

981 for user_id in new_probe_user_ids: 

982 session.add(ActivenessProbe(user_id=user_id)) 

983 

984 session.commit() 

985 

986 ## Step 2: actually send out probe notifications 

987 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

988 probes = ( 

989 session.execute( 

990 select(ActivenessProbe) 

991 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

992 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

993 .where(ActivenessProbe.is_pending) 

994 ) 

995 .scalars() 

996 .all() 

997 ) 

998 

999 for probe in probes: 

1000 probe.notifications_sent = probe_number_minus_1 + 1 

1001 context = make_user_context(user_id=probe.user.id) 

1002 notify( 

1003 session, 

1004 user_id=probe.user.id, 

1005 topic_action="activeness:probe", 

1006 key=probe.id, 

1007 data=notification_data_pb2.ActivenessProbe( 

1008 reminder_number=probe_number_minus_1 + 1, 

1009 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1010 ), 

1011 ) 

1012 session.commit() 

1013 

1014 ## Step 3: for those who haven't responded, mark them as failed 

1015 expired_probes = ( 

1016 session.execute( 

1017 select(ActivenessProbe) 

1018 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1019 .where(ActivenessProbe.is_pending) 

1020 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1021 ) 

1022 .scalars() 

1023 .all() 

1024 ) 

1025 

1026 for probe in expired_probes: 

1027 probe.responded = now() 

1028 probe.response = ActivenessProbeStatus.expired 

1029 if probe.user.hosting_status == HostingStatus.can_host: 

1030 probe.user.hosting_status = HostingStatus.cant_host 

1031 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 

1032 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1033 session.commit() 

1034 

1035 

1036send_activeness_probes.PAYLOAD = empty_pb2.Empty 

1037send_activeness_probes.SCHEDULE = timedelta(minutes=60) 

1038 

1039 

1040def update_randomized_locations(payload): 

1041 """ 

1042 We generate for each user a randomized location as follows: 

1043 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1044 - For each user, mix in the user_id for randomness 

1045 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1046 - Generate an angle from [0, 360] 

1047 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1048 """ 

1049 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1050 

1051 def gen_randomized_coords(user_id, lat, lng): 

1052 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1053 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1054 radius = 0.02 + 0.08 * radius_u 

1055 angle_rad = 2 * pi * angle_u 

1056 offset_lng = radius * cos(angle_rad) 

1057 offset_lat = radius * sin(angle_rad) 

1058 return lat + offset_lat, lng + offset_lng 

1059 

1060 user_updates = [] 

1061 

1062 with session_scope() as session: 

1063 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1064 

1065 for user_id, geom in users_to_update: 

1066 lat, lng = get_coordinates(geom) 

1067 user_updates.append( 

1068 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1069 ) 

1070 

1071 with session_scope() as session: 

1072 session.execute(update(User), user_updates) 

1073 

1074 

1075update_randomized_locations.PAYLOAD = empty_pb2.Empty 

1076update_randomized_locations.SCHEDULE = timedelta(hours=1) 

1077 

1078 

1079def send_event_reminders(payload: empty_pb2.Empty): 

1080 """ 

1081 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1082 """ 

1083 logger.info("Sending event reminder emails") 

1084 

1085 with session_scope() as session: 

1086 occurrences = ( 

1087 session.execute( 

1088 select(EventOccurrence) 

1089 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1090 .where(EventOccurrence.start_time >= now()) 

1091 ) 

1092 .scalars() 

1093 .all() 

1094 ) 

1095 

1096 for occurrence in occurrences: 

1097 results = session.execute( 

1098 select(User, EventOccurrenceAttendee) 

1099 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1100 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1101 .where(EventOccurrenceAttendee.reminder_sent == False) 

1102 ).all() 

1103 

1104 for user, attendee in results: 

1105 context = make_user_context(user_id=user.id) 

1106 

1107 notify( 

1108 session, 

1109 user_id=user.id, 

1110 topic_action="event:reminder", 

1111 data=notification_data_pb2.EventReminder( 

1112 event=event_to_pb(session, occurrence, context), 

1113 user=user_model_to_pb(user, session, context), 

1114 ), 

1115 ) 

1116 

1117 attendee.reminder_sent = True 

1118 session.commit() 

1119 

1120 

1121send_event_reminders.PAYLOAD = empty_pb2.Empty 

1122send_event_reminders.SCHEDULE = timedelta(hours=1)