Coverage for src/couchers/jobs/handlers.py: 99%

292 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-10-15 13:03 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import sqrt 

8from types import SimpleNamespace 

9from typing import List 

10 

11import requests 

12from google.protobuf import empty_pb2 

13from sqlalchemy import Integer 

14from sqlalchemy.orm import aliased 

15from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all 

16from sqlalchemy.sql.functions import percentile_disc 

17 

18from couchers.config import config 

19from couchers.crypto import asym_encrypt, b64decode, simple_decrypt 

20from couchers.db import session_scope 

21from couchers.email.dev import print_dev_email 

22from couchers.email.smtp import send_smtp_email 

23from couchers.helpers.badges import user_add_badge, user_remove_badge 

24from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid 

25from couchers.models import ( 

26 AccountDeletionToken, 

27 Cluster, 

28 ClusterRole, 

29 ClusterSubscription, 

30 Float, 

31 GroupChat, 

32 GroupChatSubscription, 

33 HostingStatus, 

34 HostRequest, 

35 Invoice, 

36 LoginToken, 

37 Message, 

38 MessageType, 

39 PassportSex, 

40 PasswordResetToken, 

41 Reference, 

42 StrongVerificationAttempt, 

43 StrongVerificationAttemptStatus, 

44 User, 

45 UserBadge, 

46) 

47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

48from couchers.notifications.notify import notify 

49from couchers.resources import get_badge_dict, get_static_badge_dict 

50from couchers.servicers.api import user_model_to_pb 

51from couchers.servicers.blocking import are_blocked 

52from couchers.servicers.conversations import generate_message_notifications 

53from couchers.servicers.events import ( 

54 generate_event_cancel_notifications, 

55 generate_event_create_notifications, 

56 generate_event_delete_notifications, 

57 generate_event_update_notifications, 

58) 

59from couchers.servicers.requests import host_request_to_pb 

60from couchers.sql import couchers_select as select 

61from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

62from couchers.utils import now 

63from proto import notification_data_pb2 

64from proto.internal import jobs_pb2, verification_pb2 

65 

66logger = logging.getLogger(__name__) 

67 

68# these were straight up imported 

69handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

70 

71send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

72 

73handle_email_digests.PAYLOAD = empty_pb2.Empty 

74handle_email_digests.SCHEDULE = timedelta(minutes=15) 

75 

76generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

77 

78generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

79 

80generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

81 

82generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

83 

84generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

85 

86 

87refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

88refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

89 

90refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

91refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

92 

93 

94def send_email(payload): 

95 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

96 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

97 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

98 # the sender must return a models.Email object that can be added to the database 

99 email = sender( 

100 sender_name=payload.sender_name, 

101 sender_email=payload.sender_email, 

102 recipient=payload.recipient, 

103 subject=payload.subject, 

104 plain=payload.plain, 

105 html=payload.html, 

106 list_unsubscribe_header=payload.list_unsubscribe_header, 

107 source_data=payload.source_data, 

108 ) 

109 with session_scope() as session: 

110 session.add(email) 

111 

112 

113send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

114 

115 

116def purge_login_tokens(payload): 

117 logger.info("Purging login tokens") 

118 with session_scope() as session: 

119 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

120 

121 

122purge_login_tokens.PAYLOAD = empty_pb2.Empty 

123purge_login_tokens.SCHEDULE = timedelta(hours=24) 

124 

125 

126def purge_password_reset_tokens(payload): 

127 logger.info("Purging login tokens") 

128 with session_scope() as session: 

129 session.execute( 

130 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

131 ) 

132 

133 

134purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

135purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

136 

137 

138def purge_account_deletion_tokens(payload): 

139 logger.info("Purging account deletion tokens") 

140 with session_scope() as session: 

141 session.execute( 

142 delete(AccountDeletionToken) 

143 .where(~AccountDeletionToken.is_valid) 

144 .execution_options(synchronize_session=False) 

145 ) 

146 

147 

148purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

149purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

150 

151 

152def send_message_notifications(payload): 

153 """ 

154 Sends out email notifications for messages that have been unseen for a long enough time 

155 """ 

