Coverage for src/couchers/jobs/handlers.py: 92%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

208 statements  

1""" 

2Background job servicers 

3""" 

4 

5 

6import logging 

7from datetime import timedelta 

8from math import sqrt 

9 

10import requests 

11from sqlalchemy import Integer 

12from sqlalchemy.orm import aliased 

13from sqlalchemy.sql import and_, cast, delete, distinct, extract, func, literal, not_, or_, select, union_all 

14from sqlalchemy.sql.functions import percentile_disc 

15 

16from couchers import config, email, urls 

17from couchers.db import session_scope 

18from couchers.email.dev import print_dev_email 

19from couchers.email.smtp import send_smtp_email 

20from couchers.materialized_views import refresh_materialized_views 

21from couchers.models import ( 

22 AccountDeletionToken, 

23 Cluster, 

24 ClusterRole, 

25 ClusterSubscription, 

26 Float, 

27 GroupChat, 

28 GroupChatSubscription, 

29 HostingStatus, 

30 HostRequest, 

31 LoginToken, 

32 Message, 

33 MessageType, 

34 PasswordResetToken, 

35 Reference, 

36 User, 

37) 

38from couchers.notifications.background import handle_email_digests, handle_email_notifications, handle_notification 

39from couchers.notifications.notify import notify 

40from couchers.servicers.blocking import are_blocked 

41from couchers.sql import couchers_select as select 

42from couchers.tasks import enforce_community_memberships, send_onboarding_email, send_reference_reminder_email 

43from couchers.utils import now 

44 

45logger = logging.getLogger(__name__) 

46 

47 

48def process_send_email(payload): 

49 logger.info(f"Sending email with subject '{payload.subject}' to '{payload.recipient}'") 

50 # selects a "sender", which either prints the email to the logger or sends it out with SMTP 

51 sender = send_smtp_email if config.config["ENABLE_EMAIL"] else print_dev_email 

52 # the sender must return a models.Email object that can be added to the database 

53 email = sender( 

54 sender_name=payload.sender_name, 

55 sender_email=payload.sender_email, 

56 recipient=payload.recipient, 

57 subject=payload.subject, 

58 plain=payload.plain, 

59 html=payload.html, 

60 ) 

61 with session_scope() as session: 

62 session.add(email) 

63 

64 

65def process_purge_login_tokens(payload): 

66 logger.info(f"Purging login tokens") 

67 with session_scope() as session: 

68 session.execute(delete(LoginToken).where(~LoginToken.is_valid).execution_options(synchronize_session=False)) 

69 

70 

71def process_purge_password_reset_tokens(payload): 

72 logger.info(f"Purging login tokens") 

73 with session_scope() as session: 

74 session.execute( 

75 delete(PasswordResetToken).where(~PasswordResetToken.is_valid).execution_options(synchronize_session=False) 

76 ) 

77 

78 

79def process_purge_account_deletion_tokens(payload): 

80 logger.info(f"Purging account deletion tokens") 

81 with session_scope() as session: 

82 session.execute( 

83 delete(AccountDeletionToken) 

84 .where(~AccountDeletionToken.is_valid) 

85 .execution_options(synchronize_session=False) 

86 ) 

87 

88 

89def process_generate_message_notifications(payload): 

90 """ 

91 Generates notifications for a message sent to a group chat 

92 """ 

93 logger.info(f"Sending out notifications for message_id = {payload.message_id}") 

94 

95 with session_scope() as session: 

96 message, group_chat = session.execute( 

97 select(Message, GroupChat) 

98 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

99 .where(Message.id == payload.message_id) 

100 ).one() 

101 

102 if message.message_type != MessageType.text: 

103 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

104 return 

105 

106 subscriptions = ( 

107 session.execute( 

108 select(GroupChatSubscription) 

109 .join(User, User.id == GroupChatSubscription.user_id) 

110 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

111 .where(User.is_visible) 

112 .where(User.id != message.author_id) 

113 .where(GroupChatSubscription.left == None) 

114 .where(not_(GroupChatSubscription.is_muted)) 

115 ) 

116 .scalars() 

117 .all() 

118 ) 

119 

120 for subscription in subscriptions: 

