Coverage for src / couchers / servicers / conversations.py: 88%

302 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-09 02:44 +0000

1import logging 

2from collections.abc import Sequence 

3from datetime import timedelta 

4from typing import Any, cast 

5 

6import grpc 

7from google.protobuf import empty_pb2 

8from sqlalchemy import select 

9from sqlalchemy.orm import Session 

10from sqlalchemy.sql import func, not_, or_ 

11 

12from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

13from couchers.context import CouchersContext, make_background_user_context 

14from couchers.db import session_scope 

15from couchers.jobs.enqueue import queue_job 

16from couchers.metrics import sent_messages_counter 

17from couchers.models import ( 

18 Conversation, 

19 GroupChat, 

20 GroupChatRole, 

21 GroupChatSubscription, 

22 Message, 

23 MessageType, 

24 ModerationObjectType, 

25 RateLimitAction, 

26 User, 

27) 

28from couchers.models.notifications import NotificationTopicAction 

29from couchers.moderation.utils import create_moderation 

30from couchers.notifications.notify import notify 

31from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

32from couchers.proto.internal import jobs_pb2 

33from couchers.rate_limits.check import process_rate_limits_and_check_abort 

34from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

35from couchers.servicers.api import user_model_to_pb 

36from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

37from couchers.utils import Timestamp_from_datetime, now 

38 

39logger = logging.getLogger(__name__) 

40 

41# TODO: Still needs custom pagination: GetUpdates 

42DEFAULT_PAGINATION_LENGTH = 20 

43MAX_PAGE_SIZE = 50 

44 

45 

46def _message_to_pb(message: Message) -> conversations_pb2.Message: 

47 """ 

48 Turns the given message to a protocol buffer 

49 """ 

50 if message.is_normal_message: 

51 return conversations_pb2.Message( 

52 message_id=message.id, 

53 author_user_id=message.author_id, 

54 time=Timestamp_from_datetime(message.time), 

55 text=conversations_pb2.MessageContentText(text=message.text), 

56 ) 

57 else: 

58 return conversations_pb2.Message( 

59 message_id=message.id, 

60 author_user_id=message.author_id, 

61 time=Timestamp_from_datetime(message.time), 

62 chat_created=( 

63 conversations_pb2.MessageContentChatCreated() 

64 if message.message_type == MessageType.chat_created 

65 else None 

66 ), 

67 chat_edited=( 

68 conversations_pb2.MessageContentChatEdited() 

69 if message.message_type == MessageType.chat_edited 

70 else None 

71 ), 

72 user_invited=( 

73 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

74 if message.message_type == MessageType.user_invited 

75 else None 

76 ), 

77 user_left=( 

78 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

79 ), 

80 user_made_admin=( 

81 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

82 if message.message_type == MessageType.user_made_admin 

83 else None 

84 ), 

85 user_removed_admin=( 

86 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

87 if message.message_type == MessageType.user_removed_admin 

88 else None 

89 ), 

90 group_chat_user_removed=( 

91 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

92 if message.message_type == MessageType.user_removed 

93 else None 

94 ), 

95 ) 

96 

97 

98def _get_visible_members_for_subscription(subscription: GroupChatSubscription) -> list[int]: 

99 """ 

100 If a user leaves a group chat, they shouldn't be able to see who's added 

101 after they left 

102 """ 

103 if not subscription.left: 

104 # still in the chat, we see everyone with a current subscription 

105 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

106 else: 

107 # not in chat anymore, see everyone who was in chat when we left 

108 return [ 

109 sub.user_id 

110 for sub in subscription.group_chat.subscriptions.where( 

111 GroupChatSubscription.joined <= subscription.left 

112 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

113 ] 

114 

115 

116def _get_visible_admins_for_subscription(subscription: GroupChatSubscription) -> list[int]: 

117 """ 

118 If a user leaves a group chat, they shouldn't be able to see who's added 

119 after they left 

120 """ 

121 if not subscription.left: 

122 # still in the chat, we see everyone with a current subscription 

123 return [ 

124 sub.user_id 

125 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

126 GroupChatSubscription.role == GroupChatRole.admin 

127 ) 

128 ] 

