Coverage for src/couchers/jobs/handlers.py: 99%

292 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 17:19 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import sqrt 

8from types import SimpleNamespace 

9from typing import List 

10 

11import requests 

12from google.protobuf import empty_pb2 

13from sqlalchemy import Integer 

14from sqlalchemy.orm import aliased 

15from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all 

16from sqlalchemy.sql.functions import percentile_disc 

17 

18from couchers.config import config 

19from couchers.crypto import asym_encrypt, b64decode, simple_decrypt 

20from couchers.db import session_scope 

21from couchers.email.dev import print_dev_email 

22from couchers.email.smtp import send_smtp_email 

23from couchers.helpers.badges import user_add_badge, user_remove_badge 

24from couchers.materialized_views import refresh_materialized_views as mv_refresh_materialized_views 

25from couchers.models import ( 

26 AccountDeletionToken, 

27 Cluster, 

28 ClusterRole, 

29 ClusterSubscription, 

30 Float, 

31 GroupChat, 

32 GroupChatSubscription, 

33 HostingStatus, 

34 HostRequest, 

35 Invoice, 

36 LoginToken, 

37 Message, 

38 MessageType, 

39 PassportSex, 

40 PasswordResetToken, 

41 Reference, 

42 StrongVerificationAttempt, 

43 StrongVerificationAttemptStatus, 

44 User, 

45 UserBadge, 

46) 

47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

48from couchers.notifications.notify import notify 

49from couchers.resources import get_badge_dict, get_static_badge_dict 

50from couchers.servicers.api import user_model_to_pb 

51from couchers.servicers.blocking import are_blocked 

52from couchers.servicers.conversations import generate_message_notifications 

53from couchers.servicers.events import ( 

54 generate_event_cancel_notifications, 

55 generate_event_create_notifications, 

56 generate_event_delete_notifications, 

57 generate_event_update_notifications, 

58) 

59from couchers.servicers.requests import host_request_to_pb 

60from couchers.sql import couchers_select as select 

61from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

62from couchers.utils import now 

63from proto import notification_data_pb2 

64from proto.internal import jobs_pb2, verification_pb2 

65 

66logger = logging.getLogger(__name__) 

67 

68# these were straight up imported 

69handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

70 

71send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

72 

73handle_email_digests.PAYLOAD = empty_pb2.Empty 

74handle_email_digests.SCHEDULE = timedelta(minutes=15) 

75 

76generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

77 

78generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

79 

80generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

81 

82generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

83 

84generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

85 

86 

87def send_email(payload): 

88 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

89 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

90 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

91 # the sender must return a models.Email object that can be added to the database 

92 email = sender( 

93 sender_name=payload.sender_name, 

94 sender_email=payload.sender_email, 

95 recipient=payload.recipient, 

96 subject=payload.subject, 

97 plain=payload.plain, 

98 html=payload.html, 

99 list_unsubscribe_header=payload.list_unsubscribe_header, 

100 source_data=payload.source_data, 

101 ) 

102 with session_scope() as session: 

103 session.add(email) 

104 

105 

106send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

107 

108 

109def purge_login_tokens(payload): 

110 logger.info("Purging login tokens") 

111 with session_scope() as session: 

112 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

113 

114 

115purge_login_tokens.PAYLOAD = empty_pb2.Empty 

116purge_login_tokens.SCHEDULE = timedelta(hours=24) 

117 

118 

119def purge_password_reset_tokens(payload): 

120 logger.info("Purging login tokens") 

121 with session_scope() as session: 

122 session.execute( 

123 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

124 ) 

125 

126 

127purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

128purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

129 

130 

131def purge_account_deletion_tokens(payload): 

132 logger.info("Purging account deletion tokens") 

133 with session_scope() as session: 

134 session.execute( 

135 delete(AccountDeletionToken) 

136 .where(~AccountDeletionToken.is_valid) 

137 .execution_options(synchronize_session=False) 

138 ) 

139 

140 

141purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

142purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

143 

144 

145def send_message_notifications(payload): 

146 """ 

147 Sends out email notifications for messages that have been unseen for a long enough time 

148 """ 

