Coverage for src/couchers/jobs/handlers.py: 99%

355 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import cos, pi, sin, sqrt 

8 

9import requests 

10from google.protobuf import empty_pb2 

11from sqlalchemy import Float, Integer 

12from sqlalchemy.orm import aliased 

13from sqlalchemy.sql import ( 

14 and_, 

15 case, 

16 cast, 

17 delete, 

18 distinct, 

19 extract, 

20 func, 

21 literal, 

22 not_, 

23 or_, 

24 select, 

25 union_all, 

26 update, 

27) 

28 

29from couchers.config import config 

30from couchers.constants import ( 

31 ACTIVENESS_PROBE_EXPIRY_TIME, 

32 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

33 ACTIVENESS_PROBE_TIME_REMINDERS, 

34) 

35from couchers.crypto import ( 

36 USER_LOCATION_RANDOMIZATION_NAME, 

37 asym_encrypt, 

38 b64decode, 

39 get_secret, 

40 simple_decrypt, 

41 stable_secure_uniform, 

42) 

43from couchers.db import session_scope 

44from couchers.email.dev import print_dev_email 

45from couchers.email.smtp import send_smtp_email 

46from couchers.helpers.badges import user_add_badge, user_remove_badge 

47from couchers.materialized_views import ( 

48 refresh_materialized_views, 

49 refresh_materialized_views_rapid, 

50 user_response_rates, 

51) 

52from couchers.metrics import strong_verification_completions_counter 

53from couchers.models import ( 

54 AccountDeletionToken, 

55 ActivenessProbe, 

56 ActivenessProbeStatus, 

57 Cluster, 

58 ClusterRole, 

59 ClusterSubscription, 

60 GroupChat, 

61 GroupChatSubscription, 

62 HostingStatus, 

63 HostRequest, 

64 Invoice, 

65 LoginToken, 

66 MeetupStatus, 

67 Message, 

68 MessageType, 

69 PassportSex, 

70 PasswordResetToken, 

71 Reference, 

72 StrongVerificationAttempt, 

73 StrongVerificationAttemptStatus, 

74 User, 

75 UserBadge, 

76) 

77from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

78from couchers.notifications.notify import notify 

79from couchers.resources import get_badge_dict, get_static_badge_dict 

80from couchers.servicers.admin import generate_new_blog_post_notifications 

81from couchers.servicers.api import user_model_to_pb 

82from couchers.servicers.blocking import are_blocked 

83from couchers.servicers.conversations import generate_message_notifications 

84from couchers.servicers.discussions import generate_create_discussion_notifications 

85from couchers.servicers.events import ( 

86 generate_event_cancel_notifications, 

87 generate_event_create_notifications, 

88 generate_event_delete_notifications, 

89 generate_event_update_notifications, 

90) 

91from couchers.servicers.requests import host_request_to_pb 

92from couchers.servicers.threads import generate_reply_notifications 

93from couchers.sql import couchers_select as select 

94from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

95from couchers.tasks import send_duplicate_strong_verification_email 

96from couchers.utils import Timestamp_from_datetime, create_coordinate, get_coordinates, make_user_context, now 

97from proto import notification_data_pb2 

98from proto.internal import jobs_pb2, verification_pb2 

99 

100logger = logging.getLogger(__name__) 

101 

102# these were straight up imported 

103handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

104 

105send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

106 

107handle_email_digests.PAYLOAD = empty_pb2.Empty 

108handle_email_digests.SCHEDULE = timedelta(minutes=15) 

109 

110generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

111 

112generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload 

113 

114generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload 

115 

116generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

117 

118generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

119 

120generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

121 

122generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

123 

124generate_new_blog_post_notifications.PAYLOAD = jobs_pb2.GenerateNewBlogPostNotificationsPayload 

125 

126refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

127refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

128 

129refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

130refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

131 

132 

133def send_email(payload): 

134 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

135 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

136 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

137 # the sender must return a models.Email object that can be added to the database 

138 email = sender( 

139 sender_name=payload.sender_name, 

140 sender_email=payload.sender_email, 

141 recipient=payload.recipient, 

142 subject=payload.subject, 

143 plain=payload.plain, 

144 html=payload.html, 

145 list_unsubscribe_header=payload.list_unsubscribe_header, 

146 source_data=payload.source_data, 

147 ) 

148 with session_scope() as session: 

149 session.add(email) 

150 

151 

152send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

153 

154 

155def purge_login_tokens(payload): 

156 logger.info("Purging login tokens") 

157 with session_scope() as session: 

158 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

159 

160 

161purge_login_tokens.PAYLOAD = empty_pb2.Empty 

162purge_login_tokens.SCHEDULE = timedelta(hours=24) 

163 

164 

165def purge_password_reset_tokens(payload): 

166 logger.info("Purging login tokens") 

167 with session_scope() as session: 

168 session.execute( 

169 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

170 ) 

171 

172 

173purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

174purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

175 

176 

177def purge_account_deletion_tokens(payload): 

178 logger.info("Purging account deletion tokens") 

179 with session_scope() as session: 

180 session.execute( 

181 delete(AccountDeletionToken) 

182 .where(~AccountDeletionToken.is_valid) 

183 .execution_options(synchronize_session=False) 

184 ) 

185 

186 

187purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

188purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

189 

190 

191def send_message_notifications(payload): 

192 """ 

193 Sends out email notifications for messages that have been unseen for a long enough time 

194 """ 

195 # very crude and dumb algorithm 

196 logger.info("Sending out email notifications for unseen messages") 

197 

198 with session_scope() as session: 

199 # users who have unnotified messages older than 5 minutes in any group chat 

200 users = ( 

201 session.execute( 

202 select(User) 

203 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

204 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

205 .where(not_(GroupChatSubscription.is_muted)) 

206 .where(User.is_visible) 

207 .where(Message.time >= GroupChatSubscription.joined) 

208 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

209 .where(Message.id > User.last_notified_message_id) 

210 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

211 .where(Message.time < now() - timedelta(minutes=5)) 

212 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

213 ) 

214 .scalars() 

215 .unique() 

216 ) 

217 

218 for user in users: 

219 # now actually grab all the group chats, not just less than 5 min old 

220 subquery = ( 

221 select( 

222 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

223 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

224 func.max(Message.id).label("message_id"), 

225 func.count(Message.id).label("count_unseen"), 

226 ) 

227 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

228 .where(GroupChatSubscription.user_id == user.id) 

229 .where(not_(GroupChatSubscription.is_muted)) 

230 .where(Message.id > user.last_notified_message_id) 

231 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

232 .where(Message.time >= GroupChatSubscription.joined) 

233 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

234 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

235 .group_by(GroupChatSubscription.group_chat_id) 

236 .order_by(func.max(Message.id).desc()) 

237 .subquery() 

238 ) 

239 

240 unseen_messages = session.execute( 

241 select(GroupChat, Message, subquery.c.count_unseen) 

242 .join(subquery, subquery.c.message_id == Message.id) 

243 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

244 .order_by(subquery.c.message_id.desc()) 

245 ).all() 

246 

247 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

248 

249 def format_title(message, group_chat, count_unseen): 

250 if group_chat.is_dm: 

251 return f"You missed {count_unseen} message(s) from {message.author.name}" 

252 else: 

253 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

254 

255 notify( 

256 session, 

257 user_id=user.id, 

258 topic_action="chat:missed_messages", 

259 data=notification_data_pb2.ChatMissedMessages( 

260 messages=[ 

261 notification_data_pb2.ChatMessage( 

262 author=user_model_to_pb( 

263 message.author, 

264 session, 

265 make_user_context(user_id=user.id), 

266 ), 

267 message=format_title(message, group_chat, count_unseen), 

268 text=message.text, 

269 group_chat_id=message.conversation_id, 

270 ) 

271 for group_chat, message, count_unseen in unseen_messages 

272 ], 

273 ), 

274 ) 

275 session.commit() 

276 

277 

278send_message_notifications.PAYLOAD = empty_pb2.Empty 

279send_message_notifications.SCHEDULE = timedelta(minutes=3) 

280 

281 

282def send_request_notifications(payload): 

283 """ 

284 Sends out email notifications for unseen messages in host requests (as surfer or host) 

285 """ 

286 logger.info("Sending out email notifications for unseen messages in host requests") 

287 

288 with session_scope() as session: 

289 # requests where this user is surfing 

290 surfing_reqs = session.execute( 

291 select(User, HostRequest, func.max(Message.id)) 

292 .where(User.is_visible) 

293 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

294 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

295 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

296 .where(Message.id > User.last_notified_request_message_id) 

297 .where(Message.time < now() - timedelta(minutes=5)) 

298 .where(Message.message_type == MessageType.text) 

299 .group_by(User, HostRequest) 

300 ).all() 