156 # very crude and dumb algorithm 

157 logger.info("Sending out email notifications for unseen messages") 

158 

159 with session_scope() as session: 

160 # users who have unnotified messages older than 5 minutes in any group chat 

161 users = ( 

162 session.execute( 

163 select(User) 

164 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

165 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

166 .where(not_(GroupChatSubscription.is_muted)) 

167 .where(User.is_visible) 

168 .where(Message.time >= GroupChatSubscription.joined) 

169 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

170 .where(Message.id > User.last_notified_message_id) 

171 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

172 .where(Message.time < now() - timedelta(minutes=5)) 

173 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

174 ) 

175 .scalars() 

176 .unique() 

177 ) 

178 

179 for user in users: 

180 # now actually grab all the group chats, not just less than 5 min old 

181 subquery = ( 

182 select( 

183 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

184 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

185 func.max(Message.id).label("message_id"), 

186 func.count(Message.id).label("count_unseen"), 

187 ) 

188 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

189 .where(GroupChatSubscription.user_id == user.id) 

190 .where(not_(GroupChatSubscription.is_muted)) 

191 .where(Message.id > user.last_notified_message_id) 

192 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

193 .where(Message.time >= GroupChatSubscription.joined) 

194 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

195 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

196 .group_by(GroupChatSubscription.group_chat_id) 

197 .order_by(func.max(Message.id).desc()) 

198 .subquery() 

199 ) 

200 

201 unseen_messages = session.execute( 

202 select(GroupChat, Message, subquery.c.count_unseen) 

203 .join(subquery, subquery.c.message_id == Message.id) 

204 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

205 .order_by(subquery.c.message_id.desc()) 

206 ).all() 

207 

208 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

209 

210 def format_title(message, group_chat, count_unseen): 

211 if group_chat.is_dm: 

212 return f"You missed {count_unseen} message(s) from {message.author.name}" 

213 else: 

214 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

215 

216 notify( 

217 session, 

218 user_id=user.id, 

219 topic_action="chat:missed_messages", 

220 data=notification_data_pb2.ChatMissedMessages( 

221 messages=[ 

222 notification_data_pb2.ChatMessage( 

223 author=user_model_to_pb( 

224 message.author, 

225 session, 

226 SimpleNamespace(user_id=user.id), 

227 ), 

228 message=format_title(message, group_chat, count_unseen), 

229 text=message.text, 

230 group_chat_id=message.conversation_id, 

231 ) 

232 for group_chat, message, count_unseen in unseen_messages 

233 ], 

234 ), 

235 ) 

236 session.commit() 

237 

238 

239send_message_notifications.PAYLOAD = empty_pb2.Empty 

240send_message_notifications.SCHEDULE = timedelta(minutes=3) 

241 

242 

243def send_request_notifications(payload): 

244 """ 

245 Sends out email notifications for unseen messages in host requests (as surfer or host) 

246 """ 

247 logger.info("Sending out email notifications for unseen messages in host requests") 

248 

249 with session_scope() as session: 

250 # requests where this user is surfing 

251 surfing_reqs = session.execute( 

252 select(User, HostRequest, func.max(Message.id)) 

253 .where(User.is_visible) 

254 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

255 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

256 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

257 .where(Message.id > User.last_notified_request_message_id) 

258 .where(Message.time < now() - timedelta(minutes=5)) 

259 .where(Message.message_type == MessageType.text) 

260 .group_by(User, HostRequest) 

261 ).all() 

262 

263 # where this user is hosting 

264 hosting_reqs = session.execute( 

265 select(User, HostRequest, func.max(Message.id)) 

266 .where(User.is_visible) 

267 .join(HostRequest, HostRequest.host_user_id == User.id) 

268 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

269 .where(Message.id > HostRequest.host_last_seen_message_id) 

270 .where(Message.id > User.last_notified_request_message_id) 

271 .where(Message.time < now() - timedelta(minutes=5)) 

272 .where(Message.message_type == MessageType.text) 

273 .group_by(User, HostRequest) 

274 ).all() 

275 

276 for user, host_request, max_message_id in surfing_reqs: 