129 else: 

130 # not in chat anymore, see everyone who was in chat when we left 

131 return [ 

132 sub.user_id 

133 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

134 .where(GroupChatSubscription.joined <= subscription.left) 

135 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

136 ] 

137 

138 

139def _user_can_message(session: Session, context: CouchersContext, group_chat: GroupChat) -> bool: 

140 """ 

141 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant 

142 - Is not deleted/banned 

143 - Has not been blocked by the user or is blocking the user 

144 - Has not left the chat 

145 """ 

146 if not group_chat.is_dm: 

147 return True 

148 

149 query = select( 

150 where_users_column_visible( 

151 select(GroupChatSubscription) 

152 .where(GroupChatSubscription.user_id != context.user_id) 

153 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

154 .where(GroupChatSubscription.left == None), 

155 context=context, 

156 column=GroupChatSubscription.user_id, 

157 ).exists() 

158 ) 

159 return session.execute(query).scalar_one() 

160 

161 

162def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload) -> None: 

163 """ 

164 Background job to generate notifications for a message sent to a group chat 

165 """ 

166 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

167 

168 with session_scope() as session: 

169 message, group_chat = session.execute( 

170 select(Message, GroupChat) 

171 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

172 .where(Message.id == payload.message_id) 

173 ).one() 

174 

175 if message.message_type != MessageType.text: 

176 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

177 return 

178 

179 context = make_background_user_context(user_id=message.author_id) 

180 user_ids_to_notify = ( 

181 session.execute( 

182 where_users_column_visible( 

183 select(GroupChatSubscription.user_id) 

184 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

185 .where(GroupChatSubscription.user_id != message.author_id) 

186 .where(GroupChatSubscription.joined <= message.time) 

187 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

188 .where(not_(GroupChatSubscription.is_muted)), 

189 context=context, 

190 column=GroupChatSubscription.user_id, 

191 ) 

192 ) 

193 .scalars() 

194 .all() 

195 ) 

196 

197 if group_chat.is_dm: 

198 msg = f"{message.author.name} sent you a message" 

199 else: 

200 msg = f"{message.author.name} sent a message in {group_chat.title}" 

201 

202 for user_id in user_ids_to_notify: 

203 notify( 

204 session, 

205 user_id=user_id, 

206 topic_action=NotificationTopicAction.chat__message, 

207 key=str(message.conversation_id), 

208 data=notification_data_pb2.ChatMessage( 

209 author=user_model_to_pb( 

210 message.author, 

211 session, 

212 make_background_user_context(user_id=user_id), 

213 ), 

214 message=msg, 

215 text=message.text, 

216 group_chat_id=message.conversation_id, 

217 ), 

218 moderation_state_id=group_chat.moderation_state_id, 

219 ) 

220 

221 

222def _add_message_to_subscription(session: Session, subscription: GroupChatSubscription, **kwargs: Any) -> Message: 

223 """ 

224 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

225 

226 Specify the keyword args for Message 

227 """ 

228 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs) 

229 

230 session.add(message) 

231 session.flush() 

232 

233 subscription.last_seen_message_id = message.id 

234 

235 queue_job( 

236 session, 

237 job=generate_message_notifications, 

238 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

239 message_id=message.id, 

240 ), 

241 ) 

242 

243 return message 

244 

245 

246def _create_chat( 

247 session: Session, 

248 creator_id: int, 

249 recipient_ids: Sequence[int], 

250 title: str | None = None, 

251 only_admins_invite: bool = True, 

252) -> GroupChat: 

253 conversation = Conversation() 

254 session.add(conversation) 

255 session.flush() 

256 

257 # Create moderation state for UMS (starts as SHADOWED) 

258 moderation_state = create_moderation( 

259 session=session, 

260 object_type=ModerationObjectType.GROUP_CHAT, 

261 object_id=conversation.id, 

262 creator_user_id=creator_id, 

263 ) 

264 

265 chat = GroupChat( 

266 conversation_id=conversation.id, 

267 title=title, 

268 creator_id=creator_id, 

269 is_dm=True if len(recipient_ids) == 1 else False, 

270 only_admins_invite=only_admins_invite, 

271 moderation_state_id=moderation_state.id, 

272 ) 

273 session.add(chat) 

274 session.flush() 

275 

276 creator_subscription = GroupChatSubscription( 

277 user_id=creator_id, 

278 group_chat=chat, 

279 role=GroupChatRole.admin, 

280 ) 

281 session.add(creator_subscription) 

282 

283 for uid in recipient_ids: 

284 session.add( 

285 GroupChatSubscription( 

286 user_id=uid, 

287 group_chat=chat, 

288 role=GroupChatRole.participant, 

289 ) 

290 ) 

291 

292 return chat 

293 

294 

295def _get_message_subscription(session: Session, user_id: int, conversation_id: int) -> GroupChatSubscription: 

296 subscription = session.execute( 

297 select(GroupChatSubscription) 

298 .where(GroupChatSubscription.group_chat_id == conversation_id) 

299 .where(GroupChatSubscription.user_id == user_id) 

300 .where(GroupChatSubscription.left == None) 

301 ).scalar_one_or_none() 

302 

303 return cast(GroupChatSubscription, subscription) 

304 

305 

306def _get_visible_message_subscription( 

307 session: Session, context: CouchersContext, conversation_id: int 

308) -> GroupChatSubscription: 

309 """Get subscription with visibility filtering""" 

310 subscription = session.execute( 

311 where_moderated_content_visible( 

312 select(GroupChatSubscription) 

313 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

314 .where(GroupChatSubscription.group_chat_id == conversation_id) 

315 .where(GroupChatSubscription.user_id == context.user_id) 

316 .where(GroupChatSubscription.left == None), 

317 context, 

318 GroupChat, 

319 is_list_operation=False, 

320 ) 

321 ).scalar_one_or_none() 

322 

323 return cast(GroupChatSubscription, subscription) 

324 

325 

326def _unseen_message_count(session: Session, subscription_id: int) -> int: 

327 query = ( 

328 select(func.count()) 

329 .select_from(Message) 

330 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

331 .where(GroupChatSubscription.id == subscription_id) 

332 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

333 ) 

334 return session.execute(query).scalar_one() 

335 

336 

337def _mute_info(subscription: GroupChatSubscription) -> conversations_pb2.MuteInfo: 

338 (muted, muted_until) = subscription.muted_display() 

339 return conversations_pb2.MuteInfo( 

340 muted=muted, 

341 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

342 ) 

343 

344 

345class Conversations(conversations_pb2_grpc.ConversationsServicer): 

346 def ListGroupChats( 

347 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session 

348 ) -> conversations_pb2.ListGroupChatsRes: 

349 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

350 page_size = min(page_size, MAX_PAGE_SIZE) 

351 

352 # select group chats where you have a subscription, and for each of 

353 # these, the latest message from them 

354 

355 t = ( 

356 select( 

357 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

358 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

359 func.max(Message.id).label("message_id"), 

360 ) 

361 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

362 .where(GroupChatSubscription.user_id == context.user_id) 

363 .where(Message.time >= GroupChatSubscription.joined) 

364 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

365 .group_by(GroupChatSubscription.group_chat_id) 

366 .order_by(func.max(Message.id).desc()) 

367 .subquery() 

368 ) 

369 

370 results = session.execute( 

371 where_moderated_content_visible( 

372 select(t, GroupChat, GroupChatSubscription, Message) 

373 .join(Message, Message.id == t.c.message_id) 

374 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

375 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

376 .where(or_(t.c.message_id < request.last_message_id, to_bool(request.last_message_id == 0))) 

377 .order_by(t.c.message_id.desc()) 

378 .limit(page_size + 1), 

379 context, 

380 GroupChat, 

381 is_list_operation=True, 

382 ) 

383 ).all() 

384 

385 return conversations_pb2.ListGroupChatsRes( 

386 group_chats=[ 

387 conversations_pb2.GroupChat( 

388 group_chat_id=result.GroupChat.conversation_id, 

389 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

390 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

391 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

392 only_admins_invite=result.GroupChat.only_admins_invite, 

393 is_dm=result.GroupChat.is_dm, 

394 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

395 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

396 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

397 latest_message=_message_to_pb(result.Message) if result.Message else None, 

398 mute_info=_mute_info(result.GroupChatSubscription), 

399 can_message=_user_can_message(session, context, result.GroupChat), 

400 ) 

401 for result in results[:page_size] 

402 ], 

403 last_message_id=( 

404 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

405 ), # TODO 

406 no_more=len(results) <= page_size, 

407 ) 

408 

409 def GetGroupChat( 

410 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session 

411 ) -> conversations_pb2.GroupChat: 

412 result = session.execute( 

413 where_moderated_content_visible( 

414 select(GroupChat, GroupChatSubscription, Message) 

415 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

416 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

417 .where(GroupChatSubscription.user_id == context.user_id) 

418 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

419 .where(Message.time >= GroupChatSubscription.joined) 

420 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

421 .order_by(Message.id.desc()) 

422 .limit(1), 

423 context, 

424 GroupChat, 

425 is_list_operation=False, 

426 ) 

427 ).one_or_none() 

428 

429 if not result: 

430 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

431 

432 return conversations_pb2.GroupChat( 

433 group_chat_id=result.GroupChat.conversation_id, 

434 title=result.GroupChat.title, 

435 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

436 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

437 only_admins_invite=result.GroupChat.only_admins_invite, 

438 is_dm=result.GroupChat.is_dm, 

439 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

440 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

441 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

442 latest_message=_message_to_pb(result.Message) if result.Message else None, 

443 mute_info=_mute_info(result.GroupChatSubscription), 

444 can_message=_user_can_message(session, context, result.GroupChat), 

445 ) 

446 

447 def GetDirectMessage( 

448 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session 

449 ) -> conversations_pb2.GroupChat: 

450 count = func.count(GroupChatSubscription.id).label("count") 

451 subquery = ( 

452 select(GroupChatSubscription.group_chat_id) 

453 .where( 

454 or_( 

455 GroupChatSubscription.user_id == context.user_id, 

456 GroupChatSubscription.user_id == request.user_id, 

457 ) 

458 ) 

459 .where(GroupChatSubscription.left == None) 

460 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

461 .where(GroupChat.is_dm == True) 

462 .group_by(GroupChatSubscription.group_chat_id) 

463 .having(count == 2) 

464 .subquery() 

465 ) 

466 

467 result = session.execute( 

468 where_moderated_content_visible( 

469 select(subquery, GroupChat, GroupChatSubscription, Message) 

470 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

471 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

472 .where(GroupChatSubscription.user_id == context.user_id) 

473 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

474 .where(Message.time >= GroupChatSubscription.joined) 

475 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

476 .order_by(Message.id.desc()) 

477 .limit(1), 

478 context, 

479 GroupChat, 

480 is_list_operation=False, 

481 ) 

482 ).one_or_none() 

483 

484 if not result: 

485 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

486 

487 return conversations_pb2.GroupChat( 

488 group_chat_id=result.GroupChat.conversation_id, 

489 title=result.GroupChat.title, 

490 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

491 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

492 only_admins_invite=result.GroupChat.only_admins_invite, 

493 is_dm=result.GroupChat.is_dm, 

494 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

495 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

496 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

497 latest_message=_message_to_pb(result.Message) if result.Message else None, 

498 mute_info=_mute_info(result.GroupChatSubscription), 

499 can_message=_user_can_message(session, context, result.GroupChat), 

500 ) 

501 

502 def GetUpdates( 

503 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session 

504 ) -> conversations_pb2.GetUpdatesRes: 

505 results = ( 

506 session.execute( 

507 where_moderated_content_visible( 

508 select(Message) 

509 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

510 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

511 .where(GroupChatSubscription.user_id == context.user_id) 

512 .where(Message.time >= GroupChatSubscription.joined) 

513 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

514 .where(Message.id > request.newest_message_id) 

515 .order_by(Message.id.asc()) 

516 .limit(DEFAULT_PAGINATION_LENGTH + 1), 

517 context, 

518 GroupChat, 

519 is_list_operation=False, 

520 ) 

521 ) 

522 .scalars() 

523 .all() 

524 ) 

525 

526 return conversations_pb2.GetUpdatesRes( 

527 updates=[ 

528 conversations_pb2.Update( 

529 group_chat_id=message.conversation_id, 

530 message=_message_to_pb(message), 

531 ) 

532 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

533 ], 

534 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

535 ) 

536 

537 def GetGroupChatMessages( 

538 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session 

539 ) -> conversations_pb2.GetGroupChatMessagesRes: 

540 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

541 page_size = min(page_size, MAX_PAGE_SIZE) 

542 

543 results = ( 

544 session.execute( 

545 where_moderated_content_visible( 

546 select(Message) 

547 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

548 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

549 .where(GroupChatSubscription.user_id == context.user_id) 

550 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

551 .where(Message.time >= GroupChatSubscription.joined) 

552 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

553 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

554 .where( 

555 or_(Message.id > GroupChatSubscription.last_seen_message_id, to_bool(request.only_unseen == 0)) 

556 ) 

557 .order_by(Message.id.desc()) 

558 .limit(page_size + 1), 

559 context, 

560 GroupChat, 

561 is_list_operation=False, 

562 ) 

563 ) 

564 .scalars() 

565 .all() 

566 ) 

567 

568 return conversations_pb2.GetGroupChatMessagesRes( 

569 messages=[_message_to_pb(message) for message in results[:page_size]], 

570 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

571 no_more=len(results) <= page_size, 

572 ) 

573 

574 def MarkLastSeenGroupChat( 

575 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session 

576 ) -> empty_pb2.Empty: 

577 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

578 

579 if not subscription: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true

580 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

581 

582 if not subscription.last_seen_message_id <= request.last_seen_message_id: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true

583 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

584 

585 subscription.last_seen_message_id = request.last_seen_message_id 

586 

587 return empty_pb2.Empty() 

588 

589 def MuteGroupChat( 

590 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session 

591 ) -> empty_pb2.Empty: 

592 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

593 

594 if not subscription: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

596 

597 if request.unmute: 

598 subscription.muted_until = DATETIME_MINUS_INFINITY 

599 elif request.forever: 

600 subscription.muted_until = DATETIME_INFINITY 

601 elif request.for_duration: 601 ↛ 607line 601 didn't jump to line 607 because the condition on line 601 was always true

602 duration = request.for_duration.ToTimedelta() 

603 if duration < timedelta(seconds=0): 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past") 

605 subscription.muted_until = now() + duration 

606 

607 return empty_pb2.Empty() 

608 

609 def SearchMessages( 

610 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session 

611 ) -> conversations_pb2.SearchMessagesRes: 

612 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

613 page_size = min(page_size, MAX_PAGE_SIZE) 

614 

615 results = ( 

616 session.execute( 

617 where_moderated_content_visible( 

618 select(Message) 

619 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

620 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

621 .where(GroupChatSubscription.user_id == context.user_id) 

622 .where(Message.time >= GroupChatSubscription.joined) 

623 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

624 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

625 .where(Message.text.ilike(f"%{request.query}%")) 

626 .order_by(Message.id.desc()) 

627 .limit(page_size + 1), 

628 context, 

629 GroupChat, 

630 is_list_operation=True, 

631 ) 

632 ) 

633 .scalars() 

634 .all() 

635 ) 

636 

637 return conversations_pb2.SearchMessagesRes( 

638 results=[ 

639 conversations_pb2.MessageSearchResult( 

640 group_chat_id=message.conversation_id, 

641 message=_message_to_pb(message), 

642 ) 

643 for message in results[:page_size] 

644 ], 

645 last_message_id=results[-2].id if len(results) > 1 else 0, 

646 no_more=len(results) <= page_size, 

647 ) 

648 

649 def CreateGroupChat( 

650 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session 

651 ) -> conversations_pb2.GroupChat: 

652 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

653 if not user.has_completed_profile: 

654 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

655 

656 recipient_user_ids = list( 

657 session.execute( 

658 select(User.id).where(users_visible(context)).where(User.id.in_(request.recipient_user_ids)) 

659 ) 

660 .scalars() 

661 .all() 

662 ) 

663 

664 # make sure all requested users are visible 

665 if len(recipient_user_ids) != len(request.recipient_user_ids): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

667 

668 if not recipient_user_ids: 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true

669 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

670 

671 if len(recipient_user_ids) != len(set(recipient_user_ids)): 671 ↛ 673line 671 didn't jump to line 673 because the condition on line 671 was never true

672 # make sure there's no duplicate users 

673 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients") 

674 

675 if context.user_id in recipient_user_ids: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true

676 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

677 

678 if len(recipient_user_ids) == 1: 

679 # can only have one DM at a time between any two users 

680 other_user_id = recipient_user_ids[0] 

681 

682 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

683 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

684 # chat, you know they already have a shared group chat 

685 count = func.count(GroupChatSubscription.id).label("count") 

686 if session.execute( 

687 where_moderated_content_visible( 

688 select(count) 

689 .where( 

690 or_( 

691 GroupChatSubscription.user_id == context.user_id, 

692 GroupChatSubscription.user_id == other_user_id, 

693 ) 

694 ) 

695 .where(GroupChatSubscription.left == None) 

696 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

697 .where(GroupChat.is_dm == True) 

698 .group_by(GroupChatSubscription.group_chat_id) 

699 .having(count == 2), 

700 context, 

701 GroupChat, 

702 is_list_operation=False, 

703 ) 

704 ).scalar_one_or_none(): 

705 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm") 

706 

707 # Check if user has been initiating chats excessively 

708 if process_rate_limits_and_check_abort( 

709 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation 

710 ): 

711 context.abort_with_error_code( 

712 grpc.StatusCode.RESOURCE_EXHAUSTED, 

713 "chat_initiation_rate_limit", 

714 substitutions={"hours": str(RATE_LIMIT_HOURS)}, 

715 ) 

716 

717 group_chat = _create_chat( 

718 session, 

719 creator_id=context.user_id, 

720 recipient_ids=request.recipient_user_ids, 

721 title=request.title.value, 

722 ) 

723 

724 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id) 

725 

726 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

727 

728 session.flush() 

729 

730 return conversations_pb2.GroupChat( 

731 group_chat_id=group_chat.conversation_id, 

732 title=group_chat.title, 

733 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

734 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

735 only_admins_invite=group_chat.only_admins_invite, 

736 is_dm=group_chat.is_dm, 

737 created=Timestamp_from_datetime(group_chat.conversation.created), 

738 mute_info=_mute_info(your_subscription), 

739 can_message=True, 

740 ) 

741 

742 def SendMessage( 

743 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session 

744 ) -> empty_pb2.Empty: 

745 if request.text == "": 745 ↛ 746line 745 didn't jump to line 746 because the condition on line 745 was never true

746 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

747 

748 result = session.execute( 

749 where_moderated_content_visible( 

750 select(GroupChatSubscription, GroupChat) 

751 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

752 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

753 .where(GroupChatSubscription.user_id == context.user_id) 

754 .where(GroupChatSubscription.left == None), 

755 context, 

756 GroupChat, 

757 is_list_operation=False, 

758 ) 

759 ).one_or_none() 

760 if not result: 

761 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

762 

763 subscription, group_chat = result 

764 if not _user_can_message(session, context, group_chat): 

765 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat") 

766 

767 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

768 

769 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

770 sent_messages_counter.labels( 

771 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

772 ).inc() 

773 

774 return empty_pb2.Empty() 

775 

776 def SendDirectMessage( 

777 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session 

778 ) -> conversations_pb2.SendDirectMessageRes: 

779 user_id = context.user_id 

780 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

781 

782 recipient_id = request.recipient_user_id 

783 

784 if not user.has_completed_profile: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true

785 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

786 

787 if not recipient_id: 787 ↛ 788line 787 didn't jump to line 788 because the condition on line 787 was never true

788 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

789 

790 recipient_user_id = session.execute( 

791 select(User.id).where(users_visible(context)).where(User.id == recipient_id) 

792 ).scalar_one_or_none() 

793 

794 if not recipient_user_id: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true

795 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

796 

797 if user_id == recipient_id: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

799 

800 if request.text == "": 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true

801 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

802 

803 # Look for an existing direct message (DM) chat between the two users 

804 dm_chat_ids = ( 

805 select(GroupChatSubscription.group_chat_id) 

806 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id])) 

807 .group_by(GroupChatSubscription.group_chat_id) 

808 .having(func.count(GroupChatSubscription.user_id) == 2) 

809 ) 

810 

811 chat = session.execute( 

812 where_moderated_content_visible( 

813 select(GroupChat) 

814 .where(GroupChat.is_dm == True) 

815 .where(GroupChat.conversation_id.in_(dm_chat_ids)) 

816 .limit(1), 

817 context, 

818 GroupChat, 

819 is_list_operation=False, 

820 ) 

821 ).scalar_one_or_none() 

822 

823 if not chat: 

824 chat = _create_chat(session, user_id, [recipient_id]) 

825 

826 # Retrieve the sender's active subscription to the chat 

827 subscription = _get_message_subscription(session, user_id, chat.conversation_id) 

828 

829 # Add the message to the conversation 

830 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

831 

832 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one() 

833 sent_messages_counter.labels(user_gender, "direct message").inc() 

834 

835 session.flush() 

836 

837 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id) 

838 

839 def EditGroupChat( 

840 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session 

841 ) -> empty_pb2.Empty: 

842 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

843 

844 if not subscription: 

845 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

846 

847 if subscription.role != GroupChatRole.admin: 

848 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit") 

849 

850 if request.HasField("title"): 

851 subscription.group_chat.title = request.title.value 

852 

853 if request.HasField("only_admins_invite"): 853 ↛ 856line 853 didn't jump to line 856 because the condition on line 853 was always true

854 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

855 

856 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

857 

858 return empty_pb2.Empty() 

859 

860 def MakeGroupChatAdmin( 

861 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session 

862 ) -> empty_pb2.Empty: 

863 if not session.execute( 

864 select(User).where(users_visible(context)).where(User.id == request.user_id) 

865 ).scalar_one_or_none(): 

866 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

867 

868 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

869 

870 if not your_subscription: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true

871 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

872 

873 if your_subscription.role != GroupChatRole.admin: 

874 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin") 

875 

876 if request.user_id == context.user_id: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin") 

878 

879 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

880 

881 if not their_subscription: 881 ↛ 882line 881 didn't jump to line 882 because the condition on line 881 was never true

882 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

883 

884 if their_subscription.role != GroupChatRole.participant: 

885 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin") 

886 

887 their_subscription.role = GroupChatRole.admin 

888 

889 _add_message_to_subscription( 

890 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

891 ) 

892 

893 return empty_pb2.Empty() 

894 

895 def RemoveGroupChatAdmin( 

896 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session 

897 ) -> empty_pb2.Empty: 

898 if not session.execute( 

899 select(User).where(users_visible(context)).where(User.id == request.user_id) 

900 ).scalar_one_or_none(): 

901 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

902 

903 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

904 

905 if not your_subscription: 905 ↛ 906line 905 didn't jump to line 906 because the condition on line 905 was never true

906 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

907 

908 if request.user_id == context.user_id: 

909 # Race condition! 

910 other_admins_count = session.execute( 

911 select(func.count()) 

912 .select_from(GroupChatSubscription) 

913 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

914 .where(GroupChatSubscription.user_id != context.user_id) 

915 .where(GroupChatSubscription.role == GroupChatRole.admin) 

916 .where(GroupChatSubscription.left == None) 

917 ).scalar_one() 

918 if not other_admins_count > 0: 

919 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin") 

920 

921 if your_subscription.role != GroupChatRole.admin: 

922 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin") 

923 

924 their_subscription = session.execute( 

925 select(GroupChatSubscription) 

926 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

927 .where(GroupChatSubscription.user_id == request.user_id) 

928 .where(GroupChatSubscription.left == None) 

929 .where(GroupChatSubscription.role == GroupChatRole.admin) 

930 ).scalar_one_or_none() 

931 

932 if not their_subscription: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true

933 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

934 

935 their_subscription.role = GroupChatRole.participant 

936 

937 _add_message_to_subscription( 

938 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

939 ) 

940 

941 return empty_pb2.Empty() 

942 

943 def InviteToGroupChat( 

944 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session 

945 ) -> empty_pb2.Empty: 

946 if not session.execute( 

947 select(User).where(users_visible(context)).where(User.id == request.user_id) 

948 ).scalar_one_or_none(): 

949 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

950 

951 result = session.execute( 

952 where_moderated_content_visible( 

953 select(GroupChatSubscription, GroupChat) 

954 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

955 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

956 .where(GroupChatSubscription.user_id == context.user_id) 

957 .where(GroupChatSubscription.left == None), 

958 context, 

959 GroupChat, 

960 is_list_operation=False, 

961 ) 

962 ).one_or_none() 

963 

964 if not result: 

965 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

966 

967 your_subscription, group_chat = result 

968 

969 if request.user_id == context.user_id: 969 ↛ 970line 969 didn't jump to line 970 because the condition on line 969 was never true

970 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self") 

971 

972 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

973 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied") 

974 

975 if group_chat.is_dm: 

976 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm") 

977 

978 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

979 

980 if their_subscription: 980 ↛ 981line 980 didn't jump to line 981 because the condition on line 980 was never true

981 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat") 

982 

983 # TODO: race condition! 

984 

985 subscription = GroupChatSubscription( 

986 user_id=request.user_id, 

987 group_chat=your_subscription.group_chat, 

988 role=GroupChatRole.participant, 

989 ) 

990 session.add(subscription) 

991 

992 _add_message_to_subscription( 

993 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

994 ) 

995 

996 return empty_pb2.Empty() 

997 

998 def RemoveGroupChatUser( 

999 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session 

1000 ) -> empty_pb2.Empty: 

1001 """ 

1002 1. Get admin info and check it's correct 

1003 2. Get user data, check it's correct and remove user 

1004 """ 

1005 # Admin info 

1006 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

1007 

1008 # if user info is missing 

1009 if not your_subscription: 1009 ↛ 1010line 1009 didn't jump to line 1010 because the condition on line 1009 was never true

1010 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

1011 

1012 # if user not admin 

1013 if your_subscription.role != GroupChatRole.admin: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true

1014 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user") 

1015 

1016 # if user wants to remove themselves 

1017 if request.user_id == context.user_id: 1017 ↛ 1018line 1017 didn't jump to line 1018 because the condition on line 1017 was never true

1018 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self") 

1019 

1020 # get user info 

1021 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

1022 

1023 # user not found 

1024 if not their_subscription: 

1025 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

1026 

1027 _add_message_to_subscription( 

1028 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

1029 ) 

1030 

1031 their_subscription.left = func.now() 

1032 

1033 return empty_pb2.Empty() 

1034 

1035 def LeaveGroupChat( 

1036 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session 

1037 ) -> empty_pb2.Empty: 

1038 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

1039 

1040 if not subscription: 

1041 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

1042 

1043 if subscription.role == GroupChatRole.admin: 

1044 other_admins_count = session.execute( 

1045 select(func.count()) 

1046 .select_from(GroupChatSubscription) 

1047 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

1048 .where(GroupChatSubscription.user_id != context.user_id) 

1049 .where(GroupChatSubscription.role == GroupChatRole.admin) 

1050 .where(GroupChatSubscription.left == None) 

1051 ).scalar_one() 

1052 participants_count = session.execute( 

1053 select(func.count()) 

1054 .select_from(GroupChatSubscription) 

1055 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

1056 .where(GroupChatSubscription.user_id != context.user_id) 

1057 .where(GroupChatSubscription.role == GroupChatRole.participant) 

1058 .where(GroupChatSubscription.left == None) 

1059 ).scalar_one() 

1060 if not (other_admins_count > 0 or participants_count == 0): 

1061 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave") 

1062 

1063 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

1064 

1065 subscription.left = func.now() 

1066 

1067 return empty_pb2.Empty()