301 

302 # where this user is hosting 

303 hosting_reqs = session.execute( 

304 select(User, HostRequest, func.max(Message.id)) 

305 .where(User.is_visible) 

306 .join(HostRequest, HostRequest.host_user_id == User.id) 

307 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

308 .where(Message.id > HostRequest.host_last_seen_message_id) 

309 .where(Message.id > User.last_notified_request_message_id) 

310 .where(Message.time < now() - timedelta(minutes=5)) 

311 .where(Message.message_type == MessageType.text) 

312 .group_by(User, HostRequest) 

313 ).all() 

314 

315 for user, host_request, max_message_id in surfing_reqs: 

316 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

317 session.flush() 

318 

319 context = make_user_context(user_id=user.id) 

320 notify( 

321 session, 

322 user_id=user.id, 

323 topic_action="host_request:missed_messages", 

324 key=host_request.conversation_id, 

325 data=notification_data_pb2.HostRequestMissedMessages( 

326 host_request=host_request_to_pb(host_request, session, context), 

327 user=user_model_to_pb(host_request.host, session, context), 

328 am_host=False, 

329 ), 

330 ) 

331 

332 for user, host_request, max_message_id in hosting_reqs: 

333 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

334 session.flush() 

335 

336 context = make_user_context(user_id=user.id) 

337 notify( 

338 session, 

339 user_id=user.id, 

340 topic_action="host_request:missed_messages", 

341 key=host_request.conversation_id, 

342 data=notification_data_pb2.HostRequestMissedMessages( 

343 host_request=host_request_to_pb(host_request, session, context), 

344 user=user_model_to_pb(host_request.surfer, session, context), 

345 am_host=True, 

346 ), 

347 ) 

348 

349 

350send_request_notifications.PAYLOAD = empty_pb2.Empty 

351send_request_notifications.SCHEDULE = timedelta(minutes=3) 

352 

353 

354def send_onboarding_emails(payload): 

355 """ 

356 Sends out onboarding emails 

357 """ 

358 logger.info("Sending out onboarding emails") 

359 

360 with session_scope() as session: 

361 # first onboarding email 

362 users = ( 

363 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

364 ) 

365 

366 for user in users: 

367 notify( 

368 session, 

369 user_id=user.id, 

370 topic_action="onboarding:reminder", 

371 key="1", 

372 ) 

373 user.onboarding_emails_sent = 1 

374 user.last_onboarding_email_sent = now() 

375 session.commit() 

376 

377 # second onboarding email 

378 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

379 users = ( 

380 session.execute( 

381 select(User) 

382 .where(User.is_visible) 

383 .where(User.onboarding_emails_sent == 1) 

384 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

385 .where(User.has_completed_profile == False) 

386 ) 

387 .scalars() 

388 .all() 

389 ) 

390 

391 for user in users: 

392 notify( 

393 session, 

394 user_id=user.id, 

395 topic_action="onboarding:reminder", 

396 key="2", 

397 ) 

398 user.onboarding_emails_sent = 2 

399 user.last_onboarding_email_sent = now() 

400 session.commit() 

401 

402 

403send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

404send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

405 

406 

407def send_reference_reminders(payload): 

408 """ 

409 Sends out reminders to write references after hosting/staying 

410 """ 

411 logger.info("Sending out reference reminder emails") 

412 

413 # Keep this in chronological order! 

414 reference_reminder_schedule = [ 

415 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

416 # the end time to write a reference is supposed to be midnight in the host's timezone 

417 # 8 pm ish on the last day of the stay 

418 (1, timedelta(days=15) - timedelta(hours=20), 14), 

419 # 2 pm ish a week after stay 

420 (2, timedelta(days=8) - timedelta(hours=14), 7), 

421 # 10 am ish 3 days before end of time to write ref 

422 (3, timedelta(days=4) - timedelta(hours=10), 3), 

423 ] 

424 

425 with session_scope() as session: 

426 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

427 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

428 user = aliased(User) 

429 other_user = aliased(User) 

430 # surfers needing to write a ref 

431 q1 = ( 

432 select(literal(True), HostRequest, user, other_user) 

433 .join(user, user.id == HostRequest.surfer_user_id) 

434 .join(other_user, other_user.id == HostRequest.host_user_id) 

435 .outerjoin( 

436 Reference, 

437 and_( 

438 Reference.host_request_id == HostRequest.conversation_id, 

439 # if no reference is found in this join, then the surfer has not written a ref 

440 Reference.from_user_id == HostRequest.surfer_user_id, 

441 ), 

442 ) 

443 .where(user.is_visible) 

444 .where(other_user.is_visible) 

445 .where(Reference.id == None) 

446 .where(HostRequest.can_write_reference) 

447 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

448 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

449 .where(HostRequest.surfer_reason_didnt_meetup == None) 

450 ) 

451 

452 # hosts needing to write a ref 

453 q2 = ( 

454 select(literal(False), HostRequest, user, other_user) 

455 .join(user, user.id == HostRequest.host_user_id) 

456 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

457 .outerjoin( 

458 Reference, 

459 and_( 

460 Reference.host_request_id == HostRequest.conversation_id, 

461 # if no reference is found in this join, then the host has not written a ref 

462 Reference.from_user_id == HostRequest.host_user_id, 

463 ), 

464 ) 

465 .where(user.is_visible) 

466 .where(other_user.is_visible) 

467 .where(Reference.id == None) 

468 .where(HostRequest.can_write_reference) 

469 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

470 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

471 .where(HostRequest.host_reason_didnt_meetup == None) 

472 ) 

473 

474 union = union_all(q1, q2).subquery() 

475 union = select( 

476 union.c[0].label("surfed"), 

477 aliased(HostRequest, union), 

478 aliased(user, union), 

479 aliased(other_user, union), 

480 ) 

481 reference_reminders = session.execute(union).all() 

482 

483 for surfed, host_request, user, other_user in reference_reminders: 

484 # checked in sql 

485 assert user.is_visible 

486 if not are_blocked(session, user.id, other_user.id): 

487 context = make_user_context(user_id=user.id) 

488 notify( 

489 session, 

490 user_id=user.id, 

491 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

492 data=notification_data_pb2.ReferenceReminder( 

493 host_request_id=host_request.conversation_id, 

494 other_user=user_model_to_pb(other_user, session, context), 

495 days_left=reminder_days_left, 

496 ), 

497 ) 

498 if surfed: 

499 host_request.surfer_sent_reference_reminders = reminder_number 

500 else: 

501 host_request.host_sent_reference_reminders = reminder_number 

502 session.commit() 

503 

504 

505send_reference_reminders.PAYLOAD = empty_pb2.Empty 

506send_reference_reminders.SCHEDULE = timedelta(hours=1) 

507 

508 

509def add_users_to_email_list(payload): 

510 if not config["LISTMONK_ENABLED"]: 

511 logger.info("Not adding users to mailing list") 

512 return 

513 

514 logger.info("Adding users to mailing list") 

515 

516 while True: 

517 with session_scope() as session: 

518 user = session.execute( 

519 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

520 ).scalar_one_or_none() 

521 if not user: 

522 logger.info("Finished adding users to mailing list") 

523 return 

524 

525 if user.opt_out_of_newsletter: 

526 user.in_sync_with_newsletter = True 

527 session.commit() 

528 continue 

529 

530 r = requests.post( 

531 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

532 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

533 json={ 

534 "email": user.email, 

535 "name": user.name, 

536 "lists": [config["LISTMONK_LIST_ID"]], 

537 "preconfirm_subscriptions": True, 

538 "attribs": {"couchers_user_id": user.id}, 

539 "status": "enabled", 

540 }, 

541 timeout=10, 

542 ) 

543 # the API returns if the user is already subscribed 

544 if r.status_code == 200 or r.status_code == 409: 

545 user.in_sync_with_newsletter = True 

546 session.commit() 

547 else: 

548 raise Exception("Failed to add users to mailing list") 

549 

550 

551add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

552add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

553 

554 

555def enforce_community_membership(payload): 

556 tasks_enforce_community_memberships() 

557 

558 

559enforce_community_membership.PAYLOAD = empty_pb2.Empty 

560enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

561 

562 

563def update_recommendation_scores(payload): 

564 text_fields = [ 

565 User.hometown, 

566 User.occupation, 

567 User.education, 

568 User.about_me, 

569 User.things_i_like, 

570 User.about_place, 

571 User.additional_information, 

572 User.pet_details, 

573 User.kid_details, 

574 User.housemate_details, 

575 User.other_host_info, 

576 User.sleeping_details, 

577 User.area, 

578 User.house_rules, 

579 ] 

