Coverage for src/couchers/jobs/handlers.py: 98%

311 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import sqrt 

8from types import SimpleNamespace 

9 

10import requests 

11from google.protobuf import empty_pb2 

12from sqlalchemy import Integer 

13from sqlalchemy.orm import aliased 

14from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all 

15from sqlalchemy.sql.functions import percentile_disc 

16 

17from couchers.config import config 

18from couchers.crypto import asym_encrypt, b64decode, simple_decrypt 

19from couchers.db import session_scope 

20from couchers.email.dev import print_dev_email 

21from couchers.email.smtp import send_smtp_email 

22from couchers.helpers.badges import user_add_badge, user_remove_badge 

23from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid 

24from couchers.metrics import strong_verification_completions_counter 

25from couchers.models import ( 

26 AccountDeletionToken, 

27 Cluster, 

28 ClusterRole, 

29 ClusterSubscription, 

30 Float, 

31 GroupChat, 

32 GroupChatSubscription, 

33 HostingStatus, 

34 HostRequest, 

35 Invoice, 

36 LoginToken, 

37 Message, 

38 MessageType, 

39 PassportSex, 

40 PasswordResetToken, 

41 Reference, 

42 StrongVerificationAttempt, 

43 StrongVerificationAttemptStatus, 

44 User, 

45 UserBadge, 

46) 

47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

48from couchers.notifications.notify import notify 

49from couchers.resources import get_badge_dict, get_static_badge_dict 

50from couchers.servicers.api import user_model_to_pb 

51from couchers.servicers.blocking import are_blocked 

52from couchers.servicers.conversations import generate_message_notifications 

53from couchers.servicers.events import ( 

54 generate_event_cancel_notifications, 

55 generate_event_create_notifications, 

56 generate_event_delete_notifications, 

57 generate_event_update_notifications, 

58) 

59from couchers.servicers.requests import host_request_to_pb 

60from couchers.sql import couchers_select as select 

61from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

62from couchers.tasks import send_duplicate_strong_verification_email 

63from couchers.utils import now 

64from proto import notification_data_pb2 

65from proto.internal import jobs_pb2, verification_pb2 

66 

67logger = logging.getLogger(__name__) 

68 

69# these were straight up imported 

70handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

71 

72send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

73 

74handle_email_digests.PAYLOAD = empty_pb2.Empty 

75handle_email_digests.SCHEDULE = timedelta(minutes=15) 

76 

77generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

78 

79generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

80 

81generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

82 

83generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

84 

85generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

86 

87 

88refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

89refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

90 

91refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

92refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

93 

94 

95def send_email(payload): 

96 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

97 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

98 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

99 # the sender must return a models.Email object that can be added to the database 

100 email = sender( 

101 sender_name=payload.sender_name, 

102 sender_email=payload.sender_email, 

103 recipient=payload.recipient, 

104 subject=payload.subject, 

105 plain=payload.plain, 

106 html=payload.html, 

107 list_unsubscribe_header=payload.list_unsubscribe_header, 

108 source_data=payload.source_data, 

109 ) 

110 with session_scope() as session: 

111 session.add(email) 

112 

113 

114send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

115 

116 

117def purge_login_tokens(payload): 

118 logger.info("Purging login tokens") 

119 with session_scope() as session: 

120 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

121 

122 

123purge_login_tokens.PAYLOAD = empty_pb2.Empty 

124purge_login_tokens.SCHEDULE = timedelta(hours=24) 

125 

126 

127def purge_password_reset_tokens(payload): 

128 logger.info("Purging login tokens") 

129 with session_scope() as session: 

130 session.execute( 

131 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

132 ) 

133 

134 

135purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

136purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

137 

138 

139def purge_account_deletion_tokens(payload): 

140 logger.info("Purging account deletion tokens") 

141 with session_scope() as session: 

142 session.execute( 

143 delete(AccountDeletionToken) 

144 .where(~AccountDeletionToken.is_valid) 

145 .execution_options(synchronize_session=False) 

146 ) 

147 

148 

149purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

150purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

151 

152 

153def send_message_notifications(payload): 

154 """ 

155 Sends out email notifications for messages that have been unseen for a long enough time 

156 """ 

157 # very crude and dumb algorithm 

158 logger.info("Sending out email notifications for unseen messages") 

159 

160 with session_scope() as session: 

161 # users who have unnotified messages older than 5 minutes in any group chat 

162 users = ( 

163 session.execute( 

164 select(User) 

165 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

166 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

167 .where(not_(GroupChatSubscription.is_muted)) 

168 .where(User.is_visible) 

169 .where(Message.time >= GroupChatSubscription.joined) 

170 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

171 .where(Message.id > User.last_notified_message_id) 

172 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

173 .where(Message.time < now() - timedelta(minutes=5)) 

174 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

175 ) 

176 .scalars() 

177 .unique() 

178 ) 

179 

180 for user in users: 

181 # now actually grab all the group chats, not just less than 5 min old 

182 subquery = ( 

183 select( 

184 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

185 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

186 func.max(Message.id).label("message_id"), 

187 func.count(Message.id).label("count_unseen"), 

188 ) 

189 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

190 .where(GroupChatSubscription.user_id == user.id) 

191 .where(not_(GroupChatSubscription.is_muted)) 

192 .where(Message.id > user.last_notified_message_id) 

193 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

194 .where(Message.time >= GroupChatSubscription.joined) 

195 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

196 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

197 .group_by(GroupChatSubscription.group_chat_id) 

198 .order_by(func.max(Message.id).desc()) 

199 .subquery() 

200 ) 

201 

202 unseen_messages = session.execute( 

203 select(GroupChat, Message, subquery.c.count_unseen) 

204 .join(subquery, subquery.c.message_id == Message.id) 

205 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

206 .order_by(subquery.c.message_id.desc()) 

207 ).all() 

208 

209 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

210 

211 def format_title(message, group_chat, count_unseen): 

212 if group_chat.is_dm: 

213 return f"You missed {count_unseen} message(s) from {message.author.name}" 

214 else: 

215 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

216 

217 notify( 

218 session, 

219 user_id=user.id, 

220 topic_action="chat:missed_messages", 

221 data=notification_data_pb2.ChatMissedMessages( 

222 messages=[ 

223 notification_data_pb2.ChatMessage( 

224 author=user_model_to_pb( 

225 message.author, 

226 session, 

227 SimpleNamespace(user_id=user.id), 

228 ), 

229 message=format_title(message, group_chat, count_unseen), 

230 text=message.text, 

231 group_chat_id=message.conversation_id, 

232 ) 

233 for group_chat, message, count_unseen in unseen_messages 

234 ], 

235 ), 

236 ) 

237 session.commit() 

238 

239 

240send_message_notifications.PAYLOAD = empty_pb2.Empty 

241send_message_notifications.SCHEDULE = timedelta(minutes=3) 

242 

243 

244def send_request_notifications(payload): 

245 """ 

246 Sends out email notifications for unseen messages in host requests (as surfer or host) 

247 """ 

248 logger.info("Sending out email notifications for unseen messages in host requests") 

249 

250 with session_scope() as session: 

251 # requests where this user is surfing 

252 surfing_reqs = session.execute( 

253 select(User, HostRequest, func.max(Message.id)) 

254 .where(User.is_visible) 

255 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

256 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

257 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

258 .where(Message.id > User.last_notified_request_message_id) 

259 .where(Message.time < now() - timedelta(minutes=5)) 

260 .where(Message.message_type == MessageType.text) 

261 .group_by(User, HostRequest) 

262 ).all() 

263 

264 # where this user is hosting 

265 hosting_reqs = session.execute( 

266 select(User, HostRequest, func.max(Message.id)) 

267 .where(User.is_visible) 

268 .join(HostRequest, HostRequest.host_user_id == User.id) 

269 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

270 .where(Message.id > HostRequest.host_last_seen_message_id) 

271 .where(Message.id > User.last_notified_request_message_id) 

272 .where(Message.time < now() - timedelta(minutes=5)) 

273 .where(Message.message_type == MessageType.text) 

274 .group_by(User, HostRequest) 

275 ).all() 

276 

277 for user, host_request, max_message_id in surfing_reqs: 

278 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

279 session.flush() 

280 

281 context = SimpleNamespace(user_id=user.id) 

282 notify( 

283 session, 

284 user_id=user.id, 

285 topic_action="host_request:missed_messages", 

286 key=host_request.conversation_id, 

287 data=notification_data_pb2.HostRequestMissedMessages( 

288 host_request=host_request_to_pb(host_request, session, context), 

289 user=user_model_to_pb(host_request.host, session, context), 

290 am_host=False, 

291 ), 

292 ) 

293 

294 for user, host_request, max_message_id in hosting_reqs: 

295 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

296 session.flush() 

297 

298 context = SimpleNamespace(user_id=user.id) 

299 notify( 

300 session, 

301 user_id=user.id, 

302 topic_action="host_request:missed_messages", 

303 key=host_request.conversation_id, 

304 data=notification_data_pb2.HostRequestMissedMessages( 

305 host_request=host_request_to_pb(host_request, session, context), 

306 user=user_model_to_pb(host_request.surfer, session, context), 

307 am_host=True, 

308 ), 

309 ) 

310 

311 

312send_request_notifications.PAYLOAD = empty_pb2.Empty 

313send_request_notifications.SCHEDULE = timedelta(minutes=3) 

314 

315 

316def send_onboarding_emails(payload): 

317 """ 

318 Sends out onboarding emails 

319 """ 

320 logger.info("Sending out onboarding emails") 

321 

322 with session_scope() as session: 

323 # first onboarding email 

324 users = ( 

325 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

326 ) 

327 

328 for user in users: 

329 notify( 

330 session, 

331 user_id=user.id, 

332 topic_action="onboarding:reminder", 

333 key="1", 

334 ) 

335 user.onboarding_emails_sent = 1 

336 user.last_onboarding_email_sent = now() 

337 session.commit() 

338 

339 # second onboarding email 

340 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

341 users = ( 

342 session.execute( 

343 select(User) 

344 .where(User.is_visible) 

345 .where(User.onboarding_emails_sent == 1) 

346 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

347 .where(User.has_completed_profile == False) 

348 ) 

349 .scalars() 

350 .all() 

351 ) 

352 

353 for user in users: 

354 notify( 

355 session, 

356 user_id=user.id, 

357 topic_action="onboarding:reminder", 

358 key="2", 

359 ) 

360 user.onboarding_emails_sent = 2 

361 user.last_onboarding_email_sent = now() 

362 session.commit() 

363 

364 

365send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

366send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

367 

368 

369def send_reference_reminders(payload): 

370 """ 

371 Sends out reminders to write references after hosting/staying 

372 """ 

373 logger.info("Sending out reference reminder emails") 

374 

375 # Keep this in chronological order! 

376 reference_reminder_schedule = [ 

377 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

378 # the end time to write a reference is supposed to be midnight in the host's timezone 

379 # 8 pm ish on the last day of the stay 

380 (1, timedelta(days=15) - timedelta(hours=20), 14), 

381 # 2 pm ish a week after stay 

382 (2, timedelta(days=8) - timedelta(hours=14), 7), 

383 # 10 am ish 3 days before end of time to write ref 

384 (3, timedelta(days=4) - timedelta(hours=10), 3), 

385 ] 

386 

387 with session_scope() as session: 

388 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

389 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

390 user = aliased(User) 

391 other_user = aliased(User) 

392 # surfers needing to write a ref 

393 q1 = ( 

394 select(literal(True), HostRequest, user, other_user) 

395 .join(user, user.id == HostRequest.surfer_user_id) 

396 .join(other_user, other_user.id == HostRequest.host_user_id) 

397 .outerjoin( 

398 Reference, 

399 and_( 

400 Reference.host_request_id == HostRequest.conversation_id, 

401 # if no reference is found in this join, then the surfer has not written a ref 

402 Reference.from_user_id == HostRequest.surfer_user_id, 

403 ), 

404 ) 

405 .where(user.is_visible) 

406 .where(other_user.is_visible) 

407 .where(Reference.id == None) 

408 .where(HostRequest.can_write_reference) 

409 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

410 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

411 .where(HostRequest.surfer_reason_didnt_meetup == None) 

412 ) 

413 

414 # hosts needing to write a ref 

415 q2 = ( 

416 select(literal(False), HostRequest, user, other_user) 

417 .join(user, user.id == HostRequest.host_user_id) 

418 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

419 .outerjoin( 

420 Reference, 

421 and_( 

422 Reference.host_request_id == HostRequest.conversation_id, 

423 # if no reference is found in this join, then the host has not written a ref 

424 Reference.from_user_id == HostRequest.host_user_id, 

425 ), 

426 ) 

427 .where(user.is_visible) 

428 .where(other_user.is_visible) 

429 .where(Reference.id == None) 

430 .where(HostRequest.can_write_reference) 

431 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

432 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

433 .where(HostRequest.host_reason_didnt_meetup == None) 

434 ) 

