Coverage for src/couchers/servicers/conversations.py: 91%

296 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-24 15:28 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.orm import Session 

7from sqlalchemy.sql import func, not_, or_ 

8 

9from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

10from couchers.context import CouchersContext, make_background_user_context 

11from couchers.db import session_scope 

12from couchers.jobs.enqueue import queue_job 

13from couchers.metrics import sent_messages_counter 

14from couchers.models import ( 

15 Conversation, 

16 GroupChat, 

17 GroupChatRole, 

18 GroupChatSubscription, 

19 Message, 

20 MessageType, 

21 ModerationObjectType, 

22 RateLimitAction, 

23 User, 

24) 

25from couchers.moderation.utils import create_moderation 

26from couchers.notifications.notify import notify 

27from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

28from couchers.proto.internal import jobs_pb2 

29from couchers.rate_limits.check import process_rate_limits_and_check_abort 

30from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

31from couchers.servicers.api import user_model_to_pb 

32from couchers.sql import couchers_select as select 

33from couchers.utils import Timestamp_from_datetime, now 

34 

35logger = logging.getLogger(__name__) 

36 

37# TODO: Still needs custom pagination: GetUpdates 

38DEFAULT_PAGINATION_LENGTH = 20 

39MAX_PAGE_SIZE = 50 

40 

41 

42def _message_to_pb(message: Message) -> conversations_pb2.Message: 

43 """ 

44 Turns the given message to a protocol buffer 

45 """ 

46 if message.is_normal_message: 

47 return conversations_pb2.Message( 

48 message_id=message.id, 

49 author_user_id=message.author_id, 

50 time=Timestamp_from_datetime(message.time), 

51 text=conversations_pb2.MessageContentText(text=message.text), 

52 ) 

53 else: 

54 return conversations_pb2.Message( 

55 message_id=message.id, 

56 author_user_id=message.author_id, 

57 time=Timestamp_from_datetime(message.time), 

58 chat_created=( 

59 conversations_pb2.MessageContentChatCreated() 

60 if message.message_type == MessageType.chat_created 

61 else None 

62 ), 

63 chat_edited=( 

64 conversations_pb2.MessageContentChatEdited() 

65 if message.message_type == MessageType.chat_edited 

66 else None 

67 ), 

68 user_invited=( 

69 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

70 if message.message_type == MessageType.user_invited 

71 else None 

72 ), 

73 user_left=( 

74 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

75 ), 

76 user_made_admin=( 

77 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

78 if message.message_type == MessageType.user_made_admin 

79 else None 

80 ), 

81 user_removed_admin=( 

82 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

83 if message.message_type == MessageType.user_removed_admin 

84 else None 

85 ), 

86 group_chat_user_removed=( 

87 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

88 if message.message_type == MessageType.user_removed 

89 else None 

90 ), 

91 ) 

92 

93 

94def _get_visible_members_for_subscription(subscription): 

95 """ 

96 If a user leaves a group chat, they shouldn't be able to see who's added 

97 after they left 

98 """ 

99 if not subscription.left: 

100 # still in the chat, we see everyone with a current subscription 

101 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

102 else: 

103 # not in chat anymore, see everyone who was in chat when we left 

104 return [ 

105 sub.user_id 

106 for sub in subscription.group_chat.subscriptions.where( 

107 GroupChatSubscription.joined <= subscription.left 

108 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

109 ] 

110 

111 

112def _get_visible_admins_for_subscription(subscription): 

113 """ 

114 If a user leaves a group chat, they shouldn't be able to see who's added 

115 after they left 

116 """ 

117 if not subscription.left: 

118 # still in the chat, we see everyone with a current subscription 

119 return [ 

120 sub.user_id 

121 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

122 GroupChatSubscription.role == GroupChatRole.admin 

123 ) 

124 ] 

125 else: 

126 # not in chat anymore, see everyone who was in chat when we left 

127 return [ 

128 sub.user_id 

129 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

130 .where(GroupChatSubscription.joined <= subscription.left) 

131 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

132 ] 

133 

134 

135def _user_can_message(session, context, group_chat: GroupChat) -> bool: 