580 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

581 

582 def poor_man_gaussian(): 

583 """ 

584 Produces an approximatley std normal random variate 

585 """ 

586 trials = 5 

587 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

588 

589 def int_(stmt): 

590 return func.coalesce(cast(stmt, Integer), 0) 

591 

592 def float_(stmt): 

593 return func.coalesce(cast(stmt, Float), 0.0) 

594 

595 with session_scope() as session: 

596 # profile 

597 profile_text = "" 

598 for field in text_fields: 

599 profile_text += func.coalesce(field, "") 

600 text_length = func.length(profile_text) 

601 home_text = "" 

602 for field in home_fields: 

603 home_text += func.coalesce(field, "") 

604 home_length = func.length(home_text) 

605 

606 has_text = int_(text_length > 500) 

607 long_text = int_(text_length > 2000) 

608 has_pic = int_(User.avatar_key != None) 

609 can_host = int_(User.hosting_status == HostingStatus.can_host) 

610 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

611 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

612 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

613 

614 # references 

615 left_ref_expr = int_(1).label("left_reference") 

616 left_refs_subquery = ( 

617 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

618 ) 

619 left_reference = int_(left_refs_subquery.c.left_reference) 

620 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

621 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

622 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

623 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

624 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

625 "has_bad_ref" 

626 ) 

627 received_ref_subquery = ( 

628 select( 

629 Reference.to_user_id.label("user_id"), 

630 has_reference_expr, 

631 has_multiple_types_expr, 

632 has_bad_ref_expr, 

633 ref_count_expr, 

634 ref_avg_expr, 

635 ) 

636 .group_by(Reference.to_user_id) 

637 .subquery() 

638 ) 

639 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

640 has_reference = int_(received_ref_subquery.c.has_reference) 

641 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

642 rating_score = float_( 

643 received_ref_subquery.c.ref_avg 

644 * ( 

645 2 * func.least(received_ref_subquery.c.ref_count, 5) 

646 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

647 ) 

648 ) 

649 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

650 

651 # activeness 

652 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

653 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

654 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

655 messaged_lots = int_(func.count(Message.id) > 5) 

656 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

657 messaging_subquery = ( 

658 select(Message.author_id.label("user_id"), messaging_points_subquery) 

659 .where(Message.message_type == MessageType.text) 

660 .group_by(Message.author_id) 

661 .subquery() 

662 ) 

663 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

664 

665 # verification 

666 cb_subquery = ( 

667 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

668 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

669 .where(ClusterSubscription.role == ClusterRole.admin) 

670 .where(Cluster.is_official_cluster) 

671 .group_by(ClusterSubscription.user_id) 

672 .subquery() 

673 ) 

674 min_node_id = cb_subquery.c.min_node_id 

675 cb = int_(min_node_id >= 1) 

676 wcb = int_(min_node_id == 1) 

677 badge_points = { 

678 "founder": 100, 

679 "board_member": 20, 

680 "past_board_member": 5, 

681 "strong_verification": 3, 

682 "volunteer": 3, 

683 "past_volunteer": 2, 

684 "donor": 1, 

685 "phone_verified": 1, 

686 } 

687 

688 badge_subquery = ( 

689 select( 

690 UserBadge.user_id.label("user_id"), 

691 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

692 ) 

693 .group_by(UserBadge.user_id) 

694 .subquery() 

695 ) 

696 

697 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

698 

699 # response rate 

700 hr_subquery = select( 

701 user_response_rates.c.user_id, 

702 float_(extract("epoch", user_response_rates.c.response_time_33p) / 60.0).label("response_time_33p"), 

703 float_(extract("epoch", user_response_rates.c.response_time_66p) / 60.0).label("response_time_66p"), 

704 ).subquery() 

705 response_time_33p = hr_subquery.c.response_time_33p 

706 response_time_66p = hr_subquery.c.response_time_66p 

707 # be careful with nulls 

708 response_rate_points = -10 * int_(response_time_33p > 60 * 72.0) + 5 * int_(response_time_66p < 60 * 48.0) 

709 

710 recommendation_score = ( 

711 profile_points 

712 + ref_score 

713 + activeness_points 

714 + other_points 

715 + response_rate_points 

716 + 2 * poor_man_gaussian() 

717 ) 

718 

719 scores = ( 

720 select(User.id.label("user_id"), recommendation_score.label("score")) 

721 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

722 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

723 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

724 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

725 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

726 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

727 ).subquery() 

728 

729 session.execute( 

730 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

731 ) 

732 

733 logger.info("Updated recommendation scores") 

734 

735 

736update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

737update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

738 

739 

740def update_badges(payload): 

741 with session_scope() as session: 

742 

743 def update_badge(badge_id: str, members: list[int]): 

744 badge = get_badge_dict()[badge_id] 

745 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

746 # in case the user ids don't exist in the db 

747 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

748 # we should add the badge to these 

749 add = set(actual_members) - set(user_ids) 

750 # we should remove the badge from these 

751 remove = set(user_ids) - set(actual_members) 

752 for user_id in add: 

753 user_add_badge(session, user_id, badge_id) 

754 

755 for user_id in remove: 

756 user_remove_badge(session, user_id, badge_id) 

757 

758 update_badge("founder", get_static_badge_dict()["founder"]) 

759 update_badge("board_member", get_static_badge_dict()["board_member"]) 

760 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

761 update_badge( 

762 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

763 ) 

764 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

765 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

766 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

767 update_badge( 

768 "strong_verification", 

769 session.execute( 

770 select(User.id) 

771 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

772 .where(StrongVerificationAttempt.has_strong_verification(User)) 

773 ) 

774 .scalars() 

775 .all(), 

776 ) 

777 

778 

779update_badges.PAYLOAD = empty_pb2.Empty 

780update_badges.SCHEDULE = timedelta(minutes=15) 

781 

782 

783def finalize_strong_verification(payload): 

784 with session_scope() as session: 

785 verification_attempt = session.execute( 

786 select(StrongVerificationAttempt) 

787 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

788 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

789 ).scalar_one() 

790 response = requests.post( 

791 "https://passportreader.app/api/v1/session.get", 

792 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

793 json={"id": verification_attempt.iris_session_id}, 

794 timeout=10, 

795 ) 

796 if response.status_code != 200: 

797 raise Exception(f"Iris didn't return 200: {response.text}") 

798 json_data = response.json() 

799 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

800 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

801 ) 

802 assert verification_attempt.user_id == reference_payload.user_id 

803 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

804 assert verification_attempt.iris_session_id == json_data["id"] 

805 assert json_data["state"] == "APPROVED" 

806 

807 if json_data["document_type"] != "PASSPORT": 

808 verification_attempt.status = StrongVerificationAttemptStatus.failed 

809 notify( 

810 session, 

811 user_id=verification_attempt.user_id, 

812 topic_action="verification:sv_fail", 

813 data=notification_data_pb2.VerificationSVFail( 

814 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

815 ), 

816 ) 

817 return 

818 

819 assert json_data["document_type"] == "PASSPORT" 

820 

821 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

822 nationality = json_data["nationality"] 

823 last_three_document_chars = json_data["document_number"][-3:] 

824 

825 existing_attempt = session.execute( 

826 select(StrongVerificationAttempt) 

827 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

828 .where(StrongVerificationAttempt.passport_nationality == nationality) 

829 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

830 .order_by(StrongVerificationAttempt.id) 

831 .limit(1) 

832 ).scalar_one_or_none() 

833 

834 verification_attempt.has_minimal_data = True 

835 verification_attempt.passport_expiry_date = expiry_date 

836 verification_attempt.passport_nationality = nationality 

837 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

838 

839 if existing_attempt: 

840 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

841 

842 if existing_attempt.user_id != verification_attempt.user_id: 

843 session.flush() 

844 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

845 

846 notify( 

847 session, 

848 user_id=verification_attempt.user_id, 

849 topic_action="verification:sv_fail", 

850 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

851 ) 

852 return 

853 

854 verification_attempt.has_full_data = True 

855 verification_attempt.passport_encrypted_data = asym_encrypt( 

856 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

857 ) 

858 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

859 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

860 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

861 

862 session.flush() 

863 

864 strong_verification_completions_counter.inc() 

865 

866 user = verification_attempt.user 

867 if verification_attempt.has_strong_verification(user): 

868 badge_id = "strong_verification" 

869 if session.execute( 

870 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

871 ).scalar_one_or_none(): 

872 return 

873 

874 user_add_badge(session, user.id, badge_id, do_notify=False) 

875 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success") 

876 else: 

877 notify( 

878 session, 

879 user_id=verification_attempt.user_id, 

880 topic_action="verification:sv_fail", 

881 data=notification_data_pb2.VerificationSVFail( 

882 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

883 ), 

884 ) 

885 

886 

887finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload 

888 

889 

890def send_activeness_probes(payload): 

891 with session_scope() as session: 

892 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

893 

894 if config["ACTIVENESS_PROBES_ENABLED"]: 

895 # current activeness probes 

896 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

897 

898 # users who we should send an activeness probe to 

899 new_probe_user_ids = ( 

900 session.execute( 

901 select(User.id) 

902 .where(User.is_visible) 

903 .where(User.hosting_status == HostingStatus.can_host) 

904 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

905 .where(User.id.not_in(select(subquery.c.user_id))) 

906 ) 

907 .scalars() 

908 .all() 

909 ) 

910 

911 for user_id in new_probe_user_ids: 

912 session.add(ActivenessProbe(user_id=user_id)) 

913 

914 session.commit() 

915 

916 ## Step 2: actually send out probe notifications 

917 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

918 probes = ( 

919 session.execute( 

920 select(ActivenessProbe) 

921 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

922 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

923 .where(ActivenessProbe.is_pending) 

924 ) 

925 .scalars() 

926 .all() 

927 ) 

928 

929 for probe in probes: 

930 probe.notifications_sent = probe_number_minus_1 + 1 

931 context = make_user_context(user_id=probe.user.id) 

932 notify( 

933 session, 

934 user_id=probe.user.id, 

935 topic_action="activeness:probe", 

936 key=probe.id, 

937 data=notification_data_pb2.ActivenessProbe( 

938 reminder_number=probe_number_minus_1 + 1, 

939 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

940 ), 

941 ) 

942 session.commit() 

943 

944 ## Step 3: for those who haven't responded, mark them as failed 

945 expired_probes = ( 

946 session.execute( 

947 select(ActivenessProbe) 

948 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

949 .where(ActivenessProbe.is_pending) 

950 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

951 ) 

952 .scalars() 

953 .all() 

954 ) 

955 

956 for probe in expired_probes: 

957 probe.responded = now() 

958 probe.response = ActivenessProbeStatus.expired 

959 if probe.user.hosting_status == HostingStatus.can_host: 

960 probe.user.hosting_status = HostingStatus.cant_host 

961 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 

962 probe.user.meetup_status = MeetupStatus.open_to_meetup 

963 session.commit() 

964 

965 

966send_activeness_probes.PAYLOAD = empty_pb2.Empty 

967send_activeness_probes.SCHEDULE = timedelta(minutes=60) 

968 

969 

970def update_randomized_locations(payload): 

971 """ 

972 We generate for each user a randomized location as follows: 

973 - Start from a strong random seed (based on the SECRET env var and our key derivation function) 

974 - For each user, mix in the user_id for randomness 

975 - Generate a radius from [0.02, 0.1] degrees (about 2-10km) 

976 - Generate an angle from [0, 360] 

977 - Randomized location is then a distance `radius` away at an angle `angle` from `geom` 

978 """ 

979 randomization_secret = get_secret(USER_LOCATION_RANDOMIZATION_NAME) 

980 

981 def gen_randomized_coords(user_id, lat, lng): 

982 radius_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|radius", "ascii")) 

983 angle_u = stable_secure_uniform(randomization_secret, seed=bytes(f"{user_id}|angle", "ascii")) 

984 radius = 0.02 + 0.08 * radius_u 

985 angle_rad = 2 * pi * angle_u 

986 offset_lng = radius * cos(angle_rad) 

987 offset_lat = radius * sin(angle_rad) 

988 return lat + offset_lat, lng + offset_lng 

989 

990 user_updates = [] 

991 

992 with session_scope() as session: 

993 users_to_update = session.execute(select(User.id, User.geom).where(User.randomized_geom == None)).all() 

994 

995 for user_id, geom in users_to_update: 

996 lat, lng = get_coordinates(geom) 

997 user_updates.append( 

998 {"id": user_id, "randomized_geom": create_coordinate(*gen_randomized_coords(user_id, lat, lng))} 

999 ) 

1000 

1001 with session_scope() as session: 

1002 session.execute(update(User), user_updates) 

1003 

1004 

1005update_randomized_locations.PAYLOAD = empty_pb2.Empty 

1006update_randomized_locations.SCHEDULE = timedelta(hours=1)