Coverage for app / backend / src / couchers / servicers / conversations.py: 88%

315 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import logging 

2from collections.abc import Sequence 

3from datetime import timedelta 

4from typing import Any, cast 

5 

6import grpc 

7from google.protobuf import empty_pb2 

8from sqlalchemy import select 

9from sqlalchemy.orm import Session, contains_eager 

10from sqlalchemy.sql import func, not_, or_ 

11 

12from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

13from couchers.context import CouchersContext, make_background_user_context 

14from couchers.db import session_scope 

15from couchers.event_log import log_event 

16from couchers.helpers.completed_profile import has_completed_profile 

17from couchers.jobs.enqueue import queue_job 

18from couchers.metrics import sent_messages_counter 

19from couchers.models import ( 

20 Conversation, 

21 GroupChat, 

22 GroupChatRole, 

23 GroupChatSubscription, 

24 Message, 

25 MessageType, 

26 ModerationObjectType, 

27 RateLimitAction, 

28 User, 

29) 

30from couchers.models.notifications import NotificationTopicAction 

31from couchers.moderation.utils import create_moderation 

32from couchers.notifications.notify import notify 

33from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

34from couchers.proto.internal import jobs_pb2 

35from couchers.rate_limits.check import process_rate_limits_and_check_abort 

36from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

37from couchers.servicers.api import user_model_to_pb 

38from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

39from couchers.utils import Timestamp_from_datetime, now 

40 

41logger = logging.getLogger(__name__) 

42 

43# TODO: Still needs custom pagination: GetUpdates 

44DEFAULT_PAGINATION_LENGTH = 20 

45MAX_PAGE_SIZE = 50 

46 

47 

48def _message_to_pb(message: Message) -> conversations_pb2.Message: 

49 """ 

50 Turns the given message to a protocol buffer 

51 """ 

52 if message.is_normal_message: 

53 return conversations_pb2.Message( 

54 message_id=message.id, 

55 author_user_id=message.author_id, 

56 time=Timestamp_from_datetime(message.time), 

57 text=conversations_pb2.MessageContentText(text=message.text), 

58 ) 

59 else: 

60 return conversations_pb2.Message( 

61 message_id=message.id, 

62 author_user_id=message.author_id, 

63 time=Timestamp_from_datetime(message.time), 

64 chat_created=( 

65 conversations_pb2.MessageContentChatCreated() 

66 if message.message_type == MessageType.chat_created 

67 else None 

68 ), 

69 chat_edited=( 

70 conversations_pb2.MessageContentChatEdited() 

71 if message.message_type == MessageType.chat_edited 

72 else None 

73 ), 

74 user_invited=( 

75 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

76 if message.message_type == MessageType.user_invited 

77 else None 

78 ), 

79 user_left=( 

80 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

81 ), 

82 user_made_admin=( 

83 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

84 if message.message_type == MessageType.user_made_admin 

85 else None 

86 ), 

87 user_removed_admin=( 

88 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

89 if message.message_type == MessageType.user_removed_admin 

90 else None 

91 ), 

92 group_chat_user_removed=( 

93 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

94 if message.message_type == MessageType.user_removed 

95 else None 

96 ), 

97 ) 

98 

99 

100def _get_visible_members_for_subscription(subscription: GroupChatSubscription) -> list[int]: 

101 """ 

102 If a user leaves a group chat, they shouldn't be able to see who's added 

103 after they left 

104 """ 

105 if not subscription.left: 

106 # still in the chat, we see everyone with a current subscription 

107 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

108 else: 

109 # not in chat anymore, see everyone who was in chat when we left 

110 return [ 

111 sub.user_id 

112 for sub in subscription.group_chat.subscriptions.where( 

113 GroupChatSubscription.joined <= subscription.left 

114 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

115 ] 

116 

117 

118def _get_visible_admins_for_subscription(subscription: GroupChatSubscription) -> list[int]: 

119 """ 

120 If a user leaves a group chat, they shouldn't be able to see who's added 

121 after they left 

122 """ 

123 if not subscription.left: 

124 # still in the chat, we see everyone with a current subscription 

125 return [ 

126 sub.user_id 

127 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

128 GroupChatSubscription.role == GroupChatRole.admin 

129 ) 

130 ] 

131 else: 

132 # not in chat anymore, see everyone who was in chat when we left 

133 return [ 

134 sub.user_id 

135 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

136 .where(GroupChatSubscription.joined <= subscription.left) 

137 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

138 ] 

139 

140 

141def _user_can_message(session: Session, context: CouchersContext, group_chat: GroupChat) -> bool: 

142 """ 

143 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant 

144 - Is not deleted/banned 

145 - Has not been blocked by the user or is blocking the user 

146 - Has not left the chat 

147 """ 

148 if not group_chat.is_dm: 

149 return True 

150 

151 query = select( 

152 where_users_column_visible( 

153 select(GroupChatSubscription) 

154 .where(GroupChatSubscription.user_id != context.user_id) 

155 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

156 .where(GroupChatSubscription.left == None), 

157 context=context, 

158 column=GroupChatSubscription.user_id, 

159 ).exists() 

160 ) 

161 return session.execute(query).scalar_one() 

162 

163 

164def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload) -> None: 

165 """ 

166 Background job to generate notifications for a message sent to a group chat 

167 """ 

168 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

169 

170 with session_scope() as session: 

171 message, group_chat = session.execute( 

172 select(Message, GroupChat) 

173 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

174 .where(Message.id == payload.message_id) 

175 ).one() 

176 

177 if message.message_type != MessageType.text: 

178 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

179 return 

180 

181 context = make_background_user_context(user_id=message.author_id) 

182 user_ids_to_notify = ( 

183 session.execute( 

184 where_users_column_visible( 

185 select(GroupChatSubscription.user_id) 

186 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

187 .where(GroupChatSubscription.user_id != message.author_id) 

188 .where(GroupChatSubscription.joined <= message.time) 

189 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

190 .where(not_(GroupChatSubscription.is_muted)), 

191 context=context, 

192 column=GroupChatSubscription.user_id, 

193 ) 

194 ) 

195 .scalars() 

196 .all() 

197 ) 

198 

199 if group_chat.is_dm: 

200 msg = f"{message.author.name} sent you a message" 

201 else: 

202 msg = f"{message.author.name} sent a message in {group_chat.title}" 

203 

204 for user_id in user_ids_to_notify: 

205 notify( 

206 session, 

207 user_id=user_id, 

208 topic_action=NotificationTopicAction.chat__message, 

209 key=str(message.conversation_id), 

210 data=notification_data_pb2.ChatMessage( 

211 author=user_model_to_pb( 

212 message.author, 

213 session, 

214 make_background_user_context(user_id=user_id), 

215 ), 

216 message=msg, 

217 text=message.text, 

218 group_chat_id=message.conversation_id, 

219 ), 

220 moderation_state_id=group_chat.moderation_state_id, 

221 ) 

222 

223 

224def _add_message_to_subscription(session: Session, subscription: GroupChatSubscription, **kwargs: Any) -> Message: 

225 """ 

226 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

227 

228 Specify the keyword args for Message 

229 """ 

230 message = Message(conversation_id=subscription.group_chat.conversation.id, author_id=subscription.user_id, **kwargs) 

231 

232 session.add(message) 

233 session.flush() 

234 

235 subscription.last_seen_message_id = message.id 

236 

237 queue_job( 

238 session, 

239 job=generate_message_notifications, 

240 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

241 message_id=message.id, 

242 ), 

243 ) 

244 

245 return message 

246 

247 

248def _create_chat( 

249 session: Session, 

250 creator_id: int, 

251 recipient_ids: Sequence[int], 

252 title: str | None = None, 

253 only_admins_invite: bool = True, 

254) -> GroupChat: 

255 conversation = Conversation() 

256 session.add(conversation) 

257 session.flush() 

258 

259 # Create moderation state for UMS (starts as SHADOWED) 

260 moderation_state = create_moderation( 

261 session=session, 

262 object_type=ModerationObjectType.group_chat, 

263 object_id=conversation.id, 

264 creator_user_id=creator_id, 

265 ) 

266 

267 chat = GroupChat( 

268 conversation_id=conversation.id, 

269 title=title, 

270 creator_id=creator_id, 

271 is_dm=True if len(recipient_ids) == 1 else False, 

272 only_admins_invite=only_admins_invite, 

273 moderation_state_id=moderation_state.id, 

274 ) 

275 session.add(chat) 

276 session.flush() 

277 

278 creator_subscription = GroupChatSubscription( 

279 user_id=creator_id, 

280 group_chat_id=chat.conversation_id, 

281 role=GroupChatRole.admin, 

282 ) 

283 session.add(creator_subscription) 

284 

285 for uid in recipient_ids: 

286 session.add( 

287 GroupChatSubscription( 

288 user_id=uid, 

289 group_chat_id=chat.conversation_id, 

290 role=GroupChatRole.participant, 

291 ) 

292 ) 

293 

294 return chat 

295 

296 

297def _get_message_subscription(session: Session, user_id: int, conversation_id: int) -> GroupChatSubscription: 

298 subscription = session.execute( 

299 select(GroupChatSubscription) 

300 .where(GroupChatSubscription.group_chat_id == conversation_id) 

301 .where(GroupChatSubscription.user_id == user_id) 

302 .where(GroupChatSubscription.left == None) 

303 ).scalar_one_or_none() 

304 

305 return cast(GroupChatSubscription, subscription) 

306 

307 

308def _get_visible_message_subscription( 

309 session: Session, context: CouchersContext, conversation_id: int 

310) -> GroupChatSubscription: 

311 """Get subscription with visibility filtering""" 

312 subscription = session.execute( 

313 where_moderated_content_visible( 

314 select(GroupChatSubscription) 

315 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

316 .where(GroupChatSubscription.group_chat_id == conversation_id) 

317 .where(GroupChatSubscription.user_id == context.user_id) 

318 .where(GroupChatSubscription.left == None), 

319 context, 

320 GroupChat, 

321 is_list_operation=False, 

322 ) 

323 ).scalar_one_or_none() 

324 

325 return cast(GroupChatSubscription, subscription) 

326 

327 

328def _unseen_message_count(session: Session, subscription_id: int) -> int: 

329 query = ( 

330 select(func.count()) 

331 .select_from(Message) 

332 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

333 .where(GroupChatSubscription.id == subscription_id) 

334 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

335 ) 

336 return session.execute(query).scalar_one() 

337 

338 

339def _mute_info(subscription: GroupChatSubscription) -> conversations_pb2.MuteInfo: 

340 (muted, muted_until) = subscription.muted_display() 

341 return conversations_pb2.MuteInfo( 

342 muted=muted, 

343 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

344 ) 

345 

346 

347class Conversations(conversations_pb2_grpc.ConversationsServicer): 

348 def ListGroupChats( 

349 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session 

350 ) -> conversations_pb2.ListGroupChatsRes: 

351 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

352 page_size = min(page_size, MAX_PAGE_SIZE) 

353 

354 # select group chats where you have a subscription, and for each of 

355 # these, the latest message from them 

356 

357 t = ( 

358 select( 

359 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

360 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

361 func.max(Message.id).label("message_id"), 

362 ) 

363 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

364 .where(GroupChatSubscription.user_id == context.user_id) 

365 .where(Message.time >= GroupChatSubscription.joined) 

366 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

367 .where( 

368 or_( 

369 to_bool(request.HasField("only_archived") == False), 

370 GroupChatSubscription.is_archived == request.only_archived, 

371 ) 

372 ) 

373 .group_by(GroupChatSubscription.group_chat_id) 

374 .order_by(func.max(Message.id).desc()) 

375 .subquery() 

376 ) 

377 

378 results = session.execute( 

379 where_moderated_content_visible( 

380 select(t, GroupChat, GroupChatSubscription, Message) 

381 .join(Message, Message.id == t.c.message_id) 

382 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

383 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

384 .join(Conversation, Conversation.id == GroupChat.conversation_id) 

385 .options(contains_eager(GroupChat.conversation)) 

386 .where(or_(t.c.message_id < request.last_message_id, to_bool(request.last_message_id == 0))) 

387 .order_by(t.c.message_id.desc()) 

388 .limit(page_size + 1), 

389 context, 

390 GroupChat, 

391 is_list_operation=True, 

392 ) 

393 ).all() 

394 

395 # Batch: unseen message counts in one query instead of N individual queries 

396 subscription_ids = [r.GroupChatSubscription.id for r in results[:page_size]] 

397 unseen_counts: dict[int, int] = dict( 

398 session.execute( # type: ignore[arg-type] 

399 select(GroupChatSubscription.id, func.count(Message.id)) 

400 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

401 .where(GroupChatSubscription.id.in_(subscription_ids)) 

402 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

403 .group_by(GroupChatSubscription.id) 

404 ).all() 

405 ) 

406 

407 return conversations_pb2.ListGroupChatsRes( 

408 group_chats=[ 

409 conversations_pb2.GroupChat( 

410 group_chat_id=result.GroupChat.conversation_id, 

411 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

412 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

413 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

414 only_admins_invite=result.GroupChat.only_admins_invite, 

415 is_dm=result.GroupChat.is_dm, 

416 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

417 unseen_message_count=unseen_counts.get(result.GroupChatSubscription.id, 0), 

418 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

419 latest_message=_message_to_pb(result.Message) if result.Message else None, 

420 mute_info=_mute_info(result.GroupChatSubscription), 

421 can_message=_user_can_message(session, context, result.GroupChat), 

422 is_archived=result.GroupChatSubscription.is_archived, 

423 ) 

424 for result in results[:page_size] 

425 ], 

426 last_message_id=( 

427 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

428 ), # TODO 

429 no_more=len(results) <= page_size, 

430 ) 

431 

432 def GetGroupChat( 

433 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session 

434 ) -> conversations_pb2.GroupChat: 

435 result = session.execute( 

436 where_moderated_content_visible( 

437 select(GroupChat, GroupChatSubscription, Message) 

438 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

439 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

440 .join(Conversation, Conversation.id == GroupChat.conversation_id) 

441 .options(contains_eager(GroupChat.conversation)) 

442 .where(GroupChatSubscription.user_id == context.user_id) 

443 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

444 .where(Message.time >= GroupChatSubscription.joined) 

445 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

446 .order_by(Message.id.desc()) 

447 .limit(1), 

448 context, 

449 GroupChat, 

450 is_list_operation=False, 

451 ) 

452 ).one_or_none() 

453 

454 if not result: 

455 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

456 

457 return conversations_pb2.GroupChat( 

458 group_chat_id=result.GroupChat.conversation_id, 

459 title=result.GroupChat.title, 

460 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

461 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

462 only_admins_invite=result.GroupChat.only_admins_invite, 

463 is_dm=result.GroupChat.is_dm, 

464 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

465 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

466 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

467 latest_message=_message_to_pb(result.Message) if result.Message else None, 

468 mute_info=_mute_info(result.GroupChatSubscription), 

469 can_message=_user_can_message(session, context, result.GroupChat), 

470 is_archived=result.GroupChatSubscription.is_archived, 

471 ) 

472 

473 def GetDirectMessage( 

474 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session 

475 ) -> conversations_pb2.GroupChat: 

476 count = func.count(GroupChatSubscription.id).label("count") 

477 subquery = ( 

478 select(GroupChatSubscription.group_chat_id) 

479 .where( 

480 or_( 

481 GroupChatSubscription.user_id == context.user_id, 

482 GroupChatSubscription.user_id == request.user_id, 

483 ) 

484 ) 

485 .where(GroupChatSubscription.left == None) 

486 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

487 .where(GroupChat.is_dm == True) 

488 .group_by(GroupChatSubscription.group_chat_id) 

489 .having(count == 2) 

490 .subquery() 

491 ) 

492 

493 result = session.execute( 

494 where_moderated_content_visible( 

495 select(subquery, GroupChat, GroupChatSubscription, Message) 

496 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

497 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

498 .join(Conversation, Conversation.id == GroupChat.conversation_id) 

499 .options(contains_eager(GroupChat.conversation)) 

500 .where(GroupChatSubscription.user_id == context.user_id) 

501 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

502 .where(Message.time >= GroupChatSubscription.joined) 

503 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

504 .order_by(Message.id.desc()) 

505 .limit(1), 

506 context, 

507 GroupChat, 

508 is_list_operation=False, 

509 ) 

510 ).one_or_none() 

511 

512 if not result: 

513 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

514 

515 return conversations_pb2.GroupChat( 

516 group_chat_id=result.GroupChat.conversation_id, 

517 title=result.GroupChat.title, 

518 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

519 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

520 only_admins_invite=result.GroupChat.only_admins_invite, 

521 is_dm=result.GroupChat.is_dm, 

522 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

523 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

524 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

525 latest_message=_message_to_pb(result.Message) if result.Message else None, 

526 mute_info=_mute_info(result.GroupChatSubscription), 

527 can_message=_user_can_message(session, context, result.GroupChat), 

528 is_archived=result.GroupChatSubscription.is_archived, 

529 ) 

530 

531 def GetUpdates( 

532 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session 

533 ) -> conversations_pb2.GetUpdatesRes: 

534 results = ( 

535 session.execute( 

536 where_moderated_content_visible( 

537 select(Message) 

538 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

539 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

540 .where(GroupChatSubscription.user_id == context.user_id) 

541 .where(Message.time >= GroupChatSubscription.joined) 

542 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

543 .where(Message.id > request.newest_message_id) 

544 .order_by(Message.id.asc()) 

545 .limit(DEFAULT_PAGINATION_LENGTH + 1), 

546 context, 

547 GroupChat, 

548 is_list_operation=False, 

549 ) 

550 ) 

551 .scalars() 

552 .all() 

553 ) 

554 

555 return conversations_pb2.GetUpdatesRes( 

556 updates=[ 

557 conversations_pb2.Update( 

558 group_chat_id=message.conversation_id, 

559 message=_message_to_pb(message), 

560 ) 

561 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

562 ], 

563 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

564 ) 

565 

566 def GetGroupChatMessages( 

567 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session 

568 ) -> conversations_pb2.GetGroupChatMessagesRes: 

569 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

570 page_size = min(page_size, MAX_PAGE_SIZE) 

571 

572 results = ( 

573 session.execute( 

574 where_moderated_content_visible( 

575 select(Message) 

576 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

577 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

578 .where(GroupChatSubscription.user_id == context.user_id) 

579 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

580 .where(Message.time >= GroupChatSubscription.joined) 

581 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

582 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

583 .where( 

584 or_(Message.id > GroupChatSubscription.last_seen_message_id, to_bool(request.only_unseen == 0)) 

585 ) 

586 .order_by(Message.id.desc()) 

587 .limit(page_size + 1), 

588 context, 

589 GroupChat, 

590 is_list_operation=False, 

591 ) 

592 ) 

593 .scalars() 

594 .all() 

595 ) 

596 

597 return conversations_pb2.GetGroupChatMessagesRes( 

598 messages=[_message_to_pb(message) for message in results[:page_size]], 

599 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

600 no_more=len(results) <= page_size, 

601 ) 

602 

603 def MarkLastSeenGroupChat( 

604 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session 

605 ) -> empty_pb2.Empty: 

606 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

607 

608 if not subscription: 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true

609 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

610 

611 if not subscription.last_seen_message_id <= request.last_seen_message_id: 611 ↛ 612line 611 didn't jump to line 612 because the condition on line 611 was never true

612 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

613 

614 subscription.last_seen_message_id = request.last_seen_message_id 

615 

616 return empty_pb2.Empty() 

617 

618 def MuteGroupChat( 

619 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session 

620 ) -> empty_pb2.Empty: 

621 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

622 

623 if not subscription: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true

624 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

625 

626 if request.unmute: 

627 subscription.muted_until = DATETIME_MINUS_INFINITY 

628 elif request.forever: 

629 subscription.muted_until = DATETIME_INFINITY 

630 elif request.for_duration: 630 ↛ 636line 630 didn't jump to line 636 because the condition on line 630 was always true

631 duration = request.for_duration.ToTimedelta() 

632 if duration < timedelta(seconds=0): 632 ↛ 633line 632 didn't jump to line 633 because the condition on line 632 was never true

633 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past") 

634 subscription.muted_until = now() + duration 

635 

636 return empty_pb2.Empty() 

637 

638 def SetGroupChatArchiveStatus( 

639 self, request: conversations_pb2.SetGroupChatArchiveStatusReq, context: CouchersContext, session: Session 

640 ) -> conversations_pb2.SetGroupChatArchiveStatusRes: 

641 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

642 

643 if not subscription: 

644 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

645 

646 subscription.is_archived = request.is_archived 

647 

648 return conversations_pb2.SetGroupChatArchiveStatusRes( 

649 group_chat_id=request.group_chat_id, 

650 is_archived=request.is_archived, 

651 ) 

652 

653 def SearchMessages( 

654 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session 

655 ) -> conversations_pb2.SearchMessagesRes: 

656 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

657 page_size = min(page_size, MAX_PAGE_SIZE) 

658 

659 results = ( 

660 session.execute( 

661 where_moderated_content_visible( 

662 select(Message) 

663 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

664 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

665 .where(GroupChatSubscription.user_id == context.user_id) 

666 .where(Message.time >= GroupChatSubscription.joined) 

667 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

668 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

669 .where(Message.text.ilike(f"%{request.query}%")) 

670 .order_by(Message.id.desc()) 

671 .limit(page_size + 1), 

672 context, 

673 GroupChat, 

674 is_list_operation=True, 

675 ) 

676 ) 

677 .scalars() 

678 .all() 

679 ) 

680 

681 return conversations_pb2.SearchMessagesRes( 

682 results=[ 

683 conversations_pb2.MessageSearchResult( 

684 group_chat_id=message.conversation_id, 

685 message=_message_to_pb(message), 

686 ) 

687 for message in results[:page_size] 

688 ], 

689 last_message_id=results[-2].id if len(results) > 1 else 0, 

690 no_more=len(results) <= page_size, 

691 ) 

692 

693 def CreateGroupChat( 

694 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session 

695 ) -> conversations_pb2.GroupChat: 

696 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

697 if not has_completed_profile(session, user): 

698 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

699 

700 recipient_user_ids = list( 

701 session.execute( 

702 select(User.id).where(users_visible(context)).where(User.id.in_(request.recipient_user_ids)) 

703 ) 

704 .scalars() 

705 .all() 

706 ) 

707 

708 # make sure all requested users are visible 

709 if len(recipient_user_ids) != len(request.recipient_user_ids): 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true

710 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

711 

712 if not recipient_user_ids: 712 ↛ 713line 712 didn't jump to line 713 because the condition on line 712 was never true

713 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

714 

715 if len(recipient_user_ids) != len(set(recipient_user_ids)): 715 ↛ 717line 715 didn't jump to line 717 because the condition on line 715 was never true

716 # make sure there's no duplicate users 

717 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients") 

718 

719 if context.user_id in recipient_user_ids: 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true

720 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

721 

722 if len(recipient_user_ids) == 1: 

723 # can only have one DM at a time between any two users 

724 other_user_id = recipient_user_ids[0] 

725 

726 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

727 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

728 # chat, you know they already have a shared group chat 

729 count = func.count(GroupChatSubscription.id).label("count") 

730 if session.execute( 

731 where_moderated_content_visible( 

732 select(count) 

733 .where( 

734 or_( 

735 GroupChatSubscription.user_id == context.user_id, 

736 GroupChatSubscription.user_id == other_user_id, 

737 ) 

738 ) 

739 .where(GroupChatSubscription.left == None) 

740 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

741 .where(GroupChat.is_dm == True) 

742 .group_by(GroupChatSubscription.group_chat_id) 

743 .having(count == 2), 

744 context, 

745 GroupChat, 

746 is_list_operation=False, 

747 ) 

748 ).scalar_one_or_none(): 

749 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm") 

750 

751 # Check if user has been initiating chats excessively 

752 if process_rate_limits_and_check_abort( 

753 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation 

754 ): 

755 context.abort_with_error_code( 

756 grpc.StatusCode.RESOURCE_EXHAUSTED, 

757 "chat_initiation_rate_limit2", 

758 substitutions={"count": RATE_LIMIT_HOURS}, 

759 ) 

760 

761 group_chat = _create_chat( 

762 session, 

763 creator_id=context.user_id, 

764 recipient_ids=request.recipient_user_ids, 

765 title=request.title.value, 

766 ) 

767 

768 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id) 

769 

770 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

771 

772 session.flush() 

773 

774 log_event( 

775 context, 

776 session, 

777 "group_chat.created", 

778 { 

779 "group_chat_id": group_chat.conversation_id, 

780 "is_dm": group_chat.is_dm, 

781 "recipient_count": len(request.recipient_user_ids), 

782 }, 

783 ) 

784 

785 return conversations_pb2.GroupChat( 

786 group_chat_id=group_chat.conversation_id, 

787 title=group_chat.title, 

788 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

789 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

790 only_admins_invite=group_chat.only_admins_invite, 

791 is_dm=group_chat.is_dm, 

792 created=Timestamp_from_datetime(group_chat.conversation.created), 

793 mute_info=_mute_info(your_subscription), 

794 can_message=True, 

795 ) 

796 

797 def SendMessage( 

798 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session 

799 ) -> empty_pb2.Empty: 

800 if request.text == "": 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true

801 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

802 

803 result = session.execute( 

804 where_moderated_content_visible( 

805 select(GroupChatSubscription, GroupChat) 

806 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

807 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

808 .where(GroupChatSubscription.user_id == context.user_id) 

809 .where(GroupChatSubscription.left == None), 

810 context, 

811 GroupChat, 

812 is_list_operation=False, 

813 ) 

814 ).one_or_none() 

815 if not result: 

816 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

817 

818 subscription, group_chat = result._tuple() 

819 if not _user_can_message(session, context, group_chat): 

820 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat") 

821 

822 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

823 

824 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

825 sent_messages_counter.labels( 

826 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

827 ).inc() 

828 log_event( 

829 context, 

830 session, 

831 "message.sent", 

832 {"group_chat_id": request.group_chat_id, "is_dm": subscription.group_chat.is_dm}, 

833 ) 

834 

835 return empty_pb2.Empty() 

836 

837 def SendDirectMessage( 

838 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session 

839 ) -> conversations_pb2.SendDirectMessageRes: 

840 user_id = context.user_id 

841 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

842 

843 recipient_id = request.recipient_user_id 

844 

845 if not has_completed_profile(session, user): 845 ↛ 846line 845 didn't jump to line 846 because the condition on line 845 was never true

846 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

847 

848 if not recipient_id: 848 ↛ 849line 848 didn't jump to line 849 because the condition on line 848 was never true

849 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

850 

851 recipient_user_id = session.execute( 

852 select(User.id).where(users_visible(context)).where(User.id == recipient_id) 

853 ).scalar_one_or_none() 

854 

855 if not recipient_user_id: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true

856 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

857 

858 if user_id == recipient_id: 858 ↛ 859line 858 didn't jump to line 859 because the condition on line 858 was never true

859 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

860 

861 if request.text == "": 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true

862 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

863 

864 # Look for an existing direct message (DM) chat between the two users 

865 dm_chat_ids = ( 

866 select(GroupChatSubscription.group_chat_id) 

867 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id])) 

868 .group_by(GroupChatSubscription.group_chat_id) 

869 .having(func.count(GroupChatSubscription.user_id) == 2) 

870 ) 

871 

872 chat = session.execute( 

873 where_moderated_content_visible( 

874 select(GroupChat) 

875 .where(GroupChat.is_dm == True) 

876 .where(GroupChat.conversation_id.in_(dm_chat_ids)) 

877 .limit(1), 

878 context, 

879 GroupChat, 

880 is_list_operation=False, 

881 ) 

882 ).scalar_one_or_none() 

883 

884 if not chat: 

885 chat = _create_chat(session, user_id, [recipient_id]) 

886 

887 # Retrieve the sender's active subscription to the chat 

888 subscription = _get_message_subscription(session, user_id, chat.conversation_id) 

889 

890 # Add the message to the conversation 

891 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

892 

893 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one() 

894 sent_messages_counter.labels(user_gender, "direct message").inc() 

895 log_event( 

896 context, 

897 session, 

898 "message.sent", 

899 {"group_chat_id": chat.conversation_id, "is_dm": True, "recipient_id": recipient_id}, 

900 ) 

901 

902 session.flush() 

903 

904 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id) 

905 

906 def EditGroupChat( 

907 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session 

908 ) -> empty_pb2.Empty: 

909 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

910 

911 if not subscription: 

912 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

913 

914 if subscription.role != GroupChatRole.admin: 

915 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit") 

916 

917 if request.HasField("title"): 

918 subscription.group_chat.title = request.title.value 

919 

920 if request.HasField("only_admins_invite"): 920 ↛ 923line 920 didn't jump to line 923 because the condition on line 920 was always true

921 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

922 

923 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

924 

925 return empty_pb2.Empty() 

926 

927 def MakeGroupChatAdmin( 

928 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session 

929 ) -> empty_pb2.Empty: 

930 if not session.execute( 

931 select(User).where(users_visible(context)).where(User.id == request.user_id) 

932 ).scalar_one_or_none(): 

933 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

934 

935 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

936 

937 if not your_subscription: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true

938 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

939 

940 if your_subscription.role != GroupChatRole.admin: 

941 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin") 

942 

943 if request.user_id == context.user_id: 943 ↛ 944line 943 didn't jump to line 944 because the condition on line 943 was never true

944 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin") 

945 

946 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

947 

948 if not their_subscription: 948 ↛ 949line 948 didn't jump to line 949 because the condition on line 948 was never true

949 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

950 

951 if their_subscription.role != GroupChatRole.participant: 

952 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin") 

953 

954 their_subscription.role = GroupChatRole.admin 

955 

956 _add_message_to_subscription( 

957 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

958 ) 

959 

960 return empty_pb2.Empty() 

961 

962 def RemoveGroupChatAdmin( 

963 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session 

964 ) -> empty_pb2.Empty: 

965 if not session.execute( 

966 select(User).where(users_visible(context)).where(User.id == request.user_id) 

967 ).scalar_one_or_none(): 

968 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

969 

970 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

971 

972 if not your_subscription: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true

973 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

974 

975 if request.user_id == context.user_id: 

976 # Race condition! 

977 other_admins_count = session.execute( 

978 select(func.count()) 

979 .select_from(GroupChatSubscription) 

980 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

981 .where(GroupChatSubscription.user_id != context.user_id) 

982 .where(GroupChatSubscription.role == GroupChatRole.admin) 

983 .where(GroupChatSubscription.left == None) 

984 ).scalar_one() 

985 if not other_admins_count > 0: 

986 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin") 

987 

988 if your_subscription.role != GroupChatRole.admin: 

989 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin") 

990 

991 their_subscription = session.execute( 

992 select(GroupChatSubscription) 

993 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

994 .where(GroupChatSubscription.user_id == request.user_id) 

995 .where(GroupChatSubscription.left == None) 

996 .where(GroupChatSubscription.role == GroupChatRole.admin) 

997 ).scalar_one_or_none() 

998 

999 if not their_subscription: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true

1000 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

1001 

1002 their_subscription.role = GroupChatRole.participant 

1003 

1004 _add_message_to_subscription( 

1005 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

1006 ) 

1007 

1008 return empty_pb2.Empty() 

1009 

1010 def InviteToGroupChat( 

1011 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session 

1012 ) -> empty_pb2.Empty: 

1013 if not session.execute( 

1014 select(User).where(users_visible(context)).where(User.id == request.user_id) 

1015 ).scalar_one_or_none(): 

1016 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1017 

1018 result = session.execute( 

1019 where_moderated_content_visible( 

1020 select(GroupChatSubscription, GroupChat) 

1021 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

1022 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

1023 .where(GroupChatSubscription.user_id == context.user_id) 

1024 .where(GroupChatSubscription.left == None), 

1025 context, 

1026 GroupChat, 

1027 is_list_operation=False, 

1028 ) 

1029 ).one_or_none() 

1030 

1031 if not result: 

1032 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

1033 

1034 your_subscription, group_chat = result._tuple() 

1035 

1036 if request.user_id == context.user_id: 1036 ↛ 1037line 1036 didn't jump to line 1037 because the condition on line 1036 was never true

1037 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self") 

1038 

1039 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

1040 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied") 

1041 

1042 if group_chat.is_dm: 

1043 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm") 

1044 

1045 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

1046 

1047 if their_subscription: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the condition on line 1047 was never true

1048 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat") 

1049 

1050 # TODO: race condition! 

1051 

1052 subscription = GroupChatSubscription( 

1053 user_id=request.user_id, 

1054 group_chat_id=your_subscription.group_chat.conversation_id, 

1055 role=GroupChatRole.participant, 

1056 ) 

1057 session.add(subscription) 

1058 

1059 _add_message_to_subscription( 

1060 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

1061 ) 

1062 

1063 return empty_pb2.Empty() 

1064 

1065 def RemoveGroupChatUser( 

1066 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session 

1067 ) -> empty_pb2.Empty: 

1068 """ 

1069 1. Get admin info and check it's correct 

1070 2. Get user data, check it's correct and remove user 

1071 """ 

1072 # Admin info 

1073 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

1074 

1075 # if user info is missing 

1076 if not your_subscription: 1076 ↛ 1077line 1076 didn't jump to line 1077 because the condition on line 1076 was never true

1077 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

1078 

1079 # if user not admin 

1080 if your_subscription.role != GroupChatRole.admin: 1080 ↛ 1081line 1080 didn't jump to line 1081 because the condition on line 1080 was never true

1081 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user") 

1082 

1083 # if user wants to remove themselves 

1084 if request.user_id == context.user_id: 1084 ↛ 1085line 1084 didn't jump to line 1085 because the condition on line 1084 was never true

1085 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self") 

1086 

1087 # get user info 

1088 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

1089 

1090 # user not found 

1091 if not their_subscription: 

1092 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

1093 

1094 _add_message_to_subscription( 

1095 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

1096 ) 

1097 

1098 their_subscription.left = func.now() 

1099 

1100 return empty_pb2.Empty() 

1101 

1102 def LeaveGroupChat( 

1103 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session 

1104 ) -> empty_pb2.Empty: 

1105 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

1106 

1107 if not subscription: 

1108 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

1109 

1110 if subscription.role == GroupChatRole.admin: 

1111 other_admins_count = session.execute( 

1112 select(func.count()) 

1113 .select_from(GroupChatSubscription) 

1114 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

1115 .where(GroupChatSubscription.user_id != context.user_id) 

1116 .where(GroupChatSubscription.role == GroupChatRole.admin) 

1117 .where(GroupChatSubscription.left == None) 

1118 ).scalar_one() 

1119 participants_count = session.execute( 

1120 select(func.count()) 

1121 .select_from(GroupChatSubscription) 

1122 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

1123 .where(GroupChatSubscription.user_id != context.user_id) 

1124 .where(GroupChatSubscription.role == GroupChatRole.participant) 

1125 .where(GroupChatSubscription.left == None) 

1126 ).scalar_one() 

1127 if not (other_admins_count > 0 or participants_count == 0): 

1128 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave") 

1129 

1130 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

1131 

1132 subscription.left = func.now() 

1133 

1134 return empty_pb2.Empty()