149 # very crude and dumb algorithm 

150 logger.info("Sending out email notifications for unseen messages") 

151 

152 with session_scope() as session: 

153 # users who have unnotified messages older than 5 minutes in any group chat 

154 users = ( 

155 session.execute( 

156 select(User) 

157 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

158 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

159 .where(not_(GroupChatSubscription.is_muted)) 

160 .where(User.is_visible) 

161 .where(Message.time >= GroupChatSubscription.joined) 

162 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

163 .where(Message.id > User.last_notified_message_id) 

164 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

165 .where(Message.time < now() - timedelta(minutes=5)) 

166 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

167 ) 

168 .scalars() 

169 .unique() 

170 ) 

171 

172 for user in users: 

173 # now actually grab all the group chats, not just less than 5 min old 

174 subquery = ( 

175 select( 

176 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

177 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

178 func.max(Message.id).label("message_id"), 

179 func.count(Message.id).label("count_unseen"), 

180 ) 

181 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

182 .where(GroupChatSubscription.user_id == user.id) 

183 .where(not_(GroupChatSubscription.is_muted)) 

184 .where(Message.id > user.last_notified_message_id) 

185 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

186 .where(Message.time >= GroupChatSubscription.joined) 

187 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

188 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

189 .group_by(GroupChatSubscription.group_chat_id) 

190 .order_by(func.max(Message.id).desc()) 

191 .subquery() 

192 ) 

193 

194 unseen_messages = session.execute( 

195 select(GroupChat, Message, subquery.c.count_unseen) 

196 .join(subquery, subquery.c.message_id == Message.id) 

197 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

198 .order_by(subquery.c.message_id.desc()) 

199 ).all() 

200 

201 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

202 

203 def format_title(message, group_chat, count_unseen): 

204 if group_chat.is_dm: 

205 return f"You missed {count_unseen} message(s) from {message.author.name}" 

206 else: 

207 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

208 

209 notify( 

210 user_id=user.id, 

211 topic_action="chat:missed_messages", 

212 data=notification_data_pb2.ChatMissedMessages( 

213 messages=[ 

214 notification_data_pb2.ChatMessage( 

215 author=user_model_to_pb( 

216 message.author, 

217 session, 

218 SimpleNamespace(user_id=user.id), 

219 ), 

220 message=format_title(message, group_chat, count_unseen), 

221 text=message.text, 

222 group_chat_id=message.conversation_id, 

223 ) 

224 for group_chat, message, count_unseen in unseen_messages 

225 ], 

226 ), 

227 ) 

228 session.commit() 

229 

230 

231send_message_notifications.PAYLOAD = empty_pb2.Empty 

232send_message_notifications.SCHEDULE = timedelta(minutes=3) 

233 

234 

235def send_request_notifications(payload): 

236 """ 

237 Sends out email notifications for unseen messages in host requests (as surfer or host) 

238 """ 

239 logger.info("Sending out email notifications for unseen messages in host requests") 

240 

241 with session_scope() as session: 

242 # requests where this user is surfing 

243 surfing_reqs = session.execute( 

244 select(User, HostRequest, func.max(Message.id)) 

245 .where(User.is_visible) 

246 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

247 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

248 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

249 .where(Message.id > User.last_notified_request_message_id) 

250 .where(Message.time < now() - timedelta(minutes=5)) 

251 .where(Message.message_type == MessageType.text) 

252 .group_by(User, HostRequest) 

253 ).all() 

254 

255 # where this user is hosting 

256 hosting_reqs = session.execute( 

257 select(User, HostRequest, func.max(Message.id)) 

258 .where(User.is_visible) 

259 .join(HostRequest, HostRequest.host_user_id == User.id) 

260 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

261 .where(Message.id > HostRequest.host_last_seen_message_id) 

262 .where(Message.id > User.last_notified_request_message_id) 

263 .where(Message.time < now() - timedelta(minutes=5)) 

264 .where(Message.message_type == MessageType.text) 

265 .group_by(User, HostRequest) 

266 ).all() 

267 

268 for user, host_request, max_message_id in surfing_reqs: 

