Coverage for src/couchers/servicers/conversations.py: 91%

292 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import func, not_, or_ 

7 

8from couchers import errors 

9from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

10from couchers.context import make_background_user_context 

11from couchers.db import session_scope 

12from couchers.jobs.enqueue import queue_job 

13from couchers.metrics import sent_messages_counter 

14from couchers.models import ( 

15 Conversation, 

16 GroupChat, 

17 GroupChatRole, 

18 GroupChatSubscription, 

19 Message, 

20 MessageType, 

21 RateLimitAction, 

22 User, 

23) 

24from couchers.notifications.notify import notify 

25from couchers.rate_limits.check import process_rate_limits_and_check_abort 

26from couchers.servicers.api import user_model_to_pb 

27from couchers.sql import couchers_select as select 

28from couchers.utils import Timestamp_from_datetime, now 

29from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

30from proto.internal import jobs_pb2 

31 

32logger = logging.getLogger(__name__) 

33 

34# TODO: Still needs custom pagination: GetUpdates 

35DEFAULT_PAGINATION_LENGTH = 20 

36MAX_PAGE_SIZE = 50 

37 

38 

39def _message_to_pb(message: Message): 

40 """ 

41 Turns the given message to a protocol buffer 

42 """ 

43 if message.is_normal_message: 

44 return conversations_pb2.Message( 

45 message_id=message.id, 

46 author_user_id=message.author_id, 

47 time=Timestamp_from_datetime(message.time), 

48 text=conversations_pb2.MessageContentText(text=message.text), 

49 ) 

50 else: 

51 return conversations_pb2.Message( 

52 message_id=message.id, 

53 author_user_id=message.author_id, 

54 time=Timestamp_from_datetime(message.time), 

55 chat_created=( 

56 conversations_pb2.MessageContentChatCreated() 

57 if message.message_type == MessageType.chat_created 

58 else None 

59 ), 

60 chat_edited=( 

61 conversations_pb2.MessageContentChatEdited() 

62 if message.message_type == MessageType.chat_edited 

63 else None 

64 ), 

65 user_invited=( 

66 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

67 if message.message_type == MessageType.user_invited 

68 else None 

69 ), 

70 user_left=( 

71 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

72 ), 

73 user_made_admin=( 

74 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

75 if message.message_type == MessageType.user_made_admin 

76 else None 

77 ), 

78 user_removed_admin=( 

79 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

80 if message.message_type == MessageType.user_removed_admin 

81 else None 

82 ), 

83 group_chat_user_removed=( 

84 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

85 if message.message_type == MessageType.user_removed 

86 else None 

87 ), 

88 ) 

89 

90 

91def _get_visible_members_for_subscription(subscription): 

92 """ 

93 If a user leaves a group chat, they shouldn't be able to see who's added 

94 after they left 

95 """ 

96 if not subscription.left: 

97 # still in the chat, we see everyone with a current subscription 

98 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

99 else: 

100 # not in chat anymore, see everyone who was in chat when we left 

101 return [ 

102 sub.user_id 

103 for sub in subscription.group_chat.subscriptions.where( 

104 GroupChatSubscription.joined <= subscription.left 

105 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

106 ] 

107 

108 

109def _get_visible_admins_for_subscription(subscription): 

110 """ 

111 If a user leaves a group chat, they shouldn't be able to see who's added 

112 after they left 

113 """ 

114 if not subscription.left: 

115 # still in the chat, we see everyone with a current subscription 

116 return [ 

117 sub.user_id 

118 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

119 GroupChatSubscription.role == GroupChatRole.admin 

120 ) 

121 ] 

122 else: 

123 # not in chat anymore, see everyone who was in chat when we left 

124 return [ 

125 sub.user_id 

126 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

127 .where(GroupChatSubscription.joined <= subscription.left) 

128 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

129 ] 

130 

131 

132def _user_can_message(session, context, group_chat: GroupChat) -> bool: 

133 """ 

134 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant 

135 - Is not deleted/banned 

136 - Has not been blocked by the user or is blocking the user 

137 - Has not left the chat 

138 """ 

139 if not group_chat.is_dm: 

140 return True 

141 return session.execute( 

142 func.exists( 

143 select(GroupChatSubscription) 

144 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id) 

145 .where(GroupChatSubscription.user_id != context.user_id) 

146 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

147 .where(GroupChatSubscription.left == None) 

148 ) 

149 ).scalar_one() 

150 

151 

152def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload): 

153 """ 

154 Background job to generate notifications for a message sent to a group chat 

155 """ 

156 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

157 

158 with session_scope() as session: 

159 message, group_chat = session.execute( 

160 select(Message, GroupChat) 

161 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

162 .where(Message.id == payload.message_id) 

163 ).one() 

164 

165 if message.message_type != MessageType.text: 

166 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

167 return [] 

168 

169 context = make_background_user_context(user_id=message.author_id) 

170 user_ids_to_notify = ( 

171 session.execute( 

172 select(GroupChatSubscription.user_id) 

173 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id) 

174 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

175 .where(GroupChatSubscription.user_id != message.author_id) 

176 .where(GroupChatSubscription.joined <= message.time) 

177 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

178 .where(not_(GroupChatSubscription.is_muted)) 

179 ) 

180 .scalars() 

181 .all() 

182 ) 

183 

184 if group_chat.is_dm: 

185 msg = f"{message.author.name} sent you a message" 

186 else: 

187 msg = f"{message.author.name} sent a message in {group_chat.title}" 

188 

189 for user_id in user_ids_to_notify: 

190 notify( 

191 session, 

192 user_id=user_id, 

193 topic_action="chat:message", 

194 key=message.conversation_id, 

195 data=notification_data_pb2.ChatMessage( 

196 author=user_model_to_pb( 

197 message.author, 

198 session, 

199 make_background_user_context(user_id=user_id), 

200 ), 

201 message=msg, 

202 text=message.text, 

203 group_chat_id=message.conversation_id, 

204 ), 

205 ) 

206 

207 

208def _add_message_to_subscription(session, subscription, **kwargs): 

209 """ 

210 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

211 

212 Specify the keyword args for Message 

213 """ 

214 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs) 

215 

216 session.add(message) 

217 session.flush() 

218 

219 subscription.last_seen_message_id = message.id 

220 

221 queue_job( 

222 session, 

223 job_type="generate_message_notifications", 

224 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

225 message_id=message.id, 

226 ), 

227 ) 

228 

229 return message 

230 

231 

232def _create_chat(session, creator_id, recipient_ids, title=None, only_admins_invite=True): 

233 conversation = Conversation() 

234 session.add(conversation) 

235 session.flush() 

236 

237 chat = GroupChat( 

238 conversation_id=conversation.id, 

239 title=title, 

240 creator_id=creator_id, 

241 is_dm=True if len(recipient_ids) == 1 else False, 

242 only_admins_invite=only_admins_invite, 

243 ) 

244 session.add(chat) 

245 session.flush() 

246 

247 creator_subscription = GroupChatSubscription( 

248 user_id=creator_id, 

249 group_chat=chat, 

250 role=GroupChatRole.admin, 

251 ) 

252 session.add(creator_subscription) 

253 

254 for uid in recipient_ids: 

255 session.add( 

256 GroupChatSubscription( 

257 user_id=uid, 

258 group_chat=chat, 

259 role=GroupChatRole.participant, 

260 ) 

261 ) 

262 

263 return chat 

264 

265 

266def _get_message_subscription(session, user_id, conversation_id): 

267 subscription = session.execute( 

268 select(GroupChatSubscription) 

269 .where(GroupChatSubscription.group_chat_id == conversation_id) 

270 .where(GroupChatSubscription.user_id == user_id) 

271 .where(GroupChatSubscription.left == None) 

272 ).scalar_one_or_none() 

273 

274 return subscription 

275 

276 

277def _unseen_message_count(session, subscription_id): 

278 return session.execute( 

279 select(func.count()) 

280 .select_from(Message) 

281 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

282 .where(GroupChatSubscription.id == subscription_id) 

283 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

284 ).scalar_one() 

285 

286 

287def _mute_info(subscription): 

288 (muted, muted_until) = subscription.muted_display() 

289 return conversations_pb2.MuteInfo( 

290 muted=muted, 

291 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

292 ) 

293 

294 

295class Conversations(conversations_pb2_grpc.ConversationsServicer): 

296 def ListGroupChats(self, request, context, session): 

297 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

298 page_size = min(page_size, MAX_PAGE_SIZE) 

299 

300 # select group chats where you have a subscription, and for each of 

301 # these, the latest message from them 

302 

303 t = ( 

304 select( 

305 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

306 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

307 func.max(Message.id).label("message_id"), 

308 ) 

309 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

310 .where(GroupChatSubscription.user_id == context.user_id) 

311 .where(Message.time >= GroupChatSubscription.joined) 

312 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

313 .group_by(GroupChatSubscription.group_chat_id) 

314 .order_by(func.max(Message.id).desc()) 

315 .subquery() 

316 ) 

317 

318 results = session.execute( 

319 select(t, GroupChat, GroupChatSubscription, Message) 

320 .join(Message, Message.id == t.c.message_id) 

321 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

322 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

323 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0)) 

324 .order_by(t.c.message_id.desc()) 

325 .limit(page_size + 1) 

326 ).all() 

327 

328 return conversations_pb2.ListGroupChatsRes( 

329 group_chats=[ 

330 conversations_pb2.GroupChat( 

331 group_chat_id=result.GroupChat.conversation_id, 

332 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

333 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

334 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

335 only_admins_invite=result.GroupChat.only_admins_invite, 

336 is_dm=result.GroupChat.is_dm, 

337 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

338 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

339 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

340 latest_message=_message_to_pb(result.Message) if result.Message else None, 

341 mute_info=_mute_info(result.GroupChatSubscription), 

342 can_message=_user_can_message(session, context, result.GroupChat), 

343 ) 

344 for result in results[:page_size] 

345 ], 

346 last_message_id=( 

347 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

348 ), # TODO 

349 no_more=len(results) <= page_size, 

350 ) 

351 

352 def GetGroupChat(self, request, context, session): 

353 result = session.execute( 

354 select(GroupChat, GroupChatSubscription, Message) 

355 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

356 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

357 .where(GroupChatSubscription.user_id == context.user_id) 

358 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

359 .where(Message.time >= GroupChatSubscription.joined) 

360 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

361 .order_by(Message.id.desc()) 

362 .limit(1) 

363 ).one_or_none() 

364 

365 if not result: 

366 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

367 

368 return conversations_pb2.GroupChat( 

369 group_chat_id=result.GroupChat.conversation_id, 

370 title=result.GroupChat.title, 

371 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

372 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

373 only_admins_invite=result.GroupChat.only_admins_invite, 

374 is_dm=result.GroupChat.is_dm, 

375 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

376 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

377 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

378 latest_message=_message_to_pb(result.Message) if result.Message else None, 

379 mute_info=_mute_info(result.GroupChatSubscription), 

380 can_message=_user_can_message(session, context, result.GroupChat), 

381 ) 

382 

383 def GetDirectMessage(self, request, context, session): 

384 count = func.count(GroupChatSubscription.id).label("count") 

385 subquery = ( 

386 select(GroupChatSubscription.group_chat_id) 

387 .where( 

388 or_( 

389 GroupChatSubscription.user_id == context.user_id, 

390 GroupChatSubscription.user_id == request.user_id, 

391 ) 

392 ) 

393 .where(GroupChatSubscription.left == None) 

394 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

395 .where(GroupChat.is_dm == True) 

396 .group_by(GroupChatSubscription.group_chat_id) 

397 .having(count == 2) 

398 .subquery() 

399 ) 

400 

401 result = session.execute( 

402 select(subquery, GroupChat, GroupChatSubscription, Message) 

403 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

404 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

405 .where(GroupChatSubscription.user_id == context.user_id) 

406 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

407 .where(Message.time >= GroupChatSubscription.joined) 

408 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

409 .order_by(Message.id.desc()) 

410 .limit(1) 

411 ).one_or_none() 

412 

413 if not result: 

414 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

415 

416 return conversations_pb2.GroupChat( 

417 group_chat_id=result.GroupChat.conversation_id, 

418 title=result.GroupChat.title, 

419 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

420 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

421 only_admins_invite=result.GroupChat.only_admins_invite, 

422 is_dm=result.GroupChat.is_dm, 

423 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

424 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

425 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

426 latest_message=_message_to_pb(result.Message) if result.Message else None, 

427 mute_info=_mute_info(result.GroupChatSubscription), 

428 can_message=_user_can_message(session, context, result.GroupChat), 

429 ) 

430 

431 def GetUpdates(self, request, context, session): 

432 results = ( 

433 session.execute( 

434 select(Message) 

435 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

436 .where(GroupChatSubscription.user_id == context.user_id) 

437 .where(Message.time >= GroupChatSubscription.joined) 

438 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

439 .where(Message.id > request.newest_message_id) 

440 .order_by(Message.id.asc()) 

441 .limit(DEFAULT_PAGINATION_LENGTH + 1) 

442 ) 

443 .scalars() 

444 .all() 

445 ) 

446 

447 return conversations_pb2.GetUpdatesRes( 

448 updates=[ 

449 conversations_pb2.Update( 

450 group_chat_id=message.conversation_id, 

451 message=_message_to_pb(message), 

452 ) 

453 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

454 ], 

455 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

456 ) 

457 

458 def GetGroupChatMessages(self, request, context, session): 

459 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

460 page_size = min(page_size, MAX_PAGE_SIZE) 

461 

462 results = ( 

463 session.execute( 

464 select(Message) 

465 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

466 .where(GroupChatSubscription.user_id == context.user_id) 

467 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

468 .where(Message.time >= GroupChatSubscription.joined) 

469 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

470 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

471 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0)) 

472 .order_by(Message.id.desc()) 

473 .limit(page_size + 1) 

474 ) 

475 .scalars() 

476 .all() 

477 ) 

478 

479 return conversations_pb2.GetGroupChatMessagesRes( 

480 messages=[_message_to_pb(message) for message in results[:page_size]], 

481 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

482 no_more=len(results) <= page_size, 

483 ) 

484 

485 def MarkLastSeenGroupChat(self, request, context, session): 

486 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

487 

488 if not subscription: 

489 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

490 

491 if not subscription.last_seen_message_id <= request.last_seen_message_id: 

492 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

493 

494 subscription.last_seen_message_id = request.last_seen_message_id 

495 

496 # TODO: notify 

497 

498 return empty_pb2.Empty() 

499 

500 def MuteGroupChat(self, request, context, session): 

501 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

502 

503 if not subscription: 

504 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

505 

506 if request.unmute: 

507 subscription.muted_until = DATETIME_MINUS_INFINITY 

508 elif request.forever: 

509 subscription.muted_until = DATETIME_INFINITY 

510 elif request.for_duration: 

511 duration = request.for_duration.ToTimedelta() 

512 if duration < timedelta(seconds=0): 

513 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST) 

514 subscription.muted_until = now() + duration 

515 

516 return empty_pb2.Empty() 

517 

518 def SearchMessages(self, request, context, session): 

519 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

520 page_size = min(page_size, MAX_PAGE_SIZE) 

521 

522 results = ( 

523 session.execute( 

524 select(Message) 

525 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

526 .where(GroupChatSubscription.user_id == context.user_id) 

527 .where(Message.time >= GroupChatSubscription.joined) 

528 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

529 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

530 .where(Message.text.ilike(f"%{request.query}%")) 

531 .order_by(Message.id.desc()) 

532 .limit(page_size + 1) 

533 ) 

534 .scalars() 

535 .all() 

536 ) 

537 

538 return conversations_pb2.SearchMessagesRes( 

539 results=[ 

540 conversations_pb2.MessageSearchResult( 

541 group_chat_id=message.conversation_id, 

542 message=_message_to_pb(message), 

543 ) 

544 for message in results[:page_size] 

545 ], 

546 last_message_id=results[-2].id if len(results) > 1 else 0, 

547 no_more=len(results) <= page_size, 

548 ) 

549 

550 def CreateGroupChat(self, request, context, session): 

551 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

552 if not user.has_completed_profile: 

553 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE) 

554 

555 recipient_user_ids = list( 

556 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids))) 

557 .scalars() 

558 .all() 

559 ) 

560 

561 # make sure all requested users are visible 

562 if len(recipient_user_ids) != len(request.recipient_user_ids): 

563 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND) 

564 

565 if not recipient_user_ids: 

566 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS) 

567 

568 if len(recipient_user_ids) != len(set(recipient_user_ids)): 

569 # make sure there's no duplicate users 

570 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS) 

571 

572 if context.user_id in recipient_user_ids: 

573 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF) 

574 

575 if len(recipient_user_ids) == 1: 

576 # can only have one DM at a time between any two users 

577 other_user_id = recipient_user_ids[0] 

578 

579 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

580 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

581 # chat, you know they already have a shared group chat 

582 count = func.count(GroupChatSubscription.id).label("count") 

583 if session.execute( 

584 select(count) 

585 .where( 

586 or_( 

587 GroupChatSubscription.user_id == context.user_id, 

588 GroupChatSubscription.user_id == other_user_id, 

589 ) 

590 ) 

591 .where(GroupChatSubscription.left == None) 

592 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

593 .where(GroupChat.is_dm == True) 

594 .group_by(GroupChatSubscription.group_chat_id) 

595 .having(count == 2) 

596 ).scalar_one_or_none(): 

597 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM) 

598 

599 # Check if user has been initiating chats excessively 

600 if process_rate_limits_and_check_abort( 

601 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation 

602 ): 

603 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.CHAT_INITIATION_RATE_LIMIT) 

604 

605 group_chat = _create_chat( 

606 session, 

607 creator_id=context.user_id, 

608 recipient_ids=request.recipient_user_ids, 

609 title=request.title.value, 

610 ) 

611 

612 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id) 

613 

614 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

615 

616 session.flush() 

617 

618 return conversations_pb2.GroupChat( 

619 group_chat_id=group_chat.conversation_id, 

620 title=group_chat.title, 

621 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

622 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

623 only_admins_invite=group_chat.only_admins_invite, 

624 is_dm=group_chat.is_dm, 

625 created=Timestamp_from_datetime(group_chat.conversation.created), 

626 mute_info=_mute_info(your_subscription), 

627 can_message=True, 

628 ) 

629 

630 def SendMessage(self, request, context, session): 

631 if request.text == "": 

632 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

633 

634 result = session.execute( 

635 select(GroupChatSubscription, GroupChat) 

636 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

637 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

638 .where(GroupChatSubscription.user_id == context.user_id) 

639 .where(GroupChatSubscription.left == None) 

640 ).one_or_none() 

641 if not result: 

642 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

643 

644 subscription, group_chat = result 

645 if not _user_can_message(session, context, group_chat): 

646 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MESSAGE_IN_CHAT) 

647 

648 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

649 

650 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

651 sent_messages_counter.labels( 

652 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

653 ).inc() 

654 

655 return empty_pb2.Empty() 

656 

657 def SendDirectMessage(self, request, context, session): 

658 user_id = context.user_id 

659 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

660 

661 recipient_id = request.recipient_user_id 

662 

663 if not user.has_completed_profile: 

664 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE) 

665 

666 if not recipient_id: 

667 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS) 

668 

669 recipient_user_id = session.execute( 

670 select(User.id).where_users_visible(context).where(User.id == recipient_id) 

671 ).scalar_one_or_none() 

672 

673 if not recipient_user_id: 

674 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND) 

675 

676 if user_id == recipient_id: 

677 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF) 

678 

679 if request.text == "": 

680 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

681 

682 # Look for an existing direct message (DM) chat between the two users 

683 dm_chat_ids = ( 

684 select(GroupChatSubscription.group_chat_id) 

685 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id])) 

686 .group_by(GroupChatSubscription.group_chat_id) 

687 .having(func.count(GroupChatSubscription.user_id) == 2) 

688 ) 

689 

690 chat = session.execute( 

691 select(GroupChat).where(GroupChat.is_dm == True).where(GroupChat.conversation_id.in_(dm_chat_ids)).limit(1) 

692 ).scalar_one_or_none() 

693 

694 if not chat: 

695 chat = _create_chat(session, user_id, [recipient_id]) 

696 

697 # Retrieve the sender's active subscription to the chat 

698 subscription = _get_message_subscription(session, user_id, chat.conversation_id) 

699 

700 # Add the message to the conversation 

701 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

702 

703 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one() 

704 sent_messages_counter.labels(user_gender, "direct message").inc() 

705 

706 session.flush() 

707 

708 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id) 

709 

710 def EditGroupChat(self, request, context, session): 

711 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

712 

713 if not subscription: 

714 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

715 

716 if subscription.role != GroupChatRole.admin: 

717 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT) 

718 

719 if request.HasField("title"): 

720 subscription.group_chat.title = request.title.value 

721 

722 if request.HasField("only_admins_invite"): 

723 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

724 

725 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

726 

727 return empty_pb2.Empty() 

728 

729 def MakeGroupChatAdmin(self, request, context, session): 

730 if not session.execute( 

731 select(User).where_users_visible(context).where(User.id == request.user_id) 

732 ).scalar_one_or_none(): 

733 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

734 

735 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

736 

737 if not your_subscription: 

738 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

739 

740 if your_subscription.role != GroupChatRole.admin: 

741 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN) 

742 

743 if request.user_id == context.user_id: 

744 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN) 

745 

746 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

747 

748 if not their_subscription: 

749 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

750 

751 if their_subscription.role != GroupChatRole.participant: 

752 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN) 

753 

754 their_subscription.role = GroupChatRole.admin 

755 

756 _add_message_to_subscription( 

757 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

758 ) 

759 

760 return empty_pb2.Empty() 

761 

762 def RemoveGroupChatAdmin(self, request, context, session): 

763 if not session.execute( 

764 select(User).where_users_visible(context).where(User.id == request.user_id) 

765 ).scalar_one_or_none(): 

766 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

767 

768 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

769 

770 if not your_subscription: 

771 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

772 

773 if request.user_id == context.user_id: 

774 # Race condition! 

775 other_admins_count = session.execute( 

776 select(func.count()) 

777 .select_from(GroupChatSubscription) 

778 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

779 .where(GroupChatSubscription.user_id != context.user_id) 

780 .where(GroupChatSubscription.role == GroupChatRole.admin) 

781 .where(GroupChatSubscription.left == None) 

782 ).scalar_one() 

783 if not other_admins_count > 0: 

784 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN) 

785 

786 if your_subscription.role != GroupChatRole.admin: 

787 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN) 

788 

789 their_subscription = session.execute( 

790 select(GroupChatSubscription) 

791 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

792 .where(GroupChatSubscription.user_id == request.user_id) 

793 .where(GroupChatSubscription.left == None) 

794 .where(GroupChatSubscription.role == GroupChatRole.admin) 

795 ).scalar_one_or_none() 

796 

797 if not their_subscription: 

798 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

799 

800 their_subscription.role = GroupChatRole.participant 

801 

802 _add_message_to_subscription( 

803 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

804 ) 

805 

806 return empty_pb2.Empty() 

807 

808 def InviteToGroupChat(self, request, context, session): 

809 if not session.execute( 

810 select(User).where_users_visible(context).where(User.id == request.user_id) 

811 ).scalar_one_or_none(): 

812 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

813 

814 result = session.execute( 

815 select(GroupChatSubscription, GroupChat) 

816 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

817 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

818 .where(GroupChatSubscription.user_id == context.user_id) 

819 .where(GroupChatSubscription.left == None) 

820 ).one_or_none() 

821 

822 if not result: 

823 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

824 

825 your_subscription, group_chat = result 

826 

827 if not your_subscription or not group_chat: 

828 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

829 

830 if request.user_id == context.user_id: 

831 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF) 

832 

833 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

834 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED) 

835 

836 if group_chat.is_dm: 

837 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM) 

838 

839 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

840 

841 if their_subscription: 

842 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT) 

843 

844 # TODO: race condition! 

845 

846 subscription = GroupChatSubscription( 

847 user_id=request.user_id, 

848 group_chat=your_subscription.group_chat, 

849 role=GroupChatRole.participant, 

850 ) 

851 session.add(subscription) 

852 

853 _add_message_to_subscription( 

854 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

855 ) 

856 

857 return empty_pb2.Empty() 

858 

859 def RemoveGroupChatUser(self, request, context, session): 

860 """ 

861 1. Get admin info and check it's correct 

862 2. Get user data, check it's correct and remove user 

863 """ 

864 # Admin info 

865 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

866 

867 # if user info is missing 

868 if not your_subscription: 

869 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

870 

871 # if user not admin 

872 if your_subscription.role != GroupChatRole.admin: 

873 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER) 

874 

875 # if user wants to remove themselves 

876 if request.user_id == context.user_id: 

877 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF) 

878 

879 # get user info 

880 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

881 

882 # user not found 

883 if not their_subscription: 

884 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

885 

886 _add_message_to_subscription( 

887 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

888 ) 

889 

890 their_subscription.left = func.now() 

891 

892 return empty_pb2.Empty() 

893 

894 def LeaveGroupChat(self, request, context, session): 

895 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

896 

897 if not subscription: 

898 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

899 

900 if subscription.role == GroupChatRole.admin: 

901 other_admins_count = session.execute( 

902 select(func.count()) 

903 .select_from(GroupChatSubscription) 

904 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

905 .where(GroupChatSubscription.user_id != context.user_id) 

906 .where(GroupChatSubscription.role == GroupChatRole.admin) 

907 .where(GroupChatSubscription.left == None) 

908 ).scalar_one() 

909 participants_count = session.execute( 

910 select(func.count()) 

911 .select_from(GroupChatSubscription) 

912 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

913 .where(GroupChatSubscription.user_id != context.user_id) 

914 .where(GroupChatSubscription.role == GroupChatRole.participant) 

915 .where(GroupChatSubscription.left == None) 

916 ).scalar_one() 

917 if not (other_admins_count > 0 or participants_count == 0): 

918 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE) 

919 

920 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

921 

922 subscription.left = func.now() 

923 

924 return empty_pb2.Empty()