136 """ 

137 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant 

138 - Is not deleted/banned 

139 - Has not been blocked by the user or is blocking the user 

140 - Has not left the chat 

141 """ 

142 if not group_chat.is_dm: 

143 return True 

144 return session.execute( 

145 func.exists( 

146 select(GroupChatSubscription) 

147 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id) 

148 .where(GroupChatSubscription.user_id != context.user_id) 

149 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

150 .where(GroupChatSubscription.left == None) 

151 ) 

152 ).scalar_one() 

153 

154 

155def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload): 

156 """ 

157 Background job to generate notifications for a message sent to a group chat 

158 """ 

159 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

160 

161 with session_scope() as session: 

162 message, group_chat = session.execute( 

163 select(Message, GroupChat) 

164 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

165 .where(Message.id == payload.message_id) 

166 ).one() 

167 

168 if message.message_type != MessageType.text: 

169 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

170 return [] 

171 

172 context = make_background_user_context(user_id=message.author_id) 

173 user_ids_to_notify = ( 

174 session.execute( 

175 select(GroupChatSubscription.user_id) 

176 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id) 

177 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

178 .where(GroupChatSubscription.user_id != message.author_id) 

179 .where(GroupChatSubscription.joined <= message.time) 

180 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

181 .where(not_(GroupChatSubscription.is_muted)) 

182 ) 

183 .scalars() 

184 .all() 

185 ) 

186 

187 if group_chat.is_dm: 

188 msg = f"{message.author.name} sent you a message" 

189 else: 

190 msg = f"{message.author.name} sent a message in {group_chat.title}" 

191 

192 for user_id in user_ids_to_notify: 

193 notify( 

194 session, 

195 user_id=user_id, 

196 topic_action="chat:message", 

197 key=str(message.conversation_id), 

198 data=notification_data_pb2.ChatMessage( 

199 author=user_model_to_pb( 

200 message.author, 

201 session, 

202 make_background_user_context(user_id=user_id), 

203 ), 

204 message=msg, 

205 text=message.text, 

206 group_chat_id=message.conversation_id, 

207 ), 

208 moderation_state_id=group_chat.moderation_state_id, 

209 ) 

210 

211 

212def _add_message_to_subscription(session, subscription, **kwargs): 

213 """ 

214 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

215 

216 Specify the keyword args for Message 

217 """ 

218 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs) 

219 

220 session.add(message) 

221 session.flush() 

222 

223 subscription.last_seen_message_id = message.id 

224 

225 queue_job( 

226 session, 

227 job_type="generate_message_notifications", 

228 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

229 message_id=message.id, 

230 ), 

231 ) 

232 

233 return message 

234 

235 

236def _create_chat(session, creator_id, recipient_ids, title=None, only_admins_invite=True): 

237 conversation = Conversation() 

238 session.add(conversation) 

239 session.flush() 

240 

241 # Create moderation state for UMS (starts as SHADOWED) 

242 moderation_state = create_moderation( 

243 session=session, 

244 object_type=ModerationObjectType.GROUP_CHAT, 

245 object_id=conversation.id, 

246 creator_user_id=creator_id, 

247 ) 

248 

249 chat = GroupChat( 

250 conversation_id=conversation.id, 

251 title=title, 

252 creator_id=creator_id, 

253 is_dm=True if len(recipient_ids) == 1 else False, 

254 only_admins_invite=only_admins_invite, 

255 moderation_state_id=moderation_state.id, 

256 ) 

257 session.add(chat) 

258 session.flush() 

259 

260 creator_subscription = GroupChatSubscription( 

261 user_id=creator_id, 

262 group_chat=chat, 

263 role=GroupChatRole.admin, 

264 ) 

265 session.add(creator_subscription) 

266 

267 for uid in recipient_ids: 

268 session.add( 

269 GroupChatSubscription( 

270 user_id=uid, 

271 group_chat=chat, 

272 role=GroupChatRole.participant, 

273 ) 

274 ) 

275 

276 return chat 

277 

278 

279def _get_message_subscription(session, user_id, conversation_id): 

280 subscription = session.execute( 

281 select(GroupChatSubscription) 

282 .where(GroupChatSubscription.group_chat_id == conversation_id) 

283 .where(GroupChatSubscription.user_id == user_id) 

284 .where(GroupChatSubscription.left == None) 

285 ).scalar_one_or_none() 

286 

287 return subscription 

288 

289 

290def _get_visible_message_subscription(session, context, conversation_id): 

291 """Get subscription with visibility filtering""" 

292 subscription = session.execute( 

293 select(GroupChatSubscription) 

294 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

295 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

296 .where(GroupChatSubscription.group_chat_id == conversation_id) 

297 .where(GroupChatSubscription.user_id == context.user_id) 

298 .where(GroupChatSubscription.left == None) 

299 ).scalar_one_or_none() 

300 

301 return subscription 

302 

303 

304def _unseen_message_count(session, subscription_id): 

305 return session.execute( 

306 select(func.count()) 

307 .select_from(Message) 

308 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

309 .where(GroupChatSubscription.id == subscription_id) 

310 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

311 ).scalar_one() 

312 

313 

314def _mute_info(subscription): 

315 (muted, muted_until) = subscription.muted_display() 

316 return conversations_pb2.MuteInfo( 

317 muted=muted, 

318 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

319 ) 

320 

321 

322class Conversations(conversations_pb2_grpc.ConversationsServicer): 

323 def ListGroupChats( 

324 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session 

325 ) -> conversations_pb2.ListGroupChatsRes: 

326 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

327 page_size = min(page_size, MAX_PAGE_SIZE) 

328 

329 # select group chats where you have a subscription, and for each of 

330 # these, the latest message from them 

331 

332 t = ( 

333 select( 

334 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

335 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

336 func.max(Message.id).label("message_id"), 

337 ) 

338 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

339 .where(GroupChatSubscription.user_id == context.user_id) 

340 .where(Message.time >= GroupChatSubscription.joined) 

341 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

342 .group_by(GroupChatSubscription.group_chat_id) 

343 .order_by(func.max(Message.id).desc()) 

344 .subquery() 

345 ) 

346 

347 results = session.execute( 

348 select(t, GroupChat, GroupChatSubscription, Message) 

349 .join(Message, Message.id == t.c.message_id) 

350 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

351 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

352 .where_moderated_content_visible(context, GroupChat, is_list_operation=True) 

353 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0)) 

354 .order_by(t.c.message_id.desc()) 

355 .limit(page_size + 1) 

356 ).all() 

357 

358 return conversations_pb2.ListGroupChatsRes( 

359 group_chats=[ 

360 conversations_pb2.GroupChat( 

361 group_chat_id=result.GroupChat.conversation_id, 

362 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

363 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

364 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

365 only_admins_invite=result.GroupChat.only_admins_invite, 

366 is_dm=result.GroupChat.is_dm, 

367 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

368 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

369 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

370 latest_message=_message_to_pb(result.Message) if result.Message else None, 

371 mute_info=_mute_info(result.GroupChatSubscription), 

372 can_message=_user_can_message(session, context, result.GroupChat), 

373 ) 

374 for result in results[:page_size] 

375 ], 

376 last_message_id=( 

377 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

378 ), # TODO 

379 no_more=len(results) <= page_size, 

380 ) 

381 

382 def GetGroupChat( 

383 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session 

384 ) -> conversations_pb2.GroupChat: 

385 result = session.execute( 

386 select(GroupChat, GroupChatSubscription, Message) 

387 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

388 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

389 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

390 .where(GroupChatSubscription.user_id == context.user_id) 

391 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

392 .where(Message.time >= GroupChatSubscription.joined) 

393 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

394 .order_by(Message.id.desc()) 

395 .limit(1) 

396 ).one_or_none() 

397 

398 if not result: 

399 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

400 

401 return conversations_pb2.GroupChat( 

402 group_chat_id=result.GroupChat.conversation_id, 

403 title=result.GroupChat.title, 

404 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

405 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

406 only_admins_invite=result.GroupChat.only_admins_invite, 

407 is_dm=result.GroupChat.is_dm, 

408 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

409 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

410 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

411 latest_message=_message_to_pb(result.Message) if result.Message else None, 

412 mute_info=_mute_info(result.GroupChatSubscription), 

413 can_message=_user_can_message(session, context, result.GroupChat), 

414 ) 

415 

416 def GetDirectMessage( 

417 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session 

418 ) -> conversations_pb2.GroupChat: 

419 count = func.count(GroupChatSubscription.id).label("count") 

420 subquery = ( 

421 select(GroupChatSubscription.group_chat_id) 

422 .where( 

423 or_( 

424 GroupChatSubscription.user_id == context.user_id, 

425 GroupChatSubscription.user_id == request.user_id, 

426 ) 

427 ) 

428 .where(GroupChatSubscription.left == None) 

429 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

430 .where(GroupChat.is_dm == True) 

431 .group_by(GroupChatSubscription.group_chat_id) 

432 .having(count == 2) 

433 .subquery() 

434 ) 

435 

436 result = session.execute( 

437 select(subquery, GroupChat, GroupChatSubscription, Message) 

438 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

439 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

440 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

441 .where(GroupChatSubscription.user_id == context.user_id) 

442 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

443 .where(Message.time >= GroupChatSubscription.joined) 

444 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

445 .order_by(Message.id.desc()) 

446 .limit(1) 

447 ).one_or_none() 

448 

449 if not result: 

450 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

451 

452 return conversations_pb2.GroupChat( 

453 group_chat_id=result.GroupChat.conversation_id, 

454 title=result.GroupChat.title, 

455 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

456 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

457 only_admins_invite=result.GroupChat.only_admins_invite, 

458 is_dm=result.GroupChat.is_dm, 

459 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

460 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

461 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

462 latest_message=_message_to_pb(result.Message) if result.Message else None, 

463 mute_info=_mute_info(result.GroupChatSubscription), 

464 can_message=_user_can_message(session, context, result.GroupChat), 

465 ) 

466 

467 def GetUpdates( 

468 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session 

469 ) -> conversations_pb2.GetUpdatesRes: 

470 results = ( 

471 session.execute( 

472 select(Message) 

473 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

474 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

475 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

476 .where(GroupChatSubscription.user_id == context.user_id) 

477 .where(Message.time >= GroupChatSubscription.joined) 

478 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

479 .where(Message.id > request.newest_message_id) 

480 .order_by(Message.id.asc()) 

481 .limit(DEFAULT_PAGINATION_LENGTH + 1) 

482 ) 

483 .scalars() 

484 .all() 

485 ) 

486 

487 return conversations_pb2.GetUpdatesRes( 

488 updates=[ 

489 conversations_pb2.Update( 

490 group_chat_id=message.conversation_id, 

491 message=_message_to_pb(message), 

492 ) 

493 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

494 ], 

495 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

496 ) 

497 

498 def GetGroupChatMessages( 

499 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session 

500 ) -> conversations_pb2.GetGroupChatMessagesRes: 

501 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

502 page_size = min(page_size, MAX_PAGE_SIZE) 

503 

504 results = ( 

505 session.execute( 

506 select(Message) 

507 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

508 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

509 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

510 .where(GroupChatSubscription.user_id == context.user_id) 

511 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

512 .where(Message.time >= GroupChatSubscription.joined) 

513 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

514 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

515 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0)) 

516 .order_by(Message.id.desc()) 

517 .limit(page_size + 1) 

518 ) 

519 .scalars() 

520 .all() 

521 ) 

522 

523 return conversations_pb2.GetGroupChatMessagesRes( 

524 messages=[_message_to_pb(message) for message in results[:page_size]], 

525 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

526 no_more=len(results) <= page_size, 

527 ) 

528 

529 def MarkLastSeenGroupChat( 

530 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session 

531 ) -> empty_pb2.Empty: 

532 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

533 

534 if not subscription: 

535 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

536 

537 if not subscription.last_seen_message_id <= request.last_seen_message_id: 

538 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

539 

540 subscription.last_seen_message_id = request.last_seen_message_id 

541 

542 return empty_pb2.Empty() 

543 

544 def MuteGroupChat( 

545 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session 

546 ) -> empty_pb2.Empty: 

547 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

548 

549 if not subscription: 

550 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

551 

552 if request.unmute: 

553 subscription.muted_until = DATETIME_MINUS_INFINITY 

554 elif request.forever: 

555 subscription.muted_until = DATETIME_INFINITY 

556 elif request.for_duration: 

557 duration = request.for_duration.ToTimedelta() 

558 if duration < timedelta(seconds=0): 

559 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past") 

560 subscription.muted_until = now() + duration 

561 

562 return empty_pb2.Empty() 

563 

564 def SearchMessages( 

565 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session 

566 ) -> conversations_pb2.SearchMessagesRes: 

567 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

568 page_size = min(page_size, MAX_PAGE_SIZE) 

569 

570 results = ( 

571 session.execute( 

572 select(Message) 

573 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

574 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

575 .where_moderated_content_visible(context, GroupChat, is_list_operation=True) 

576 .where(GroupChatSubscription.user_id == context.user_id) 

577 .where(Message.time >= GroupChatSubscription.joined) 

578 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

579 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

580 .where(Message.text.ilike(f"%{request.query}%")) 

581 .order_by(Message.id.desc()) 

582 .limit(page_size + 1) 

583 ) 

584 .scalars() 

585 .all() 

586 ) 

587 

588 return conversations_pb2.SearchMessagesRes( 

589 results=[ 

590 conversations_pb2.MessageSearchResult( 

591 group_chat_id=message.conversation_id, 

592 message=_message_to_pb(message), 

593 ) 

594 for message in results[:page_size] 

595 ], 

596 last_message_id=results[-2].id if len(results) > 1 else 0, 

597 no_more=len(results) <= page_size, 

598 ) 

599 

600 def CreateGroupChat( 

601 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session 

602 ) -> conversations_pb2.GroupChat: 

603 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

604 if not user.has_completed_profile: 

605 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

606 

607 recipient_user_ids = list( 

608 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids))) 

609 .scalars() 

610 .all() 

611 ) 

612 

613 # make sure all requested users are visible 

614 if len(recipient_user_ids) != len(request.recipient_user_ids): 

615 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

616 

617 if not recipient_user_ids: 

618 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

619 

620 if len(recipient_user_ids) != len(set(recipient_user_ids)): 

621 # make sure there's no duplicate users 

622 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients") 

623 

624 if context.user_id in recipient_user_ids: 

625 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

626 

627 if len(recipient_user_ids) == 1: 

628 # can only have one DM at a time between any two users 

629 other_user_id = recipient_user_ids[0] 

630 

631 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

632 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

633 # chat, you know they already have a shared group chat 

634 count = func.count(GroupChatSubscription.id).label("count") 

635 if session.execute( 

636 select(count) 

637 .where( 

638 or_( 

639 GroupChatSubscription.user_id == context.user_id, 

640 GroupChatSubscription.user_id == other_user_id, 

641 ) 

642 ) 

643 .where(GroupChatSubscription.left == None) 

644 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

645 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

646 .where(GroupChat.is_dm == True) 

647 .group_by(GroupChatSubscription.group_chat_id) 

648 .having(count == 2) 

649 ).scalar_one_or_none(): 

650 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm") 

651 

652 # Check if user has been initiating chats excessively 

653 if process_rate_limits_and_check_abort( 

654 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation 

655 ): 

656 context.abort_with_error_code( 

657 grpc.StatusCode.RESOURCE_EXHAUSTED, 

658 "chat_initiation_rate_limit", 

659 substitutions={"hours": RATE_LIMIT_HOURS}, 

660 ) 

661 

662 group_chat = _create_chat( 

663 session, 

664 creator_id=context.user_id, 

665 recipient_ids=request.recipient_user_ids, 

666 title=request.title.value, 

667 ) 

668 

669 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id) 

670 

671 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

672 

673 session.flush() 

674 

675 return conversations_pb2.GroupChat( 

676 group_chat_id=group_chat.conversation_id, 

677 title=group_chat.title, 

678 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

679 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

680 only_admins_invite=group_chat.only_admins_invite, 

681 is_dm=group_chat.is_dm, 

682 created=Timestamp_from_datetime(group_chat.conversation.created), 

683 mute_info=_mute_info(your_subscription), 

684 can_message=True, 

685 ) 

686 

687 def SendMessage( 

688 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session 

689 ) -> empty_pb2.Empty: 

690 if request.text == "": 

691 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

692 

693 result = session.execute( 

694 select(GroupChatSubscription, GroupChat) 

695 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

696 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

697 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

698 .where(GroupChatSubscription.user_id == context.user_id) 

699 .where(GroupChatSubscription.left == None) 

700 ).one_or_none() 

701 if not result: 

702 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

703 

704 subscription, group_chat = result 

705 if not _user_can_message(session, context, group_chat): 

706 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat") 

707 

708 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

709 

710 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

711 sent_messages_counter.labels( 

712 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

713 ).inc() 

714 

715 return empty_pb2.Empty() 

716 

717 def SendDirectMessage( 

718 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session 

719 ) -> conversations_pb2.SendDirectMessageRes: 

720 user_id = context.user_id 

721 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

722 

723 recipient_id = request.recipient_user_id 

724 

725 if not user.has_completed_profile: 

726 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

727 

728 if not recipient_id: 

729 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

730 

731 recipient_user_id = session.execute( 

732 select(User.id).where_users_visible(context).where(User.id == recipient_id) 

733 ).scalar_one_or_none() 

734 

735 if not recipient_user_id: 

736 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

737 

738 if user_id == recipient_id: 

739 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

740 

741 if request.text == "": 

742 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

743 

744 # Look for an existing direct message (DM) chat between the two users 

745 dm_chat_ids = ( 

746 select(GroupChatSubscription.group_chat_id) 

747 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id])) 

748 .group_by(GroupChatSubscription.group_chat_id) 

749 .having(func.count(GroupChatSubscription.user_id) == 2) 

750 ) 

751 

752 chat = session.execute( 

753 select(GroupChat) 

754 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

755 .where(GroupChat.is_dm == True) 

756 .where(GroupChat.conversation_id.in_(dm_chat_ids)) 

757 .limit(1) 

758 ).scalar_one_or_none() 

759 

760 if not chat: 

761 chat = _create_chat(session, user_id, [recipient_id]) 

762 

763 # Retrieve the sender's active subscription to the chat 

764 subscription = _get_message_subscription(session, user_id, chat.conversation_id) 

765 

766 # Add the message to the conversation 

767 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

768 

769 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one() 

770 sent_messages_counter.labels(user_gender, "direct message").inc() 

771 

772 session.flush() 

773 

774 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id) 

775 

776 def EditGroupChat( 

777 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session 

778 ) -> empty_pb2.Empty: 

779 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

780 

781 if not subscription: 

782 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

783 

784 if subscription.role != GroupChatRole.admin: 

785 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit") 

786 

787 if request.HasField("title"): 

788 subscription.group_chat.title = request.title.value 

789 

790 if request.HasField("only_admins_invite"): 

791 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

792 

793 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

794 

795 return empty_pb2.Empty() 

796 

797 def MakeGroupChatAdmin( 

798 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session 

799 ) -> empty_pb2.Empty: 

800 if not session.execute( 

801 select(User).where_users_visible(context).where(User.id == request.user_id) 

802 ).scalar_one_or_none(): 

803 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

804 

805 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

806 

807 if not your_subscription: 

808 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

809 

810 if your_subscription.role != GroupChatRole.admin: 

811 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin") 

812 

813 if request.user_id == context.user_id: 

814 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin") 

815 

816 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

817 

818 if not their_subscription: 

819 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

820 

821 if their_subscription.role != GroupChatRole.participant: 

822 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin") 

823 

824 their_subscription.role = GroupChatRole.admin 

825 

826 _add_message_to_subscription( 

827 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

828 ) 

829 

830 return empty_pb2.Empty() 

831 

832 def RemoveGroupChatAdmin( 

833 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session 

834 ) -> empty_pb2.Empty: 

835 if not session.execute( 

836 select(User).where_users_visible(context).where(User.id == request.user_id) 

837 ).scalar_one_or_none(): 

838 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

839 

840 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

841 

842 if not your_subscription: 

843 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

844 

845 if request.user_id == context.user_id: 

846 # Race condition! 

847 other_admins_count = session.execute( 

848 select(func.count()) 

849 .select_from(GroupChatSubscription) 

850 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

851 .where(GroupChatSubscription.user_id != context.user_id) 

852 .where(GroupChatSubscription.role == GroupChatRole.admin) 

853 .where(GroupChatSubscription.left == None) 

854 ).scalar_one() 

855 if not other_admins_count > 0: 

856 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin") 

857 

858 if your_subscription.role != GroupChatRole.admin: 

859 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin") 

860 

861 their_subscription = session.execute( 

862 select(GroupChatSubscription) 

863 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

864 .where(GroupChatSubscription.user_id == request.user_id) 

865 .where(GroupChatSubscription.left == None) 

866 .where(GroupChatSubscription.role == GroupChatRole.admin) 

867 ).scalar_one_or_none() 

868 

869 if not their_subscription: 

870 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

871 

872 their_subscription.role = GroupChatRole.participant 

873 

874 _add_message_to_subscription( 

875 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

876 ) 

877 

878 return empty_pb2.Empty() 

879 

880 def InviteToGroupChat( 

881 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session 

882 ) -> empty_pb2.Empty: 

883 if not session.execute( 

884 select(User).where_users_visible(context).where(User.id == request.user_id) 

885 ).scalar_one_or_none(): 

886 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

887 

888 result = session.execute( 

889 select(GroupChatSubscription, GroupChat) 

890 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

891 .where_moderated_content_visible(context, GroupChat, is_list_operation=False) 

892 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

893 .where(GroupChatSubscription.user_id == context.user_id) 

894 .where(GroupChatSubscription.left == None) 

895 ).one_or_none() 

896 

897 if not result: 

898 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

899 

900 your_subscription, group_chat = result 

901 

902 if request.user_id == context.user_id: 

903 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self") 

904 

905 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

906 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied") 

907 

908 if group_chat.is_dm: 

909 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm") 

910 

911 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

912 

913 if their_subscription: 

914 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat") 

915 

916 # TODO: race condition! 

917 

918 subscription = GroupChatSubscription( 

919 user_id=request.user_id, 

920 group_chat=your_subscription.group_chat, 

921 role=GroupChatRole.participant, 

922 ) 

923 session.add(subscription) 

924 

925 _add_message_to_subscription( 

926 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

927 ) 

928 

929 return empty_pb2.Empty() 

930 

931 def RemoveGroupChatUser( 

932 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session 

933 ) -> empty_pb2.Empty: 

934 """ 

935 1. Get admin info and check it's correct 

936 2. Get user data, check it's correct and remove user 

937 """ 

938 # Admin info 

939 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

940 

941 # if user info is missing 

942 if not your_subscription: 

943 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

944 

945 # if user not admin 

946 if your_subscription.role != GroupChatRole.admin: 

947 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user") 

948 

949 # if user wants to remove themselves 

950 if request.user_id == context.user_id: 

951 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self") 

952 

953 # get user info 

954 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

955 

956 # user not found 

957 if not their_subscription: 

958 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

959 

960 _add_message_to_subscription( 

961 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

962 ) 

963 

964 their_subscription.left = func.now() 

965 

966 return empty_pb2.Empty() 

967 

968 def LeaveGroupChat( 

969 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session 

970 ) -> empty_pb2.Empty: 

971 subscription = _get_visible_message_subscription(session, context, request.group_chat_id) 

972 

973 if not subscription: 

974 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

975 

976 if subscription.role == GroupChatRole.admin: 

977 other_admins_count = session.execute( 

978 select(func.count()) 

979 .select_from(GroupChatSubscription) 

980 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

981 .where(GroupChatSubscription.user_id != context.user_id) 

982 .where(GroupChatSubscription.role == GroupChatRole.admin) 

983 .where(GroupChatSubscription.left == None) 

984 ).scalar_one() 

985 participants_count = session.execute( 

986 select(func.count()) 

987 .select_from(GroupChatSubscription) 

988 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

989 .where(GroupChatSubscription.user_id != context.user_id) 

990 .where(GroupChatSubscription.role == GroupChatRole.participant) 

991 .where(GroupChatSubscription.left == None) 

992 ).scalar_one() 

993 if not (other_admins_count > 0 or participants_count == 0): 

994 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave") 

995 

996 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

997 

998 subscription.left = func.now() 

999 

1000 return empty_pb2.Empty()