121 logger.info(f"Notifying user_id = {subscription.user_id}") 

122 notify( 

123 user_id=subscription.user_id, 

124 topic="chat", 

125 key=str(message.conversation_id), 

126 action="message", 

127 icon="message", 

128 title=f"{message.author.name} sent a message in {group_chat.title}", 

129 content=message.text, 

130 link=urls.chat_link(chat_id=message.conversation_id), 

131 ) 

132 

133 

134def process_send_message_notifications(payload): 

135 """ 

136 Sends out email notifications for messages that have been unseen for a long enough time 

137 """ 

138 # very crude and dumb algorithm 

139 logger.info(f"Sending out email notifications for unseen messages") 

140 

141 with session_scope() as session: 

142 # users who have unnotified messages older than 5 minutes in any group chat 

143 users = ( 

144 session.execute( 

145 ( 

146 select(User) 

147 .join(GroupChatSubscription, GroupChatSubscription.user_id == User.id) 

148 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

149 .where(not_(GroupChatSubscription.is_muted)) 

150 .where(User.is_visible) 

151 .where(Message.time >= GroupChatSubscription.joined) 

152 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

153 .where(Message.id > User.last_notified_message_id) 

154 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

155 .where(Message.time < now() - timedelta(minutes=5)) 

156 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

157 ) 

158 ) 

159 .scalars() 

160 .unique() 

161 ) 

162 

163 for user in users: 

164 # now actually grab all the group chats, not just less than 5 min old 

165 subquery = ( 

166 select( 

167 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

168 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

169 func.max(Message.id).label("message_id"), 

170 func.count(Message.id).label("count_unseen"), 

171 ) 

172 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

173 .where(GroupChatSubscription.user_id == user.id) 

174 .where(not_(GroupChatSubscription.is_muted)) 

175 .where(Message.id > user.last_notified_message_id) 

176 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

177 .where(Message.time >= GroupChatSubscription.joined) 

178 .where(Message.message_type == MessageType.text) # TODO: only text messages for now 

179 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

180 .group_by(GroupChatSubscription.group_chat_id) 

181 .order_by(func.max(Message.id).desc()) 

182 .subquery() 

183 ) 

184 

185 unseen_messages = session.execute( 

186 select(GroupChat, Message, subquery.c.count_unseen) 

187 .join(subquery, subquery.c.message_id == Message.id) 

188 .join(GroupChat, GroupChat.conversation_id == subquery.c.group_chat_id) 

189 .order_by(subquery.c.message_id.desc()) 

190 ).all() 

191 

192 user.last_notified_message_id = max(message.id for _, message, _ in unseen_messages) 

193 session.commit() 

194 

195 total_unseen_message_count = sum(count for _, _, count in unseen_messages) 

196 

197 email.enqueue_email_from_template( 

198 user.email, 

199 "unseen_messages", 

200 template_args={ 

201 "user": user, 

202 "total_unseen_message_count": total_unseen_message_count, 

203 "unseen_messages": [ 

204 (group_chat, latest_message, count) for group_chat, latest_message, count in unseen_messages 

205 ], 

206 "group_chats_link": urls.messages_link(), 

207 }, 

208 ) 

209 

210 

211def process_send_request_notifications(payload): 

212 """ 

213 Sends out email notifications for unseen messages in host requests (as surfer or host) 

214 """ 

215 logger.info(f"Sending out email notifications for unseen messages in host requests") 

216 

217 with session_scope() as session: 

218 # requests where this user is surfing 

219 surfing_reqs = session.execute( 

220 select(User, HostRequest, func.max(Message.id)) 

221 .where(User.is_visible) 

222 .join(HostRequest, HostRequest.surfer_user_id == User.id) 

223 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

224 .where(Message.id > HostRequest.surfer_last_seen_message_id) 

225 .where(Message.id > User.last_notified_request_message_id) 

226 .where(Message.time < now() - timedelta(minutes=5)) 

227 .where(Message.message_type == MessageType.text) 

228 .group_by(User, HostRequest) 

229 ).all() 

230 

231 # where this user is hosting 

232 hosting_reqs = session.execute( 

233 select(User, HostRequest, func.max(Message.id)) 

234 .where(User.is_visible) 

235 .join(HostRequest, HostRequest.host_user_id == User.id) 

236 .join(Message, Message.conversation_id == HostRequest.conversation_id) 

237 .where(Message.id > HostRequest.host_last_seen_message_id) 

238 .where(Message.id > User.last_notified_request_message_id) 

239 .where(Message.time < now() - timedelta(minutes=5)) 

240 .where(Message.message_type == MessageType.text) 

241 .group_by(User, HostRequest) 

242 ).all() 

243 

244 for user, host_request, max_message_id in surfing_reqs: 

245 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

246 session.commit() 

247 

248 email.enqueue_email_from_template( 

249 user.email, 

250 "unseen_message_guest", 

251 template_args={ 

252 "user": user, 

253 "host_request": host_request, 

254 "host_request_link": urls.host_request_link_guest(), 

255 }, 

256 ) 

257 

258 for user, host_request, max_message_id in hosting_reqs: 

259 user.last_notified_request_message_id = max(user.last_notified_request_message_id, max_message_id) 

260 session.commit() 

261 

262 email.enqueue_email_from_template( 

263 user.email, 

264 "unseen_message_host", 

265 template_args={ 

266 "user": user, 

267 "host_request": host_request, 

268 "host_request_link": urls.host_request_link_host(), 

269 }, 

270 ) 

271 

272 

273def process_send_onboarding_emails(payload): 

274 """ 

275 Sends out onboarding emails 

276 """ 

277 logger.info(f"Sending out onboarding emails") 

278 

279 with session_scope() as session: 

280 # first onboarding email 

281 users = ( 

282 session.execute(select(User).where(User.is_visible).where(User.onboarding_emails_sent == 0)).scalars().all() 

283 ) 

284 

285 for user in users: 

286 send_onboarding_email(user, email_number=1) 

287 user.onboarding_emails_sent = 1 

288 user.last_onboarding_email_sent = now() 

289 session.commit() 

290 

291 # second onboarding email 

292 # sent after a week if the user has no profile or their "about me" section is less than 20 characters long 

293 users = ( 

294 session.execute( 

295 select(User) 

296 .where(User.is_visible) 

297 .where(User.onboarding_emails_sent == 1) 

298 .where(now() - User.last_onboarding_email_sent > timedelta(days=7)) 

299 .where(User.has_completed_profile == False) 

300 ) 

301 .scalars() 

302 .all() 

303 ) 

304 

305 for user in users: 

306 send_onboarding_email(user, email_number=2) 

307 user.onboarding_emails_sent = 2 

308 user.last_onboarding_email_sent = now() 

309 session.commit() 

310 

311 

312def process_send_reference_reminders(payload): 

313 """ 

314 Sends out reminders to write references after hosting/staying 

315 """ 

316 logger.info(f"Sending out reference reminder emails") 

317 

318 # Keep this in chronological order! 

319 reference_reminder_schedule = [ 

320 # (number, timedelta before we stop being able to write a ref, text for how long they have left to write the ref) 

321 # the end time to write a reference is supposed to be midnight in the host's timezone 

322 # 8 pm ish on the last day of the stay 

323 (1, timedelta(days=15) - timedelta(hours=20), "14 days"), 

324 # 2 pm ish a week after stay 

325 (2, timedelta(days=8) - timedelta(hours=14), "7 days"), 

326 # 10 am ish 3 days before end of time to write ref 

327 (3, timedelta(days=4) - timedelta(hours=10), "3 days"), 

328 ] 

329 

330 with session_scope() as session: 

331 # iterate the reminders in backwards order, so if we missed out on one we don't send duplicates 

332 for reminder_no, reminder_time, reminder_text in reversed(reference_reminder_schedule): 

333 user = aliased(User) 

334 other_user = aliased(User) 

335 # surfers needing to write a ref 

336 q1 = ( 

337 select(literal(True), HostRequest, user, other_user) 

338 .join(user, user.id == HostRequest.surfer_user_id) 

339 .join(other_user, other_user.id == HostRequest.host_user_id) 

340 .outerjoin( 

341 Reference, 

342 and_( 

343 Reference.host_request_id == HostRequest.conversation_id, 

344 # if no reference is found in this join, then the surfer has not written a ref 

345 Reference.from_user_id == HostRequest.surfer_user_id, 

346 ), 

347 ) 

348 .where(user.is_visible) 

349 .where(other_user.is_visible) 

350 .where(Reference.id == None) 

351 .where(HostRequest.can_write_reference) 

352 .where(HostRequest.surfer_sent_reference_reminders < reminder_no) 

353 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

354 ) 

355 

356 # hosts needing to write a ref 

357 q2 = ( 

358 select(literal(False), HostRequest, user, other_user) 

359 .join(user, user.id == HostRequest.host_user_id) 

360 .join(other_user, other_user.id == HostRequest.surfer_user_id) 

361 .outerjoin( 

362 Reference, 

363 and_( 

364 Reference.host_request_id == HostRequest.conversation_id, 

365 # if no reference is found in this join, then the host has not written a ref 

366 Reference.from_user_id == HostRequest.host_user_id, 

367 ), 

368 ) 

369 .where(user.is_visible) 

370 .where(other_user.is_visible) 

371 .where(Reference.id == None) 

372 .where(HostRequest.can_write_reference) 

373 .where(HostRequest.host_sent_reference_reminders < reminder_no) 

374 .where(HostRequest.end_time_to_write_reference - reminder_time < now()) 

375 ) 

376 

377 union = union_all(q1, q2).subquery() 

378 union = select( 

379 union.c[0].label("surfed"), 

380 aliased(HostRequest, union), 

381 aliased(user, union), 

382 aliased(other_user, union), 

383 ) 

384 reference_reminders = session.execute(union).all() 

385 

386 for surfed, host_request, user, other_user in reference_reminders: 

387 # checked in sql 

388 assert user.is_visible 

389 if not are_blocked(session, user.id, other_user.id): 

390 send_reference_reminder_email(user, other_user, host_request, surfed, reminder_text) 

391 if surfed: 

392 host_request.surfer_sent_reference_reminders = reminder_no 

393 else: 

394 host_request.host_sent_reference_reminders = reminder_no 

395 session.commit() 

396 

397 

398def process_add_users_to_email_list(payload): 

399 if not config.config["MAILCHIMP_ENABLED"]: 

400 logger.info(f"Not adding users to mailing list") 

401 return 

402 

403 logger.info(f"Adding users to mailing list") 

404 

405 with session_scope() as session: 

406 users = ( 

407 session.execute(select(User).where(User.is_visible).where(User.added_to_mailing_list == False).limit(100)) 

408 .scalars() 

409 .all() 

410 ) 

411 

412 if not users: 

413 logger.info(f"No users to add to mailing list") 

414 return 

415 

416 auth = ("apikey", config.config["MAILCHIMP_API_KEY"]) 

417 

418 body = { 

419 "members": [ 

420 { 

421 "email_address": user.email, 

422 "status_if_new": "subscribed", 

423 "status": "subscribed", 

424 "merge_fields": { 

425 "FNAME": user.name, 

426 }, 

427 } 

428 for user in users 

429 ] 

430 } 

431 

432 dc = config.config["MAILCHIMP_DC"] 

433 list_id = config.config["MAILCHIMP_LIST_ID"] 

434 r = requests.post(f"https://{dc}.api.mailchimp.com/3.0/lists/{list_id}", auth=auth, json=body) 

435 if r.status_code == 200: 

436 for user in users: 

437 user.added_to_mailing_list = True 

438 session.commit() 

439 else: 

440 raise Exception("Failed to add users to mailing list") 

441 

442 

443def process_enforce_community_membership(payload): 

444 enforce_community_memberships() 

445 

446 

447def process_handle_notification(payload): 

448 handle_notification(payload.notification_id) 

449 

450 

451def process_handle_email_notifications(payload): 

452 handle_email_notifications() 

453 

454 

455def process_handle_email_digests(payload): 

456 handle_email_digests() 

457 

458 

459def process_update_recommendation_scores(payload): 

460 text_fields = [ 

461 User.hometown, 

462 User.occupation, 

463 User.education, 

464 User.about_me, 

465 User.my_travels, 

466 User.things_i_like, 

467 User.about_place, 

468 User.additional_information, 

469 User.pet_details, 

470 User.kid_details, 

471 User.housemate_details, 

472 User.other_host_info, 

473 User.sleeping_details, 

474 User.area, 

475 User.house_rules, 

476 ] 

477 home_fields = [User.about_place, User.other_host_info, User.sleeping_details, User.area, User.house_rules] 

478 

479 def poor_man_gaussian(): 

480 """ 

481 Produces an approximatley std normal random variate 

482 """ 

483 trials = 5 

484 return (sum([func.random() for _ in range(trials)]) - trials / 2) / sqrt(trials / 12) 

485 

486 def int_(stmt): 

487 return func.coalesce(cast(stmt, Integer), 0) 

488 

489 def float_(stmt): 

490 return func.coalesce(cast(stmt, Float), 0.0) 

491 

492 with session_scope() as session: 

493 # profile 

494 profile_text = "" 

495 for field in text_fields: 

496 profile_text += func.coalesce(field, "") 

497 text_length = func.length(profile_text) 

498 home_text = "" 

499 for field in home_fields: 

500 home_text += func.coalesce(field, "") 

501 home_length = func.length(home_text) 

502 

503 has_text = int_(text_length > 500) 

504 long_text = int_(text_length > 2000) 

505 has_pic = int_(User.avatar_key != None) 

506 can_host = int_(User.hosting_status == HostingStatus.can_host) 

507 cant_host = int_(User.hosting_status == HostingStatus.cant_host) 

508 filled_home = int_(User.last_minute != None) * int_(home_length > 200) 

509 profile_points = 2 * has_text + 3 * long_text + 2 * has_pic + 3 * can_host + 2 * filled_home - 5 * cant_host 

510 

511 # references 

512 left_ref_expr = int_(1).label("left_reference") 

513 left_refs_subquery = ( 

514 select(Reference.from_user_id.label("user_id"), left_ref_expr).group_by(Reference.from_user_id).subquery() 

515 ) 

516 left_reference = int_(left_refs_subquery.c.left_reference) 

517 has_reference_expr = int_(func.count(Reference.id) >= 1).label("has_reference") 

518 ref_count_expr = int_(func.count(Reference.id)).label("ref_count") 

519 ref_avg_expr = func.avg(1.4 * (Reference.rating - 0.3)).label("ref_avg") 

520 has_multiple_types_expr = int_(func.count(distinct(Reference.reference_type)) >= 2).label("has_multiple_types") 

521 has_bad_ref_expr = int_(func.sum(int_((Reference.rating <= 0.2) | (~Reference.was_appropriate))) >= 1).label( 

522 "has_bad_ref" 

523 ) 

524 received_ref_subquery = ( 

525 select( 

526 Reference.to_user_id.label("user_id"), 

527 has_reference_expr, 

528 has_multiple_types_expr, 

529 has_bad_ref_expr, 

530 ref_count_expr, 

531 ref_avg_expr, 

532 ) 

533 .group_by(Reference.to_user_id) 

534 .subquery() 

535 ) 

536 has_multiple_types = int_(received_ref_subquery.c.has_multiple_types) 

537 has_reference = int_(received_ref_subquery.c.has_reference) 

538 has_bad_reference = int_(received_ref_subquery.c.has_bad_ref) 

539 rating_score = float_( 

540 received_ref_subquery.c.ref_avg 

541 * ( 

542 2 * func.least(received_ref_subquery.c.ref_count, 5) 

543 + func.greatest(received_ref_subquery.c.ref_count - 5, 0) 

544 ) 

545 ) 

546 ref_score = 2 * has_reference + has_multiple_types + left_reference - 5 * has_bad_reference + rating_score 

547 

548 # activeness 

549 recently_active = int_(User.last_active >= now() - timedelta(days=180)) 

550 very_recently_active = int_(User.last_active >= now() - timedelta(days=14)) 

551 recently_messaged = int_(func.max(Message.time) > now() - timedelta(days=14)) 

552 messaged_lots = int_(func.count(Message.id) > 5) 

