Coverage for src/couchers/jobs/handlers.py: 96%

394 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-11 03:48 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import cos, pi, sin, sqrt 

8from random import sample 

9from typing import Any 

10 

11import requests 

12from google.protobuf import empty_pb2 

13from sqlalchemy import Float, Integer 

14from sqlalchemy.orm import aliased 

15from sqlalchemy.sql import ( 

16 and_, 

17 case, 

18 cast, 

19 delete, 

20 distinct, 

21 exists, 

22 extract, 

23 func, 

24 literal, 

25 not_, 

26 or_, 

27 union_all, 

28 update, 

29) 

30 

31from couchers.config import config 

32from couchers.constants import ( 

33 ACTIVENESS_PROBE_EXPIRY_TIME, 

34 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

35 ACTIVENESS_PROBE_TIME_REMINDERS, 

36 EVENT_REMINDER_TIMEDELTA, 

37 HOST_REQUEST_MAX_REMINDERS, 

38 HOST_REQUEST_REMINDER_INTERVAL, 

39) 

40from couchers.context import make_background_user_context 

41from couchers.crypto import ( 

42 USER_LOCATION_RANDOMIZATION_NAME, 

43 asym_encrypt, 

44 b64decode, 

45 get_secret, 

46 simple_decrypt, 

47 stable_secure_uniform, 

48) 

49from couchers.db import session_scope 

50from couchers.email.dev import print_dev_email 

51from couchers.email.smtp import send_smtp_email 

52from couchers.helpers.badges import user_add_badge, user_remove_badge 

53from couchers.materialized_views import ( 

54 UserResponseRate, 

55 refresh_materialized_views, 

56 refresh_materialized_views_rapid, 

57) 

58from couchers.metrics import strong_verification_completions_counter 

59from couchers.models import ( 

60 AccountDeletionToken, 

61 ActivenessProbe, 

62 ActivenessProbeStatus, 

63 Cluster, 

64 ClusterRole, 

65 ClusterSubscription, 

66 EventOccurrence, 

67 EventOccurrenceAttendee, 

68 GroupChat, 

69 GroupChatSubscription, 

70 HostingStatus, 

71 HostRequest, 

72 HostRequestStatus, 

73 Invoice, 

74 LoginToken, 

75 MeetupStatus, 

76 Message, 

77 MessageType, 

78 PassportSex, 

79 PasswordResetToken, 

80 Reference, 

81 StrongVerificationAttempt, 

82 StrongVerificationAttemptStatus, 

83 User, 

84 UserBadge, 

85) 

86from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

87from couchers.notifications.notify import notify 

88from couchers.resources import get_badge_dict, get_static_badge_dict 

89from couchers.servicers.admin import generate_new_blog_post_notifications 

90from couchers.servicers.api import user_model_to_pb 

91from couchers.servicers.blocking import is_not_visible 

92from couchers.servicers.conversations import generate_message_notifications 

93from couchers.servicers.discussions import generate_create_discussion_notifications 

94from couchers.servicers.events import ( 

95 event_to_pb, 

96 generate_event_cancel_notifications, 

97 generate_event_create_notifications, 

98 generate_event_delete_notifications, 

99 generate_event_update_notifications, 

100) 

101from couchers.servicers.requests import host_request_to_pb 

102from couchers.servicers.threads import generate_reply_notifications 

103from couchers.sql import couchers_select as select 

104from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

105from couchers.tasks import send_duplicate_strong_verification_email 

106from couchers.utils import ( 

107 Timestamp_from_datetime, 

108 create_coordinate, 

109 get_coordinates, 

110 now, 

111) 

112from proto import notification_data_pb2 

113from proto.internal import jobs_pb2, verification_pb2 

114 

115logger = logging.getLogger(__name__) 

116 

117 

118# these were straight up imported 

119handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

120 

121send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

122 

123handle_email_digests.PAYLOAD = empty_pb2.Empty 

124handle_email_digests.SCHEDULE = timedelta(minutes=15) 

125 

126generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

127 

128generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload 

129 

130generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload 

131 

132generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

133 

134generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

135 

136generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

137 

138generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

139 

140generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload 

141 

142refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

143refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

144 

145refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

146refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

147 

148 

149def send_email(payload: jobs_pb2.SendEmailPayload) -> None: 

150 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

151 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

152 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

153 # the sender must return a models.Email object that can be added to the database 

154 email = sender( 

155 sender_name=payload.sender_name, 

156 sender_email=payload.sender_email, 

157 recipient=payload.recipient, 

158 subject=payload.subject, 

159 plain=payload.plain, 

160 html=payload.html, 

161 list_unsubscribe_header=payload.list_unsubscribe_header, 

162 source_data=payload.source_data, 

163 ) 

164 with session_scope() as session: 

165 session.add(email) 

166 

167 

168send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

169 

170 

171def purge_login_tokens(payload: empty_pb2.Empty) -> None: 

172 logger.info("Purging login tokens") 

173 with session_scope() as session: 

174 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

175 

176 

177purge_login_tokens.PAYLOAD = empty_pb2.Empty 

178purge_login_tokens.SCHEDULE = timedelta(hours=24) 

179 

180 

181def purge_password_reset_tokens(payload: empty_pb2.Empty) -> None: 

182 logger.info("Purging login tokens") 

183 with session_scope() as session: 

184 session.execute( 

185 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

186 ) 

187 

188 

189purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

190purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

191 

192 

193def purge_account_deletion_tokens(payload: empty_pb2.Empty) -> None: 

194 logger.info("Purging account deletion tokens") 

195 with session_scope() as session: 

196 session.execute( 

197 delete(AccountDeletionToken) 

198 .where(~AccountDeletionToken.is_valid) 

199 .execution_options(synchronize_session=False) 

200 ) 

201 

202 

203purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

204purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

205 

206 

207def send_message_notifications(payload: empty_pb2.Empty) -> None: 

208 """ 

209 Sends out email notifications for messages that have been unseen for a long enough time 

210 """ 

211 # very crude and dumb algorithm 

212 logger.info("Sending out email notifications for unseen messages") 

213 

214 with session_scope() as session: 

215 # users who have unnotified messages older than 5 minutes in any group chat 

216 users = ( 

217 session.execute( 

218 select(User) 

219 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

220 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

221 .where(not_(GroupChatSubscription.is_muted)) 

222 .where(User.is_visible) 

223 .where(Message.time >= GroupChatSubscription.joined) 

224 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

225 .where(Message.id > User.last_notified_message_id) 

226 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

227 .where(Message.time < now() - timedelta(minutes=5)) 

228 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

229 ) 

230 .scalars() 

231 .unique() 

232 ) 

233 

234 for user in users: 

235 # now actually grab all the group chats, not just less than 5 min old 

236 subquery = ( 

237 select( 

238 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

239 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

240 func.max(Message.id).label("message_id"), 

241 func.count(Message.id).label("count_unseen"), 

242 ) 

243 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

244 .where(GroupChatSubscription.user_id == user.id) 

245 .where(not_(GroupChatSubscription.is_muted)) 

246 .where(Message.id > user.last_notified_message_id) 

247 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

248 .where(Message.time >= GroupChatSubscription.joined) 

249 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

250 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

251 .group_by(GroupChatSubscription.group_chat_id) 

252 .order_by(func.max(Message.id).desc()) 

253 .subquery() 

254 ) 

255 

256 unseen_messages = session.execute( 

257 select(GroupChat, Message, subquery.c.count_unseen) 

258 .join(subquery, subquery.c.message_id == Message.id) 

259 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

260 .order_by(subquery.c.message_id.desc()) 

261 ).all() 

262 

263 if not unseen_messages: 

264 continue 

265 

266 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

267 

268 def format_title(message, group_chat, count_unseen): 

269 if group_chat.is_dm: 

270 return f"You missed {count_unseen} message(s) from {message.author.name}" 

271 else: 

272 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

273 

274 notify( 

275 session, 

276 user_id=user.id, 

277 topic_action="chat:missed_messages", 

278 data=notification_data_pb2.ChatMissedMessages( 

279 messages=[ 

280 notification_data_pb2.ChatMessage( 

281 author=user_model_to_pb( 

282 message.author, 

283 session, 

284 make_background_user_context(user_id=user.id), 

285 ), 

286 message=format_title(message, group_chat, count_unseen), 

287 text=message.text, 

288 group_chat_id=message.conversation_id, 

289 ) 

290 for group_chat, message, count_unseen in unseen_messages 

291 ], 

292 ), 

293 ) 

294 session.commit() 

295 

296 

297send_message_notifications.PAYLOAD = empty_pb2.Empty 

298send_message_notifications.SCHEDULE = timedelta(minutes=3) 

299 

300 

301def send_request_notifications(payload: empty_pb2.Empty) -> None: 