269 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

270 session.flush() 

271 

272 context = SimpleNamespace(user_id=user.id) 

273 notify( 

274 user_id=user.id, 

275 topic_action="host_request:missed_messages", 

276 key=host_request.conversation_id, 

277 data=notification_data_pb2.HostRequestMissedMessages( 

278 host_request=host_request_to_pb(host_request, session, context), 

279 user=user_model_to_pb(host_request.host, session, context), 

280 am_host=False, 

281 ), 

282 ) 

283 

284 for user, host_request, max_message_id in hosting_reqs: 

285 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

286 session.flush() 

287 

288 context = SimpleNamespace(user_id=user.id) 

289 notify( 

290 user_id=user.id, 

291 topic_action="host_request:missed_messages", 

292 key=host_request.conversation_id, 

293 data=notification_data_pb2.HostRequestMissedMessages( 

294 host_request=host_request_to_pb(host_request, session, context), 

295 user=user_model_to_pb(host_request.surfer, session, context), 

296 am_host=True, 

297 ), 

298 ) 

299 

300 

301send_request_notifications.PAYLOAD = empty_pb2.Empty 

302send_request_notifications.SCHEDULE = timedelta(minutes=3) 

303 

304 

305def send_onboarding_emails(payload): 

306 """ 

307 Sends out onboarding emails 

308 """ 

309 logger.info("Sending out onboarding emails") 

310 

311 with session_scope() as session: 

312 # first onboarding email 

313 users = ( 

314 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

315 ) 

316 

317 for user in users: 

318 notify( 

319 user_id=user.id, 

320 topic_action="onboarding:reminder", 

321 key="1", 

322 ) 

323 user.onboarding_emails_sent = 1 

324 user.last_onboarding_email_sent = now() 

325 session.commit() 

326 

327 # second onboarding email 

328 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

329 users = ( 

330 session.execute( 

331 select(User) 

332 .where(User.is_visible) 

333 .where(User.onboarding_emails_sent == 1) 

334 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

335 .where(User.has_completed_profile == False) 

336 ) 

337 .scalars() 

338 .all() 

339 ) 

340 

341 for user in users: 

342 notify( 

343 user_id=user.id, 

344 topic_action="onboarding:reminder", 

345 key="2", 

346 ) 

347 user.onboarding_emails_sent = 2 

348 user.last_onboarding_email_sent = now() 

349 session.commit() 

350 

351 

352send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

353send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

354 

355 

356def send_reference_reminders(payload): 

357 """ 

358 Sends out reminders to write references after hosting/staying 

359 """ 

360 logger.info("Sending out reference reminder emails") 

361 

362 # Keep this in chronological order! 

363 reference_reminder_schedule = [ 

364 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

365 # the end time to write a reference is supposed to be midnight in the host's timezone 

366 # 8 pm ish on the last day of the stay 

367 (1, timedelta(days=15) - timedelta(hours=20), 14), 

368 # 2 pm ish a week after stay 

369 (2, timedelta(days=8) - timedelta(hours=14), 7), 

370 # 10 am ish 3 days before end of time to write ref 

371 (3, timedelta(days=4) - timedelta(hours=10), 3), 

372 ] 

373 

374 with session_scope() as session: 

375 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

376 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

377 user = aliased(User) 

378 other_user = aliased(User) 

379 # surfers needing to write a ref 

380 q1 = ( 

381 select(literal(True), HostRequest, user, other_user) 

382 .join(user, user.id == HostRequest.surfer_user_id) 

383 .join(other_user, other_user.id == HostRequest.host_user_id) 

384 .outerjoin( 

385 Reference, 

386 and_( 

387 Reference.host_request_id == HostRequest.conversation_id, 

388 # if no reference is found in this join, then the surfer has not written a ref 

389 Reference.from_user_id == HostRequest.surfer_user_id, 

390 ), 

391 ) 

392 .where(user.is_visible) 

393 .where(other_user.is_visible) 

394 .where(Reference.id == None) 

395 .where(HostRequest.can_write_reference) 

396 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

397 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

398 ) 

399 

400 # hosts needing to write a ref 

401 q2 = ( 

402 select(literal(False), HostRequest, user, other_user) 

403 .join(user, user.id == HostRequest.host_user_id) 

404 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

405 .outerjoin( 

406 Reference, 

407 and_( 

408 Reference.host_request_id == HostRequest.conversation_id, 

409 # if no reference is found in this join, then the host has not written a ref 

410 Reference.from_user_id == HostRequest.host_user_id, 

411 ), 

412 ) 

413 .where(user.is_visible) 

414 .where(other_user.is_visible) 

415 .where(Reference.id == None) 

416 .where(HostRequest.can_write_reference) 

417 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

418 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

419 ) 

420 

421 union = union_all(q1, q2).subquery() 

422 union = select( 

423 union.c[0].label("surfed"), 

424 aliased(HostRequest, union), 

425 aliased(user, union), 

426 aliased(other_user, union), 

427 ) 

428 reference_reminders = session.execute(union).all() 

429 

430 for surfed, host_request, user, other_user in reference_reminders: 

431 # checked in sql 

432 assert user.is_visible 

433 if not are_blocked(session, user.id, other_user.id): 

434 context = SimpleNamespace(user_id=user.id) 

435 notify( 

436 user_id=user.id, 

437 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

438 data=notification_data_pb2.ReferenceReminder( 

439 host_request_id=host_request.conversation_id, 

440 other_user=user_model_to_pb(other_user, session, context), 

441 days_left=reminder_days_left, 

442 ), 

443 ) 

444 if surfed: 

445 host_request.surfer_sent_reference_reminders = reminder_number 

446 else: 

447 host_request.host_sent_reference_reminders = reminder_number 

448 session.commit() 

449 

450 

451send_reference_reminders.PAYLOAD = empty_pb2.Empty 

452send_reference_reminders.SCHEDULE = timedelta(hours=1) 

453 

454 

455def add_users_to_email_list(payload): 

456 if not config["LISTMONK_ENABLED"]: 

457 logger.info("Not adding users to mailing list") 

458 return 

459 

460 logger.info("Adding users to mailing list") 

461 

462 while True: 

463 with session_scope() as session: 

464 user = session.execute( 

465 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

466 ).scalar_one_or_none() 

467 if not user: 

468 logger.info("Finished adding users to mailing list") 

469 return 

470 

471 if user.opt_out_of_newsletter: 

472 user.in_sync_with_newsletter = True 

473 session.commit() 

474 continue 

475 

476 r = requests.post( 

477 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

478 auth=("listmonk", config["LISTMONK_API_KEY"]), 

479 json={ 

480 "email": user.email, 

481 "name": user.name, 

482 "list_uuids": [config["LISTMONK_LIST_UUID"]], 

483 "preconfirm_subscriptions": True, 

484 }, 

485 timeout=10, 

486 ) 

487 # the API returns if the user is already subscribed 

488 if r.status_code == 200 or r.status_code == 409: 

489 user.in_sync_with_newsletter = True 

490 session.commit() 

491 else: 

492 raise Exception("Failed to add users to mailing list") 

493 

494 

495add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

496add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

497 

498 

499def enforce_community_membership(payload): 

500 tasks_enforce_community_memberships() 

501 

502 

503enforce_community_membership.PAYLOAD = empty_pb2.Empty 

504enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

505 

506 

507def update_recommendation_scores(payload): 

508 text_fields = [ 

509 User.hometown, 

510 User.occupation, 

511 User.education, 

512 User.about_me, 

513 User.my_travels, 

514 User.things_i_like, 

515 User.about_place, 

516 User.additional_information, 

517 User.pet_details, 

518 User.kid_details, 

519 User.housemate_details, 

520 User.other_host_info, 

521 User.sleeping_details, 

522 User.area, 

523 User.house_rules, 

524 ] 

525 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

526 

527 def poor_man_gaussian(): 

528 """ 

529 Produces an approximatley std normal random variate 

530 """ 

531 trials = 5 

532 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

533 

534 def int_(stmt): 

535 return func.coalesce(cast(stmt, Integer), 0) 

536 

537 def float_(stmt): 

538 return func.coalesce(cast(stmt, Float), 0.0) 

539 

540 with session_scope() as session: 

541 # profile 

542 profile_text = "" 

543 for field in text_fields: 

544 profile_text += func.coalesce(field, "") 

545 text_length = func.length(profile_text) 

546 home_text = "" 

547 for field in home_fields: 

548 home_text += func.coalesce(field, "") 

549 home_length = func.length(home_text) 

550 

551 has_text = int_(text_length > 500) 

552 long_text = int_(text_length > 2000) 

553 has_pic = int_(User.avatar_key != None) 

554 can_host = int_(User.hosting_status == HostingStatus.can_host) 

555 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

556 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

557 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

558 

559 # references 

560 left_ref_expr = int_(1).label("left_reference") 

561 left_refs_subquery = ( 

562 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

563 ) 

564 left_reference = int_(left_refs_subquery.c.left_reference) 

565 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

566 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

567 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

568 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

569 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

570 "has_bad_ref" 

571 ) 

572 received_ref_subquery = ( 

573 select( 

574 Reference.to_user_id.label("user_id"), 

575 has_reference_expr, 

576 has_multiple_types_expr, 

577 has_bad_ref_expr, 

578 ref_count_expr, 

579 ref_avg_expr, 

580 ) 

581 .group_by(Reference.to_user_id) 

582 .subquery() 

583 ) 

584 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

585 has_reference = int_(received_ref_subquery.c.has_reference) 

586 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

587 rating_score = float_( 

588 received_ref_subquery.c.ref_avg 

589 * ( 

590 2 * func.least(received_ref_subquery.c.ref_count, 5) 

591 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

592 ) 

593 ) 

594 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

595 

596 # activeness 

597 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

598 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

599 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

600 messaged_lots = int_(func.count(Message.id) > 5) 

601 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

602 messaging_subquery = ( 

603 select(Message.author_id.label("user_id"), messaging_points_subquery) 

604 .where(Message.message_type == MessageType.text) 

605 .group_by(Message.author_id) 

606 .subquery() 

607 ) 

608 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

609 

610 # verification 

611 cb_subquery = ( 

612 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

613 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

614 .where(ClusterSubscription.role == ClusterRole.admin) 

615 .where(Cluster.is_official_cluster) 

616 .group_by(ClusterSubscription.user_id) 

617 .subquery() 

618 ) 

619 min_node_id = cb_subquery.c.min_node_id 

620 cb = int_(min_node_id >= 1) 

621 wcb = int_(min_node_id == 1) 

622 badge_points = { 

623 "founder": 100, 

624 "board_member": 20, 

625 "past_board_member": 5, 

626 "strong_verification": 3, 

627 "volunteer": 3, 

628 "past_volunteer": 2, 

629 "donor": 1, 

630 "phone_verified": 1, 

631 } 

632 

633 badge_subquery = ( 

634 select( 

635 UserBadge.user_id.label("user_id"), 

636 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

637 ) 

638 .group_by(UserBadge.user_id) 

639 .subquery() 

640 ) 

641 

642 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

643 

644 # response rate 

645 t = ( 

646 select(Message.conversation_id, Message.time) 

647 .where(Message.message_type == MessageType.chat_created) 

648 .subquery() 

649 ) 

650 s = ( 

651 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

652 .group_by(Message.conversation_id, Message.author_id) 

653 .subquery() 

654 ) 

655 hr_subquery = ( 

656 select( 

657 HostRequest.host_user_id.label("user_id"), 

658 func.avg(s.c.time - t.c.time).label("avg_response_time"), 

659 func.count(t.c.time).label("received"), 

660 func.count(s.c.time).label("responded"), 

661 float_( 

662 extract( 

663 "epoch", 

664 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

665 ) 

666 / 60.0 

667 ).label("response_time_33p"), 

668 float_( 

669 extract( 

670 "epoch", 

671 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

672 ) 

673 / 60.0 

674 ).label("response_time_66p"), 

675 ) 

676 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

677 .outerjoin( 

678 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id) 

679 ) 

680 .group_by(HostRequest.host_user_id) 

