Coverage for src / couchers / servicers / conversations.py: 88%

309 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-26 03:52 +0000

1import logging 

2from collections.abc import Sequence 

3from datetime import timedelta 

4from typing import Any, cast 

5 

6import grpc 

7from google.protobuf import empty_pb2 

8from sqlalchemy import select 

9from sqlalchemy.orm import Session 

10from sqlalchemy.sql import func, not_, or_ 

11 

12from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

13from couchers.context import CouchersContext, make_background_user_context 

14from couchers.db import session_scope 

15from couchers.helpers.completed_profile import has_completed_profile 

16from couchers.jobs.enqueue import queue_job 

17from couchers.metrics import sent_messages_counter 

18from couchers.models import ( 

19 Conversation, 

20 GroupChat, 

21 GroupChatRole, 

22 GroupChatSubscription, 

23 Message, 

24 MessageType, 

25 ModerationObjectType, 

26 RateLimitAction, 

27 User, 

28) 

29from couchers.models.notifications import NotificationTopicAction 

30from couchers.moderation.utils import create_moderation 

31from couchers.notifications.notify import notify 

32from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

33from couchers.proto.internal import jobs_pb2 

34from couchers.rate_limits.check import process_rate_limits_and_check_abort 

35from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

36from couchers.servicers.api import user_model_to_pb 

37from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

38from couchers.utils import Timestamp_from_datetime, now 

39 

40logger = logging.getLogger(__name__) 

41 

42# TODO: Still needs custom pagination: GetUpdates 

43DEFAULT_PAGINATION_LENGTH = 20 

44MAX_PAGE_SIZE = 50 

45 

46 

47def _message_to_pb(message: Message) -> conversations_pb2.Message: 

48 """ 

49 Turns the given message to a protocol buffer 

50 """ 

51 if message.is_normal_message: 

52 return conversations_pb2.Message( 

53 message_id=message.id, 

54 author_user_id=message.author_id, 

55 time=Timestamp_from_datetime(message.time), 

56 text=conversations_pb2.MessageContentText(text=message.text), 

57 ) 

58 else: 

59 return conversations_pb2.Message( 

60 message_id=message.id, 

61 author_user_id=message.author_id, 

62 time=Timestamp_from_datetime(message.time), 

63 chat_created=( 

64 conversations_pb2.MessageContentChatCreated() 

65 if message.message_type == MessageType.chat_created 

66 else None 

67 ), 

68 chat_edited=( 

69 conversations_pb2.MessageContentChatEdited() 

70 if message.message_type == MessageType.chat_edited 

71 else None 

72 ), 

73 user_invited=( 

74 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

75 if message.message_type == MessageType.user_invited 

76 else None 

77 ), 

78 user_left=( 

79 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

80 ), 

81 user_made_admin=( 

82 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

83 if message.message_type == MessageType.user_made_admin 

84 else None 

85 ), 

86 user_removed_admin=( 

87 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

88 if message.message_type == MessageType.user_removed_admin 

89 else None 

90 ), 

91 group_chat_user_removed=( 

92 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

93 if message.message_type == MessageType.user_removed 

94 else None 

95 ), 

96 ) 

97 

98 

99def _get_visible_members_for_subscription(subscription: GroupChatSubscription) -> list[int]: 

100 """ 

101 If a user leaves a group chat, they shouldn't be able to see who's added 

102 after they left 

103 """ 

104 if not subscription.left: 

105 # still in the chat, we see everyone with a current subscription 

106 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

107 else: 

108 # not in chat anymore, see everyone who was in chat when we left 

109 return [ 

110 sub.user_id 

111 for sub in subscription.group_chat.subscriptions.where( 

112 GroupChatSubscription.joined <= subscription.left 

113 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

114 ] 

115 

116 

117def _get_visible_admins_for_subscription(subscription: GroupChatSubscription) -> list[int]: 

118 """ 

119 If a user leaves a group chat, they shouldn't be able to see who's added 

120 after they left 

121 """ 

122 if not subscription.left: 

123 # still in the chat, we see everyone with a current subscription 

124 return [ 

125 sub.user_id 

126 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

127 GroupChatSubscription.role == GroupChatRole.admin 

128 ) 

129 ] 

130 else: 

131 # not in chat anymore, see everyone who was in chat when we left 

132 return [ 

133 sub.user_id 

134 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

135 .where(GroupChatSubscription.joined <= subscription.left) 

136 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

137 ] 

138 

139 

140def _user_can_message(session: Session, context: CouchersContext, group_chat: GroupChat) -> bool: 

141 """ 

142 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant 

143 - Is not deleted/banned 

144 - Has not been blocked by the user or is blocking the user 

145 - Has not left the chat 

146 """ 

147 if not group_chat.is_dm: 

148 return True 

149 

150 query = select( 

151 where_users_column_visible( 

152 select(GroupChatSubscription) 

153 .where(GroupChatSubscription.user_id != context.user_id) 

154 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

155 .where(GroupChatSubscription.left == None), 

156 context=context, 

157 column=GroupChatSubscription.user_id, 

158 ).exists() 

159 ) 

160 return session.execute(query).scalar_one() 

161 

162 

163def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload) -> None: 

164 """ 

165 Background job to generate notifications for a message sent to a group chat 

166 """ 

167 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

168 

169 with session_scope() as session: 

170 message, group_chat = session.execute( 

171 select(Message, GroupChat) 

172 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

173 .where(Message.id == payload.message_id) 

174 ).one() 

175 

176 if message.message_type != MessageType.text: 

177 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

178 return 

179 

180 context = make_background_user_context(user_id=message.author_id) 

181 user_ids_to_notify = ( 

182 session.execute( 

183 where_users_column_visible( 

184 select(GroupChatSubscription.user_id) 

185 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

186 .where(GroupChatSubscription.user_id != message.author_id) 

187 .where(GroupChatSubscription.joined <= message.time) 

188 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

189 .where(not_(GroupChatSubscription.is_muted)), 

190 context=context, 

191 column=GroupChatSubscription.user_id, 

192 ) 

193 ) 

194 .scalars() 

195 .all() 

196 ) 

197 

198 if group_chat.is_dm: 

199 msg = f"{message.author.name} sent you a message" 

200 else: 

201 msg = f"{message.author.name} sent a message in {group_chat.title}" 

202 

203 for user_id in user_ids_to_notify: 

204 notify( 

205 session, 

206 user_id=user_id, 

207 topic_action=NotificationTopicAction.chat__message, 

208 key=str(message.conversation_id), 

209 data=notification_data_pb2.ChatMessage( 

210 author=user_model_to_pb( 

211 message.author, 

212 session, 

213 make_background_user_context(user_id=user_id), 

214 ), 

215 message=msg, 

216 text=message.text, 

217 group_chat_id=message.conversation_id, 

218 ), 

219 moderation_state_id=group_chat.moderation_state_id, 

220 ) 

221 

222 

223def _add_message_to_subscription(session: Session, subscription: GroupChatSubscription, **kwargs: Any) -> Message: 

224 """ 

225 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

226 

227 Specify the keyword args for Message 

228 """ 

229 message = Message(conversation_id=subscription.group_chat.conversation.id, author_id=subscription.user_id, **kwargs) 

230 

231 session.add(message) 

232 session.flush() 

233 

234 subscription.last_seen_message_id = message.id 

235 

236 queue_job( 

237 session, 

238 job=generate_message_notifications, 

239 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

240 message_id=message.id, 

241 ), 

242 ) 

243 

244 return message 

245 

246 

247def _create_chat( 

248 session: Session, 

249 creator_id: int, 

250 recipient_ids: Sequence[int], 

251 title: str | None = None, 

252 only_admins_invite: bool = True, 

253) -> GroupChat: 

254 conversation = Conversation() 

255 session.add(conversation) 

256 session.flush() 

257 

258 # Create moderation state for UMS (starts as SHADOWED) 

259 moderation_state = create_moderation( 

260 session=session, 

261 object_type=ModerationObjectType.GROUP_CHAT, 

262 object_id=conversation.id, 

263 creator_user_id=creator_id, 

264 ) 

265 

266 chat = GroupChat( 

267 conversation_id=conversation.id, 

268 title=title, 

269 creator_id=creator_id, 

270 is_dm=True if len(recipient_ids) == 1 else False, 

271 only_admins_invite=only_admins_invite, 

272 moderation_state_id=moderation_state.id, 

273 ) 

274 session.add(chat) 

275 session.flush() 

276 

277 creator_subscription = GroupChatSubscription( 

278 user_id=creator_id, 

279 group_chat_id=chat.conversation_id, 

280 role=GroupChatRole.admin, 

281 ) 

282 session.add(creator_subscription) 

283 

284 for uid in recipient_ids: 

285 session.add( 

286 GroupChatSubscription( 

287 user_id=uid, 

288 group_chat_id=chat.conversation_id, 

289 role=GroupChatRole.participant, 

290 ) 

291 ) 

292 

293 return chat 

294 

295 

296def _get_message_subscription(session: Session, user_id: int, conversation_id: int) -> GroupChatSubscription: 

297 subscription = session.execute( 

298 select(GroupChatSubscription) 

299 .where(GroupChatSubscription.group_chat_id == conversation_id) 

300 .where(GroupChatSubscription.user_id == user_id) 

301 .where(GroupChatSubscription.left == None) 

302 ).scalar_one_or_none() 

303 

304 return cast(GroupChatSubscription, subscription) 

305 

306 

307def _get_visible_message_subscription( 

308 session: Session, context: CouchersContext, conversation_id: int 

309) -> GroupChatSubscription: 

310 """Get subscription with visibility filtering""" 

311 subscription = session.execute( 

312 where_moderated_content_visible( 

313 select(GroupChatSubscription) 

314 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

315 .where(GroupChatSubscription.group_chat_id == conversation_id) 

316 .where(GroupChatSubscription.user_id == context.user_id) 

317 .where(GroupChatSubscription.left == None), 

318 context, 

319 GroupChat, 

320 is_list_operation=False, 

321 ) 

322 ).scalar_one_or_none() 

323 

324 return cast(GroupChatSubscription, subscription) 

325 

326 

327def _unseen_message_count(session: Session, subscription_id: int) -> int: 

328 query = ( 

329 select(func.count()) 

330 .select_from(Message) 

331 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

332 .where(GroupChatSubscription.id == subscription_id) 

333 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

334 ) 

335 return session.execute(query).scalar_one() 

336 

337 

338def _mute_info(subscription: GroupChatSubscription) -> conversations_pb2.MuteInfo: 

339 (muted, muted_until) = subscription.muted_display() 

340 return conversations_pb2.MuteInfo( 

341 muted=muted, 

342 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

343 ) 

344 

345 

346class Conversations(conversations_pb2_grpc.ConversationsServicer): 

347 def ListGroupChats( 

348 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session 

349 ) -> conversations_pb2.ListGroupChatsRes: 

350 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

351 page_size = min(page_size, MAX_PAGE_SIZE) 

352 

353 # select group chats where you have a subscription, and for each of 

354 # these, the latest message from them 

355 

356 t = ( 

357 select( 

358 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

359 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

360 func.max(Message.id).label("message_id"), 

361 ) 

362 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

363 .where(GroupChatSubscription.user_id == context.user_id) 

364 .where(Message.time >= GroupChatSubscription.joined) 

365 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

366 .where( 

367 or_( 

368 to_bool(request.HasField("only_archived") == False), 

369 GroupChatSubscription.is_archived == request.only_archived, 

370 ) 

371 ) 

372 .group_by(GroupChatSubscription.group_chat_id) 

373 .order_by(func.max(Message.id).desc()) 

374 .subquery() 

375 ) 

376 

377 results = session.execute( 

378 where_moderated_content_visible( 

379 select(t, GroupChat, GroupChatSubscription, Message) 

380 .join(Message, Message.id == t.c.message_id) 

381 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

382 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

383 .where(or_(t.c.message_id < request.last_message_id, to_bool(request.last_message_id == 0))) 

384 .order_by(t.c.message_id.desc()) 

385 .limit(page_size + 1), 

386 context, 

387 GroupChat, 

388 is_list_operation=True, 

389 ) 

390 ).all() 

391 

392 return conversations_pb2.ListGroupChatsRes( 

393 group_chats=[ 

394 conversations_pb2.GroupChat( 

395 group_chat_id=result.GroupChat.conversation_id, 

396 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

397 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

398 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

399 only_admins_invite=result.GroupChat.only_admins_invite, 

400 is_dm=result.GroupChat.is_dm, 

401 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

402 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

403 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

404 latest_message=_message_to_pb(result.Message) if result.Message else None, 

405 mute_info=_mute_info(result.GroupChatSubscription), 

406 can_message=_user_can_message(session, context, result.GroupChat), 

407 is_archived=result.GroupChatSubscription.is_archived, 

408 ) 

409 for result in results[:page_size] 

410 ], 

411 last_message_id=( 

412 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

413 ), # TODO 

414 no_more=len(results) <= page_size, 

415 ) 

416 

417 def GetGroupChat( 

418 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session 

419 ) -> conversations_pb2.GroupChat: 

420 result = session.execute( 

421 where_moderated_content_visible( 

422 select(GroupChat, GroupChatSubscription, Message) 

423 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

424 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

425 .where(GroupChatSubscription.user_id == context.user_id) 

426 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

427 .where(Message.time >= GroupChatSubscription.joined) 

428 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

429 .order_by(Message.id.desc()) 

430 .limit(1), 

431 context, 

432 GroupChat, 

433 is_list_operation=False, 

434 ) 

435 ).one_or_none() 

436 

437 if not result: 

438 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

439 

440 return conversations_pb2.GroupChat( 

441 group_chat_id=result.GroupChat.conversation_id, 

442 title=result.GroupChat.title, 

443 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

444 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

445 only_admins_invite=result.GroupChat.only_admins_invite, 

446 is_dm=result.GroupChat.is_dm, 

447 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

448 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

449 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

450 latest_message=_message_to_pb(result.Message) if result.Message else None, 

451 mute_info=_mute_info(result.GroupChatSubscription), 

452 can_message=_user_can_message(session, context, result.GroupChat), 

453 is_archived=result.GroupChatSubscription.is_archived, 

454 ) 

455 

456 def GetDirectMessage( 

457 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session 

458 ) -> conversations_pb2.GroupChat: 

459 count = func.count(GroupChatSubscription.id).label("count") 

460 subquery = ( 

461 select(GroupChatSubscription.group_chat_id) 

462 .where( 

463 or_( 

464 GroupChatSubscription.user_id == context.user_id, 

465 GroupChatSubscription.user_id == request.user_id, 

466 ) 

467 ) 

468 .where(GroupChatSubscription.left == None) 

469 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

470 .where(GroupChat.is_dm == True) 

471 .group_by(GroupChatSubscription.group_chat_id) 

472 .having(count == 2) 

473 .subquery() 

474 ) 

475 

476 result = session.execute( 

477 where_moderated_content_visible( 

478 select(subquery, GroupChat, GroupChatSubscription, Message) 

479 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

480 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

481 .where(GroupChatSubscription.user_id == context.user_id) 

482 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

483 .where(Message.time >= GroupChatSubscription.joined) 

484 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

485 .order_by(Message.id.desc()) 

486 .limit(1), 

487 context, 

488 GroupChat, 

489 is_list_operation=False, 

490 ) 

491 ).one_or_none() 

492 

493 if not result: 

494 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

495 

496 return conversations_pb2.GroupChat( 

497 group_chat_id=result.GroupChat.conversation_id, 

498 title=result.GroupChat.title, 

499 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

500 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

501 only_admins_invite=result.GroupChat.only_admins_invite, 

502 is_dm=result.GroupChat.is_dm, 

503 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

504 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

505 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

506 latest_message=_message_to_pb(result.Message) if result.Message else None, 

507 mute_info=_mute_info(result.GroupChatSubscription), 

508 can_message=_user_can_message(session, context, result.GroupChat), 

509 is_archived=result.GroupChatSubscription.is_archived, 

510 ) 

511 

512 def GetUpdates( 

513 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session 

514 ) -> conversations_pb2.GetUpdatesRes: 

515 results = ( 

516 session.execute( 

517 where_moderated_content_visible( 

518 select(Message) 

519 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

520 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

521 .where(GroupChatSubscription.user_id == context.user_id) 

522 .where(Message.time >= GroupChatSubscription.joined) 

523 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

524 .where(Message.id > request.newest_message_id) 

525 .order_by(Message.id.asc()) 

526 .limit(DEFAULT_PAGINATION_LENGTH + 1), 

527 context, 

528 GroupChat, 

529 is_list_operation=False, 

530 ) 

531 ) 

532 .scalars() 

533 .all() 

534 ) 

535 

536 return conversations_pb2.GetUpdatesRes( 

537 updates=[ 

538 conversations_pb2.Update( 

539 group_chat_id=message.conversation_id, 

540 message=_message_to_pb(message), 

541 ) 

542 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

543 ], 

544 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

545 ) 

546 

547 def GetGroupChatMessages( 

548 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session 

549 ) -> conversations_pb2.GetGroupChatMessagesRes: 

550 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

551 page_size = min(page_size, MAX_PAGE_SIZE) 

552 

553 results = ( 

554 session.execute( 

555 where_moderated_content_visible( 

556 select(Message) 

557 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

558 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

559 .where(GroupChatSubscription.user_id == context.user_id) 

560 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

561 .where(Message.time >= GroupChatSubscription.joined) 

562 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

563 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

564 .where( 

565 or_(Message.id > GroupChatSubscription.last_seen_message_id, to_bool(request.only_unseen == 0)) 

566 ) 

567 .order_by(Message.id.desc()) 

568 .limit(page_size + 1), 

569 context, 

570 GroupChat, 

571 is_list_operation=False, 

572 ) 

573 ) 

574 .scalars() 

575 .all() 

576 ) 

577 

578 return conversations_pb2.GetGroupChatMessagesRes( 

579 messages=[_message_to_pb(message) for message in results[:page_size]], 

580 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

581 no_more=len(results) <= page_size, 

582 ) 

583 

584 def MarkLastSeenGroupChat( 

585 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session 

586 ) -> empty_pb2.Empty: 

587 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

588 

589 if not subscription: 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true

590 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

591 

592 if not subscription.last_seen_message_id <= request.last_seen_message_id: 592 ↛ 593line 592 didn't jump to line 593 because the condition on line 592 was never true

593 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

594 

595 subscription.last_seen_message_id = request.last_seen_message_id 

596 

597 return empty_pb2.Empty() 

598 

599 def MuteGroupChat( 

600 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session 

601 ) -> empty_pb2.Empty: 

602 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

603 

604 if not subscription: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true

605 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

606 

607 if request.unmute: 

608 subscription.muted_until = DATETIME_MINUS_INFINITY 

609 elif request.forever: 

610 subscription.muted_until = DATETIME_INFINITY 

611 elif request.for_duration: 611 ↛ 617line 611 didn't jump to line 617 because the condition on line 611 was always true

612 duration = request.for_duration.ToTimedelta() 

613 if duration < timedelta(seconds=0): 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true

614 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past") 

615 subscription.muted_until = now() + duration 

616 

617 return empty_pb2.Empty() 

618 

619 def SetGroupChatArchiveStatus( 

620 self, request: conversations_pb2.SetGroupChatArchiveStatusReq, context: CouchersContext, session: Session 

621 ) -> conversations_pb2.SetGroupChatArchiveStatusRes: 

622 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

623 

624 if not subscription: 

625 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

626 

627 subscription.is_archived = request.is_archived 

628 

629 return conversations_pb2.SetGroupChatArchiveStatusRes( 

630 group_chat_id=request.group_chat_id, 

631 is_archived=request.is_archived, 

632 ) 

633 

634 def SearchMessages( 

635 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session 

636 ) -> conversations_pb2.SearchMessagesRes: 

637 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

638 page_size = min(page_size, MAX_PAGE_SIZE) 

639 

640 results = ( 

641 session.execute( 

642 where_moderated_content_visible( 

643 select(Message) 

644 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

645 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

646 .where(GroupChatSubscription.user_id == context.user_id) 

647 .where(Message.time >= GroupChatSubscription.joined) 

648 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

649 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

650 .where(Message.text.ilike(f"%{request.query}%")) 

651 .order_by(Message.id.desc()) 

652 .limit(page_size + 1), 

653 context, 

654 GroupChat, 

655 is_list_operation=True, 

656 ) 

657 ) 

658 .scalars() 

659 .all() 

660 ) 

661 

662 return conversations_pb2.SearchMessagesRes( 

663 results=[ 

664 conversations_pb2.MessageSearchResult( 

665 group_chat_id=message.conversation_id, 

666 message=_message_to_pb(message), 

667 ) 

668 for message in results[:page_size] 

669 ], 

670 last_message_id=results[-2].id if len(results) > 1 else 0, 

671 no_more=len(results) <= page_size, 

672 ) 

673 

674 def CreateGroupChat( 

675 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session 

676 ) -> conversations_pb2.GroupChat: 

677 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

678 if not has_completed_profile(session, user): 

679 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

680 

681 recipient_user_ids = list( 

682 session.execute( 

683 select(User.id).where(users_visible(context)).where(User.id.in_(request.recipient_user_ids)) 

684 ) 

685 .scalars() 

686 .all() 

687 ) 

688 

689 # make sure all requested users are visible 

690 if len(recipient_user_ids) != len(request.recipient_user_ids): 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true

691 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

692 

693 if not recipient_user_ids: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true

694 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

695 

696 if len(recipient_user_ids) != len(set(recipient_user_ids)): 696 ↛ 698line 696 didn't jump to line 698 because the condition on line 696 was never true

697 # make sure there's no duplicate users 

698 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients") 

699 

700 if context.user_id in recipient_user_ids: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true

701 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

702 

703 if len(recipient_user_ids) == 1: 

704 # can only have one DM at a time between any two users 

705 other_user_id = recipient_user_ids[0] 

706 

707 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

708 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

709 # chat, you know they already have a shared group chat 

710 count = func.count(GroupChatSubscription.id).label("count") 

711 if session.execute( 

712 where_moderated_content_visible( 

713 select(count) 

714 .where( 

715 or_( 

716 GroupChatSubscription.user_id == context.user_id, 

717 GroupChatSubscription.user_id == other_user_id, 

718 ) 

719 ) 

720 .where(GroupChatSubscription.left == None) 

721 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

722 .where(GroupChat.is_dm == True) 

723 .group_by(GroupChatSubscription.group_chat_id) 

724 .having(count == 2), 

725 context, 

726 GroupChat, 

727 is_list_operation=False, 

728 ) 

729 ).scalar_one_or_none(): 

730 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm") 

731 

732 # Check if user has been initiating chats excessively 

733 if process_rate_limits_and_check_abort( 

734 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation 

735 ): 

736 context.abort_with_error_code( 

737 grpc.StatusCode.RESOURCE_EXHAUSTED, 

738 "chat_initiation_rate_limit", 

739 substitutions={"hours": str(RATE_LIMIT_HOURS)}, 

740 ) 

741 

742 group_chat = _create_chat( 

743 session, 

744 creator_id=context.user_id, 

745 recipient_ids=request.recipient_user_ids, 

746 title=request.title.value, 

747 ) 

748 

749 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id) 

750 

751 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

752 

753 session.flush() 

754 

755 return conversations_pb2.GroupChat( 

756 group_chat_id=group_chat.conversation_id, 

757 title=group_chat.title, 

758 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

759 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

760 only_admins_invite=group_chat.only_admins_invite, 

761 is_dm=group_chat.is_dm, 

762 created=Timestamp_from_datetime(group_chat.conversation.created), 

763 mute_info=_mute_info(your_subscription), 

764 can_message=True, 

765 ) 

766 

767 def SendMessage( 

768 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session 

769 ) -> empty_pb2.Empty: 

770 if request.text == "": 770 ↛ 771line 770 didn't jump to line 771 because the condition on line 770 was never true

771 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

772 

773 result = session.execute( 

774 where_moderated_content_visible( 

775 select(GroupChatSubscription, GroupChat) 

776 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

777 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

778 .where(GroupChatSubscription.user_id == context.user_id) 

779 .where(GroupChatSubscription.left == None), 

780 context, 

781 GroupChat, 

782 is_list_operation=False, 

783 ) 

784 ).one_or_none() 

785 if not result: 

786 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

787 

788 subscription, group_chat = result._tuple() 

789 if not _user_can_message(session, context, group_chat): 

790 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat") 

791 

792 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

793 

794 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

795 sent_messages_counter.labels( 

796 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

797 ).inc() 

798 

799 return empty_pb2.Empty() 

800 

801 def SendDirectMessage( 

802 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session 

803 ) -> conversations_pb2.SendDirectMessageRes: 

804 user_id = context.user_id 

805 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

806 

807 recipient_id = request.recipient_user_id 

808 

809 if not has_completed_profile(session, user): 809 ↛ 810line 809 didn't jump to line 810 because the condition on line 809 was never true

810 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

811 

812 if not recipient_id: 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true

813 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

814 

815 recipient_user_id = session.execute( 

816 select(User.id).where(users_visible(context)).where(User.id == recipient_id) 

817 ).scalar_one_or_none() 

818 

819 if not recipient_user_id: 819 ↛ 820line 819 didn't jump to line 820 because the condition on line 819 was never true

820 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

821 

822 if user_id == recipient_id: 822 ↛ 823line 822 didn't jump to line 823 because the condition on line 822 was never true

823 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

824 

825 if request.text == "": 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true

826 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

827 

828 # Look for an existing direct message (DM) chat between the two users 

829 dm_chat_ids = ( 

830 select(GroupChatSubscription.group_chat_id) 

831 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id])) 

832 .group_by(GroupChatSubscription.group_chat_id) 

833 .having(func.count(GroupChatSubscription.user_id) == 2) 

834 ) 

835 

836 chat = session.execute( 

837 where_moderated_content_visible( 

838 select(GroupChat) 

839 .where(GroupChat.is_dm == True) 

840 .where(GroupChat.conversation_id.in_(dm_chat_ids)) 

841 .limit(1), 

842 context, 

843 GroupChat, 

844 is_list_operation=False, 

845 ) 

846 ).scalar_one_or_none() 

847 

848 if not chat: 

849 chat = _create_chat(session, user_id, [recipient_id]) 

850 

851 # Retrieve the sender's active subscription to the chat 

852 subscription = _get_message_subscription(session, user_id, chat.conversation_id) 

853 

854 # Add the message to the conversation 

855 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

856 

857 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one() 

858 sent_messages_counter.labels(user_gender, "direct message").inc() 

859 

860 session.flush() 

861 

862 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id) 

863 

864 def EditGroupChat( 

865 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session 

866 ) -> empty_pb2.Empty: 

867 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

868 

869 if not subscription: 

870 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

871 

872 if subscription.role != GroupChatRole.admin: 

873 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit") 

874 

875 if request.HasField("title"): 

876 subscription.group_chat.title = request.title.value 

877 

878 if request.HasField("only_admins_invite"): 878 ↛ 881line 878 didn't jump to line 881 because the condition on line 878 was always true

879 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

880 

881 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

882 

883 return empty_pb2.Empty() 

884 

885 def MakeGroupChatAdmin( 

886 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session 

887 ) -> empty_pb2.Empty: 

888 if not session.execute( 

889 select(User).where(users_visible(context)).where(User.id == request.user_id) 

890 ).scalar_one_or_none(): 

891 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

892 

893 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

894 

895 if not your_subscription: 895 ↛ 896line 895 didn't jump to line 896 because the condition on line 895 was never true

896 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

897 

898 if your_subscription.role != GroupChatRole.admin: 

899 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin") 

900 

901 if request.user_id == context.user_id: 901 ↛ 902line 901 didn't jump to line 902 because the condition on line 901 was never true

902 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin") 

903 

904 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

905 

906 if not their_subscription: 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true

907 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

908 

909 if their_subscription.role != GroupChatRole.participant: 

910 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin") 

911 

912 their_subscription.role = GroupChatRole.admin 

913 

914 _add_message_to_subscription( 

915 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

916 ) 

917 

918 return empty_pb2.Empty() 

919 

920 def RemoveGroupChatAdmin( 

921 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session 

922 ) -> empty_pb2.Empty: 

923 if not session.execute( 

924 select(User).where(users_visible(context)).where(User.id == request.user_id) 

925 ).scalar_one_or_none(): 

926 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

927 

928 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

929 

930 if not your_subscription: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true

931 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

932 

933 if request.user_id == context.user_id: 

934 # Race condition! 

935 other_admins_count = session.execute( 

936 select(func.count()) 

937 .select_from(GroupChatSubscription) 

938 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

939 .where(GroupChatSubscription.user_id != context.user_id) 

940 .where(GroupChatSubscription.role == GroupChatRole.admin) 

941 .where(GroupChatSubscription.left == None) 

942 ).scalar_one() 

943 if not other_admins_count > 0: 

944 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin") 

945 

946 if your_subscription.role != GroupChatRole.admin: 

947 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin") 

948 

949 their_subscription = session.execute( 

950 select(GroupChatSubscription) 

951 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

952 .where(GroupChatSubscription.user_id == request.user_id) 

953 .where(GroupChatSubscription.left == None) 

954 .where(GroupChatSubscription.role == GroupChatRole.admin) 

955 ).scalar_one_or_none() 

956 

957 if not their_subscription: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true

958 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

959 

960 their_subscription.role = GroupChatRole.participant 

961 

962 _add_message_to_subscription( 

963 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

964 ) 

965 

966 return empty_pb2.Empty() 

967 

968 def InviteToGroupChat( 

969 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session 

970 ) -> empty_pb2.Empty: 

971 if not session.execute( 

972 select(User).where(users_visible(context)).where(User.id == request.user_id) 

973 ).scalar_one_or_none(): 

974 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

975 

976 result = session.execute( 

977 where_moderated_content_visible( 

978 select(GroupChatSubscription, GroupChat) 

979 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

980 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

981 .where(GroupChatSubscription.user_id == context.user_id) 

982 .where(GroupChatSubscription.left == None), 

983 context, 

984 GroupChat, 

985 is_list_operation=False, 

986 ) 

987 ).one_or_none() 

988 

989 if not result: 

990 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

991 

992 your_subscription, group_chat = result._tuple() 

993 

994 if request.user_id == context.user_id: 994 ↛ 995line 994 didn't jump to line 995 because the condition on line 994 was never true

995 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self") 

996 

997 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

998 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied") 

999 

1000 if group_chat.is_dm: 

1001 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm") 

1002 

1003 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

1004 

1005 if their_subscription: 1005 ↛ 1006line 1005 didn't jump to line 1006 because the condition on line 1005 was never true

1006 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat") 

1007 

1008 # TODO: race condition! 

1009 

1010 subscription = GroupChatSubscription( 

1011 user_id=request.user_id, 

1012 group_chat_id=your_subscription.group_chat.conversation_id, 

1013 role=GroupChatRole.participant, 

1014 ) 

1015 session.add(subscription) 

1016 

1017 _add_message_to_subscription( 

1018 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

1019 ) 

1020 

1021 return empty_pb2.Empty() 

1022 

1023 def RemoveGroupChatUser( 

1024 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session 

1025 ) -> empty_pb2.Empty: 

1026 """ 

1027 1. Get admin info and check it's correct 

1028 2. Get user data, check it's correct and remove user 

1029 """ 

1030 # Admin info 

1031 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

1032 

1033 # if user info is missing 

1034 if not your_subscription: 1034 ↛ 1035line 1034 didn't jump to line 1035 because the condition on line 1034 was never true

1035 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

1036 

1037 # if user not admin 

1038 if your_subscription.role != GroupChatRole.admin: 1038 ↛ 1039line 1038 didn't jump to line 1039 because the condition on line 1038 was never true

1039 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user") 

1040 

1041 # if user wants to remove themselves 

1042 if request.user_id == context.user_id: 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true

1043 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self") 

1044 

1045 # get user info 

1046 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

1047 

1048 # user not found 

1049 if not their_subscription: 

1050 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

1051 

1052 _add_message_to_subscription( 

1053 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

1054 ) 

1055 

1056 their_subscription.left = func.now() 

1057 

1058 return empty_pb2.Empty() 

1059 

1060 def LeaveGroupChat( 

1061 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session 

1062 ) -> empty_pb2.Empty: 

1063 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

1064 

1065 if not subscription: 

1066 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

1067 

1068 if subscription.role == GroupChatRole.admin: 

1069 other_admins_count = session.execute( 

1070 select(func.count()) 

1071 .select_from(GroupChatSubscription) 

1072 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

1073 .where(GroupChatSubscription.user_id != context.user_id) 

1074 .where(GroupChatSubscription.role == GroupChatRole.admin) 

1075 .where(GroupChatSubscription.left == None) 

1076 ).scalar_one() 

1077 participants_count = session.execute( 

1078 select(func.count()) 

1079 .select_from(GroupChatSubscription) 

1080 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

1081 .where(GroupChatSubscription.user_id != context.user_id) 

1082 .where(GroupChatSubscription.role == GroupChatRole.participant) 

1083 .where(GroupChatSubscription.left == None) 

1084 ).scalar_one() 

1085 if not (other_admins_count > 0 or participants_count == 0): 

1086 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave") 

1087 

1088 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

1089 

1090 subscription.left = func.now() 

1091 

1092 return empty_pb2.Empty()