277 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

278 session.flush() 

279 

280 context = SimpleNamespace(user_id=user.id) 

281 notify( 

282 session, 

283 user_id=user.id, 

284 topic_action="host_request:missed_messages", 

285 key=host_request.conversation_id, 

286 data=notification_data_pb2.HostRequestMissedMessages( 

287 host_request=host_request_to_pb(host_request, session, context), 

288 user=user_model_to_pb(host_request.host, session, context), 

289 am_host=False, 

290 ), 

291 ) 

292 

293 for user, host_request, max_message_id in hosting_reqs: 

294 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

295 session.flush() 

296 

297 context = SimpleNamespace(user_id=user.id) 

298 notify( 

299 session, 

300 user_id=user.id, 

301 topic_action="host_request:missed_messages", 

302 key=host_request.conversation_id, 

303 data=notification_data_pb2.HostRequestMissedMessages( 

304 host_request=host_request_to_pb(host_request, session, context), 

305 user=user_model_to_pb(host_request.surfer, session, context), 

306 am_host=True, 

307 ), 

308 ) 

309 

310 

311send_request_notifications.PAYLOAD = empty_pb2.Empty 

312send_request_notifications.SCHEDULE = timedelta(minutes=3) 

313 

314 

315def send_onboarding_emails(payload): 

316 """ 

317 Sends out onboarding emails 

318 """ 

319 logger.info("Sending out onboarding emails") 

320 

321 with session_scope() as session: 

322 # first onboarding email 

323 users = ( 

324 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

325 ) 

326 

327 for user in users: 

328 notify( 

329 session, 

330 user_id=user.id, 

331 topic_action="onboarding:reminder", 

332 key="1", 

333 ) 

334 user.onboarding_emails_sent = 1 

335 user.last_onboarding_email_sent = now() 

336 session.commit() 

337 

338 # second onboarding email 

339 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

340 users = ( 

341 session.execute( 

342 select(User) 

343 .where(User.is_visible) 

344 .where(User.onboarding_emails_sent == 1) 

345 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

346 .where(User.has_completed_profile == False) 

347 ) 

348 .scalars() 

349 .all() 

350 ) 

351 

352 for user in users: 

353 notify( 

354 session, 

355 user_id=user.id, 

356 topic_action="onboarding:reminder", 

357 key="2", 

358 ) 

359 user.onboarding_emails_sent = 2 

360 user.last_onboarding_email_sent = now() 

361 session.commit() 

362 

363 

364send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

365send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

366 

367 

368def send_reference_reminders(payload): 

369 """ 

370 Sends out reminders to write references after hosting/staying 

371 """ 

372 logger.info("Sending out reference reminder emails") 

373 

374 # Keep this in chronological order! 

375 reference_reminder_schedule = [ 

376 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

377 # the end time to write a reference is supposed to be midnight in the host's timezone 

378 # 8 pm ish on the last day of the stay 

379 (1, timedelta(days=15) - timedelta(hours=20), 14), 

380 # 2 pm ish a week after stay 

381 (2, timedelta(days=8) - timedelta(hours=14), 7), 

382 # 10 am ish 3 days before end of time to write ref 

383 (3, timedelta(days=4) - timedelta(hours=10), 3), 

384 ] 

385 

386 with session_scope() as session: 

387 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

388 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

389 user = aliased(User) 

390 other_user = aliased(User) 

391 # surfers needing to write a ref 

392 q1 = ( 

393 select(literal(True), HostRequest, user, other_user) 

394 .join(user, user.id == HostRequest.surfer_user_id) 

395 .join(other_user, other_user.id == HostRequest.host_user_id) 

396 .outerjoin( 

397 Reference, 

398 and_( 

399 Reference.host_request_id == HostRequest.conversation_id, 

400 # if no reference is found in this join, then the surfer has not written a ref 

401 Reference.from_user_id == HostRequest.surfer_user_id, 

402 ), 

403 ) 

404 .where(user.is_visible) 

405 .where(other_user.is_visible) 

406 .where(Reference.id == None) 

407 .where(HostRequest.can_write_reference) 

408 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

409 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

410 ) 

411 

412 # hosts needing to write a ref 

413 q2 = ( 

414 select(literal(False), HostRequest, user, other_user) 

415 .join(user, user.id == HostRequest.host_user_id) 

416 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

417 .outerjoin( 

418 Reference, 

419 and_( 

420 Reference.host_request_id == HostRequest.conversation_id, 

421 # if no reference is found in this join, then the host has not written a ref 

422 Reference.from_user_id == HostRequest.host_user_id, 

423 ), 

424 ) 

425 .where(user.is_visible) 

426 .where(other_user.is_visible) 

427 .where(Reference.id == None) 

428 .where(HostRequest.can_write_reference) 

429 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

430 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

431 ) 

432 

433 union = union_all(q1, q2).subquery() 

434 union = select( 

435 union.c[0].label("surfed"), 

436 aliased(HostRequest, union), 

437 aliased(user, union), 

438 aliased(other_user, union), 

439 ) 

440 reference_reminders = session.execute(union).all() 

441 

442 for surfed, host_request, user, other_user in reference_reminders: 

443 # checked in sql 

444 assert user.is_visible 

445 if not are_blocked(session, user.id, other_user.id): 

446 context = SimpleNamespace(user_id=user.id) 

447 notify( 

448 session, 

449 user_id=user.id, 

450 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

451 data=notification_data_pb2.ReferenceReminder( 

452 host_request_id=host_request.conversation_id, 

453 other_user=user_model_to_pb(other_user, session, context), 

454 days_left=reminder_days_left, 

455 ), 

456 ) 

457 if surfed: 

458 host_request.surfer_sent_reference_reminders = reminder_number 

459 else: 

460 host_request.host_sent_reference_reminders = reminder_number 

461 session.commit() 

462 

463 

464send_reference_reminders.PAYLOAD = empty_pb2.Empty 

465send_reference_reminders.SCHEDULE = timedelta(hours=1) 

466 

467 

468def add_users_to_email_list(payload): 

469 if not config["LISTMONK_ENABLED"]: 

470 logger.info("Not adding users to mailing list") 

471 return 

472 

473 logger.info("Adding users to mailing list") 

474 

475 while True: 

476 with session_scope() as session: 

477 user = session.execute( 

478 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

479 ).scalar_one_or_none() 

480 if not user: 

481 logger.info("Finished adding users to mailing list") 

482 return 

483 

484 if user.opt_out_of_newsletter: 

485 user.in_sync_with_newsletter = True 

486 session.commit() 

487 continue 

488 

489 r = requests.post( 

490 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

491 auth=("listmonk", config["LISTMONK_API_KEY"]), 

492 json={ 

493 "email": user.email, 

494 "name": user.name, 

495 "list_uuids": [config["LISTMONK_LIST_UUID"]], 

496 "preconfirm_subscriptions": True, 

497 "attribs": {"couchers_user_id": user.id}, 

498 }, 

499 timeout=10, 

500 ) 

501 # the API returns if the user is already subscribed 

502 if r.status_code == 200 or r.status_code == 409: 

503 user.in_sync_with_newsletter = True 

504 session.commit() 

505 else: 

506 raise Exception("Failed to add users to mailing list") 

507 

508 

509add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

510add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

511 

512 

513def enforce_community_membership(payload): 

514 tasks_enforce_community_memberships() 

515 

516 

517enforce_community_membership.PAYLOAD = empty_pb2.Empty 

518enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

519 

520 

521def update_recommendation_scores(payload): 

522 text_fields = [ 

523 User.hometown, 

524 User.occupation, 

525 User.education, 

526 User.about_me, 

527 User.my_travels, 

528 User.things_i_like, 

529 User.about_place, 

530 User.additional_information, 

531 User.pet_details, 

532 User.kid_details, 

533 User.housemate_details, 

534 User.other_host_info, 

535 User.sleeping_details, 

536 User.area, 

537 User.house_rules, 

538 ] 

539 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

540 

541 def poor_man_gaussian(): 

542 """ 

543 Produces an approximatley std normal random variate 

544 """ 

545 trials = 5 

546 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

547 

548 def int_(stmt): 

549 return func.coalesce(cast(stmt, Integer), 0) 

550 

551 def float_(stmt): 

552 return func.coalesce(cast(stmt, Float), 0.0) 

553 

554 with session_scope() as session: 

555 # profile 

556 profile_text = "" 

557 for field in text_fields: 

558 profile_text += func.coalesce(field, "") 

559 text_length = func.length(profile_text) 

560 home_text = "" 

561 for field in home_fields: 

562 home_text += func.coalesce(field, "") 

563 home_length = func.length(home_text) 

564 

565 has_text = int_(text_length > 500) 

566 long_text = int_(text_length > 2000) 

567 has_pic = int_(User.avatar_key != None) 

568 can_host = int_(User.hosting_status == HostingStatus.can_host) 

569 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

570 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

571 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

572 

573 # references 

574 left_ref_expr = int_(1).label("left_reference") 

575 left_refs_subquery = ( 

576 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

577 ) 

578 left_reference = int_(left_refs_subquery.c.left_reference) 

579 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

580 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

581 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

582 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

583 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

584 "has_bad_ref" 

585 ) 

586 received_ref_subquery = ( 

587 select( 

588 Reference.to_user_id.label("user_id"), 

589 has_reference_expr, 

590 has_multiple_types_expr, 

591 has_bad_ref_expr, 

592 ref_count_expr, 

593 ref_avg_expr, 

594 ) 

595 .group_by(Reference.to_user_id) 

596 .subquery() 

597 ) 

598 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

599 has_reference = int_(received_ref_subquery.c.has_reference) 

600 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

601 rating_score = float_( 

602 received_ref_subquery.c.ref_avg 

603 * ( 

604 2 * func.least(received_ref_subquery.c.ref_count, 5) 

605 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

606 ) 

607 ) 

608 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

609 

610 # activeness 

611 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

612 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

613 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

614 messaged_lots = int_(func.count(Message.id) > 5) 

615 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

616 messaging_subquery = ( 

617 select(Message.author_id.label("user_id"), messaging_points_subquery) 

618 .where(Message.message_type == MessageType.text) 

619 .group_by(Message.author_id) 

620 .subquery() 

621 ) 

622 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

623 

624 # verification 

625 cb_subquery = ( 

626 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

627 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

628 .where(ClusterSubscription.role == ClusterRole.admin) 

629 .where(Cluster.is_official_cluster) 

630 .group_by(ClusterSubscription.user_id) 

631 .subquery() 

632 ) 

633 min_node_id = cb_subquery.c.min_node_id 

634 cb = int_(min_node_id >= 1) 

635 wcb = int_(min_node_id == 1) 

636 badge_points = { 

637 "founder": 100, 

638 "board_member": 20, 

639 "past_board_member": 5, 

640 "strong_verification": 3, 

641 "volunteer": 3, 

642 "past_volunteer": 2, 

643 "donor": 1, 

644 "phone_verified": 1, 

645 } 

646 

647 badge_subquery = ( 

648 select( 

649 UserBadge.user_id.label("user_id"), 

650 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

651 ) 

652 .group_by(UserBadge.user_id) 

653 .subquery() 

654 ) 

655 

656 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

657 

658 # response rate 

659 t = ( 

660 select(Message.conversation_id, Message.time) 

661 .where(Message.message_type == MessageType.chat_created) 

662 .subquery() 

663 ) 

664 s = ( 

665 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

666 .group_by(Message.conversation_id, Message.author_id) 

667 .subquery() 

668 ) 

669 hr_subquery = ( 

670 select( 

671 HostRequest.host_user_id.label("user_id"), 

672 func.avg(s.c.time - t.c.time).label("avg_response_time"), 

673 func.count(t.c.time).label("received"), 

674 func.count(s.c.time).label("responded"), 

675 float_( 

676 extract( 

677 "epoch", 

678 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

679 ) 

680 / 60.0 

681 ).label("response_time_33p"), 

682 float_( 

683 extract( 

684 "epoch", 

685 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

686 ) 

687 / 60.0 

688 ).label("response_time_66p"), 

689 ) 

690 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

691 .outerjoin( 

692 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id) 

693 ) 