435 

436 union = union_all(q1, q2).subquery() 

437 union = select( 

438 union.c[0].label("surfed"), 

439 aliased(HostRequest, union), 

440 aliased(user, union), 

441 aliased(other_user, union), 

442 ) 

443 reference_reminders = session.execute(union).all() 

444 

445 for surfed, host_request, user, other_user in reference_reminders: 

446 # checked in sql 

447 assert user.is_visible 

448 if not are_blocked(session, user.id, other_user.id): 

449 context = SimpleNamespace(user_id=user.id) 

450 notify( 

451 session, 

452 user_id=user.id, 

453 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

454 data=notification_data_pb2.ReferenceReminder( 

455 host_request_id=host_request.conversation_id, 

456 other_user=user_model_to_pb(other_user, session, context), 

457 days_left=reminder_days_left, 

458 ), 

459 ) 

460 if surfed: 

461 host_request.surfer_sent_reference_reminders = reminder_number 

462 else: 

463 host_request.host_sent_reference_reminders = reminder_number 

464 session.commit() 

465 

466 

467send_reference_reminders.PAYLOAD = empty_pb2.Empty 

468send_reference_reminders.SCHEDULE = timedelta(hours=1) 

469 

470 

471def add_users_to_email_list(payload): 

472 if not config["LISTMONK_ENABLED"]: 

473 logger.info("Not adding users to mailing list") 

474 return 

475 

476 logger.info("Adding users to mailing list") 

477 

478 while True: 

479 with session_scope() as session: 

480 user = session.execute( 

481 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

482 ).scalar_one_or_none() 

483 if not user: 

484 logger.info("Finished adding users to mailing list") 

485 return 

486 

487 if user.opt_out_of_newsletter: 

488 user.in_sync_with_newsletter = True 

489 session.commit() 

490 continue 

491 

492 r = requests.post( 

493 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

494 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

495 json={ 

496 "email": user.email, 

497 "name": user.name, 

498 "lists": [config["LISTMONK_LIST_ID"]], 

499 "preconfirm_subscriptions": True, 

500 "attribs": {"couchers_user_id": user.id}, 

501 "status": "enabled", 

502 }, 

503 timeout=10, 

504 ) 

505 # the API returns if the user is already subscribed 

506 if r.status_code == 200 or r.status_code == 409: 

507 user.in_sync_with_newsletter = True 

508 session.commit() 

509 else: 

510 raise Exception("Failed to add users to mailing list") 

511 

512 

513add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

514add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

515 

516 

517def enforce_community_membership(payload): 

518 tasks_enforce_community_memberships() 

519 

520 

521enforce_community_membership.PAYLOAD = empty_pb2.Empty 

522enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

523 

524 

525def update_recommendation_scores(payload): 

526 text_fields = [ 

527 User.hometown, 

528 User.occupation, 

529 User.education, 

530 User.about_me, 

531 User.things_i_like, 

532 User.about_place, 

533 User.additional_information, 

534 User.pet_details, 

535 User.kid_details, 

536 User.housemate_details, 

537 User.other_host_info, 

538 User.sleeping_details, 

539 User.area, 

540 User.house_rules, 

541 ] 

542 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

543 

544 def poor_man_gaussian(): 

545 """ 

546 Produces an approximatley std normal random variate 

547 """ 

548 trials = 5 

549 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

550 

551 def int_(stmt): 

552 return func.coalesce(cast(stmt, Integer), 0) 

553 

554 def float_(stmt): 

555 return func.coalesce(cast(stmt, Float), 0.0) 

556 

557 with session_scope() as session: 

558 # profile 

559 profile_text = "" 

560 for field in text_fields: 

561 profile_text += func.coalesce(field, "") 

562 text_length = func.length(profile_text) 

563 home_text = "" 

564 for field in home_fields: 

565 home_text += func.coalesce(field, "") 

566 home_length = func.length(home_text) 

567 

568 has_text = int_(text_length > 500) 

569 long_text = int_(text_length > 2000) 

570 has_pic = int_(User.avatar_key != None) 

571 can_host = int_(User.hosting_status == HostingStatus.can_host) 

572 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

573 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

574 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

575 

576 # references 

577 left_ref_expr = int_(1).label("left_reference") 

578 left_refs_subquery = ( 

579 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

580 ) 

581 left_reference = int_(left_refs_subquery.c.left_reference) 

582 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

583 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

584 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

585 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

586 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

587 "has_bad_ref" 

588 ) 

589 received_ref_subquery = ( 

590 select( 

591 Reference.to_user_id.label("user_id"), 

592 has_reference_expr, 

593 has_multiple_types_expr, 

594 has_bad_ref_expr, 

595 ref_count_expr, 

596 ref_avg_expr, 

597 ) 

598 .group_by(Reference.to_user_id) 

599 .subquery() 

600 ) 

601 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

602 has_reference = int_(received_ref_subquery.c.has_reference) 

603 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

604 rating_score = float_( 

605 received_ref_subquery.c.ref_avg 

606 * ( 

607 2 * func.least(received_ref_subquery.c.ref_count, 5) 

608 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

609 ) 

610 ) 

611 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

612 

613 # activeness 

614 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

615 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

616 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

617 messaged_lots = int_(func.count(Message.id) > 5) 

618 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

619 messaging_subquery = ( 

620 select(Message.author_id.label("user_id"), messaging_points_subquery) 

621 .where(Message.message_type == MessageType.text) 

622 .group_by(Message.author_id) 

623 .subquery() 

624 ) 

625 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

626 

627 # verification 

628 cb_subquery = ( 

629 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

630 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

631 .where(ClusterSubscription.role == ClusterRole.admin) 

632 .where(Cluster.is_official_cluster) 

633 .group_by(ClusterSubscription.user_id) 

634 .subquery() 

635 ) 

636 min_node_id = cb_subquery.c.min_node_id 

637 cb = int_(min_node_id >= 1) 

638 wcb = int_(min_node_id == 1) 

639 badge_points = { 

640 "founder": 100, 

641 "board_member": 20, 

642 "past_board_member": 5, 

643 "strong_verification": 3, 

644 "volunteer": 3, 

645 "past_volunteer": 2, 

646 "donor": 1, 

647 "phone_verified": 1, 

648 } 

649 

650 badge_subquery = ( 

651 select( 

652 UserBadge.user_id.label("user_id"), 

653 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

654 ) 

655 .group_by(UserBadge.user_id) 

656 .subquery() 

657 ) 

658 

659 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

660 

661 # response rate 

662 t = ( 

663 select(Message.conversation_id, Message.time) 

664 .where(Message.message_type == MessageType.chat_created) 

665 .subquery() 

666 ) 

667 s = ( 

668 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

669 .group_by(Message.conversation_id, Message.author_id) 

670 .subquery() 

671 ) 

672 hr_subquery = ( 

673 select( 

674 HostRequest.host_user_id.label("user_id"), 

675 func.avg(s.c.time - t.c.time).label("avg_response_time"), 

676 func.count(t.c.time).label("received"), 

677 func.count(s.c.time).label("responded"), 

678 float_( 

679 extract( 

680 "epoch", 

681 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

682 ) 

683 / 60.0 

684 ).label("response_time_33p"), 

685 float_( 

686 extract( 

687 "epoch", 

688 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

689 ) 

690 / 60.0 

691 ).label("response_time_66p"), 

692 ) 

693 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

694 .outerjoin( 

695 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id) 

696 ) 

697 .group_by(HostRequest.host_user_id) 

698 .subquery() 

699 ) 

700 avg_response_time = hr_subquery.c.avg_response_time 

701 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0) 

702 received = hr_subquery.c.received 

703 responded = hr_subquery.c.responded 

704 response_time_33p = hr_subquery.c.response_time_33p 

705 response_time_66p = hr_subquery.c.response_time_66p 

706 response_rate = float_(responded / (1.0 * func.greatest(received, 1))) 

707 # be careful with nulls 

708 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0) 

709 

710 recommendation_score = ( 

711 profile_points 

712 + ref_score 

713 + activeness_points 

714 + other_points 

715 + response_rate_points 

716 + 2 * poor_man_gaussian() 

717 ) 

718 

719 scores = ( 

720 select(User.id.label("user_id"), recommendation_score.label("score")) 

721 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

722 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

723 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

724 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

725 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

726 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

727 ).subquery() 

728 

729 session.execute( 

730 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

731 ) 

732 

733 logger.info("Updated recommendation scores") 

734 

735 

736update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

737update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

738 

739 

740def update_badges(payload): 

741 with session_scope() as session: 

742 

743 def update_badge(badge_id: str, members: list[int]): 

744 badge = get_badge_dict()[badge_id] 

745 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

746 # in case the user ids don't exist in the db 

747 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

748 # we should add the badge to these 

749 add = set(actual_members) - set(user_ids) 

750 # we should remove the badge from these 

751 remove = set(user_ids) - set(actual_members) 

752 for user_id in add: 

753 user_add_badge(session, user_id, badge_id) 

754 

755 for user_id in remove: 

756 user_remove_badge(session, user_id, badge_id) 

757 

758 update_badge("founder", get_static_badge_dict()["founder"]) 

759 update_badge("board_member", get_static_badge_dict()["board_member"]) 

760 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

761 update_badge( 

762 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

763 ) 

764 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

765 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

766 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

767 update_badge( 

768 "strong_verification", 

769 session.execute( 

770 select(User.id) 

771 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

772 .where(StrongVerificationAttempt.has_strong_verification(User)) 

773 ) 

774 .scalars() 

775 .all(), 

776 ) 

777 

778 

779update_badges.PAYLOAD = empty_pb2.Empty 

780update_badges.SCHEDULE = timedelta(minutes=15) 

781 

782 

783def finalize_strong_verification(payload): 

784 with session_scope() as session: 

785 verification_attempt = session.execute( 

786 select(StrongVerificationAttempt) 

787 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

788 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

789 ).scalar_one() 

790 response = requests.post( 

791 "https://passportreader.app/api/v1/session.get", 

792 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

793 json={"id": verification_attempt.iris_session_id}, 

794 timeout=10, 

795 ) 

796 if response.status_code != 200: 

797 raise Exception(f"Iris didn't return 200: {response.text}") 

798 json_data = response.json() 

799 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

800 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

801 ) 

802 assert verification_attempt.user_id == reference_payload.user_id 

803 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

804 assert verification_attempt.iris_session_id == json_data["id"] 

805 assert json_data["state"] == "APPROVED" 

806 

807 if json_data["document_type"] != "PASSPORT": 

808 verification_attempt.status = StrongVerificationAttemptStatus.failed 

809 notify( 

810 session, 

811 user_id=verification_attempt.user_id, 

812 topic_action="verification:sv_fail", 

813 data=notification_data_pb2.VerificationSVFail( 

814 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

815 ), 

816 ) 

817 return 

818 

819 assert json_data["document_type"] == "PASSPORT" 

820 

821 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

822 nationality = json_data["nationality"] 

823 last_three_document_chars = json_data["document_number"][-3:] 

824 

825 existing_attempt = session.execute( 

826 select(StrongVerificationAttempt) 

827 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

828 .where(StrongVerificationAttempt.passport_nationality == nationality) 

829 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

830 .order_by(StrongVerificationAttempt.id) 

831 .limit(1) 

832 ).scalar_one_or_none() 

833 

834 verification_attempt.has_minimal_data = True 

835 verification_attempt.passport_expiry_date = expiry_date 

836 verification_attempt.passport_nationality = nationality 

837 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

838 

839 if existing_attempt: 

840 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

841 

842 if existing_attempt.user_id != verification_attempt.user_id: 

843 session.flush() 

844 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

845 

846 notify( 

847 session, 

848 user_id=verification_attempt.user_id, 

849 topic_action="verification:sv_fail", 

850 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

851 ) 

852 return 

853 

854 verification_attempt.has_full_data = True 

855 verification_attempt.passport_encrypted_data = asym_encrypt( 

856 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

857 ) 

858 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

859 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

860 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

861 

862 session.flush() 

863 

864 strong_verification_completions_counter.inc() 

865 

866 user = verification_attempt.user 

867 if verification_attempt.has_strong_verification(user): 

868 badge_id = "strong_verification" 

869 if session.execute( 

870 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

871 ).scalar_one_or_none(): 

872 return 

873 

874 user_add_badge(session, user.id, badge_id, do_notify=False) 

875 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success") 

876 else: 

877 notify( 

878 session, 

879 user_id=verification_attempt.user_id, 

880 topic_action="verification:sv_fail", 

881 data=notification_data_pb2.VerificationSVFail( 

882 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

883 ), 

884 ) 

885 

886 

887finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload