Coverage for src/couchers/servicers/conversations.py: 91%

291 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-12 05:54 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import func, not_, or_ 

7 

8from couchers import errors 

9from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

10from couchers.db import session_scope 

11from couchers.jobs.enqueue import queue_job 

12from couchers.metrics import sent_messages_counter 

13from couchers.models import ( 

14 Conversation, 

15 GroupChat, 

16 GroupChatRole, 

17 GroupChatSubscription, 

18 Message, 

19 MessageType, 

20 RateLimitAction, 

21 User, 

22) 

23from couchers.notifications.notify import notify 

24from couchers.rate_limits.check import process_rate_limits_and_check_abort 

25from couchers.servicers.api import user_model_to_pb 

26from couchers.sql import couchers_select as select 

27from couchers.utils import Timestamp_from_datetime, make_user_context, now 

28from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

29from proto.internal import jobs_pb2 

30 

31logger = logging.getLogger(__name__) 

32 

33# TODO: Still needs custom pagination: GetUpdates 

34DEFAULT_PAGINATION_LENGTH = 20 

35MAX_PAGE_SIZE = 50 

36 

37 

38def _message_to_pb(message: Message): 

39 """ 

40 Turns the given message to a protocol buffer 

41 """ 

42 if message.is_normal_message: 

43 return conversations_pb2.Message( 

44 message_id=message.id, 

45 author_user_id=message.author_id, 

46 time=Timestamp_from_datetime(message.time), 

47 text=conversations_pb2.MessageContentText(text=message.text), 

48 ) 

49 else: 

50 return conversations_pb2.Message( 

51 message_id=message.id, 

52 author_user_id=message.author_id, 

53 time=Timestamp_from_datetime(message.time), 

54 chat_created=( 

55 conversations_pb2.MessageContentChatCreated() 

56 if message.message_type == MessageType.chat_created 

57 else None 

58 ), 

59 chat_edited=( 

60 conversations_pb2.MessageContentChatEdited() 

61 if message.message_type == MessageType.chat_edited 

62 else None 

63 ), 

64 user_invited=( 

65 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

66 if message.message_type == MessageType.user_invited 

67 else None 

68 ), 

69 user_left=( 

70 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

71 ), 

72 user_made_admin=( 

73 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

74 if message.message_type == MessageType.user_made_admin 

75 else None 

76 ), 

77 user_removed_admin=( 

78 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

79 if message.message_type == MessageType.user_removed_admin 

80 else None 

81 ), 

82 group_chat_user_removed=( 

83 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

84 if message.message_type == MessageType.user_removed 

85 else None 

86 ), 

87 ) 

88 

89 

90def _get_visible_members_for_subscription(subscription): 

91 """ 

92 If a user leaves a group chat, they shouldn't be able to see who's added 

93 after they left 

94 """ 

95 if not subscription.left: 

96 # still in the chat, we see everyone with a current subscription 

97 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

98 else: 

99 # not in chat anymore, see everyone who was in chat when we left 

100 return [ 

101 sub.user_id 

102 for sub in subscription.group_chat.subscriptions.where( 

103 GroupChatSubscription.joined <= subscription.left 

104 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

105 ] 

106 

107 

108def _get_visible_admins_for_subscription(subscription): 

109 """ 

110 If a user leaves a group chat, they shouldn't be able to see who's added 

111 after they left 

112 """ 

113 if not subscription.left: 

114 # still in the chat, we see everyone with a current subscription 

115 return [ 

116 sub.user_id 

117 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

118 GroupChatSubscription.role == GroupChatRole.admin 

119 ) 

120 ] 

121 else: 

122 # not in chat anymore, see everyone who was in chat when we left 

123 return [ 

124 sub.user_id 

125 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

126 .where(GroupChatSubscription.joined <= subscription.left) 

127 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

128 ] 

129 

130 

131def _user_can_message(session, context, group_chat: GroupChat) -> bool: 

132 """ 

133 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant 

134 - Is not deleted/banned 

135 - Has not been blocked by the user or is blocking the user 

136 - Has not left the chat 

137 """ 

138 if not group_chat.is_dm: 

139 return True 

140 return session.execute( 

141 func.exists( 

142 select(GroupChatSubscription) 

143 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id) 

144 .where(GroupChatSubscription.user_id != context.user_id) 

145 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

146 .where(GroupChatSubscription.left == None) 

147 ) 

148 ).scalar_one() 

149 

150 

151def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload): 

152 """ 

153 Background job to generate notifications for a message sent to a group chat 

154 """ 

155 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

156 

157 with session_scope() as session: 

158 message, group_chat = session.execute( 

159 select(Message, GroupChat) 

160 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

161 .where(Message.id == payload.message_id) 

162 ).one() 

163 

164 if message.message_type != MessageType.text: 

165 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

166 return [] 

167 

168 context = make_user_context(user_id=message.author_id) 

169 user_ids_to_notify = ( 

170 session.execute( 

171 select(GroupChatSubscription.user_id) 

172 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id) 

173 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

174 .where(GroupChatSubscription.user_id != message.author_id) 

175 .where(GroupChatSubscription.joined <= message.time) 

176 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

177 .where(not_(GroupChatSubscription.is_muted)) 

178 ) 

179 .scalars() 

180 .all() 

181 ) 

182 

183 if group_chat.is_dm: 

184 msg = f"{message.author.name} sent you a message" 

185 else: 

186 msg = f"{message.author.name} sent a message in {group_chat.title}" 

187 

188 for user_id in user_ids_to_notify: 

189 notify( 

190 session, 

191 user_id=user_id, 

192 topic_action="chat:message", 

193 key=message.conversation_id, 

194 data=notification_data_pb2.ChatMessage( 

195 author=user_model_to_pb( 

196 message.author, 

197 session, 

198 make_user_context(user_id=user_id), 

199 ), 

200 message=msg, 

201 text=message.text, 

202 group_chat_id=message.conversation_id, 

203 ), 

204 ) 

205 

206 

207def _add_message_to_subscription(session, subscription, **kwargs): 

208 """ 

209 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

210 

211 Specify the keyword args for Message 

212 """ 

213 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs) 

214 

215 session.add(message) 

216 session.flush() 

217 

218 subscription.last_seen_message_id = message.id 

219 

220 queue_job( 

221 session, 

222 job_type="generate_message_notifications", 

223 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

224 message_id=message.id, 

225 ), 

226 ) 

227 

228 return message 

229 

230 

231def _create_chat(session, creator_id, recipient_ids, title=None, only_admins_invite=True): 

232 conversation = Conversation() 

233 session.add(conversation) 

234 session.flush() 

235 

236 chat = GroupChat( 

237 conversation_id=conversation.id, 

238 title=title, 

239 creator_id=creator_id, 

240 is_dm=True if len(recipient_ids) == 1 else False, 

241 only_admins_invite=only_admins_invite, 

242 ) 

243 session.add(chat) 

244 session.flush() 

245 

246 creator_subscription = GroupChatSubscription( 

247 user_id=creator_id, 

248 group_chat=chat, 

249 role=GroupChatRole.admin, 

250 ) 

251 session.add(creator_subscription) 

252 

253 for uid in recipient_ids: 

254 session.add( 

255 GroupChatSubscription( 

256 user_id=uid, 

257 group_chat=chat, 

258 role=GroupChatRole.participant, 

259 ) 

260 ) 

261 

262 return chat 

263 

264 

265def _get_message_subscription(session, user_id, conversation_id): 

266 subscription = session.execute( 

267 select(GroupChatSubscription) 

268 .where(GroupChatSubscription.group_chat_id == conversation_id) 

269 .where(GroupChatSubscription.user_id == user_id) 

270 .where(GroupChatSubscription.left == None) 

271 ).scalar_one_or_none() 

272 

273 return subscription 

274 

275 

276def _unseen_message_count(session, subscription_id): 

277 return session.execute( 

278 select(func.count()) 

279 .select_from(Message) 

280 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

281 .where(GroupChatSubscription.id == subscription_id) 

282 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

283 ).scalar_one() 

284 

285 

286def _mute_info(subscription): 

287 (muted, muted_until) = subscription.muted_display() 

288 return conversations_pb2.MuteInfo( 

289 muted=muted, 

290 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

291 ) 

292 

293 

294class Conversations(conversations_pb2_grpc.ConversationsServicer): 

295 def ListGroupChats(self, request, context, session): 

296 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

297 page_size = min(page_size, MAX_PAGE_SIZE) 

298 

299 # select group chats where you have a subscription, and for each of 

300 # these, the latest message from them 

301 

302 t = ( 

303 select( 

304 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

305 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

306 func.max(Message.id).label("message_id"), 

307 ) 

308 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

309 .where(GroupChatSubscription.user_id == context.user_id) 

310 .where(Message.time >= GroupChatSubscription.joined) 

311 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

312 .group_by(GroupChatSubscription.group_chat_id) 

313 .order_by(func.max(Message.id).desc()) 

314 .subquery() 

315 ) 

316 

317 results = session.execute( 

318 select(t, GroupChat, GroupChatSubscription, Message) 

319 .join(Message, Message.id == t.c.message_id) 

320 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

321 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

322 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0)) 

323 .order_by(t.c.message_id.desc()) 

324 .limit(page_size + 1) 

325 ).all() 

326 

327 return conversations_pb2.ListGroupChatsRes( 

328 group_chats=[ 

329 conversations_pb2.GroupChat( 

330 group_chat_id=result.GroupChat.conversation_id, 

331 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

332 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

333 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

334 only_admins_invite=result.GroupChat.only_admins_invite, 

335 is_dm=result.GroupChat.is_dm, 

336 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

337 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

338 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

339 latest_message=_message_to_pb(result.Message) if result.Message else None, 

340 mute_info=_mute_info(result.GroupChatSubscription), 

341 can_message=_user_can_message(session, context, result.GroupChat), 

342 ) 

343 for result in results[:page_size] 

344 ], 

345 last_message_id=( 

346 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

347 ), # TODO 

348 no_more=len(results) <= page_size, 

349 ) 

350 

351 def GetGroupChat(self, request, context, session): 

352 result = session.execute( 

353 select(GroupChat, GroupChatSubscription, Message) 

354 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

355 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

356 .where(GroupChatSubscription.user_id == context.user_id) 

357 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

358 .where(Message.time >= GroupChatSubscription.joined) 

359 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

360 .order_by(Message.id.desc()) 

361 .limit(1) 

362 ).one_or_none() 

363 

364 if not result: 

365 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

366 

367 return conversations_pb2.GroupChat( 

368 group_chat_id=result.GroupChat.conversation_id, 

369 title=result.GroupChat.title, 

370 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

371 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

372 only_admins_invite=result.GroupChat.only_admins_invite, 

373 is_dm=result.GroupChat.is_dm, 

374 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

375 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

376 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

377 latest_message=_message_to_pb(result.Message) if result.Message else None, 

378 mute_info=_mute_info(result.GroupChatSubscription), 

379 can_message=_user_can_message(session, context, result.GroupChat), 

380 ) 

381 

382 def GetDirectMessage(self, request, context, session): 

383 count = func.count(GroupChatSubscription.id).label("count") 

384 subquery = ( 

385 select(GroupChatSubscription.group_chat_id) 

386 .where( 

387 or_( 

388 GroupChatSubscription.user_id == context.user_id, 

389 GroupChatSubscription.user_id == request.user_id, 

390 ) 

391 ) 

392 .where(GroupChatSubscription.left == None) 

393 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

394 .where(GroupChat.is_dm == True) 

395 .group_by(GroupChatSubscription.group_chat_id) 

396 .having(count == 2) 

397 .subquery() 

398 ) 

399 

400 result = session.execute( 

401 select(subquery, GroupChat, GroupChatSubscription, Message) 

402 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

403 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

404 .where(GroupChatSubscription.user_id == context.user_id) 

405 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

406 .where(Message.time >= GroupChatSubscription.joined) 

407 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

408 .order_by(Message.id.desc()) 

409 .limit(1) 

410 ).one_or_none() 

411 

412 if not result: 

413 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

414 

415 return conversations_pb2.GroupChat( 

416 group_chat_id=result.GroupChat.conversation_id, 

417 title=result.GroupChat.title, 

418 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

419 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

420 only_admins_invite=result.GroupChat.only_admins_invite, 

421 is_dm=result.GroupChat.is_dm, 

422 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

423 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

424 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

425 latest_message=_message_to_pb(result.Message) if result.Message else None, 

426 mute_info=_mute_info(result.GroupChatSubscription), 

427 can_message=_user_can_message(session, context, result.GroupChat), 

428 ) 

429 

430 def GetUpdates(self, request, context, session): 

431 results = ( 

432 session.execute( 

433 select(Message) 

434 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

435 .where(GroupChatSubscription.user_id == context.user_id) 

436 .where(Message.time >= GroupChatSubscription.joined) 

437 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

438 .where(Message.id > request.newest_message_id) 

439 .order_by(Message.id.asc()) 

440 .limit(DEFAULT_PAGINATION_LENGTH + 1) 

441 ) 

442 .scalars() 

443 .all() 

444 ) 

445 

446 return conversations_pb2.GetUpdatesRes( 

447 updates=[ 

448 conversations_pb2.Update( 

449 group_chat_id=message.conversation_id, 

450 message=_message_to_pb(message), 

451 ) 

452 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

453 ], 

454 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

455 ) 

456 

457 def GetGroupChatMessages(self, request, context, session): 

458 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

459 page_size = min(page_size, MAX_PAGE_SIZE) 

460 

461 results = ( 

462 session.execute( 

463 select(Message) 

464 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

465 .where(GroupChatSubscription.user_id == context.user_id) 

466 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

467 .where(Message.time >= GroupChatSubscription.joined) 

468 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

469 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

470 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0)) 

471 .order_by(Message.id.desc()) 

472 .limit(page_size + 1) 

473 ) 

474 .scalars() 

475 .all() 

476 ) 

477 

478 return conversations_pb2.GetGroupChatMessagesRes( 

479 messages=[_message_to_pb(message) for message in results[:page_size]], 

480 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

481 no_more=len(results) <= page_size, 

482 ) 

483 

484 def MarkLastSeenGroupChat(self, request, context, session): 

485 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

486 

487 if not subscription: 

488 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

489 

490 if not subscription.last_seen_message_id <= request.last_seen_message_id: 

491 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

492 

493 subscription.last_seen_message_id = request.last_seen_message_id 

494 

495 # TODO: notify 

496 

497 return empty_pb2.Empty() 

498 

499 def MuteGroupChat(self, request, context, session): 

500 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

501 

502 if not subscription: 

503 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

504 

505 if request.unmute: 

506 subscription.muted_until = DATETIME_MINUS_INFINITY 

507 elif request.forever: 

508 subscription.muted_until = DATETIME_INFINITY 

509 elif request.for_duration: 

510 duration = request.for_duration.ToTimedelta() 

511 if duration < timedelta(seconds=0): 

512 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST) 

513 subscription.muted_until = now() + duration 

514 

515 return empty_pb2.Empty() 

516 

517 def SearchMessages(self, request, context, session): 

518 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

519 page_size = min(page_size, MAX_PAGE_SIZE) 

520 

521 results = ( 

522 session.execute( 

523 select(Message) 

524 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

525 .where(GroupChatSubscription.user_id == context.user_id) 

526 .where(Message.time >= GroupChatSubscription.joined) 

527 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

528 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

529 .where(Message.text.ilike(f"%{request.query}%")) 

530 .order_by(Message.id.desc()) 

531 .limit(page_size + 1) 

532 ) 

533 .scalars() 

534 .all() 

535 ) 

536 

537 return conversations_pb2.SearchMessagesRes( 

538 results=[ 

539 conversations_pb2.MessageSearchResult( 

540 group_chat_id=message.conversation_id, 

541 message=_message_to_pb(message), 

542 ) 

543 for message in results[:page_size] 

544 ], 

545 last_message_id=results[-2].id if len(results) > 1 else 0, 

546 no_more=len(results) <= page_size, 

547 ) 

548 

549 def CreateGroupChat(self, request, context, session): 

550 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

551 if not user.has_completed_profile: 

552 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE) 

553 

554 recipient_user_ids = list( 

555 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids))) 

556 .scalars() 

557 .all() 

558 ) 

559 

560 # make sure all requested users are visible 

561 if len(recipient_user_ids) != len(request.recipient_user_ids): 

562 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND) 

563 

564 if not recipient_user_ids: 

565 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS) 

566 

567 if len(recipient_user_ids) != len(set(recipient_user_ids)): 

568 # make sure there's no duplicate users 

569 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS) 

570 

571 if context.user_id in recipient_user_ids: 

572 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF) 

573 

574 if len(recipient_user_ids) == 1: 

575 # can only have one DM at a time between any two users 

576 other_user_id = recipient_user_ids[0] 

577 

578 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

579 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

580 # chat, you know they already have a shared group chat 

581 count = func.count(GroupChatSubscription.id).label("count") 

582 if session.execute( 

583 select(count) 

584 .where( 

585 or_( 

586 GroupChatSubscription.user_id == context.user_id, 

587 GroupChatSubscription.user_id == other_user_id, 

588 ) 

589 ) 

590 .where(GroupChatSubscription.left == None) 

591 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

592 .where(GroupChat.is_dm == True) 

593 .group_by(GroupChatSubscription.group_chat_id) 

594 .having(count == 2) 

595 ).scalar_one_or_none(): 

596 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM) 

597 

598 # Check if user has been initiating chats excessively 

599 if process_rate_limits_and_check_abort( 

600 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation 

601 ): 

602 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.CHAT_INITIATION_RATE_LIMIT) 

603 

604 group_chat = _create_chat( 

605 session, 

606 creator_id=context.user_id, 

607 recipient_ids=request.recipient_user_ids, 

608 title=request.title.value, 

609 ) 

610 

611 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id) 

612 

613 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

614 

615 session.flush() 

616 

617 return conversations_pb2.GroupChat( 

618 group_chat_id=group_chat.conversation_id, 

619 title=group_chat.title, 

620 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

621 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

622 only_admins_invite=group_chat.only_admins_invite, 

623 is_dm=group_chat.is_dm, 

624 created=Timestamp_from_datetime(group_chat.conversation.created), 

625 mute_info=_mute_info(your_subscription), 

626 can_message=True, 

627 ) 

628 

629 def SendMessage(self, request, context, session): 

630 if request.text == "": 

631 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

632 

633 result = session.execute( 

634 select(GroupChatSubscription, GroupChat) 

635 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

636 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

637 .where(GroupChatSubscription.user_id == context.user_id) 

638 .where(GroupChatSubscription.left == None) 

639 ).one_or_none() 

640 if not result: 

641 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

642 

643 subscription, group_chat = result 

644 if not _user_can_message(session, context, group_chat): 

645 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MESSAGE_IN_CHAT) 

646 

647 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

648 

649 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

650 sent_messages_counter.labels( 

651 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

652 ).inc() 

653 

654 return empty_pb2.Empty() 

655 

656 def SendDirectMessage(self, request, context, session): 

657 user_id = context.user_id 

658 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

659 

660 recipient_id = request.recipient_user_id 

661 

662 if not user.has_completed_profile: 

663 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE) 

664 

665 if not recipient_id: 

666 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS) 

667 

668 recipient_user_id = session.execute( 

669 select(User.id).where_users_visible(context).where(User.id == recipient_id) 

670 ).scalar_one_or_none() 

671 

672 if not recipient_user_id: 

673 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND) 

674 

675 if user_id == recipient_id: 

676 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF) 

677 

678 if request.text == "": 

679 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

680 

681 # Look for an existing direct message (DM) chat between the two users 

682 dm_chat_ids = ( 

683 select(GroupChatSubscription.group_chat_id) 

684 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id])) 

685 .group_by(GroupChatSubscription.group_chat_id) 

686 .having(func.count(GroupChatSubscription.user_id) == 2) 

687 ) 

688 

689 chat = session.execute( 

690 select(GroupChat).where(GroupChat.is_dm == True).where(GroupChat.conversation_id.in_(dm_chat_ids)).limit(1) 

691 ).scalar_one_or_none() 

692 

693 if not chat: 

694 chat = _create_chat(session, user_id, [recipient_id]) 

695 

696 # Retrieve the sender's active subscription to the chat 

697 subscription = _get_message_subscription(session, user_id, chat.conversation_id) 

698 

699 # Add the message to the conversation 

700 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

701 

702 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one() 

703 sent_messages_counter.labels(user_gender, "direct message").inc() 

704 

705 session.flush() 

706 

707 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id) 

708 

709 def EditGroupChat(self, request, context, session): 

710 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

711 

712 if not subscription: 

713 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

714 

715 if subscription.role != GroupChatRole.admin: 

716 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT) 

717 

718 if request.HasField("title"): 

719 subscription.group_chat.title = request.title.value 

720 

721 if request.HasField("only_admins_invite"): 

722 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

723 

724 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

725 

726 return empty_pb2.Empty() 

727 

728 def MakeGroupChatAdmin(self, request, context, session): 

729 if not session.execute( 

730 select(User).where_users_visible(context).where(User.id == request.user_id) 

731 ).scalar_one_or_none(): 

732 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

733 

734 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

735 

736 if not your_subscription: 

737 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

738 

739 if your_subscription.role != GroupChatRole.admin: 

740 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN) 

741 

742 if request.user_id == context.user_id: 

743 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN) 

744 

745 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

746 

747 if not their_subscription: 

748 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

749 

750 if their_subscription.role != GroupChatRole.participant: 

751 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN) 

752 

753 their_subscription.role = GroupChatRole.admin 

754 

755 _add_message_to_subscription( 

756 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

757 ) 

758 

759 return empty_pb2.Empty() 

760 

761 def RemoveGroupChatAdmin(self, request, context, session): 

762 if not session.execute( 

763 select(User).where_users_visible(context).where(User.id == request.user_id) 

764 ).scalar_one_or_none(): 

765 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

766 

767 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

768 

769 if not your_subscription: 

770 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

771 

772 if request.user_id == context.user_id: 

773 # Race condition! 

774 other_admins_count = session.execute( 

775 select(func.count()) 

776 .select_from(GroupChatSubscription) 

777 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

778 .where(GroupChatSubscription.user_id != context.user_id) 

779 .where(GroupChatSubscription.role == GroupChatRole.admin) 

780 .where(GroupChatSubscription.left == None) 

781 ).scalar_one() 

782 if not other_admins_count > 0: 

783 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN) 

784 

785 if your_subscription.role != GroupChatRole.admin: 

786 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN) 

787 

788 their_subscription = session.execute( 

789 select(GroupChatSubscription) 

790 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

791 .where(GroupChatSubscription.user_id == request.user_id) 

792 .where(GroupChatSubscription.left == None) 

793 .where(GroupChatSubscription.role == GroupChatRole.admin) 

794 ).scalar_one_or_none() 

795 

796 if not their_subscription: 

797 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

798 

799 their_subscription.role = GroupChatRole.participant 

800 

801 _add_message_to_subscription( 

802 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

803 ) 

804 

805 return empty_pb2.Empty() 

806 

807 def InviteToGroupChat(self, request, context, session): 

808 if not session.execute( 

809 select(User).where_users_visible(context).where(User.id == request.user_id) 

810 ).scalar_one_or_none(): 

811 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

812 

813 result = session.execute( 

814 select(GroupChatSubscription, GroupChat) 

815 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

816 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

817 .where(GroupChatSubscription.user_id == context.user_id) 

818 .where(GroupChatSubscription.left == None) 

819 ).one_or_none() 

820 

821 if not result: 

822 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

823 

824 your_subscription, group_chat = result 

825 

826 if not your_subscription or not group_chat: 

827 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

828 

829 if request.user_id == context.user_id: 

830 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF) 

831 

832 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

833 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED) 

834 

835 if group_chat.is_dm: 

836 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM) 

837 

838 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

839 

840 if their_subscription: 

841 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT) 

842 

843 # TODO: race condition! 

844 

845 subscription = GroupChatSubscription( 

846 user_id=request.user_id, 

847 group_chat=your_subscription.group_chat, 

848 role=GroupChatRole.participant, 

849 ) 

850 session.add(subscription) 

851 

852 _add_message_to_subscription( 

853 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

854 ) 

855 

856 return empty_pb2.Empty() 

857 

858 def RemoveGroupChatUser(self, request, context, session): 

859 """ 

860 1. Get admin info and check it's correct 

861 2. Get user data, check it's correct and remove user 

862 """ 

863 # Admin info 

864 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

865 

866 # if user info is missing 

867 if not your_subscription: 

868 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

869 

870 # if user not admin 

871 if your_subscription.role != GroupChatRole.admin: 

872 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER) 

873 

874 # if user wants to remove themselves 

875 if request.user_id == context.user_id: 

876 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF) 

877 

878 # get user info 

879 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

880 

881 # user not found 

882 if not their_subscription: 

883 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

884 

885 _add_message_to_subscription( 

886 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

887 ) 

888 

889 their_subscription.left = func.now() 

890 

891 return empty_pb2.Empty() 

892 

893 def LeaveGroupChat(self, request, context, session): 

894 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

895 

896 if not subscription: 

897 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

898 

899 if subscription.role == GroupChatRole.admin: 

900 other_admins_count = session.execute( 

901 select(func.count()) 

902 .select_from(GroupChatSubscription) 

903 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

904 .where(GroupChatSubscription.user_id != context.user_id) 

905 .where(GroupChatSubscription.role == GroupChatRole.admin) 

906 .where(GroupChatSubscription.left == None) 

907 ).scalar_one() 

908 participants_count = session.execute( 

909 select(func.count()) 

910 .select_from(GroupChatSubscription) 

911 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

912 .where(GroupChatSubscription.user_id != context.user_id) 

913 .where(GroupChatSubscription.role == GroupChatRole.participant) 

914 .where(GroupChatSubscription.left == None) 

915 ).scalar_one() 

916 if not (other_admins_count > 0 or participants_count == 0): 

917 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE) 

918 

919 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

920 

921 subscription.left = func.now() 

922 

923 return empty_pb2.Empty()