681 .subquery() 

682 ) 

683 avg_response_time = hr_subquery.c.avg_response_time 

684 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0) 

685 received = hr_subquery.c.received 

686 responded = hr_subquery.c.responded 

687 response_time_33p = hr_subquery.c.response_time_33p 

688 response_time_66p = hr_subquery.c.response_time_66p 

689 response_rate = float_(responded / (1.0 * func.greatest(received, 1))) 

690 # be careful with nulls 

691 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0) 

692 

693 recommendation_score = ( 

694 profile_points 

695 + ref_score 

696 + activeness_points 

697 + other_points 

698 + response_rate_points 

699 + 2 * poor_man_gaussian() 

700 ) 

701 

702 scores = ( 

703 select(User.id.label("user_id"), recommendation_score.label("score")) 

704 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

705 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

706 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

707 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

708 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

709 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

710 ).subquery() 

711 

712 session.execute( 

713 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

714 ) 

715 

716 logger.info("Updated recommendation scores") 

717 

718 

719update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

720update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

721 

722 

723def refresh_materialized_views(payload): 

724 mv_refresh_materialized_views() 

725 

726 

727refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

728refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

729 

730 

731def update_badges(payload): 

732 with session_scope() as session: 

733 

734 def update_badge(badge_id: str, members: List[int]): 

735 badge = get_badge_dict()[badge_id] 

736 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

737 # in case the user ids don't exist in the db 

738 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

739 # we should add the badge to these 

740 add = set(actual_members) - set(user_ids) 

741 # we should remove the badge from these 

742 remove = set(user_ids) - set(actual_members) 

743 for user_id in add: 

744 user_add_badge(session, user_id, badge_id) 

745 

746 for user_id in remove: 

747 user_remove_badge(session, user_id, badge_id) 

748 

749 update_badge("founder", get_static_badge_dict()["founder"]) 

750 update_badge("board_member", get_static_badge_dict()["board_member"]) 

751 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

752 update_badge( 

753 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

754 ) 

755 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

756 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

757 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

758 update_badge( 

759 "strong_verification", 

760 session.execute( 

761 select(User.id) 

762 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

763 .where(StrongVerificationAttempt.has_strong_verification(User)) 

764 ) 

765 .scalars() 

766 .all(), 

767 ) 

768 

769 

770update_badges.PAYLOAD = empty_pb2.Empty 

771update_badges.SCHEDULE = timedelta(minutes=15) 

772 

773 

774def finalize_strong_verification(payload): 

775 with session_scope() as session: 

776 verification_attempt = session.execute( 

777 select(StrongVerificationAttempt) 

778 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

779 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

780 ).scalar_one() 

781 response = requests.post( 

782 "https://passportreader.app/api/v1/session.get", 

783 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

784 json={"id": verification_attempt.iris_session_id}, 

785 timeout=10, 

786 ) 

787 if response.status_code != 200: 

788 raise Exception(f"Iris didn't return 200: {response.text}") 

789 json_data = response.json() 

790 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

791 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

792 ) 

793 assert verification_attempt.user_id == reference_payload.user_id 

794 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

795 assert verification_attempt.iris_session_id == json_data["id"] 

796 assert json_data["state"] == "APPROVED" 

797 assert json_data["document_type"] == "PASSPORT" 

798 verification_attempt.has_full_data = True 

799 verification_attempt.passport_encrypted_data = asym_encrypt( 

800 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

801 ) 

802 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

803 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

804 verification_attempt.has_minimal_data = True 

805 verification_attempt.passport_expiry_date = date.fromisoformat(json_data["expiry_date"]) 

806 verification_attempt.passport_nationality = json_data["nationality"] 

807 verification_attempt.passport_last_three_document_chars = json_data["document_number"][-3:] 

808 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

809 

810 session.flush() 

811 

812 user = verification_attempt.user 

813 if verification_attempt.has_strong_verification(user): 

814 badge_id = "strong_verification" 

815 if session.execute( 

816 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

817 ).scalar_one_or_none(): 

818 return 

819 

820 user_add_badge(session, user.id, badge_id) 

821 

822 

823finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload