Coverage for src/couchers/jobs/handlers.py: 99%

334 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-04-16 15:13 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import sqrt 

8from types import SimpleNamespace 

9 

10import requests 

11from google.protobuf import empty_pb2 

12from sqlalchemy import Float, Integer 

13from sqlalchemy.orm import aliased 

14from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all 

15 

16from couchers.config import config 

17from couchers.constants import ( 

18 ACTIVENESS_PROBE_EXPIRY_TIME, 

19 ACTIVENESS_PROBE_INACTIVITY_PERIOD, 

20 ACTIVENESS_PROBE_TIME_REMINDERS, 

21) 

22from couchers.crypto import asym_encrypt, b64decode, simple_decrypt 

23from couchers.db import session_scope 

24from couchers.email.dev import print_dev_email 

25from couchers.email.smtp import send_smtp_email 

26from couchers.helpers.badges import user_add_badge, user_remove_badge 

27from couchers.materialized_views import ( 

28 refresh_materialized_views, 

29 refresh_materialized_views_rapid, 

30 user_response_rates, 

31) 

32from couchers.metrics import strong_verification_completions_counter 

33from couchers.models import ( 

34 AccountDeletionToken, 

35 ActivenessProbe, 

36 ActivenessProbeStatus, 

37 Cluster, 

38 ClusterRole, 

39 ClusterSubscription, 

40 GroupChat, 

41 GroupChatSubscription, 

42 HostingStatus, 

43 HostRequest, 

44 Invoice, 

45 LoginToken, 

46 MeetupStatus, 

47 Message, 

48 MessageType, 

49 PassportSex, 

50 PasswordResetToken, 

51 Reference, 

52 StrongVerificationAttempt, 

53 StrongVerificationAttemptStatus, 

54 User, 

55 UserBadge, 

56) 

57from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

58from couchers.notifications.notify import notify 

59from couchers.resources import get_badge_dict, get_static_badge_dict 

60from couchers.servicers.api import user_model_to_pb 

61from couchers.servicers.blocking import are_blocked 

62from couchers.servicers.conversations import generate_message_notifications 

63from couchers.servicers.discussions import generate_create_discussion_notifications 

64from couchers.servicers.events import ( 

65 generate_event_cancel_notifications, 

66 generate_event_create_notifications, 

67 generate_event_delete_notifications, 

68 generate_event_update_notifications, 

69) 

70from couchers.servicers.requests import host_request_to_pb 

71from couchers.servicers.threads import generate_reply_notifications 

72from couchers.sql import couchers_select as select 

73from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

74from couchers.tasks import send_duplicate_strong_verification_email 

75from couchers.utils import Timestamp_from_datetime, now 

76from proto import notification_data_pb2 

77from proto.internal import jobs_pb2, verification_pb2 

78 

79logger = logging.getLogger(__name__) 

80 

81# these were straight up imported 

82handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

83 

84send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

85 

86handle_email_digests.PAYLOAD = empty_pb2.Empty 

87handle_email_digests.SCHEDULE = timedelta(minutes=15) 

88 

89generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

90 

91generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload 

92 

93generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload 

94 

95generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

96 

97generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

98 

99generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

100 

101generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

102 

103 

104refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

105refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

106 

107refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

108refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

109 

110 

111def send_email(payload): 

112 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

113 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

114 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

115 # the sender must return a models.Email object that can be added to the database 

116 email = sender( 

117 sender_name=payload.sender_name, 

118 sender_email=payload.sender_email, 

119 recipient=payload.recipient, 

120 subject=payload.subject, 

121 plain=payload.plain, 

122 html=payload.html, 

123 list_unsubscribe_header=payload.list_unsubscribe_header, 

124 source_data=payload.source_data, 

125 ) 

126 with session_scope() as session: 

127 session.add(email) 

128 

129 

130send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

131 

132 

133def purge_login_tokens(payload): 

134 logger.info("Purging login tokens") 

135 with session_scope() as session: 

136 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

137 

138 

139purge_login_tokens.PAYLOAD = empty_pb2.Empty 

140purge_login_tokens.SCHEDULE = timedelta(hours=24) 

141 

142 

143def purge_password_reset_tokens(payload): 

144 logger.info("Purging login tokens") 

145 with session_scope() as session: 

146 session.execute( 

147 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

148 ) 

149 

150 

151purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

152purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

153 

154 

155def purge_account_deletion_tokens(payload): 

156 logger.info("Purging account deletion tokens") 

157 with session_scope() as session: 

158 session.execute( 

159 delete(AccountDeletionToken) 

160 .where(~AccountDeletionToken.is_valid) 

161 .execution_options(synchronize_session=False) 

162 ) 

163 

164 

165purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

166purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

167 

168 

169def send_message_notifications(payload): 

170 """ 

171 Sends out email notifications for messages that have been unseen for a long enough time 

172 """ 

173 # very crude and dumb algorithm 

174 logger.info("Sending out email notifications for unseen messages") 

175 

176 with session_scope() as session: 

177 # users who have unnotified messages older than 5 minutes in any group chat 

178 users = ( 

179 session.execute( 

180 select(User) 

181 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

182 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

183 .where(not_(GroupChatSubscription.is_muted)) 

184 .where(User.is_visible) 

185 .where(Message.time >= GroupChatSubscription.joined) 

186 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

187 .where(Message.id > User.last_notified_message_id) 

188 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

189 .where(Message.time < now() - timedelta(minutes=5)) 

190 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

191 ) 

192 .scalars() 

193 .unique() 

194 ) 

195 

196 for user in users: 

197 # now actually grab all the group chats, not just less than 5 min old 

198 subquery = ( 

199 select( 

200 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

201 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

202 func.max(Message.id).label("message_id"), 

203 func.count(Message.id).label("count_unseen"), 

204 ) 

205 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

206 .where(GroupChatSubscription.user_id == user.id) 

207 .where(not_(GroupChatSubscription.is_muted)) 

208 .where(Message.id > user.last_notified_message_id) 

209 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

210 .where(Message.time >= GroupChatSubscription.joined) 

211 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

212 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

213 .group_by(GroupChatSubscription.group_chat_id) 

214 .order_by(func.max(Message.id).desc()) 

215 .subquery() 

216 ) 

217 

218 unseen_messages = session.execute( 

219 select(GroupChat, Message, subquery.c.count_unseen) 

220 .join(subquery, subquery.c.message_id == Message.id) 

221 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

222 .order_by(subquery.c.message_id.desc()) 

223 ).all() 

224 

225 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

226 

227 def format_title(message, group_chat, count_unseen): 

228 if group_chat.is_dm: 

229 return f"You missed {count_unseen} message(s) from {message.author.name}" 

230 else: 

231 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

232 

233 notify( 

234 session, 

235 user_id=user.id, 

236 topic_action="chat:missed_messages", 

237 data=notification_data_pb2.ChatMissedMessages( 

238 messages=[ 

239 notification_data_pb2.ChatMessage( 

240 author=user_model_to_pb( 

241 message.author, 

242 session, 

243 SimpleNamespace(user_id=user.id), 

244 ), 

245 message=format_title(message, group_chat, count_unseen), 

246 text=message.text, 

247 group_chat_id=message.conversation_id, 

248 ) 

249 for group_chat, message, count_unseen in unseen_messages 

250 ], 

251 ), 

252 ) 

253 session.commit() 

254 

255 

256send_message_notifications.PAYLOAD = empty_pb2.Empty 

257send_message_notifications.SCHEDULE = timedelta(minutes=3) 

258 

259 

260def send_request_notifications(payload): 

261 """ 

262 Sends out email notifications for unseen messages in host requests (as surfer or host) 

263 """ 

264 logger.info("Sending out email notifications for unseen messages in host requests") 

265 

266 with session_scope() as session: 

267 # requests where this user is surfing 

268 surfing_reqs = session.execute( 

269 select(User, HostRequest, func.max(Message.id)) 

270 .where(User.is_visible) 

271 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

272 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

273 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

274 .where(Message.id > User.last_notified_request_message_id) 

275 .where(Message.time < now() - timedelta(minutes=5)) 

276 .where(Message.message_type == MessageType.text) 

277 .group_by(User, HostRequest) 

278 ).all() 

279 

280 # where this user is hosting 

281 hosting_reqs = session.execute( 

282 select(User, HostRequest, func.max(Message.id)) 

283 .where(User.is_visible) 

284 .join(HostRequest, HostRequest.host_user_id == User.id) 

285 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

286 .where(Message.id > HostRequest.host_last_seen_message_id) 

287 .where(Message.id > User.last_notified_request_message_id) 

288 .where(Message.time < now() - timedelta(minutes=5)) 

289 .where(Message.message_type == MessageType.text) 

290 .group_by(User, HostRequest) 

291 ).all() 

292 

293 for user, host_request, max_message_id in surfing_reqs: 

294 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

295 session.flush() 

296 

297 context = SimpleNamespace(user_id=user.id) 

298 notify( 

299 session, 

300 user_id=user.id, 

301 topic_action="host_request:missed_messages", 

302 key=host_request.conversation_id, 

303 data=notification_data_pb2.HostRequestMissedMessages( 

304 host_request=host_request_to_pb(host_request, session, context), 

305 user=user_model_to_pb(host_request.host, session, context), 

306 am_host=False, 

307 ), 

308 ) 

309 

310 for user, host_request, max_message_id in hosting_reqs: 

311 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

312 session.flush() 

313 

314 context = SimpleNamespace(user_id=user.id) 

315 notify( 

316 session, 

317 user_id=user.id, 

318 topic_action="host_request:missed_messages", 

319 key=host_request.conversation_id, 

320 data=notification_data_pb2.HostRequestMissedMessages( 

321 host_request=host_request_to_pb(host_request, session, context), 

322 user=user_model_to_pb(host_request.surfer, session, context), 

323 am_host=True, 

324 ), 

325 ) 

326 

327 

328send_request_notifications.PAYLOAD = empty_pb2.Empty 

329send_request_notifications.SCHEDULE = timedelta(minutes=3) 

330 

331 

332def send_onboarding_emails(payload): 

333 """ 

334 Sends out onboarding emails 

335 """ 

336 logger.info("Sending out onboarding emails") 

337 

338 with session_scope() as session: 

339 # first onboarding email 

340 users = ( 

341 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

342 ) 

343 

344 for user in users: 

345 notify( 

346 session, 

347 user_id=user.id, 

348 topic_action="onboarding:reminder", 

349 key="1", 

350 ) 

351 user.onboarding_emails_sent = 1 

352 user.last_onboarding_email_sent = now() 

353 session.commit() 

354 

355 # second onboarding email 

356 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

357 users = ( 

358 session.execute( 

359 select(User) 

360 .where(User.is_visible) 

361 .where(User.onboarding_emails_sent == 1) 

362 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

363 .where(User.has_completed_profile == False) 

364 ) 

365 .scalars() 

366 .all() 

367 ) 

368 

369 for user in users: 

370 notify( 

371 session, 

372 user_id=user.id, 

373 topic_action="onboarding:reminder", 

374 key="2", 

375 ) 

376 user.onboarding_emails_sent = 2 

377 user.last_onboarding_email_sent = now() 

378 session.commit() 

379 

380 

381send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

382send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

383 

384 

385def send_reference_reminders(payload): 

386 """ 

387 Sends out reminders to write references after hosting/staying 

388 """ 

389 logger.info("Sending out reference reminder emails") 

390 

391 # Keep this in chronological order! 

392 reference_reminder_schedule = [ 

393 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

394 # the end time to write a reference is supposed to be midnight in the host's timezone 

395 # 8 pm ish on the last day of the stay 

396 (1, timedelta(days=15) - timedelta(hours=20), 14), 

397 # 2 pm ish a week after stay 

398 (2, timedelta(days=8) - timedelta(hours=14), 7), 

399 # 10 am ish 3 days before end of time to write ref 

400 (3, timedelta(days=4) - timedelta(hours=10), 3), 

401 ] 

402 

403 with session_scope() as session: 

404 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

405 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

406 user = aliased(User) 

407 other_user = aliased(User) 

408 # surfers needing to write a ref 

409 q1 = ( 

410 select(literal(True), HostRequest, user, other_user) 

411 .join(user, user.id == HostRequest.surfer_user_id) 

412 .join(other_user, other_user.id == HostRequest.host_user_id) 

413 .outerjoin( 

414 Reference, 

415 and_( 

416 Reference.host_request_id == HostRequest.conversation_id, 

417 # if no reference is found in this join, then the surfer has not written a ref 

418 Reference.from_user_id == HostRequest.surfer_user_id, 

419 ), 

420 ) 

421 .where(user.is_visible) 

422 .where(other_user.is_visible) 

423 .where(Reference.id == None) 

424 .where(HostRequest.can_write_reference) 

425 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

426 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

427 .where(HostRequest.surfer_reason_didnt_meetup == None) 

428 ) 

429 

430 # hosts needing to write a ref 

431 q2 = ( 

432 select(literal(False), HostRequest, user, other_user) 

433 .join(user, user.id == HostRequest.host_user_id) 

434 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

435 .outerjoin( 

436 Reference, 

437 and_( 

438 Reference.host_request_id == HostRequest.conversation_id, 

439 # if no reference is found in this join, then the host has not written a ref 

440 Reference.from_user_id == HostRequest.host_user_id, 

441 ), 

442 ) 

443 .where(user.is_visible) 

444 .where(other_user.is_visible) 

445 .where(Reference.id == None) 

446 .where(HostRequest.can_write_reference) 

447 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

448 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

449 .where(HostRequest.host_reason_didnt_meetup == None) 

450 ) 

451 

452 union = union_all(q1, q2).subquery() 

453 union = select( 

454 union.c[0].label("surfed"), 

455 aliased(HostRequest, union), 

456 aliased(user, union), 

457 aliased(other_user, union), 

458 ) 

459 reference_reminders = session.execute(union).all() 

460 

461 for surfed, host_request, user, other_user in reference_reminders: 

462 # checked in sql 

463 assert user.is_visible 

464 if not are_blocked(session, user.id, other_user.id): 

465 context = SimpleNamespace(user_id=user.id) 

466 notify( 

467 session, 

468 user_id=user.id, 

469 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

470 data=notification_data_pb2.ReferenceReminder( 

471 host_request_id=host_request.conversation_id, 

472 other_user=user_model_to_pb(other_user, session, context), 

473 days_left=reminder_days_left, 

474 ), 

475 ) 

476 if surfed: 

477 host_request.surfer_sent_reference_reminders = reminder_number 

478 else: 

479 host_request.host_sent_reference_reminders = reminder_number 

480 session.commit() 

481 

482 

483send_reference_reminders.PAYLOAD = empty_pb2.Empty 

484send_reference_reminders.SCHEDULE = timedelta(hours=1) 

485 

486 

487def add_users_to_email_list(payload): 

488 if not config["LISTMONK_ENABLED"]: 

489 logger.info("Not adding users to mailing list") 

490 return 

491 

492 logger.info("Adding users to mailing list") 

493 

494 while True: 

495 with session_scope() as session: 

496 user = session.execute( 

497 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

498 ).scalar_one_or_none() 

499 if not user: 

500 logger.info("Finished adding users to mailing list") 

501 return 

502 

503 if user.opt_out_of_newsletter: 

504 user.in_sync_with_newsletter = True 

505 session.commit() 

506 continue 

507 

508 r = requests.post( 

509 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

510 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

511 json={ 

512 "email": user.email, 

513 "name": user.name, 

514 "lists": [config["LISTMONK_LIST_ID"]], 

515 "preconfirm_subscriptions": True, 

516 "attribs": {"couchers_user_id": user.id}, 

517 "status": "enabled", 

518 }, 

519 timeout=10, 

520 ) 

521 # the API returns if the user is already subscribed 

522 if r.status_code == 200 or r.status_code == 409: 

523 user.in_sync_with_newsletter = True 

524 session.commit() 

525 else: 

526 raise Exception("Failed to add users to mailing list") 

527 

528 

529add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

530add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

531 

532 

533def enforce_community_membership(payload): 

534 tasks_enforce_community_memberships() 

535 

536 

537enforce_community_membership.PAYLOAD = empty_pb2.Empty 

538enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

539 

540 

541def update_recommendation_scores(payload): 

542 text_fields = [ 

543 User.hometown, 

544 User.occupation, 

545 User.education, 

546 User.about_me, 

547 User.things_i_like, 

548 User.about_place, 

549 User.additional_information, 

550 User.pet_details, 

551 User.kid_details, 

552 User.housemate_details, 

553 User.other_host_info, 

554 User.sleeping_details, 

555 User.area, 

556 User.house_rules, 

557 ] 

558 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

559 

560 def poor_man_gaussian(): 

561 """ 

562 Produces an approximatley std normal random variate 

563 """ 

564 trials = 5 

565 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

566 

567 def int_(stmt): 

568 return func.coalesce(cast(stmt, Integer), 0) 

569 

570 def float_(stmt): 

571 return func.coalesce(cast(stmt, Float), 0.0) 

572 

573 with session_scope() as session: 

574 # profile 

575 profile_text = "" 

576 for field in text_fields: 

577 profile_text += func.coalesce(field, "") 

578 text_length = func.length(profile_text) 

579 home_text = "" 

580 for field in home_fields: 

581 home_text += func.coalesce(field, "") 

582 home_length = func.length(home_text) 

583 

584 has_text = int_(text_length > 500) 

585 long_text = int_(text_length > 2000) 

586 has_pic = int_(User.avatar_key != None) 

587 can_host = int_(User.hosting_status == HostingStatus.can_host) 

588 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

589 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

590 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

591 

592 # references 

593 left_ref_expr = int_(1).label("left_reference") 

594 left_refs_subquery = ( 

595 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

596 ) 

597 left_reference = int_(left_refs_subquery.c.left_reference) 

598 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

599 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

600 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

601 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

602 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

603 "has_bad_ref" 

604 ) 

605 received_ref_subquery = ( 

606 select( 

607 Reference.to_user_id.label("user_id"), 

608 has_reference_expr, 

609 has_multiple_types_expr, 

610 has_bad_ref_expr, 

611 ref_count_expr, 

612 ref_avg_expr, 

613 ) 

614 .group_by(Reference.to_user_id) 

615 .subquery() 

616 ) 

617 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

618 has_reference = int_(received_ref_subquery.c.has_reference) 

619 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

620 rating_score = float_( 

621 received_ref_subquery.c.ref_avg 

622 * ( 

623 2 * func.least(received_ref_subquery.c.ref_count, 5) 

624 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

625 ) 

626 ) 

627 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

628 

629 # activeness 

630 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

631 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

632 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

633 messaged_lots = int_(func.count(Message.id) > 5) 

634 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

635 messaging_subquery = ( 

636 select(Message.author_id.label("user_id"), messaging_points_subquery) 

637 .where(Message.message_type == MessageType.text) 

638 .group_by(Message.author_id) 

639 .subquery() 

640 ) 

641 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

642 

643 # verification 

644 cb_subquery = ( 

645 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

646 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

647 .where(ClusterSubscription.role == ClusterRole.admin) 

648 .where(Cluster.is_official_cluster) 

649 .group_by(ClusterSubscription.user_id) 

650 .subquery() 

651 ) 

652 min_node_id = cb_subquery.c.min_node_id 

653 cb = int_(min_node_id >= 1) 

654 wcb = int_(min_node_id == 1) 

655 badge_points = { 

656 "founder": 100, 

657 "board_member": 20, 

658 "past_board_member": 5, 

659 "strong_verification": 3, 

660 "volunteer": 3, 

661 "past_volunteer": 2, 

662 "donor": 1, 

663 "phone_verified": 1, 

664 } 

665 

666 badge_subquery = ( 

667 select( 

668 UserBadge.user_id.label("user_id"), 

669 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

670 ) 

671 .group_by(UserBadge.user_id) 

672 .subquery() 

673 ) 

674 

675 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

676 

677 # response rate 

678 hr_subquery = select( 

679 user_response_rates.c.user_id, 

680 float_(extract("epoch", user_response_rates.c.response_time_33p) / 60.0).label("response_time_33p"), 

681 float_(extract("epoch", user_response_rates.c.response_time_66p) / 60.0).label("response_time_66p"), 

682 ).subquery() 

683 response_time_33p = hr_subquery.c.response_time_33p 

684 response_time_66p = hr_subquery.c.response_time_66p 

685 # be careful with nulls 

686 response_rate_points = -10 * int_(response_time_33p > 60 * 72.0) + 5 * int_(response_time_66p < 60 * 48.0) 

687 

688 recommendation_score = ( 

689 profile_points 

690 + ref_score 

691 + activeness_points 

692 + other_points 

693 + response_rate_points 

694 + 2 * poor_man_gaussian() 

695 ) 

696 

697 scores = ( 

698 select(User.id.label("user_id"), recommendation_score.label("score")) 

699 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

700 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

701 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

702 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

703 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

704 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

705 ).subquery() 

706 

707 session.execute( 

708 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

709 ) 

710 

711 logger.info("Updated recommendation scores") 

712 

713 

714update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

715update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

716 

717 

718def update_badges(payload): 

719 with session_scope() as session: 

720 

721 def update_badge(badge_id: str, members: list[int]): 

722 badge = get_badge_dict()[badge_id] 

723 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

724 # in case the user ids don't exist in the db 

725 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

726 # we should add the badge to these 

727 add = set(actual_members) - set(user_ids) 

728 # we should remove the badge from these 

729 remove = set(user_ids) - set(actual_members) 

730 for user_id in add: 

731 user_add_badge(session, user_id, badge_id) 

732 

733 for user_id in remove: 

734 user_remove_badge(session, user_id, badge_id) 

735 

736 update_badge("founder", get_static_badge_dict()["founder"]) 

737 update_badge("board_member", get_static_badge_dict()["board_member"]) 

738 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

739 update_badge( 

740 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

741 ) 

742 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

743 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

744 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

745 update_badge( 

746 "strong_verification", 

747 session.execute( 

748 select(User.id) 

749 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

750 .where(StrongVerificationAttempt.has_strong_verification(User)) 

751 ) 

752 .scalars() 

753 .all(), 

754 ) 

755 

756 

757update_badges.PAYLOAD = empty_pb2.Empty 

758update_badges.SCHEDULE = timedelta(minutes=15) 

759 

760 

761def finalize_strong_verification(payload): 

762 with session_scope() as session: 

763 verification_attempt = session.execute( 

764 select(StrongVerificationAttempt) 

765 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

766 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

767 ).scalar_one() 

768 response = requests.post( 

769 "https://passportreader.app/api/v1/session.get", 

770 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

771 json={"id": verification_attempt.iris_session_id}, 

772 timeout=10, 

773 ) 

774 if response.status_code != 200: 

775 raise Exception(f"Iris didn't return 200: {response.text}") 

776 json_data = response.json() 

777 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

778 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

779 ) 

780 assert verification_attempt.user_id == reference_payload.user_id 

781 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

782 assert verification_attempt.iris_session_id == json_data["id"] 

783 assert json_data["state"] == "APPROVED" 

784 

785 if json_data["document_type"] != "PASSPORT": 

786 verification_attempt.status = StrongVerificationAttemptStatus.failed 

787 notify( 

788 session, 

789 user_id=verification_attempt.user_id, 

790 topic_action="verification:sv_fail", 

791 data=notification_data_pb2.VerificationSVFail( 

792 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

793 ), 

794 ) 

795 return 

796 

797 assert json_data["document_type"] == "PASSPORT" 

798 

799 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

800 nationality = json_data["nationality"] 

801 last_three_document_chars = json_data["document_number"][-3:] 

802 

803 existing_attempt = session.execute( 

804 select(StrongVerificationAttempt) 

805 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

806 .where(StrongVerificationAttempt.passport_nationality == nationality) 

807 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

808 .order_by(StrongVerificationAttempt.id) 

809 .limit(1) 

810 ).scalar_one_or_none() 

811 

812 verification_attempt.has_minimal_data = True 

813 verification_attempt.passport_expiry_date = expiry_date 

814 verification_attempt.passport_nationality = nationality 

815 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

816 

817 if existing_attempt: 

818 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

819 

820 if existing_attempt.user_id != verification_attempt.user_id: 

821 session.flush() 

822 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

823 

824 notify( 

825 session, 

826 user_id=verification_attempt.user_id, 

827 topic_action="verification:sv_fail", 

828 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

829 ) 

830 return 

831 

832 verification_attempt.has_full_data = True 

833 verification_attempt.passport_encrypted_data = asym_encrypt( 

834 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

835 ) 

836 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

837 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

838 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

839 

840 session.flush() 

841 

842 strong_verification_completions_counter.inc() 

843 

844 user = verification_attempt.user 

845 if verification_attempt.has_strong_verification(user): 

846 badge_id = "strong_verification" 

847 if session.execute( 

848 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

849 ).scalar_one_or_none(): 

850 return 

851 

852 user_add_badge(session, user.id, badge_id, do_notify=False) 

853 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success") 

854 else: 

855 notify( 

856 session, 

857 user_id=verification_attempt.user_id, 

858 topic_action="verification:sv_fail", 

859 data=notification_data_pb2.VerificationSVFail( 

860 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

861 ), 

862 ) 

863 

864 

865finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload 

866 

867 

868def send_activeness_probes(payload): 

869 with session_scope() as session: 

870 ## Step 1: create new activeness probes for those who need it and don't have one (if enabled) 

871 

872 if config["ACTIVENESS_PROBES_ENABLED"]: 

873 # current activeness probes 

874 subquery = select(ActivenessProbe.user_id).where(ActivenessProbe.responded == None).subquery() 

875 

876 # users who we should send an activeness probe to 

877 new_probe_user_ids = ( 

878 session.execute( 

879 select(User.id) 

880 .where(User.is_visible) 

881 .where(User.hosting_status == HostingStatus.can_host) 

882 .where(User.last_active < func.now() - ACTIVENESS_PROBE_INACTIVITY_PERIOD) 

883 .where(User.id.not_in(select(subquery.c.user_id))) 

884 ) 

885 .scalars() 

886 .all() 

887 ) 

888 

889 for user_id in new_probe_user_ids: 

890 session.add(ActivenessProbe(user_id=user_id)) 

891 

892 session.commit() 

893 

894 ## Step 2: actually send out probe notifications 

895 for probe_number_minus_1, delay in enumerate(ACTIVENESS_PROBE_TIME_REMINDERS): 

896 probes = ( 

897 session.execute( 

898 select(ActivenessProbe) 

899 .where(ActivenessProbe.notifications_sent == probe_number_minus_1) 

900 .where(ActivenessProbe.probe_initiated + delay < func.now()) 

901 .where(ActivenessProbe.is_pending) 

902 ) 

903 .scalars() 

904 .all() 

905 ) 

906 

907 for probe in probes: 

908 probe.notifications_sent = probe_number_minus_1 + 1 

909 context = SimpleNamespace(user_id=probe.user.id) 

910 notify( 

911 session, 

912 user_id=probe.user.id, 

913 topic_action="activeness:probe", 

914 key=probe.id, 

915 data=notification_data_pb2.ActivenessProbe( 

916 reminder_number=probe_number_minus_1 + 1, 

917 deadline=Timestamp_from_datetime(probe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME), 

918 ), 

919 ) 

920 session.commit() 

921 

922 ## Step 3: for those who haven't responded, mark them as failed 

923 expired_probes = ( 

924 session.execute( 

925 select(ActivenessProbe) 

926 .where(ActivenessProbe.notifications_sent == len(ACTIVENESS_PROBE_TIME_REMINDERS)) 

927 .where(ActivenessProbe.is_pending) 

928 .where(ActivenessProbe.probe_initiated + ACTIVENESS_PROBE_EXPIRY_TIME < func.now()) 

929 ) 

930 .scalars() 

931 .all() 

932 ) 

933 

934 for probe in expired_probes: 

935 probe.responded = now() 

936 probe.response = ActivenessProbeStatus.expired 

937 if probe.user.hosting_status == HostingStatus.can_host: 

938 probe.user.hosting_status = HostingStatus.cant_host 

939 if probe.user.meetup_status == MeetupStatus.wants_to_meetup: 

940 probe.user.meetup_status = MeetupStatus.open_to_meetup 

941 session.commit() 

942 

943 

944send_activeness_probes.PAYLOAD = empty_pb2.Empty 

945send_activeness_probes.SCHEDULE = timedelta(minutes=60)