Coverage for src/couchers/jobs/handlers.py: 98%

315 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-11 15:27 +0000

1""" 

2Background job servicers 

3""" 

4 

5import logging 

6from datetime import date, timedelta 

7from math import sqrt 

8from types import SimpleNamespace 

9 

10import requests 

11from google.protobuf import empty_pb2 

12from sqlalchemy import Integer 

13from sqlalchemy.orm import aliased 

14from sqlalchemy.sql import and_, case, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all 

15from sqlalchemy.sql.functions import percentile_disc 

16 

17from couchers.config import config 

18from couchers.crypto import asym_encrypt, b64decode, simple_decrypt 

19from couchers.db import session_scope 

20from couchers.email.dev import print_dev_email 

21from couchers.email.smtp import send_smtp_email 

22from couchers.helpers.badges import user_add_badge, user_remove_badge 

23from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid 

24from couchers.metrics import strong_verification_completions_counter 

25from couchers.models import ( 

26 AccountDeletionToken, 

27 Cluster, 

28 ClusterRole, 

29 ClusterSubscription, 

30 Float, 

31 GroupChat, 

32 GroupChatSubscription, 

33 HostingStatus, 

34 HostRequest, 

35 Invoice, 

36 LoginToken, 

37 Message, 

38 MessageType, 

39 PassportSex, 

40 PasswordResetToken, 

41 Reference, 

42 StrongVerificationAttempt, 

43 StrongVerificationAttemptStatus, 

44 User, 

45 UserBadge, 

46) 

47from couchers.notifications.background import handle_email_digests, handle_notification, send_raw_push_notification 

48from couchers.notifications.notify import notify 

49from couchers.resources import get_badge_dict, get_static_badge_dict 

50from couchers.servicers.api import user_model_to_pb 

51from couchers.servicers.blocking import are_blocked 

52from couchers.servicers.conversations import generate_message_notifications 

53from couchers.servicers.discussions import generate_create_discussion_notifications 

54from couchers.servicers.events import ( 

55 generate_event_cancel_notifications, 

56 generate_event_create_notifications, 

57 generate_event_delete_notifications, 

58 generate_event_update_notifications, 

59) 

60from couchers.servicers.requests import host_request_to_pb 

61from couchers.servicers.threads import generate_reply_notifications 

62from couchers.sql import couchers_select as select 

63from couchers.tasks import enforce_community_memberships as tasks_enforce_community_memberships 

64from couchers.tasks import send_duplicate_strong_verification_email 

65from couchers.utils import now 

66from proto import notification_data_pb2 

67from proto.internal import jobs_pb2, verification_pb2 

68 

69logger = logging.getLogger(__name__) 

70 

71# these were straight up imported 

72handle_notification.PAYLOAD = jobs_pb2.HandleNotificationPayload 

73 

74send_raw_push_notification.PAYLOAD = jobs_pb2.SendRawPushNotificationPayload 

75 

76handle_email_digests.PAYLOAD = empty_pb2.Empty 

77handle_email_digests.SCHEDULE = timedelta(minutes=15) 

78 

79generate_message_notifications.PAYLOAD = jobs_pb2.GenerateMessageNotificationsPayload 

80 

81generate_reply_notifications.PAYLOAD = jobs_pb2.GenerateReplyNotificationsPayload 

82 

83generate_create_discussion_notifications.PAYLOAD = jobs_pb2.GenerateCreateDiscussionNotificationsPayload 

84 

85generate_event_create_notifications.PAYLOAD = jobs_pb2.GenerateEventCreateNotificationsPayload 

86 

87generate_event_update_notifications.PAYLOAD = jobs_pb2.GenerateEventUpdateNotificationsPayload 

88 

89generate_event_cancel_notifications.PAYLOAD = jobs_pb2.GenerateEventCancelNotificationsPayload 

90 

91generate_event_delete_notifications.PAYLOAD = jobs_pb2.GenerateEventDeleteNotificationsPayload 

92 

93 

94refresh_materialized_views.PAYLOAD = empty_pb2.Empty 

95refresh_materialized_views.SCHEDULE = timedelta(minutes=5) 

96 

97refresh_materialized_views_rapid.PAYLOAD = empty_pb2.Empty 

98refresh_materialized_views_rapid.SCHEDULE = timedelta(seconds=30) 

99 

100 

101def send_email(payload): 

102 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

103 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

104 sender = send_smtp_email if config["ENABLE_EMAIL"] else print_dev_email 

105 # the sender must return a models.Email object that can be added to the database 

106 email = sender( 

107 sender_name=payload.sender_name, 

108 sender_email=payload.sender_email, 

109 recipient=payload.recipient, 

110 subject=payload.subject, 

111 plain=payload.plain, 

112 html=payload.html, 

113 list_unsubscribe_header=payload.list_unsubscribe_header, 

114 source_data=payload.source_data, 

115 ) 

116 with session_scope() as session: 

117 session.add(email) 

118 

119 

120send_email.PAYLOAD = jobs_pb2.SendEmailPayload 

121 

122 

123def purge_login_tokens(payload): 

124 logger.info("Purging login tokens") 

125 with session_scope() as session: 

126 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

127 

128 

129purge_login_tokens.PAYLOAD = empty_pb2.Empty 

130purge_login_tokens.SCHEDULE = timedelta(hours=24) 

131 

132 

133def purge_password_reset_tokens(payload): 

134 logger.info("Purging login tokens") 

135 with session_scope() as session: 

136 session.execute( 

137 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

138 ) 

139 

140 

141purge_password_reset_tokens.PAYLOAD = empty_pb2.Empty 

142purge_password_reset_tokens.SCHEDULE = timedelta(hours=24) 

143 

144 

145def purge_account_deletion_tokens(payload): 

146 logger.info("Purging account deletion tokens") 

147 with session_scope() as session: 

148 session.execute( 

149 delete(AccountDeletionToken) 

150 .where(~AccountDeletionToken.is_valid) 

151 .execution_options(synchronize_session=False) 

152 ) 

153 

154 

155purge_account_deletion_tokens.PAYLOAD = empty_pb2.Empty 

156purge_account_deletion_tokens.SCHEDULE = timedelta(hours=24) 

157 

158 

159def send_message_notifications(payload): 

160 """ 

161 Sends out email notifications for messages that have been unseen for a long enough time 

162 """ 

163 # very crude and dumb algorithm 

164 logger.info("Sending out email notifications for unseen messages") 

165 

166 with session_scope() as session: 

167 # users who have unnotified messages older than 5 minutes in any group chat 

168 users = ( 

169 session.execute( 

170 select(User) 

171 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

172 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

173 .where(not_(GroupChatSubscription.is_muted)) 

174 .where(User.is_visible) 

175 .where(Message.time >= GroupChatSubscription.joined) 

176 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

177 .where(Message.id > User.last_notified_message_id) 

178 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

179 .where(Message.time < now() - timedelta(minutes=5)) 

180 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

181 ) 

182 .scalars() 

183 .unique() 

184 ) 

185 

186 for user in users: 

187 # now actually grab all the group chats, not just less than 5 min old 

188 subquery = ( 

189 select( 

190 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

191 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

192 func.max(Message.id).label("message_id"), 

193 func.count(Message.id).label("count_unseen"), 

194 ) 

195 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

196 .where(GroupChatSubscription.user_id == user.id) 

197 .where(not_(GroupChatSubscription.is_muted)) 

198 .where(Message.id > user.last_notified_message_id) 

199 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

200 .where(Message.time >= GroupChatSubscription.joined) 

201 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

202 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

203 .group_by(GroupChatSubscription.group_chat_id) 

204 .order_by(func.max(Message.id).desc()) 

205 .subquery() 

206 ) 

207 

208 unseen_messages = session.execute( 

209 select(GroupChat, Message, subquery.c.count_unseen) 

210 .join(subquery, subquery.c.message_id == Message.id) 

211 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

212 .order_by(subquery.c.message_id.desc()) 

213 ).all() 

214 

215 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

216 

217 def format_title(message, group_chat, count_unseen): 

218 if group_chat.is_dm: 

219 return f"You missed {count_unseen} message(s) from {message.author.name}" 

220 else: 

221 return f"You missed {count_unseen} message(s) in {group_chat.title}" 

222 

223 notify( 

224 session, 

225 user_id=user.id, 

226 topic_action="chat:missed_messages", 

227 data=notification_data_pb2.ChatMissedMessages( 

228 messages=[ 

229 notification_data_pb2.ChatMessage( 

230 author=user_model_to_pb( 

231 message.author, 

232 session, 

233 SimpleNamespace(user_id=user.id), 

234 ), 

235 message=format_title(message, group_chat, count_unseen), 

236 text=message.text, 

237 group_chat_id=message.conversation_id, 

238 ) 

239 for group_chat, message, count_unseen in unseen_messages 

240 ], 

241 ), 

242 ) 

243 session.commit() 

244 

245 

246send_message_notifications.PAYLOAD = empty_pb2.Empty 

247send_message_notifications.SCHEDULE = timedelta(minutes=3) 

248 

249 

250def send_request_notifications(payload): 

251 """ 

252 Sends out email notifications for unseen messages in host requests (as surfer or host) 

253 """ 

254 logger.info("Sending out email notifications for unseen messages in host requests") 

255 

256 with session_scope() as session: 

257 # requests where this user is surfing 

258 surfing_reqs = session.execute( 

259 select(User, HostRequest, func.max(Message.id)) 

260 .where(User.is_visible) 

261 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

262 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

263 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

264 .where(Message.id > User.last_notified_request_message_id) 

265 .where(Message.time < now() - timedelta(minutes=5)) 

266 .where(Message.message_type == MessageType.text) 

267 .group_by(User, HostRequest) 

268 ).all() 

269 

270 # where this user is hosting 

271 hosting_reqs = session.execute( 

272 select(User, HostRequest, func.max(Message.id)) 

273 .where(User.is_visible) 

274 .join(HostRequest, HostRequest.host_user_id == User.id) 

275 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

276 .where(Message.id > HostRequest.host_last_seen_message_id) 

277 .where(Message.id > User.last_notified_request_message_id) 

278 .where(Message.time < now() - timedelta(minutes=5)) 

279 .where(Message.message_type == MessageType.text) 

280 .group_by(User, HostRequest) 

281 ).all() 

282 

283 for user, host_request, max_message_id in surfing_reqs: 

284 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

285 session.flush() 

286 

287 context = SimpleNamespace(user_id=user.id) 

288 notify( 

289 session, 

290 user_id=user.id, 

291 topic_action="host_request:missed_messages", 

292 key=host_request.conversation_id, 

293 data=notification_data_pb2.HostRequestMissedMessages( 

294 host_request=host_request_to_pb(host_request, session, context), 

295 user=user_model_to_pb(host_request.host, session, context), 

296 am_host=False, 

297 ), 

298 ) 

299 

300 for user, host_request, max_message_id in hosting_reqs: 

301 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

302 session.flush() 

303 

304 context = SimpleNamespace(user_id=user.id) 

305 notify( 

306 session, 

307 user_id=user.id, 

308 topic_action="host_request:missed_messages", 

309 key=host_request.conversation_id, 

310 data=notification_data_pb2.HostRequestMissedMessages( 

311 host_request=host_request_to_pb(host_request, session, context), 

312 user=user_model_to_pb(host_request.surfer, session, context), 

313 am_host=True, 

314 ), 

315 ) 

316 

317 

318send_request_notifications.PAYLOAD = empty_pb2.Empty 

319send_request_notifications.SCHEDULE = timedelta(minutes=3) 

320 

321 

322def send_onboarding_emails(payload): 

323 """ 

324 Sends out onboarding emails 

325 """ 

326 logger.info("Sending out onboarding emails") 

327 

328 with session_scope() as session: 

329 # first onboarding email 

330 users = ( 

331 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

332 ) 

333 

334 for user in users: 

335 notify( 

336 session, 

337 user_id=user.id, 

338 topic_action="onboarding:reminder", 

339 key="1", 

340 ) 

341 user.onboarding_emails_sent = 1 

342 user.last_onboarding_email_sent = now() 

343 session.commit() 

344 

345 # second onboarding email 

346 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

347 users = ( 

348 session.execute( 

349 select(User) 

350 .where(User.is_visible) 

351 .where(User.onboarding_emails_sent == 1) 

352 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

353 .where(User.has_completed_profile == False) 

354 ) 

355 .scalars() 

356 .all() 

357 ) 

358 

359 for user in users: 

360 notify( 

361 session, 

362 user_id=user.id, 

363 topic_action="onboarding:reminder", 

364 key="2", 

365 ) 

366 user.onboarding_emails_sent = 2 

367 user.last_onboarding_email_sent = now() 

368 session.commit() 

369 

370 

371send_onboarding_emails.PAYLOAD = empty_pb2.Empty 

372send_onboarding_emails.SCHEDULE = timedelta(hours=1) 

373 

374 

375def send_reference_reminders(payload): 

376 """ 

377 Sends out reminders to write references after hosting/staying 

378 """ 

379 logger.info("Sending out reference reminder emails") 

380 

381 # Keep this in chronological order! 

382 reference_reminder_schedule = [ 

383 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

384 # the end time to write a reference is supposed to be midnight in the host's timezone 

385 # 8 pm ish on the last day of the stay 

386 (1, timedelta(days=15) - timedelta(hours=20), 14), 

387 # 2 pm ish a week after stay 

388 (2, timedelta(days=8) - timedelta(hours=14), 7), 

389 # 10 am ish 3 days before end of time to write ref 

390 (3, timedelta(days=4) - timedelta(hours=10), 3), 

391 ] 

392 

393 with session_scope() as session: 

394 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

395 for reminder_number, reminder_time, reminder_days_left in reversed(reference_reminder_schedule): 

396 user = aliased(User) 

397 other_user = aliased(User) 

398 # surfers needing to write a ref 

399 q1 = ( 

400 select(literal(True), HostRequest, user, other_user) 

401 .join(user, user.id == HostRequest.surfer_user_id) 

402 .join(other_user, other_user.id == HostRequest.host_user_id) 

403 .outerjoin( 

404 Reference, 

405 and_( 

406 Reference.host_request_id == HostRequest.conversation_id, 

407 # if no reference is found in this join, then the surfer has not written a ref 

408 Reference.from_user_id == HostRequest.surfer_user_id, 

409 ), 

410 ) 

411 .where(user.is_visible) 

412 .where(other_user.is_visible) 

413 .where(Reference.id == None) 

414 .where(HostRequest.can_write_reference) 

415 .where(HostRequest.surfer_sent_reference_reminders < reminder_number) 

416 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

417 .where(HostRequest.surfer_reason_didnt_meetup == None) 

418 ) 

419 

420 # hosts needing to write a ref 

421 q2 = ( 

422 select(literal(False), HostRequest, user, other_user) 

423 .join(user, user.id == HostRequest.host_user_id) 

424 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

425 .outerjoin( 

426 Reference, 

427 and_( 

428 Reference.host_request_id == HostRequest.conversation_id, 

429 # if no reference is found in this join, then the host has not written a ref 

430 Reference.from_user_id == HostRequest.host_user_id, 

431 ), 

432 ) 

433 .where(user.is_visible) 

434 .where(other_user.is_visible) 

435 .where(Reference.id == None) 

436 .where(HostRequest.can_write_reference) 

437 .where(HostRequest.host_sent_reference_reminders < reminder_number) 

438 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

439 .where(HostRequest.host_reason_didnt_meetup == None) 

440 ) 

441 

442 union = union_all(q1, q2).subquery() 

443 union = select( 

444 union.c[0].label("surfed"), 

445 aliased(HostRequest, union), 

446 aliased(user, union), 

447 aliased(other_user, union), 

448 ) 

449 reference_reminders = session.execute(union).all() 

450 

451 for surfed, host_request, user, other_user in reference_reminders: 

452 # checked in sql 

453 assert user.is_visible 

454 if not are_blocked(session, user.id, other_user.id): 

455 context = SimpleNamespace(user_id=user.id) 

456 notify( 

457 session, 

458 user_id=user.id, 

459 topic_action="reference:reminder_surfed" if surfed else "reference:reminder_hosted", 

460 data=notification_data_pb2.ReferenceReminder( 

461 host_request_id=host_request.conversation_id, 

462 other_user=user_model_to_pb(other_user, session, context), 

463 days_left=reminder_days_left, 

464 ), 

465 ) 

466 if surfed: 

467 host_request.surfer_sent_reference_reminders = reminder_number 

468 else: 

469 host_request.host_sent_reference_reminders = reminder_number 

470 session.commit() 

471 

472 

473send_reference_reminders.PAYLOAD = empty_pb2.Empty 

474send_reference_reminders.SCHEDULE = timedelta(hours=1) 

475 

476 

477def add_users_to_email_list(payload): 

478 if not config["LISTMONK_ENABLED"]: 

479 logger.info("Not adding users to mailing list") 

480 return 

481 

482 logger.info("Adding users to mailing list") 

483 

484 while True: 

485 with session_scope() as session: 

486 user = session.execute( 

487 select(User).where(User.is_visible).where(User.in_sync_with_newsletter == False).limit(1) 

488 ).scalar_one_or_none() 

489 if not user: 

490 logger.info("Finished adding users to mailing list") 

491 return 

492 

493 if user.opt_out_of_newsletter: 

494 user.in_sync_with_newsletter = True 

495 session.commit() 

496 continue 

497 

498 r = requests.post( 

499 config["LISTMONK_BASE_URL"] + "/api/subscribers", 

500 auth=(config["LISTMONK_API_USERNAME"], config["LISTMONK_API_KEY"]), 

501 json={ 

502 "email": user.email, 

503 "name": user.name, 

504 "lists": [config["LISTMONK_LIST_ID"]], 

505 "preconfirm_subscriptions": True, 

506 "attribs": {"couchers_user_id": user.id}, 

507 "status": "enabled", 

508 }, 

509 timeout=10, 

510 ) 

511 # the API returns if the user is already subscribed 

512 if r.status_code == 200 or r.status_code == 409: 

513 user.in_sync_with_newsletter = True 

514 session.commit() 

515 else: 

516 raise Exception("Failed to add users to mailing list") 

517 

518 

519add_users_to_email_list.PAYLOAD = empty_pb2.Empty 

520add_users_to_email_list.SCHEDULE = timedelta(hours=1) 

521 

522 

523def enforce_community_membership(payload): 

524 tasks_enforce_community_memberships() 

525 

526 

527enforce_community_membership.PAYLOAD = empty_pb2.Empty 

528enforce_community_membership.SCHEDULE = timedelta(minutes=15) 

529 

530 

531def update_recommendation_scores(payload): 

532 text_fields = [ 

533 User.hometown, 

534 User.occupation, 

535 User.education, 

536 User.about_me, 

537 User.things_i_like, 

538 User.about_place, 

539 User.additional_information, 

540 User.pet_details, 

541 User.kid_details, 

542 User.housemate_details, 

543 User.other_host_info, 

544 User.sleeping_details, 

545 User.area, 

546 User.house_rules, 

547 ] 

548 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

549 

550 def poor_man_gaussian(): 

551 """ 

552 Produces an approximatley std normal random variate 

553 """ 

554 trials = 5 

555 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

556 

557 def int_(stmt): 

558 return func.coalesce(cast(stmt, Integer), 0) 

559 

560 def float_(stmt): 

561 return func.coalesce(cast(stmt, Float), 0.0) 

562 

563 with session_scope() as session: 

564 # profile 

565 profile_text = "" 

566 for field in text_fields: 

567 profile_text += func.coalesce(field, "") 

568 text_length = func.length(profile_text) 

569 home_text = "" 

570 for field in home_fields: 

571 home_text += func.coalesce(field, "") 

572 home_length = func.length(home_text) 

573 

574 has_text = int_(text_length > 500) 

575 long_text = int_(text_length > 2000) 

576 has_pic = int_(User.avatar_key != None) 

577 can_host = int_(User.hosting_status == HostingStatus.can_host) 

578 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

579 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

580 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

581 

582 # references 

583 left_ref_expr = int_(1).label("left_reference") 

584 left_refs_subquery = ( 

585 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

586 ) 

587 left_reference = int_(left_refs_subquery.c.left_reference) 

588 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

589 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

590 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

591 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

592 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

593 "has_bad_ref" 

594 ) 

595 received_ref_subquery = ( 

596 select( 

597 Reference.to_user_id.label("user_id"), 

598 has_reference_expr, 

599 has_multiple_types_expr, 

600 has_bad_ref_expr, 

601 ref_count_expr, 

602 ref_avg_expr, 

603 ) 

604 .group_by(Reference.to_user_id) 

605 .subquery() 

606 ) 

607 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

608 has_reference = int_(received_ref_subquery.c.has_reference) 

609 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

610 rating_score = float_( 

611 received_ref_subquery.c.ref_avg 

612 * ( 

613 2 * func.least(received_ref_subquery.c.ref_count, 5) 

614 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

615 ) 

616 ) 

617 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

618 

619 # activeness 

620 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

621 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

622 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

623 messaged_lots = int_(func.count(Message.id) > 5) 

624 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

625 messaging_subquery = ( 

626 select(Message.author_id.label("user_id"), messaging_points_subquery) 

627 .where(Message.message_type == MessageType.text) 

628 .group_by(Message.author_id) 

629 .subquery() 

630 ) 

631 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

632 

633 # verification 

634 cb_subquery = ( 

635 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

636 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

637 .where(ClusterSubscription.role == ClusterRole.admin) 

638 .where(Cluster.is_official_cluster) 

639 .group_by(ClusterSubscription.user_id) 

640 .subquery() 

641 ) 

642 min_node_id = cb_subquery.c.min_node_id 

643 cb = int_(min_node_id >= 1) 

644 wcb = int_(min_node_id == 1) 

645 badge_points = { 

646 "founder": 100, 

647 "board_member": 20, 

648 "past_board_member": 5, 

649 "strong_verification": 3, 

650 "volunteer": 3, 

651 "past_volunteer": 2, 

652 "donor": 1, 

653 "phone_verified": 1, 

654 } 

655 

656 badge_subquery = ( 

657 select( 

658 UserBadge.user_id.label("user_id"), 

659 func.sum(case(badge_points, value=UserBadge.badge_id, else_=0)).label("badge_points"), 

660 ) 

661 .group_by(UserBadge.user_id) 

662 .subquery() 

663 ) 

664 

665 other_points = 0.0 + 10 * wcb + 5 * cb + int_(badge_subquery.c.badge_points) 

666 

667 # response rate 

668 t = ( 

669 select(Message.conversation_id, Message.time) 

670 .where(Message.message_type == MessageType.chat_created) 

671 .subquery() 

672 ) 

673 s = ( 

674 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

675 .group_by(Message.conversation_id, Message.author_id) 

676 .subquery() 

677 ) 

678 hr_subquery = ( 

679 select( 

680 HostRequest.host_user_id.label("user_id"), 

681 func.avg(s.c.time - t.c.time).label("avg_response_time"), 

682 func.count(t.c.time).label("received"), 

683 func.count(s.c.time).label("responded"), 

684 float_( 

685 extract( 

686 "epoch", 

687 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

688 ) 

689 / 60.0 

690 ).label("response_time_33p"), 

691 float_( 

692 extract( 

693 "epoch", 

694 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

695 ) 

696 / 60.0 

697 ).label("response_time_66p"), 

698 ) 

699 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

700 .outerjoin( 

701 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id) 

702 ) 

703 .group_by(HostRequest.host_user_id) 

704 .subquery() 

705 ) 

706 avg_response_time = hr_subquery.c.avg_response_time 

707 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0) 

708 received = hr_subquery.c.received 

709 responded = hr_subquery.c.responded 

710 response_time_33p = hr_subquery.c.response_time_33p 

711 response_time_66p = hr_subquery.c.response_time_66p 

712 response_rate = float_(responded / (1.0 * func.greatest(received, 1))) 

713 # be careful with nulls 

714 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0) 

715 

716 recommendation_score = ( 

717 profile_points 

718 + ref_score 

719 + activeness_points 

720 + other_points 

721 + response_rate_points 

722 + 2 * poor_man_gaussian() 

723 ) 

724 

725 scores = ( 

726 select(User.id.label("user_id"), recommendation_score.label("score")) 

727 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

728 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

729 .outerjoin(badge_subquery, badge_subquery.c.user_id == User.id) 

730 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

731 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

732 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

733 ).subquery() 

734 

735 session.execute( 

736 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

737 ) 

738 

739 logger.info("Updated recommendation scores") 

740 

741 

742update_recommendation_scores.PAYLOAD = empty_pb2.Empty 

743update_recommendation_scores.SCHEDULE = timedelta(hours=24) 

744 

745 

746def update_badges(payload): 

747 with session_scope() as session: 

748 

749 def update_badge(badge_id: str, members: list[int]): 

750 badge = get_badge_dict()[badge_id] 

751 user_ids = session.execute(select(UserBadge.user_id).where(UserBadge.badge_id == badge_id)).scalars().all() 

752 # in case the user ids don't exist in the db 

753 actual_members = session.execute(select(User.id).where(User.id.in_(members))).scalars().all() 

754 # we should add the badge to these 

755 add = set(actual_members) - set(user_ids) 

756 # we should remove the badge from these 

757 remove = set(user_ids) - set(actual_members) 

758 for user_id in add: 

759 user_add_badge(session, user_id, badge_id) 

760 

761 for user_id in remove: 

762 user_remove_badge(session, user_id, badge_id) 

763 

764 update_badge("founder", get_static_badge_dict()["founder"]) 

765 update_badge("board_member", get_static_badge_dict()["board_member"]) 

766 update_badge("past_board_member", get_static_badge_dict()["past_board_member"]) 

767 update_badge( 

768 "donor", session.execute(select(User.id).join(Invoice, Invoice.user_id == User.id)).scalars().all() 

769 ) 

770 update_badge("moderator", session.execute(select(User.id).where(User.is_superuser)).scalars().all()) 

771 update_badge("phone_verified", session.execute(select(User.id).where(User.phone_is_verified)).scalars().all()) 

772 # strong verification requires passport on file + gender/sex correspondence and date of birth match 

773 update_badge( 

774 "strong_verification", 

775 session.execute( 

776 select(User.id) 

777 .join(StrongVerificationAttempt, StrongVerificationAttempt.user_id == User.id) 

778 .where(StrongVerificationAttempt.has_strong_verification(User)) 

779 ) 

780 .scalars() 

781 .all(), 

782 ) 

783 

784 

785update_badges.PAYLOAD = empty_pb2.Empty 

786update_badges.SCHEDULE = timedelta(minutes=15) 

787 

788 

789def finalize_strong_verification(payload): 

790 with session_scope() as session: 

791 verification_attempt = session.execute( 

792 select(StrongVerificationAttempt) 

793 .where(StrongVerificationAttempt.id == payload.verification_attempt_id) 

794 .where(StrongVerificationAttempt.status == StrongVerificationAttemptStatus.in_progress_waiting_on_backend) 

795 ).scalar_one() 

796 response = requests.post( 

797 "https://passportreader.app/api/v1/session.get", 

798 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

799 json={"id": verification_attempt.iris_session_id}, 

800 timeout=10, 

801 ) 

802 if response.status_code != 200: 

803 raise Exception(f"Iris didn't return 200: {response.text}") 

804 json_data = response.json() 

805 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

806 simple_decrypt("iris_callback", b64decode(json_data["reference"])) 

807 ) 

808 assert verification_attempt.user_id == reference_payload.user_id 

809 assert verification_attempt.verification_attempt_token == reference_payload.verification_attempt_token 

810 assert verification_attempt.iris_session_id == json_data["id"] 

811 assert json_data["state"] == "APPROVED" 

812 

813 if json_data["document_type"] != "PASSPORT": 

814 verification_attempt.status = StrongVerificationAttemptStatus.failed 

815 notify( 

816 session, 

817 user_id=verification_attempt.user_id, 

818 topic_action="verification:sv_fail", 

819 data=notification_data_pb2.VerificationSVFail( 

820 reason=notification_data_pb2.SV_FAIL_REASON_NOT_A_PASSPORT 

821 ), 

822 ) 

823 return 

824 

825 assert json_data["document_type"] == "PASSPORT" 

826 

827 expiry_date = date.fromisoformat(json_data["expiry_date"]) 

828 nationality = json_data["nationality"] 

829 last_three_document_chars = json_data["document_number"][-3:] 

830 

831 existing_attempt = session.execute( 

832 select(StrongVerificationAttempt) 

833 .where(StrongVerificationAttempt.passport_expiry_date == expiry_date) 

834 .where(StrongVerificationAttempt.passport_nationality == nationality) 

835 .where(StrongVerificationAttempt.passport_last_three_document_chars == last_three_document_chars) 

836 .order_by(StrongVerificationAttempt.id) 

837 .limit(1) 

838 ).scalar_one_or_none() 

839 

840 verification_attempt.has_minimal_data = True 

841 verification_attempt.passport_expiry_date = expiry_date 

842 verification_attempt.passport_nationality = nationality 

843 verification_attempt.passport_last_three_document_chars = last_three_document_chars 

844 

845 if existing_attempt: 

846 verification_attempt.status = StrongVerificationAttemptStatus.duplicate 

847 

848 if existing_attempt.user_id != verification_attempt.user_id: 

849 session.flush() 

850 send_duplicate_strong_verification_email(session, existing_attempt, verification_attempt) 

851 

852 notify( 

853 session, 

854 user_id=verification_attempt.user_id, 

855 topic_action="verification:sv_fail", 

856 data=notification_data_pb2.VerificationSVFail(reason=notification_data_pb2.SV_FAIL_REASON_DUPLICATE), 

857 ) 

858 return 

859 

860 verification_attempt.has_full_data = True 

861 verification_attempt.passport_encrypted_data = asym_encrypt( 

862 config["VERIFICATION_DATA_PUBLIC_KEY"], response.text.encode("utf8") 

863 ) 

864 verification_attempt.passport_date_of_birth = date.fromisoformat(json_data["date_of_birth"]) 

865 verification_attempt.passport_sex = PassportSex[json_data["sex"].lower()] 

866 verification_attempt.status = StrongVerificationAttemptStatus.succeeded 

867 

868 session.flush() 

869 

870 strong_verification_completions_counter.inc() 

871 

872 user = verification_attempt.user 

873 if verification_attempt.has_strong_verification(user): 

874 badge_id = "strong_verification" 

875 if session.execute( 

876 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge_id) 

877 ).scalar_one_or_none(): 

878 return 

879 

880 user_add_badge(session, user.id, badge_id, do_notify=False) 

881 notify(session, user_id=verification_attempt.user_id, topic_action="verification:sv_success") 

882 else: 

883 notify( 

884 session, 

885 user_id=verification_attempt.user_id, 

886 topic_action="verification:sv_fail", 

887 data=notification_data_pb2.VerificationSVFail( 

888 reason=notification_data_pb2.SV_FAIL_REASON_WRONG_BIRTHDATE_OR_GENDER 

889 ), 

890 ) 

891 

892 

893finalize_strong_verification.PAYLOAD = jobs_pb2.FinalizeStrongVerificationPayload