553 messaging_points_subquery = (recently_messaged + messaged_lots).label("messaging_points") 

554 messaging_subquery = ( 

555 select(Message.author_id.label("user_id"), messaging_points_subquery) 

556 .where(Message.message_type == MessageType.text) 

557 .group_by(Message.author_id) 

558 .subquery() 

559 ) 

560 activeness_points = recently_active + 2 * very_recently_active + int_(messaging_subquery.c.messaging_points) 

561 

562 # verification 

563 phone_verified = int_(User.phone_is_verified) 

564 cb_subquery = ( 

565 select(ClusterSubscription.user_id.label("user_id"), func.min(Cluster.parent_node_id).label("min_node_id")) 

566 .join(Cluster, Cluster.id == ClusterSubscription.cluster_id) 

567 .where(ClusterSubscription.role == ClusterRole.admin) 

568 .where(Cluster.is_official_cluster) 

569 .group_by(ClusterSubscription.user_id) 

570 .subquery() 

571 ) 

572 min_node_id = cb_subquery.c.min_node_id 

573 cb = int_(min_node_id >= 1) 

574 f = int_(User.id <= 2) 

575 wcb = int_(min_node_id == 1) 

576 verification_points = 0.0 + 100 * f + 10 * wcb + 5 * cb 

577 

578 # response rate 

579 t = ( 

580 select(Message.conversation_id, Message.time) 

581 .where(Message.message_type == MessageType.chat_created) 

582 .subquery() 

583 ) 

584 s = ( 

585 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

586 .group_by(Message.conversation_id, Message.author_id) 

587 .subquery() 

588 ) 

589 hr_subquery = ( 

590 select( 

591 HostRequest.host_user_id.label("user_id"), 

592 func.avg(s.c.time - t.c.time).label("avg_response_time"), 

593 func.count(t.c.time).label("received"), 

594 func.count(s.c.time).label("responded"), 

595 float_( 

596 extract( 

597 "epoch", 

598 percentile_disc(0.33).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

599 ) 

600 / 60.0 

601 ).label("response_time_33p"), 

602 float_( 

603 extract( 

604 "epoch", 

605 percentile_disc(0.66).within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))), 

606 ) 

607 / 60.0 

608 ).label("response_time_66p"), 

609 ) 

610 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

611 .outerjoin( 

612 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id) 

613 ) 

614 .group_by(HostRequest.host_user_id) 

615 .subquery() 

616 ) 

617 avg_response_time = hr_subquery.c.avg_response_time 

618 avg_response_time_hr = float_(extract("epoch", avg_response_time) / 60.0) 

619 received = hr_subquery.c.received 

620 responded = hr_subquery.c.responded 

621 response_time_33p = hr_subquery.c.response_time_33p 

622 response_time_66p = hr_subquery.c.response_time_66p 

623 response_rate = float_(responded / (1.0 * func.greatest(received, 1))) 

624 # be careful with nulls 

625 response_rate_points = -10 * int_(response_time_33p > 60 * 48.0) + 5 * int_(response_time_66p < 60 * 48.0) 

626 

627 recommendation_score = ( 

628 profile_points 

629 + ref_score 

630 + activeness_points 

631 + verification_points 

632 + response_rate_points 

633 + 2 * poor_man_gaussian() 

634 ) 

635 

636 scores = ( 

637 select(User.id.label("user_id"), recommendation_score.label("score")) 

638 .outerjoin(messaging_subquery, messaging_subquery.c.user_id == User.id) 

639 .outerjoin(left_refs_subquery, left_refs_subquery.c.user_id == User.id) 

640 .outerjoin(received_ref_subquery, received_ref_subquery.c.user_id == User.id) 

641 .outerjoin(cb_subquery, cb_subquery.c.user_id == User.id) 

642 .outerjoin(hr_subquery, hr_subquery.c.user_id == User.id) 

643 ).subquery() 

644 

645 session.execute( 

646 User.__table__.update().values(recommendation_score=scores.c.score).where(User.id == scores.c.user_id) 

647 ) 

648 

649 logger.info("Updated recommendation scores") 

650 

651 

652def process_refresh_materialized_views(payload): 

653 refresh_materialized_views()