302 """ 

303 Sends out email notifications for unseen messages in host requests (as surfer or host) 

304 """ 

305 logger.info("Sending out email notifications for unseen messages in host requests") 

306 

307 with session_scope() as session: 

308 # requests where this user is surfing 

309 surfing_reqs = session.execute( 

310 select(User, HostRequest, func.max(Message.id)) 

311 .where(User.is_visible) 

312 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

313 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

314 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

315 .where(Message.id > User.last_notified_request_message_id) 

316 .where(Message.time < now() - timedelta(minutes=5)) 

317 .where(Message.message_type == MessageType.text) 

318 .group_by(User, HostRequest) 

319 ).all() 

320 

321 # where this user is hosting 

322 hosting_reqs = session.execute( 

323 select(User, HostRequest, func.max(Message.id)) 

324 .where(User.is_visible) 

325 .join(HostRequest, HostRequest.host_user_id == User.id) 

326 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

327 .where(Message.id > HostRequest.host_last_seen_message_id) 

328 .where(Message.id > User.last_notified_request_message_id) 

329 .where(Message.time < now() - timedelta(minutes=5)) 

330 .where(Message.message_type == MessageType.text) 

331 .group_by(User, HostRequest) 

332 ).all() 

333 

334 for user, host_request, max_message_id in surfing_reqs: 

335 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

336 session.flush() 

337 

338 context = make_background_user_context(user_id=user.id) 

339 notify( 

340 session, 

341 user_id=user.id, 

342 topic_action="host_request:missed_messages", 

343 key=host_request.conversation_id, 

344 data=notification_data_pb2.HostRequestMissedMessages( 

345 host_request=host_request_to_pb(host_request, session, context), 

346 user=user_model_to_pb(host_request.host, session, context), 

347 am_host=False, 

348 ), 

349 ) 

350 

351 for user, host_request, max_message_id in hosting_reqs: 

352 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

353 session.flush() 

354 

355 context = make_background_user_context(user_id=user.id) 

356 notify( 

357 session, 

358 user_id=user.id, 

359 topic_action="host_request:missed_messages", 

360 key=host_request.conversation_id, 

361 data=notification_data_pb2.HostRequestMissedMessages( 

362 host_request=host_request_to_pb(host_request, session, context), 

363 user=user_model_to_pb(host_request.surfer, session, context), 

364 am_host=True, 

365 ), 

366 ) 

367 

368 

369send_request_notifications.PAYLOAD = empty_pb2.Empty 

370send_request_notifications.SCHEDULE = timedelta(minutes=3) 

371 

372 

373def send_onboarding_emails(payload: empty_pb2.Empty) -> None: 

374 """ 

375 Sends out onboarding emails 

376 """ 

377 logger.info("Sending out onboarding emails") 

378 

379 with session_scope() as session: 

380 # first onboarding email 

381 users = ( 

382 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

383 ) 

384 

385 for user in users: 

386 notify( 

387 session, 

388 user_id=user.id, 

389 topic_action="onboarding:reminder", 

390 key="1", 

391 ) 

392 user.onboarding_emails_sent = 1 

393 user.last_onboarding_email_sent = now() 

394 session.commit() 

395 

396 # second onboarding email 

397 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

398 users = ( 

399 session.execute( 

400 select(User) 

401 .where(User.is_visible) 

402 .where(User.onboarding_emails_sent == 1) 

403 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

404 .where(User.has_completed_profile == False) 

405 ) 

406 .scalars() 

407 .all() 

408 ) 

409 

410 for user in users: 

411 notify( 

412 session, 

413 user_id=user.id, 

414 topic_action="onboarding:reminder", 

415 key="2", 

416 ) 

417 user.onboarding_emails_sent = 2 

418 user.last_onboarding_email_sent = now() 

419 session.commit() 

420 

421 

422send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

423send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

424 

425 

426def send_reference_reminders(payload: empty_pb2.Empty) -> None: 

427 """ 

428 Sends out reminders to write references after hosting/staying 

429 """ 

430 logger.info("Sending out reference reminder emails") 

431 

432 # Keep this in chronological order! 

433 reference_reminder_schedule = [ 

434 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

435 # the end time to write a reference is supposed to be midnight in the host's timezone 

436 # 8 pm ish on the last day of the stay 

437 (1, timedelta(days=15) - timedelta(hours=20), 14), 

438 # 2 pm ish a week after stay 

439 (2, timedelta(days=8) - timedelta(hours=14), 7), 

440 # 10 am ish 3 days before end of time to write ref 

441 (3, timedelta(days=4) - timedelta(hours=10), 3), 

442 ] 

443 

444 with session_scope() as session: 

445 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

446 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

447 user = aliased(User) 

448 other_user = aliased(User) 

449 # surfers needing to write a ref 

450 q1 = ( 

451 select(literal(True), HostRequest, user, other_user) 

452 .join(user, user.id == HostRequest.surfer_user_id) 

453 .join(other_user, other_user.id == HostRequest.host_user_id) 

454 .outerjoin( 

455 Reference, 

456 and_( 

457 Reference.host_request_id == HostRequest.conversation_id, 

458 # if no reference is found in this join, then the surfer has not written a ref 

459 Reference.from_user_id == HostRequest.surfer_user_id, 

460 ), 

461 ) 

462 .where(user.is_visible) 

463 .where(other_user.is_visible) 

464 .where(Reference.id == None) 

465 .where(HostRequest.can_write_reference) 

466 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

467 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

468 .where(HostRequest.surfer_reason_didnt_meetup == None) 

469 ) 

470 

471 # hosts needing to write a ref 

472 q2 = ( 

473 select(literal(False), HostRequest, user, other_user) 

474 .join(user, user.id == HostRequest.host_user_id) 

475 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

476 .outerjoin( 

477 Reference, 

478 and_( 

479 Reference.host_request_id == HostRequest.conversation_id, 

480 # if no reference is found in this join, then the host has not written a ref 

481 Reference.from_user_id == HostRequest.host_user_id, 

482 ), 

483 ) 

484 .where(user.is_visible) 

485 .where(other_user.is_visible) 

486 .where(Reference.id == None) 

487 .where(HostRequest.can_write_reference) 

488 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

489 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

490 .where(HostRequest.host_reason_didnt_meetup == None) 

491 ) 

492 

493 union = union_all(q1, q2).subquery() 

494 union = select( 

495 union.c[0].label("surfed"), 

496 aliased(HostRequest, union), 

497 aliased(user, union), 

498 aliased(other_user, union), 

499 ) 

500 reference_reminders = session.execute(union).all() 

501 

502 for surfed, host_request, user, other_user in reference_reminders: 

503 # checked in sql 

504 assert user.is_visible 

505 if not is_not_visible(session, user.id, other_user.id): 

506 context = make_background_user_context(user_id=user.id) 

507 notify( 

508 session, 

509 user_id=user.id, 

510 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

511 data=notification_data_pb2.ReferenceReminder( 

512 host_request_id=host_request.conversation_id, 

513 other_user=user_model_to_pb(other_user, session, context), 

514 days_left=reminder_days_left, 

515 ), 

516 ) 

517 if surfed: 

518 host_request.surfer_sent_reference_reminders = reminder_number 

519 else: 

520 host_request.host_sent_reference_reminders = reminder_number 

521 session.commit() 

522 

523 

524send_reference_reminders.PAYLOAD = empty_pb2.Empty 

525send_reference_reminders.SCHEDULE = timedelta(hours=1) 

526 

527 

528def send_host_request_reminders(payload: empty_pb2.Empty) -> None: 

529 with session_scope() as session: 

530 host_has_sent_message = select(1).where( 

531 Message.conversation_id == HostRequest.conversation_id, Message.author_id == HostRequest.host_user_id 

532 ) 

533 

534 requests = ( 

535 session.execute( 

536 select(HostRequest) 

537 .where(HostRequest.status == HostRequestStatus.pending) 

538 .where(HostRequest.host_sent_request_reminders < HOST_REQUEST_MAX_REMINDERS) 

539 .where(HostRequest.start_time > func.now()) 

540 .where((func.now() - HostRequest.last_sent_request_reminder_time) >= HOST_REQUEST_REMINDER_INTERVAL) 

541 .where(~exists(host_has_sent_message)) 

542 ) 

543 .scalars() 

544 .all() 

545 ) 

546 

547 for host_request in requests: 

548 host_request.host_sent_request_reminders += 1 

549 host_request.last_sent_request_reminder_time = now() 

550 

551 context = make_background_user_context(user_id=host_request.host_user_id) 

552 notify( 

553 session, 

554 user_id=host_request.host_user_id, 

555 topic_action="host_request:reminder", 

556 data=notification_data_pb2.HostRequestReminder( 

557 host_request=host_request_to_pb(host_request, session, context), 

558 surfer=user_model_to_pb(host_request.surfer, session, context), 

559 ), 

560 ) 

561 

562 session.commit() 

563 

564 

565send_host_request_reminders.PAYLOAD = empty_pb2.Empty 

566send_host_request_reminders.SCHEDULE = timedelta(minutes=15) 

567 

568 

569def add_users_to_email_list(payload: empty_pb2.Empty) -> None: 

570 if not config["LISTMONK_ENABLED"]: 

571 logger.info("Not adding users to mailing list") 

572 return 

573 

574 logger.info("Adding users to mailing list") 

575 

576 while True: 

577 with session_scope() as session: 

578 user = session.execute( 

579 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

580 ).scalar_one_or_none() 

581 if not user: 

582 logger.info("Finished adding users to mailing list") 

583 return 

584 

585 if user.opt_out_of_newsletter: 

586 user.in_sync_with_newsletter = True 

587 session.commit() 

588 continue 

589 

590 r = requests.post( 

591 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

592 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

593 json={ 

594 "email": user.email, 

595 "name": user.name, 

596 "lists": [config["LISTMONK_LIST_ID"]], 

597 "preconfirm_subscriptions": True, 

598 "attribs": {"couchers_user_id": user.id}, 

599 "status": "enabled", 

600 }, 

601 timeout=10, 

602 ) 

603 # the API returns if the user is already subscribed 

604 if r.status_code == 200 or r.status_code == 409: 

605 user.in_sync_with_newsletter = True 

606 session.commit() 

607 else: 

608 raise Exception("Failed to add users to mailing list") 

609 

610 

611add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

612add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

613 

614 

615def enforce_community_membership(payload: empty_pb2.Empty) -> None: 

616 tasks_enforce_community_memberships() 

617 

618 

619enforce_community_membership.PAYLOAD = empty_pb2.Empty 

620enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

621 

622 

623def update_recommendation_scores(payload: empty_pb2.Empty) -> None: 

624 text_fields = [ 

625 User.hometown, 

626 User.occupation, 

627 User.education, 

628 User.about_me, 

629 User.things_i_like, 

630 User.about_place, 

631 User.additional_information, 

632 User.pet_details, 

633 User.kid_details, 

634 User.housemate_details, 

635 User.other_host_info, 

636 User.sleeping_details, 

637 User.area, 

638 User.house_rules, 

639 ] 

640 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

641 

642 def poor_man_gaussian(): 

643 """ 

644 Produces an approximatley std normal random variate 

645 """ 

646 trials = 5 

647 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

648 

649 def int_(stmt): 

650 return func.coalesce(cast(stmt, Integer), 0) 

651 

652 def float_(stmt): 

653 return func.coalesce(cast(stmt, Float), 0.0) 

654 

655 with session_scope() as session: 

656 # profile 

657 profile_text = "" 

658 for field in text_fields: 

659 profile_text += func.coalesce(field, "") 

660 text_length = func.length(profile_text) 

661 home_text = "" 

662 for field in home_fields: 

663 home_text += func.coalesce(field, "") 

664 home_length = func.length(home_text) 

665 

666 filled_profile = int_(User.has_completed_profile) 

667 has_text = int_(text_length > 500) 

668 long_text = int_(text_length > 2000) 

669 can_host = int_(User.hosting_status == HostingStatus.can_host) 

670 may_host = int_(User.hosting_status == HostingStatus.maybe) 

671 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

672 filled_home = int_(User.has_completed_my_home) 

673 filled_home_lots = int_(home_length > 200) 

674 hosting_status_points = 5 * can_host - 5 * may_host - 10 * cant_host 

675 profile_points = 5 * filled_profile + 2 * has_text + 3 * long_text + 5 * filled_home + 10 * filled_home_lots 

676 

677 # references 

678 left_ref_expr = int_(1).label("left_reference") 

679 left_refs_subquery = ( 

680 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

681 ) 

682 left_reference = int_(left_refs_subquery.c.left_reference) 

683 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

684 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

685 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

686 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

687 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

688 "has_bad_ref" 

689 ) 

690 received_ref_subquery = ( 

691 select( 

692 Reference.to_user_id.label("user_id"), 

693 has_reference_expr, 

694 has_multiple_types_expr, 

695 has_bad_ref_expr, 

696 ref_count_expr, 

697 ref_avg_expr, 

698 ) 

699 .group_by(Reference.to_user_id) 

700 .subquery() 

701 ) 

702 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

703 has_reference = int_(received_ref_subquery.c.has_reference) 

704 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

705 rating_score = float_( 

706 received_ref_subquery.c.ref_avg 

707 * ( 

708 2 * func.least(received_ref_subquery.c.ref_count, 5) 

709 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

710 ) 

711 ) 

712 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

713 

714 # activeness 

715 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

716 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

717 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

718 messaged_lots = int_(func.count(Message.id) > 5) 

719 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

720 messaging_subquery = ( 

721 select(Message.author_id.label("user_id"), messaging_points_subquery) 

722 .where(Message.message_type == MessageType.text) 

723 .group_by(Message.author_id) 

724 .subquery() 

725 ) 

726 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

727 

728 # verification 

729 cb_subquery = ( 

730 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

731 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

732 .where(ClusterSubscription.role == ClusterRole.admin) 

733 .where(Cluster.is_official_cluster) 

734 .group_by(ClusterSubscription.user_id) 

735 .subquery() 

736 ) 

737 min_node_id = cb_subquery.c.min_node_id 

738 cb = int_(min_node_id >= 1) 

739 wcb = int_(min_node_id == 1) 

740 badge_points = { 

741 "founder": 100, 

742 "board_member": 20, 

743 "past_board_member": 5, 

744 "strong_verification": 3, 

745 "volunteer": 3, 

746 "past_volunteer": 2, 

747 "donor": 1, 

748 "phone_verified": 1, 

749 } 

750 

751 badge_subquery = ( 

752 select( 

753 UserBadge.user_id.label("user_id"), 

754 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

755 ) 

756 .group_by(UserBadge.user_id) 

757 .subquery() 

758 ) 

759 

760 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

761 

762 # response rate 

763 hr_subquery = select( 

764 UserResponseRate.user_id, 

765 float_(extract("epoch", UserResponseRate.response_time_33p) / 60.0).label("response_time_33p"), 

766 float_(extract("epoch", UserResponseRate.response_time_66p) / 60.0).label("response_time_66p"), 

767 ).subquery() 

768 response_time_33p = hr_subquery.c.response_time_33p 

769 response_time_66p = hr_subquery.c.response_time_66p 

770 # be careful with nulls 

771 response_rate_points = -10 * int_(response_time_33p > 60 * 96.0) + 5 * int_(response_time_66p < 60 * 96.0) 

772 

773 recommendation_score = ( 

774 hosting_status_points 

775 + profile_points 

776 + ref_score 

777 + activeness_points 

778 + other_points 

779 + response_rate_points 

780 + 2 * poor_man_gaussian() 

781 ) 

782 

783 scores = ( 

784 select(User.id.label("user_id"), recommendation_score.label("score")) 

785 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

786 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

787 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

788 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

789 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

790 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

791 ).subquery() 

792 

793 session.execute( 

794 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

795 ) 

796 

797 logger.info("Updated recommendation scores") 

798 

799 

800update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

801update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

802 

803 

804def update_badges(payload: empty_pb2.Empty) -> None: 

805 with session_scope() as session: 

806 

807 def update_badge(badge_id: str, members: list[int]) -> None: 

808 badge = get_badge_dict()[badge_id] 

809 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

810 # in case the user ids don't exist in the db 

811 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

812 # we should add the badge to these 

813 add = set(actual_members) - set(user_ids) 

814 # we should remove the badge from these 

815 remove = set(user_ids) - set(actual_members) 

816 for user_id in add: 

817 user_add_badge(session, user_id, badge_id) 

818 

819 for user_id in remove: 

820 user_remove_badge(session, user_id, badge_id) 

821 

822 update_badge("founder", get_static_badge_dict()["founder"]) 

823 update_badge("board_member", get_static_badge_dict()["board_member"]) 

824 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

825 update_badge( 

826 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

827 ) 

828 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

829 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

830 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

831 update_badge( 

832 "strong_verification", 

833 session.execute( 

834 select(User.id) 

835 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

836 .where(StrongVerificationAttempt.has_strong_verification(User)) 

837 ) 

838 .scalars() 

839 .all(), 

840 ) 

841 

842 

843update_badges.PAYLOAD = empty_pb2.Empty 

844update_badges.SCHEDULE = timedelta(minutes=15) 

845 

846 

847def finalize_strong_verification(payload: "jobs_pb2.FinalizeStrongVerificationPayload") -> None: 

848 with session_scope() as session: 

849 verification_attempt = session.execute( 

850 select(StrongVerificationAttempt) 

851 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

852 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

853 ).scalar_one() 

854 response = requests.post( 

855 "https://passportreader.app/api/v1/session.get", 

856 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

857 json={"id": verification_attempt.iris_session_id}, 

858 timeout=10, 

859 verify="/etc/ssl/certs/ca-certificates.crt", 

860 ) 

861 if response.status_code != 200: 

862 raise Exception(f"Iris didn't return 200: {response.text}") 

863 json_data = response.json() 

864 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

865 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

866 ) 

867 assert verification_attempt.user_id == reference_payload.user_id 

868 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

869 assert verification_attempt.iris_session_id == json_data["id"] 

870 assert json_data["state"] == "APPROVED" 

871 

872 if json_data["document_type"] != "PASSPORT": 

873 verification_attempt.status = StrongVerificationAttemptStatus.failed 

874 notify( 

875 session, 

876 user_id=verification_attempt.user_id, 

877 topic_action="verification:sv_fail", 

878 data=notification_data_pb2.VerificationSVFail( 

879 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

880 ), 

881 ) 

882 return 

883 

884 assert json_data["document_type"] == "PASSPORT" 

885 

886 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

887 nationality = json_data["nationality"] 

888 last_three_document_chars = json_data["document_number"][-3:] 

889 

890 existing_attempt = session.execute( 

891 select(StrongVerificationAttempt) 

892 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

893 .where(StrongVerificationAttempt.passport_nationality == nationality) 

894 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

895 .order_by(StrongVerificationAttempt.id) 

896 .limit(1) 

897 ).scalar_one_or_none() 

898 

899 verification_attempt.has_minimal_data = True 

900 verification_attempt.passport_expiry_date = expiry_date 

901 verification_attempt.passport_nationality = nationality 

902 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

903 

904 if existing_attempt: 

905 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

906 

907 if existing_attempt.user_id != verification_attempt.user_id: 

908 session.flush() 

909 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

910 

911 notify( 

912 session, 

913 user_id=verification_attempt.user_id, 

914 topic_action="verification:sv_fail", 

915 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

916 ) 

917 return 

918 

919 verification_attempt.has_full_data = True 

920 verification_attempt.passport_encrypted_data = asym_encrypt( 

921 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

922 ) 

923 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

924 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

925 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

926 

927 session.flush() 

928 

929 strong_verification_completions_counter.inc() 

930 

931 user = verification_attempt.user 

932 if verification_attempt.has_strong_verification(user): 

933 badge_id = "strong_verification" 

934 if session.execute( 

935 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

936 ).scalar_one_or_none(): 

937 return 

938 

939 user_add_badge(session, user.id, badge_id, do_notify=False) 

940 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success") 

941 else: 

942 notify( 

943 session, 

944 user_id=verification_attempt.user_id, 

945 topic_action="verification:sv_fail", 

946 data=notification_data_pb2.VerificationSVFail( 

947 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

948 ), 

949 ) 

950 

951 

952finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload 

953 

954 

955def send_activeness_probes(payload: empty_pb2.Empty) -> None: 

956 with session_scope() as session: 

957 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

958 

959 if config["ACTIVENESS_PROBES_ENABLED"]: 

960 # current activeness probes 

961 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

962 

963 # users who we should send an activeness probe to 

964 new_probe_user_ids = ( 

965 session.execute( 

966 select(User.id) 

967 .where(User.is_visible) 

968 .where(User.hosting_status == HostingStatus.can_host) 

969 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

970 .where(User.id.not_in(select(subquery.c.user_id))) 

971 ) 

972 .scalars() 

973 .all() 

974 ) 

975 

976 total_users = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

977 probes_today = session.execute( 

978 select(func.count()) 

979 .select_from(ActivenessProbe) 

980 .where(func.now() - ActivenessProbe.probe_initiated < timedelta(hours=24)) 

981 ).scalar_one() 

982 

983 # send probes to max 2% of users per day 

984 max_probes_per_day = 0.02 * total_users 

985 max_probe_size = int(max(min(max_probes_per_day - probes_today, max_probes_per_day / 24), 1)) 

986 

987 if len(new_probe_user_ids) > max_probe_size: 

988 new_probe_user_ids = sample(new_probe_user_ids, max_probe_size) 

989 

990 for user_id in new_probe_user_ids: 

991 session.add(ActivenessProbe(user_id=user_id)) 

992 

993 session.commit() 

994 

995 ## Step 2: actually send out probe notifications 

996 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

997 probes = ( 

998 session.execute( 

999 select(ActivenessProbe) 

1000 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

1001 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

1002 .where(ActivenessProbe.is_pending) 

1003 ) 

1004 .scalars() 

1005 .all() 

1006 ) 

1007 

1008 for probe in probes: 

1009 probe.notifications_sent = probe_number_minus_1 + 1 

1010 context = make_background_user_context(user_id=probe.user.id) 

1011 notify( 

1012 session, 

1013 user_id=probe.user.id, 

1014 topic_action="activeness:probe", 

1015 key=probe.id, 

1016 data=notification_data_pb2.ActivenessProbe( 

1017 reminder_number=probe_number_minus_1 + 1, 

1018 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

1019 ), 

1020 ) 

1021 session.commit() 

1022 

1023 ## Step 3: for those who haven't responded, mark them as failed 

1024 expired_probes = ( 

1025 session.execute( 

1026 select(ActivenessProbe) 

1027 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

1028 .where(ActivenessProbe.is_pending) 

1029 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

1030 ) 

1031 .scalars() 

1032 .all() 

1033 ) 

1034 

1035 for probe in expired_probes: 

1036 probe.responded = now() 

1037 probe.response = ActivenessProbeStatus.expired 

1038 if probe.user.hosting_status == HostingStatus.can_host: 

1039 probe.user.hosting_status = HostingStatus.maybe 

1040 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 

1041 probe.user.meetup_status = MeetupStatus.open_to_meetup 

1042 session.commit() 

1043 

1044 

1045send_activeness_probes.PAYLOAD = empty_pb2.Empty 

1046send_activeness_probes.SCHEDULE = timedelta(minutes=60) 

1047 

1048 

1049def update_randomized_locations(payload: empty_pb2.Empty) -> None: 

1050 """ 

1051 We generate for each user a randomized location as follows: 

1052 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

1053 - For each user, mix in the user_id for randomness 

1054 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

1055 - Generate an angle from [0, 360] 

1056 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

1057 """ 

1058 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

1059 

1060 def gen_randomized_coords(user_id: int, lat: float, lng: float) -> tuple[float, float]: 

1061 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

1062 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

1063 radius = 0.02 + 0.08 * radius_u 

1064 angle_rad = 2 * pi * angle_u 

1065 offset_lng = radius * cos(angle_rad) 

1066 offset_lat = radius * sin(angle_rad) 

1067 return lat + offset_lat, lng + offset_lng 

1068 

1069 user_updates: list[dict[str, Any]] = [] 

1070 

1071 with session_scope() as session: 

1072 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

1073 

1074 for user_id, geom in users_to_update: 

1075 lat, lng = get_coordinates(geom) 

1076 user_updates.append( 

1077 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

1078 ) 

1079 

1080 with session_scope() as session: 

1081 session.execute(update(User), user_updates) 

1082 

1083 

1084update_randomized_locations.PAYLOAD = empty_pb2.Empty 

1085update_randomized_locations.SCHEDULE = timedelta(hours=1) 

1086 

1087 

1088def send_event_reminders(payload: empty_pb2.Empty) -> None: 

1089 """ 

1090 Sends reminders for events that are 24 hours away to users who marked themselves as attending. 

1091 """ 

1092 logger.info("Sending event reminder emails") 

1093 

1094 with session_scope() as session: 

1095 occurrences = ( 

1096 session.execute( 

1097 select(EventOccurrence) 

1098 .where(EventOccurrence.start_time <= now() + EVENT_REMINDER_TIMEDELTA) 

1099 .where(EventOccurrence.start_time >= now()) 

1100 ) 

1101 .scalars() 

1102 .all() 

1103 ) 

1104 

1105 for occurrence in occurrences: 

1106 results = session.execute( 

1107 select(User, EventOccurrenceAttendee) 

1108 .join(EventOccurrenceAttendee, EventOccurrenceAttendee.user_id == User.id) 

1109 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1110 .where(EventOccurrenceAttendee.reminder_sent == False) 

1111 ).all() 

1112 

1113 for user, attendee in results: 

1114 context = make_background_user_context(user_id=user.id) 

1115 

1116 notify( 

1117 session, 

1118 user_id=user.id, 

1119 topic_action="event:reminder", 

1120 data=notification_data_pb2.EventReminder( 

1121 event=event_to_pb(session, occurrence, context), 

1122 user=user_model_to_pb(user, session, context), 

1123 ), 

1124 ) 

1125 

1126 attendee.reminder_sent = True 

1127 session.commit() 

1128 

1129 

1130send_event_reminders.PAYLOAD = empty_pb2.Empty 

1131send_event_reminders.SCHEDULE = timedelta(hours=1)