694 .group_by(HostRequest.host_user_id) 

695 .subquery() 

696 ) 

697 avg_response_time = hr_subquery.c.avg_response_time 

698 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0) 

699 received = hr_subquery.c.received 

700 responded = hr_subquery.c.responded 

701 response_time_33p = hr_subquery.c.response_time_33p 

702 response_time_66p = hr_subquery.c.response_time_66p 

703 response_rate = float_(responded / (1.0 * func.greatest(received, 1))) 

704 # be careful with nulls 

705 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0) 

706 

707 recommendation_score = ( 

708 profile_points 

709 + ref_score 

710 + activeness_points 

711 + other_points 

712 + response_rate_points 

713 + 2 * poor_man_gaussian() 

714 ) 

715 

716 scores = ( 

717 select(User.id.label("user_id"), recommendation_score.label("score")) 

718 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

719 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

720 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

721 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

722 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

723 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

724 ).subquery() 

725 

726 session.execute( 

727 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

728 ) 

729 

730 logger.info("Updated recommendation scores") 

731 

732 

733update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

734update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

735 

736 

737def update_badges(payload): 

738 with session_scope() as session: 

739 

740 def update_badge(badge_id: str, members: List[int]): 

741 badge = get_badge_dict()[badge_id] 

742 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

743 # in case the user ids don't exist in the db 

744 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

745 # we should add the badge to these 

746 add = set(actual_members) - set(user_ids) 

747 # we should remove the badge from these 

748 remove = set(user_ids) - set(actual_members) 

749 for user_id in add: 

750 user_add_badge(session, user_id, badge_id) 

751 

752 for user_id in remove: 

753 user_remove_badge(session, user_id, badge_id) 

754 

755 update_badge("founder", get_static_badge_dict()["founder"]) 

756 update_badge("board_member", get_static_badge_dict()["board_member"]) 

757 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

758 update_badge( 

759 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

760 ) 

761 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

762 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

763 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

764 update_badge( 

765 "strong_verification", 

766 session.execute( 

767 select(User.id) 

768 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

769 .where(StrongVerificationAttempt.has_strong_verification(User)) 

770 ) 

771 .scalars() 

772 .all(), 

773 ) 

774 

775 

776update_badges.PAYLOAD = empty_pb2.Empty 

777update_badges.SCHEDULE = timedelta(minutes=15) 

778 

779 

780def finalize_strong_verification(payload): 

781 with session_scope() as session: 

782 verification_attempt = session.execute( 

783 select(StrongVerificationAttempt) 

784 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

785 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

786 ).scalar_one() 

787 response = requests.post( 

788 "https://passportreader.app/api/v1/session.get", 

789 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

790 json={"id": verification_attempt.iris_session_id}, 

791 timeout=10, 

792 ) 

793 if response.status_code != 200: 

794 raise Exception(f"Iris didn't return 200: {response.text}") 

795 json_data = response.json() 

796 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

797 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

798 ) 

799 assert verification_attempt.user_id == reference_payload.user_id 

800 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

801 assert verification_attempt.iris_session_id == json_data["id"] 

802 assert json_data["state"] == "APPROVED" 

803 assert json_data["document_type"] == "PASSPORT" 

804 verification_attempt.has_full_data = True 

805 verification_attempt.passport_encrypted_data = asym_encrypt( 

806 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

807 ) 

808 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

809 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

810 verification_attempt.has_minimal_data = True 

811 verification_attempt.passport_expiry_date = date.fromisoformat(json_data["expiry_date"]) 

812 verification_attempt.passport_nationality = json_data["nationality"] 

813 verification_attempt.passport_last_three_document_chars = json_data["document_number"][-3:] 

814 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

815 

816 session.flush() 

817 

818 user = verification_attempt.user 

819 if verification_attempt.has_strong_verification(user): 

820 badge_id = "strong_verification" 

821 if session.execute( 

822 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

823 ).scalar_one_or_none(): 

824 return 

825 

826 user_add_badge(session, user.id, badge_id) 

827 

828 

829finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload