Coverage for src/couchers/jobs/handlers.py: 98%

311 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-12-20 18:03 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import sqrt 

8from types import SimpleNamespace 

9 

10import requests 

11from google.protobuf import empty_pb2 

12from sqlalchemy import Integer 

13from sqlalchemy.orm import aliased 

14from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all 

15from sqlalchemy.sql.functions import percentile_disc 

16 

17from couchers.config import config 

18from couchers.crypto import asym_encrypt, b64decode, simple_decrypt 

19from couchers.db import session_scope 

20from couchers.email.dev import print_dev_email 

21from couchers.email.smtp import send_smtp_email 

22from couchers.helpers.badges import user_add_badge, user_remove_badge 

23from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid 

24from couchers.metrics import strong_verification_completions_counter 

25from couchers.models import ( 

26 AccountDeletionToken, 

27 Cluster, 

28 ClusterRole, 

29 ClusterSubscription, 

30 Float, 

31 GroupChat, 

32 GroupChatSubscription, 

33 HostingStatus, 

34 HostRequest, 

35 Invoice, 

36 LoginToken, 

37 Message, 

38 MessageType, 

39 PassportSex, 

40 PasswordResetToken, 

41 Reference, 

42 StrongVerificationAttempt, 

43 StrongVerificationAttemptStatus, 

44 User, 

45 UserBadge, 

46) 

47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

48from couchers.notifications.notify import notify 

49from couchers.resources import get_badge_dict, get_static_badge_dict 

50from couchers.servicers.api import user_model_to_pb 

51from couchers.servicers.blocking import are_blocked 

52from couchers.servicers.conversations import generate_message_notifications 

53from couchers.servicers.events import ( 

54 generate_event_cancel_notifications, 

55 generate_event_create_notifications, 

56 generate_event_delete_notifications, 

57 generate_event_update_notifications, 

58) 

59from couchers.servicers.requests import host_request_to_pb 

60from couchers.sql import couchers_select as select 

61from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

62from couchers.tasks import send_duplicate_strong_verification_email 

63from couchers.utils import now 

64from proto import notification_data_pb2 

65from proto.internal import jobs_pb2, verification_pb2 

66 

67logger = logging.getLogger(__name__) 

68 

69# these were straight up imported 

70handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

71 

72send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

73 

74handle_email_digests.PAYLOAD = empty_pb2.Empty 

75handle_email_digests.SCHEDULE = timedelta(minutes=15) 

76 

77generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

78 

79generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

80 

81generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

82 

83generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

84 

85generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

86 

87 

88refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

89refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

90 

91refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

92refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

93 

94 

95def send_email(payload): 

96 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

97 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

98 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

99 # the sender must return a models.Email object that can be added to the database 

100 email = sender( 

101 sender_name=payload.sender_name, 

102 sender_email=payload.sender_email, 

103 recipient=payload.recipient, 

104 subject=payload.subject, 

105 plain=payload.plain, 

106 html=payload.html, 

107 list_unsubscribe_header=payload.list_unsubscribe_header, 

108 source_data=payload.source_data, 

109 ) 

110 with session_scope() as session: 

111 session.add(email) 

112 

113 

114send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

115 

116 

117def purge_login_tokens(payload): 

118 logger.info("Purging login tokens") 

119 with session_scope() as session: 

120 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

121 

122 

123purge_login_tokens.PAYLOAD = empty_pb2.Empty 

124purge_login_tokens.SCHEDULE = timedelta(hours=24) 

125 

126 

127def purge_password_reset_tokens(payload): 

128 logger.info("Purging login tokens") 

129 with session_scope() as session: 

130 session.execute( 

131 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

132 ) 

133 

134 

135purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

136purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

137 

138 

139def purge_account_deletion_tokens(payload): 

140 logger.info("Purging account deletion tokens") 

141 with session_scope() as session: 

142 session.execute( 

143 delete(AccountDeletionToken) 

144 .where(~AccountDeletionToken.is_valid) 

145 .execution_options(synchronize_session=False) 

146 ) 

147 

148 

149purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

150purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

151 

152 

153def send_message_notifications(payload): 

154 """ 

155 Sends out email notifications for messages that have been unseen for a long enough time 

156 """ 

157 # very crude and dumb algorithm 

158 logger.info("Sending out email notifications for unseen messages") 

159 

160 with session_scope() as session: 

161 # users who have unnotified messages older than 5 minutes in any group chat 

162 users = ( 

163 session.execute( 

164 select(User) 

165 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

166 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

167 .where(not_(GroupChatSubscription.is_muted)) 

168 .where(User.is_visible) 

169 .where(Message.time >= GroupChatSubscription.joined) 

170 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

171 .where(Message.id > User.last_notified_message_id) 

172 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

173 .where(Message.time < now() - timedelta(minutes=5)) 

174 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

175 ) 

176 .scalars() 

177 .unique() 

178 ) 

179 

180 for user in users: 

181 # now actually grab all the group chats, not just less than 5 min old 

182 subquery = ( 

183 select( 

184 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

185 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

186 func.max(Message.id).label("message_id"), 

187 func.count(Message.id).label("count_unseen"), 

188 ) 

189 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

190 .where(GroupChatSubscription.user_id == user.id) 

191 .where(not_(GroupChatSubscription.is_muted)) 

192 .where(Message.id > user.last_notified_message_id) 

193 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

194 .where(Message.time >= GroupChatSubscription.joined) 

195 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

196 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

197 .group_by(GroupChatSubscription.group_chat_id) 

198 .order_by(func.max(Message.id).desc()) 

199 .subquery() 

200 ) 

201 

202 unseen_messages = session.execute( 

203 select(GroupChat, Message, subquery.c.count_unseen) 

204 .join(subquery, subquery.c.message_id == Message.id) 

205 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

206 .order_by(subquery.c.message_id.desc()) 

207 ).all() 

208 

209 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

210 

211 def format_title(message, group_chat, count_unseen): 

212 if group_chat.is_dm: 

213 return f"You missed {count_unseen} message(s) from {message.author.name}" 

214 else: 

215 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

216 

217 notify( 

218 session, 

219 user_id=user.id, 

220 topic_action="chat:missed_messages", 

221 data=notification_data_pb2.ChatMissedMessages( 

222 messages=[ 

223 notification_data_pb2.ChatMessage( 

224 author=user_model_to_pb( 

225 message.author, 

226 session, 

227 SimpleNamespace(user_id=user.id), 

228 ), 

229 message=format_title(message, group_chat, count_unseen), 

230 text=message.text, 

231 group_chat_id=message.conversation_id, 

232 ) 

233 for group_chat, message, count_unseen in unseen_messages 

234 ], 

235 ), 

236 ) 

237 session.commit() 

238 

239 

240send_message_notifications.PAYLOAD = empty_pb2.Empty 

241send_message_notifications.SCHEDULE = timedelta(minutes=3) 

242 

243 

244def send_request_notifications(payload): 

245 """ 

246 Sends out email notifications for unseen messages in host requests (as surfer or host) 

247 """ 

248 logger.info("Sending out email notifications for unseen messages in host requests") 

249 

250 with session_scope() as session: 

251 # requests where this user is surfing 

252 surfing_reqs = session.execute( 

253 select(User, HostRequest, func.max(Message.id)) 

254 .where(User.is_visible) 

255 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

256 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

257 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

258 .where(Message.id > User.last_notified_request_message_id) 

259 .where(Message.time < now() - timedelta(minutes=5)) 

260 .where(Message.message_type == MessageType.text) 

261 .group_by(User, HostRequest) 

262 ).all() 

263 

264 # where this user is hosting 

265 hosting_reqs = session.execute( 

266 select(User, HostRequest, func.max(Message.id)) 

267 .where(User.is_visible) 

268 .join(HostRequest, HostRequest.host_user_id == User.id) 

269 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

270 .where(Message.id > HostRequest.host_last_seen_message_id) 

271 .where(Message.id > User.last_notified_request_message_id) 

272 .where(Message.time < now() - timedelta(minutes=5)) 

273 .where(Message.message_type == MessageType.text) 

274 .group_by(User, HostRequest) 

275 ).all() 

276 

277 for user, host_request, max_message_id in surfing_reqs: 

278 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

279 session.flush() 

280 

281 context = SimpleNamespace(user_id=user.id) 

282 notify( 

283 session, 

284 user_id=user.id, 

285 topic_action="host_request:missed_messages", 

286 key=host_request.conversation_id, 

287 data=notification_data_pb2.HostRequestMissedMessages( 

288 host_request=host_request_to_pb(host_request, session, context), 

289 user=user_model_to_pb(host_request.host, session, context), 

290 am_host=False, 

291 ), 

292 ) 

293 

294 for user, host_request, max_message_id in hosting_reqs: 

295 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

296 session.flush() 

297 

298 context = SimpleNamespace(user_id=user.id) 

299 notify( 

300 session, 

301 user_id=user.id, 

302 topic_action="host_request:missed_messages", 

303 key=host_request.conversation_id, 

304 data=notification_data_pb2.HostRequestMissedMessages( 

305 host_request=host_request_to_pb(host_request, session, context), 

306 user=user_model_to_pb(host_request.surfer, session, context), 

307 am_host=True, 

308 ), 

309 ) 

310 

311 

312send_request_notifications.PAYLOAD = empty_pb2.Empty 

313send_request_notifications.SCHEDULE = timedelta(minutes=3) 

314 

315 

316def send_onboarding_emails(payload): 

317 """ 

318 Sends out onboarding emails 

319 """ 

320 logger.info("Sending out onboarding emails") 

321 

322 with session_scope() as session: 

323 # first onboarding email 

324 users = ( 

325 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

326 ) 

327 

328 for user in users: 

329 notify( 

330 session, 

331 user_id=user.id, 

332 topic_action="onboarding:reminder", 

333 key="1", 

334 ) 

335 user.onboarding_emails_sent = 1 

336 user.last_onboarding_email_sent = now() 

337 session.commit() 

338 

339 # second onboarding email 

340 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

341 users = ( 

342 session.execute( 

343 select(User) 

344 .where(User.is_visible) 

345 .where(User.onboarding_emails_sent == 1) 

346 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

347 .where(User.has_completed_profile == False) 

348 ) 

349 .scalars() 

350 .all() 

351 ) 

352 

353 for user in users: 

354 notify( 

355 session, 

356 user_id=user.id, 

357 topic_action="onboarding:reminder", 

358 key="2", 

359 ) 

360 user.onboarding_emails_sent = 2 

361 user.last_onboarding_email_sent = now() 

362 session.commit() 

363 

364 

365send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

366send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

367 

368 

369def send_reference_reminders(payload): 

370 """ 

371 Sends out reminders to write references after hosting/staying 

372 """ 

373 logger.info("Sending out reference reminder emails") 

374 

375 # Keep this in chronological order! 

376 reference_reminder_schedule = [ 

377 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

378 # the end time to write a reference is supposed to be midnight in the host's timezone 

379 # 8 pm ish on the last day of the stay 

380 (1, timedelta(days=15) - timedelta(hours=20), 14), 

381 # 2 pm ish a week after stay 

382 (2, timedelta(days=8) - timedelta(hours=14), 7), 

383 # 10 am ish 3 days before end of time to write ref 

384 (3, timedelta(days=4) - timedelta(hours=10), 3), 

385 ] 

386 

387 with session_scope() as session: 

388 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

389 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

390 user = aliased(User) 

391 other_user = aliased(User) 

392 # surfers needing to write a ref 

393 q1 = ( 

394 select(literal(True), HostRequest, user, other_user) 

395 .join(user, user.id == HostRequest.surfer_user_id) 

396 .join(other_user, other_user.id == HostRequest.host_user_id) 

397 .outerjoin( 

398 Reference, 

399 and_( 

400 Reference.host_request_id == HostRequest.conversation_id, 

401 # if no reference is found in this join, then the surfer has not written a ref 

402 Reference.from_user_id == HostRequest.surfer_user_id, 

403 ), 

404 ) 

405 .where(user.is_visible) 

406 .where(other_user.is_visible) 

407 .where(Reference.id == None) 

408 .where(HostRequest.can_write_reference) 

409 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

410 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

411 ) 

412 

413 # hosts needing to write a ref 

414 q2 = ( 

415 select(literal(False), HostRequest, user, other_user) 

416 .join(user, user.id == HostRequest.host_user_id) 

417 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

418 .outerjoin( 

419 Reference, 

420 and_( 

421 Reference.host_request_id == HostRequest.conversation_id, 

422 # if no reference is found in this join, then the host has not written a ref 

423 Reference.from_user_id == HostRequest.host_user_id, 

424 ), 

425 ) 

426 .where(user.is_visible) 

427 .where(other_user.is_visible) 

428 .where(Reference.id == None) 

429 .where(HostRequest.can_write_reference) 

430 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

431 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

432 ) 

433 

434 union = union_all(q1, q2).subquery() 

435 union = select( 

436 union.c[0].label("surfed"), 

437 aliased(HostRequest, union), 

438 aliased(user, union), 

439 aliased(other_user, union), 

440 ) 

441 reference_reminders = session.execute(union).all() 

442 

443 for surfed, host_request, user, other_user in reference_reminders: 

444 # checked in sql 

445 assert user.is_visible 

446 if not are_blocked(session, user.id, other_user.id): 

447 context = SimpleNamespace(user_id=user.id) 

448 notify( 

449 session, 

450 user_id=user.id, 

451 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

452 data=notification_data_pb2.ReferenceReminder( 

453 host_request_id=host_request.conversation_id, 

454 other_user=user_model_to_pb(other_user, session, context), 

455 days_left=reminder_days_left, 

456 ), 

457 ) 

458 if surfed: 

459 host_request.surfer_sent_reference_reminders = reminder_number 

460 else: 

461 host_request.host_sent_reference_reminders = reminder_number 

462 session.commit() 

463 

464 

465send_reference_reminders.PAYLOAD = empty_pb2.Empty 

466send_reference_reminders.SCHEDULE = timedelta(hours=1) 

467 

468 

469def add_users_to_email_list(payload): 

470 if not config["LISTMONK_ENABLED"]: 

471 logger.info("Not adding users to mailing list") 

472 return 

473 

474 logger.info("Adding users to mailing list") 

475 

476 while True: 

477 with session_scope() as session: 

478 user = session.execute( 

479 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

480 ).scalar_one_or_none() 

481 if not user: 

482 logger.info("Finished adding users to mailing list") 

483 return 

484 

485 if user.opt_out_of_newsletter: 

486 user.in_sync_with_newsletter = True 

487 session.commit() 

488 continue 

489 

490 r = requests.post( 

491 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

492 auth=("listmonk", config["LISTMONK_API_KEY"]), 

493 json={ 

494 "email": user.email, 

495 "name": user.name, 

496 "list_uuids": [config["LISTMONK_LIST_UUID"]], 

497 "preconfirm_subscriptions": True, 

498 "attribs": {"couchers_user_id": user.id}, 

499 }, 

500 timeout=10, 

501 ) 

502 # the API returns if the user is already subscribed 

503 if r.status_code == 200 or r.status_code == 409: 

504 user.in_sync_with_newsletter = True 

505 session.commit() 

506 else: 

507 raise Exception("Failed to add users to mailing list") 

508 

509 

510add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

511add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

512 

513 

514def enforce_community_membership(payload): 

515 tasks_enforce_community_memberships() 

516 

517 

518enforce_community_membership.PAYLOAD = empty_pb2.Empty 

519enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

520 

521 

522def update_recommendation_scores(payload): 

523 text_fields = [ 

524 User.hometown, 

525 User.occupation, 

526 User.education, 

527 User.about_me, 

528 User.things_i_like, 

529 User.about_place, 

530 User.additional_information, 

531 User.pet_details, 

532 User.kid_details, 

533 User.housemate_details, 

534 User.other_host_info, 

535 User.sleeping_details, 

536 User.area, 

537 User.house_rules, 

538 ] 

539 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

540 

541 def poor_man_gaussian(): 

542 """ 

543 Produces an approximatley std normal random variate 

544 """ 

545 trials = 5 

546 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

547 

548 def int_(stmt): 

549 return func.coalesce(cast(stmt, Integer), 0) 

550 

551 def float_(stmt): 

552 return func.coalesce(cast(stmt, Float), 0.0) 

553 

554 with session_scope() as session: 

555 # profile 

556 profile_text = "" 

557 for field in text_fields: 

558 profile_text += func.coalesce(field, "") 

559 text_length = func.length(profile_text) 

560 home_text = "" 

561 for field in home_fields: 

562 home_text += func.coalesce(field, "") 

563 home_length = func.length(home_text) 

564 

565 has_text = int_(text_length > 500) 

566 long_text = int_(text_length > 2000) 

567 has_pic = int_(User.avatar_key != None) 

568 can_host = int_(User.hosting_status == HostingStatus.can_host) 

569 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

570 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

571 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

572 

573 # references 

574 left_ref_expr = int_(1).label("left_reference") 

575 left_refs_subquery = ( 

576 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

577 ) 

578 left_reference = int_(left_refs_subquery.c.left_reference) 

579 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

580 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

581 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

582 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

583 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

584 "has_bad_ref" 

585 ) 

586 received_ref_subquery = ( 

587 select( 

588 Reference.to_user_id.label("user_id"), 

589 has_reference_expr, 

590 has_multiple_types_expr, 

591 has_bad_ref_expr, 

592 ref_count_expr, 

593 ref_avg_expr, 

594 ) 

595 .group_by(Reference.to_user_id) 

596 .subquery() 

597 ) 

598 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

599 has_reference = int_(received_ref_subquery.c.has_reference) 

600 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

601 rating_score = float_( 

602 received_ref_subquery.c.ref_avg 

603 * ( 

604 2 * func.least(received_ref_subquery.c.ref_count, 5) 

605 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

606 ) 

607 ) 

608 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

609 

610 # activeness 

611 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

612 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

613 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

614 messaged_lots = int_(func.count(Message.id) > 5) 

615 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

616 messaging_subquery = ( 

617 select(Message.author_id.label("user_id"), messaging_points_subquery) 

618 .where(Message.message_type == MessageType.text) 

619 .group_by(Message.author_id) 

620 .subquery() 

621 ) 

622 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

623 

624 # verification 

625 cb_subquery = ( 

626 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

627 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

628 .where(ClusterSubscription.role == ClusterRole.admin) 

629 .where(Cluster.is_official_cluster) 

630 .group_by(ClusterSubscription.user_id) 

631 .subquery() 

632 ) 

633 min_node_id = cb_subquery.c.min_node_id 

634 cb = int_(min_node_id >= 1) 

635 wcb = int_(min_node_id == 1) 

636 badge_points = { 

637 "founder": 100, 

638 "board_member": 20, 

639 "past_board_member": 5, 

640 "strong_verification": 3, 

641 "volunteer": 3, 

642 "past_volunteer": 2, 

643 "donor": 1, 

644 "phone_verified": 1, 

645 } 

646 

647 badge_subquery = ( 

648 select( 

649 UserBadge.user_id.label("user_id"), 

650 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

651 ) 

652 .group_by(UserBadge.user_id) 

653 .subquery() 

654 ) 

655 

656 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

657 

658 # response rate 

659 t = ( 

660 select(Message.conversation_id, Message.time) 

661 .where(Message.message_type == MessageType.chat_created) 

662 .subquery() 

663 ) 

664 s = ( 

665 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

666 .group_by(Message.conversation_id, Message.author_id) 

667 .subquery() 

668 ) 

669 hr_subquery = ( 

670 select( 

671 HostRequest.host_user_id.label("user_id"), 

672 func.avg(s.c.time - t.c.time).label("avg_response_time"), 

673 func.count(t.c.time).label("received"), 

674 func.count(s.c.time).label("responded"), 

675 float_( 

676 extract( 

677 "epoch", 

678 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

679 ) 

680 / 60.0 

681 ).label("response_time_33p"), 

682 float_( 

683 extract( 

684 "epoch", 

685 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

686 ) 

687 / 60.0 

688 ).label("response_time_66p"), 

689 ) 

690 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

691 .outerjoin( 

692 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id) 

693 ) 

694 .group_by(HostRequest.host_user_id) 

695 .subquery() 

696 ) 

697 avg_response_time = hr_subquery.c.avg_response_time 

698 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0) 

699 received = hr_subquery.c.received 

700 responded = hr_subquery.c.responded 

701 response_time_33p = hr_subquery.c.response_time_33p 

702 response_time_66p = hr_subquery.c.response_time_66p 

703 response_rate = float_(responded / (1.0 * func.greatest(received, 1))) 

704 # be careful with nulls 

705 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0) 

706 

707 recommendation_score = ( 

708 profile_points 

709 + ref_score 

710 + activeness_points 

711 + other_points 

712 + response_rate_points 

713 + 2 * poor_man_gaussian() 

714 ) 

715 

716 scores = ( 

717 select(User.id.label("user_id"), recommendation_score.label("score")) 

718 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

719 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

720 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

721 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

722 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

723 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

724 ).subquery() 

725 

726 session.execute( 

727 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

728 ) 

729 

730 logger.info("Updated recommendation scores") 

731 

732 

733update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

734update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

735 

736 

737def update_badges(payload): 

738 with session_scope() as session: 

739 

740 def update_badge(badge_id: str, members: list[int]): 

741 badge = get_badge_dict()[badge_id] 

742 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

743 # in case the user ids don't exist in the db 

744 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

745 # we should add the badge to these 

746 add = set(actual_members) - set(user_ids) 

747 # we should remove the badge from these 

748 remove = set(user_ids) - set(actual_members) 

749 for user_id in add: 

750 user_add_badge(session, user_id, badge_id) 

751 

752 for user_id in remove: 

753 user_remove_badge(session, user_id, badge_id) 

754 

755 update_badge("founder", get_static_badge_dict()["founder"]) 

756 update_badge("board_member", get_static_badge_dict()["board_member"]) 

757 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

758 update_badge( 

759 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

760 ) 

761 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

762 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

763 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

764 update_badge( 

765 "strong_verification", 

766 session.execute( 

767 select(User.id) 

768 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

769 .where(StrongVerificationAttempt.has_strong_verification(User)) 

770 ) 

771 .scalars() 

772 .all(), 

773 ) 

774 

775 

776update_badges.PAYLOAD = empty_pb2.Empty 

777update_badges.SCHEDULE = timedelta(minutes=15) 

778 

779 

780def finalize_strong_verification(payload): 

781 with session_scope() as session: 

782 verification_attempt = session.execute( 

783 select(StrongVerificationAttempt) 

784 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

785 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

786 ).scalar_one() 

787 response = requests.post( 

788 "https://passportreader.app/api/v1/session.get", 

789 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

790 json={"id": verification_attempt.iris_session_id}, 

791 timeout=10, 

792 ) 

793 if response.status_code != 200: 

794 raise Exception(f"Iris didn't return 200: {response.text}") 

795 json_data = response.json() 

796 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

797 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

798 ) 

799 assert verification_attempt.user_id == reference_payload.user_id 

800 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

801 assert verification_attempt.iris_session_id == json_data["id"] 

802 assert json_data["state"] == "APPROVED" 

803 

804 if json_data["document_type"] != "PASSPORT": 

805 verification_attempt.status = StrongVerificationAttemptStatus.failed 

806 notify( 

807 session, 

808 user_id=verification_attempt.user_id, 

809 topic_action="verification:sv_fail", 

810 data=notification_data_pb2.VerificationSVFail( 

811 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

812 ), 

813 ) 

814 return 

815 

816 assert json_data["document_type"] == "PASSPORT" 

817 

818 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

819 nationality = json_data["nationality"] 

820 last_three_document_chars = json_data["document_number"][-3:] 

821 

822 existing_attempt = session.execute( 

823 select(StrongVerificationAttempt) 

824 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

825 .where(StrongVerificationAttempt.passport_nationality == nationality) 

826 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

827 .order_by(StrongVerificationAttempt.id) 

828 .limit(1) 

829 ).scalar_one_or_none() 

830 

831 verification_attempt.has_minimal_data = True 

832 verification_attempt.passport_expiry_date = expiry_date 

833 verification_attempt.passport_nationality = nationality 

834 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

835 

836 if existing_attempt: 

837 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

838 

839 if existing_attempt.user_id != verification_attempt.user_id: 

840 session.flush() 

841 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

842 

843 notify( 

844 session, 

845 user_id=verification_attempt.user_id, 

846 topic_action="verification:sv_fail", 

847 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

848 ) 

849 return 

850 

851 verification_attempt.has_full_data = True 

852 verification_attempt.passport_encrypted_data = asym_encrypt( 

853 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

854 ) 

855 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

856 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

857 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

858 

859 session.flush() 

860 

861 strong_verification_completions_counter.inc() 

862 

863 user = verification_attempt.user 

864 if verification_attempt.has_strong_verification(user): 

865 badge_id = "strong_verification" 

866 if session.execute( 

867 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

868 ).scalar_one_or_none(): 

869 return 

870 

871 user_add_badge(session, user.id, badge_id, do_notify=False) 

872 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success") 

873 else: 

874 notify( 

875 session, 

876 user_id=verification_attempt.user_id, 

877 topic_action="verification:sv_fail", 

878 data=notification_data_pb2.VerificationSVFail( 

879 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

880 ), 

881 ) 

882 

883 

884finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload