Coverage for src/couchers/servicers/conversations.py: 91%

292 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-21 02:54 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import func, not_, or_ 

7 

8from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

9from couchers.context import make_background_user_context 

10from couchers.db import session_scope 

11from couchers.jobs.enqueue import queue_job 

12from couchers.metrics import sent_messages_counter 

13from couchers.models import ( 

14 Conversation, 

15 GroupChat, 

16 GroupChatRole, 

17 GroupChatSubscription, 

18 Message, 

19 MessageType, 

20 RateLimitAction, 

21 User, 

22) 

23from couchers.notifications.notify import notify 

24from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

25from couchers.proto.internal import jobs_pb2 

26from couchers.rate_limits.check import process_rate_limits_and_check_abort 

27from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

28from couchers.servicers.api import user_model_to_pb 

29from couchers.sql import couchers_select as select 

30from couchers.utils import Timestamp_from_datetime, now 

31 

32logger = logging.getLogger(__name__) 

33 

34# TODO: Still needs custom pagination: GetUpdates 

35DEFAULT_PAGINATION_LENGTH = 20 

36MAX_PAGE_SIZE = 50 

37 

38 

39def _message_to_pb(message: Message) -> conversations_pb2.Message: 

40 """ 

41 Turns the given message to a protocol buffer 

42 """ 

43 if message.is_normal_message: 

44 return conversations_pb2.Message( 

45 message_id=message.id, 

46 author_user_id=message.author_id, 

47 time=Timestamp_from_datetime(message.time), 

48 text=conversations_pb2.MessageContentText(text=message.text), 

49 ) 

50 else: 

51 return conversations_pb2.Message( 

52 message_id=message.id, 

53 author_user_id=message.author_id, 

54 time=Timestamp_from_datetime(message.time), 

55 chat_created=( 

56 conversations_pb2.MessageContentChatCreated() 

57 if message.message_type == MessageType.chat_created 

58 else None 

59 ), 

60 chat_edited=( 

61 conversations_pb2.MessageContentChatEdited() 

62 if message.message_type == MessageType.chat_edited 

63 else None 

64 ), 

65 user_invited=( 

66 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

67 if message.message_type == MessageType.user_invited 

68 else None 

69 ), 

70 user_left=( 

71 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

72 ), 

73 user_made_admin=( 

74 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

75 if message.message_type == MessageType.user_made_admin 

76 else None 

77 ), 

78 user_removed_admin=( 

79 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

80 if message.message_type == MessageType.user_removed_admin 

81 else None 

82 ), 

83 group_chat_user_removed=( 

84 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

85 if message.message_type == MessageType.user_removed 

86 else None 

87 ), 

88 ) 

89 

90 

91def _get_visible_members_for_subscription(subscription): 

92 """ 

93 If a user leaves a group chat, they shouldn't be able to see who's added 

94 after they left 

95 """ 

96 if not subscription.left: 

97 # still in the chat, we see everyone with a current subscription 

98 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

99 else: 

100 # not in chat anymore, see everyone who was in chat when we left 

101 return [ 

102 sub.user_id 

103 for sub in subscription.group_chat.subscriptions.where( 

104 GroupChatSubscription.joined <= subscription.left 

105 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

106 ] 

107 

108 

109def _get_visible_admins_for_subscription(subscription): 

110 """ 

111 If a user leaves a group chat, they shouldn't be able to see who's added 

112 after they left 

113 """ 

114 if not subscription.left: 

115 # still in the chat, we see everyone with a current subscription 

116 return [ 

117 sub.user_id 

118 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

119 GroupChatSubscription.role == GroupChatRole.admin 

120 ) 

121 ] 

122 else: 

123 # not in chat anymore, see everyone who was in chat when we left 

124 return [ 

125 sub.user_id 

126 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

127 .where(GroupChatSubscription.joined <= subscription.left) 

128 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

129 ] 

130 

131 

132def _user_can_message(session, context, group_chat: GroupChat) -> bool: 

133 """ 

134 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant 

135 - Is not deleted/banned 

136 - Has not been blocked by the user or is blocking the user 

137 - Has not left the chat 

138 """ 

139 if not group_chat.is_dm: 

140 return True 

141 return session.execute( 

142 func.exists( 

143 select(GroupChatSubscription) 

144 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id) 

145 .where(GroupChatSubscription.user_id != context.user_id) 

146 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

147 .where(GroupChatSubscription.left == None) 

148 ) 

149 ).scalar_one() 

150 

151 

152def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload): 

153 """ 

154 Background job to generate notifications for a message sent to a group chat 

155 """ 

156 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

157 

158 with session_scope() as session: 

159 message, group_chat = session.execute( 

160 select(Message, GroupChat) 

161 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

162 .where(Message.id == payload.message_id) 

163 ).one() 

164 

165 if message.message_type != MessageType.text: 

166 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

167 return [] 

168 

169 context = make_background_user_context(user_id=message.author_id) 

170 user_ids_to_notify = ( 

171 session.execute( 

172 select(GroupChatSubscription.user_id) 

173 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id) 

174 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

175 .where(GroupChatSubscription.user_id != message.author_id) 

176 .where(GroupChatSubscription.joined <= message.time) 

177 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

178 .where(not_(GroupChatSubscription.is_muted)) 

179 ) 

180 .scalars() 

181 .all() 

182 ) 

183 

184 if group_chat.is_dm: 

185 msg = f"{message.author.name} sent you a message" 

186 else: 

187 msg = f"{message.author.name} sent a message in {group_chat.title}" 

188 

189 for user_id in user_ids_to_notify: 

190 notify( 

191 session, 

192 user_id=user_id, 

193 topic_action="chat:message", 

194 key=message.conversation_id, 

195 data=notification_data_pb2.ChatMessage( 

196 author=user_model_to_pb( 

197 message.author, 

198 session, 

199 make_background_user_context(user_id=user_id), 

200 ), 

201 message=msg, 

202 text=message.text, 

203 group_chat_id=message.conversation_id, 

204 ), 

205 ) 

206 

207 

208def _add_message_to_subscription(session, subscription, **kwargs): 

209 """ 

210 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

211 

212 Specify the keyword args for Message 

213 """ 

214 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs) 

215 

216 session.add(message) 

217 session.flush() 

218 

219 subscription.last_seen_message_id = message.id 

220 

221 queue_job( 

222 session, 

223 job_type="generate_message_notifications", 

224 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

225 message_id=message.id, 

226 ), 

227 ) 

228 

229 return message 

230 

231 

232def _create_chat(session, creator_id, recipient_ids, title=None, only_admins_invite=True): 

233 conversation = Conversation() 

234 session.add(conversation) 

235 session.flush() 

236 

237 chat = GroupChat( 

238 conversation_id=conversation.id, 

239 title=title, 

240 creator_id=creator_id, 

241 is_dm=True if len(recipient_ids) == 1 else False, 

242 only_admins_invite=only_admins_invite, 

243 ) 

244 session.add(chat) 

245 session.flush() 

246 

247 creator_subscription = GroupChatSubscription( 

248 user_id=creator_id, 

249 group_chat=chat, 

250 role=GroupChatRole.admin, 

251 ) 

252 session.add(creator_subscription) 

253 

254 for uid in recipient_ids: 

255 session.add( 

256 GroupChatSubscription( 

257 user_id=uid, 

258 group_chat=chat, 

259 role=GroupChatRole.participant, 

260 ) 

261 ) 

262 

263 return chat 

264 

265 

266def _get_message_subscription(session, user_id, conversation_id): 

267 subscription = session.execute( 

268 select(GroupChatSubscription) 

269 .where(GroupChatSubscription.group_chat_id == conversation_id) 

270 .where(GroupChatSubscription.user_id == user_id) 

271 .where(GroupChatSubscription.left == None) 

272 ).scalar_one_or_none() 

273 

274 return subscription 

275 

276 

277def _unseen_message_count(session, subscription_id): 

278 return session.execute( 

279 select(func.count()) 

280 .select_from(Message) 

281 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

282 .where(GroupChatSubscription.id == subscription_id) 

283 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

284 ).scalar_one() 

285 

286 

287def _mute_info(subscription): 

288 (muted, muted_until) = subscription.muted_display() 

289 return conversations_pb2.MuteInfo( 

290 muted=muted, 

291 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

292 ) 

293 

294 

295class Conversations(conversations_pb2_grpc.ConversationsServicer): 

296 def ListGroupChats(self, request, context, session): 

297 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

298 page_size = min(page_size, MAX_PAGE_SIZE) 

299 

300 # select group chats where you have a subscription, and for each of 

301 # these, the latest message from them 

302 

303 t = ( 

304 select( 

305 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

306 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

307 func.max(Message.id).label("message_id"), 

308 ) 

309 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

310 .where(GroupChatSubscription.user_id == context.user_id) 

311 .where(Message.time >= GroupChatSubscription.joined) 

312 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

313 .group_by(GroupChatSubscription.group_chat_id) 

314 .order_by(func.max(Message.id).desc()) 

315 .subquery() 

316 ) 

317 

318 results = session.execute( 

319 select(t, GroupChat, GroupChatSubscription, Message) 

320 .join(Message, Message.id == t.c.message_id) 

321 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

322 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

323 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0)) 

324 .order_by(t.c.message_id.desc()) 

325 .limit(page_size + 1) 

326 ).all() 

327 

328 return conversations_pb2.ListGroupChatsRes( 

329 group_chats=[ 

330 conversations_pb2.GroupChat( 

331 group_chat_id=result.GroupChat.conversation_id, 

332 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

333 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

334 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

335 only_admins_invite=result.GroupChat.only_admins_invite, 

336 is_dm=result.GroupChat.is_dm, 

337 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

338 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

339 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

340 latest_message=_message_to_pb(result.Message) if result.Message else None, 

341 mute_info=_mute_info(result.GroupChatSubscription), 

342 can_message=_user_can_message(session, context, result.GroupChat), 

343 ) 

344 for result in results[:page_size] 

345 ], 

346 last_message_id=( 

347 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

348 ), # TODO 

349 no_more=len(results) <= page_size, 

350 ) 

351 

352 def GetGroupChat(self, request, context, session): 

353 result = session.execute( 

354 select(GroupChat, GroupChatSubscription, Message) 

355 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

356 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

357 .where(GroupChatSubscription.user_id == context.user_id) 

358 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

359 .where(Message.time >= GroupChatSubscription.joined) 

360 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

361 .order_by(Message.id.desc()) 

362 .limit(1) 

363 ).one_or_none() 

364 

365 if not result: 

366 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

367 

368 return conversations_pb2.GroupChat( 

369 group_chat_id=result.GroupChat.conversation_id, 

370 title=result.GroupChat.title, 

371 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

372 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

373 only_admins_invite=result.GroupChat.only_admins_invite, 

374 is_dm=result.GroupChat.is_dm, 

375 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

376 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

377 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

378 latest_message=_message_to_pb(result.Message) if result.Message else None, 

379 mute_info=_mute_info(result.GroupChatSubscription), 

380 can_message=_user_can_message(session, context, result.GroupChat), 

381 ) 

382 

383 def GetDirectMessage(self, request, context, session): 

384 count = func.count(GroupChatSubscription.id).label("count") 

385 subquery = ( 

386 select(GroupChatSubscription.group_chat_id) 

387 .where( 

388 or_( 

389 GroupChatSubscription.user_id == context.user_id, 

390 GroupChatSubscription.user_id == request.user_id, 

391 ) 

392 ) 

393 .where(GroupChatSubscription.left == None) 

394 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

395 .where(GroupChat.is_dm == True) 

396 .group_by(GroupChatSubscription.group_chat_id) 

397 .having(count == 2) 

398 .subquery() 

399 ) 

400 

401 result = session.execute( 

402 select(subquery, GroupChat, GroupChatSubscription, Message) 

403 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

404 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

405 .where(GroupChatSubscription.user_id == context.user_id) 

406 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

407 .where(Message.time >= GroupChatSubscription.joined) 

408 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

409 .order_by(Message.id.desc()) 

410 .limit(1) 

411 ).one_or_none() 

412 

413 if not result: 

414 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

415 

416 return conversations_pb2.GroupChat( 

417 group_chat_id=result.GroupChat.conversation_id, 

418 title=result.GroupChat.title, 

419 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

420 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

421 only_admins_invite=result.GroupChat.only_admins_invite, 

422 is_dm=result.GroupChat.is_dm, 

423 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

424 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

425 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

426 latest_message=_message_to_pb(result.Message) if result.Message else None, 

427 mute_info=_mute_info(result.GroupChatSubscription), 

428 can_message=_user_can_message(session, context, result.GroupChat), 

429 ) 

430 

431 def GetUpdates(self, request, context, session): 

432 results = ( 

433 session.execute( 

434 select(Message) 

435 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

436 .where(GroupChatSubscription.user_id == context.user_id) 

437 .where(Message.time >= GroupChatSubscription.joined) 

438 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

439 .where(Message.id > request.newest_message_id) 

440 .order_by(Message.id.asc()) 

441 .limit(DEFAULT_PAGINATION_LENGTH + 1) 

442 ) 

443 .scalars() 

444 .all() 

445 ) 

446 

447 return conversations_pb2.GetUpdatesRes( 

448 updates=[ 

449 conversations_pb2.Update( 

450 group_chat_id=message.conversation_id, 

451 message=_message_to_pb(message), 

452 ) 

453 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

454 ], 

455 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

456 ) 

457 

458 def GetGroupChatMessages(self, request, context, session): 

459 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

460 page_size = min(page_size, MAX_PAGE_SIZE) 

461 

462 results = ( 

463 session.execute( 

464 select(Message) 

465 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

466 .where(GroupChatSubscription.user_id == context.user_id) 

467 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

468 .where(Message.time >= GroupChatSubscription.joined) 

469 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

470 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

471 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0)) 

472 .order_by(Message.id.desc()) 

473 .limit(page_size + 1) 

474 ) 

475 .scalars() 

476 .all() 

477 ) 

478 

479 return conversations_pb2.GetGroupChatMessagesRes( 

480 messages=[_message_to_pb(message) for message in results[:page_size]], 

481 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

482 no_more=len(results) <= page_size, 

483 ) 

484 

485 def MarkLastSeenGroupChat(self, request, context, session): 

486 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

487 

488 if not subscription: 

489 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

490 

491 if not subscription.last_seen_message_id <= request.last_seen_message_id: 

492 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

493 

494 subscription.last_seen_message_id = request.last_seen_message_id 

495 

496 # TODO: notify 

497 

498 return empty_pb2.Empty() 

499 

500 def MuteGroupChat(self, request, context, session): 

501 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

502 

503 if not subscription: 

504 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

505 

506 if request.unmute: 

507 subscription.muted_until = DATETIME_MINUS_INFINITY 

508 elif request.forever: 

509 subscription.muted_until = DATETIME_INFINITY 

510 elif request.for_duration: 

511 duration = request.for_duration.ToTimedelta() 

512 if duration < timedelta(seconds=0): 

513 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past") 

514 subscription.muted_until = now() + duration 

515 

516 return empty_pb2.Empty() 

517 

518 def SearchMessages(self, request, context, session): 

519 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

520 page_size = min(page_size, MAX_PAGE_SIZE) 

521 

522 results = ( 

523 session.execute( 

524 select(Message) 

525 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

526 .where(GroupChatSubscription.user_id == context.user_id) 

527 .where(Message.time >= GroupChatSubscription.joined) 

528 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

529 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

530 .where(Message.text.ilike(f"%{request.query}%")) 

531 .order_by(Message.id.desc()) 

532 .limit(page_size + 1) 

533 ) 

534 .scalars() 

535 .all() 

536 ) 

537 

538 return conversations_pb2.SearchMessagesRes( 

539 results=[ 

540 conversations_pb2.MessageSearchResult( 

541 group_chat_id=message.conversation_id, 

542 message=_message_to_pb(message), 

543 ) 

544 for message in results[:page_size] 

545 ], 

546 last_message_id=results[-2].id if len(results) > 1 else 0, 

547 no_more=len(results) <= page_size, 

548 ) 

549 

550 def CreateGroupChat(self, request, context, session): 

551 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

552 if not user.has_completed_profile: 

553 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

554 

555 recipient_user_ids = list( 

556 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids))) 

557 .scalars() 

558 .all() 

559 ) 

560 

561 # make sure all requested users are visible 

562 if len(recipient_user_ids) != len(request.recipient_user_ids): 

563 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

564 

565 if not recipient_user_ids: 

566 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

567 

568 if len(recipient_user_ids) != len(set(recipient_user_ids)): 

569 # make sure there's no duplicate users 

570 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients") 

571 

572 if context.user_id in recipient_user_ids: 

573 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

574 

575 if len(recipient_user_ids) == 1: 

576 # can only have one DM at a time between any two users 

577 other_user_id = recipient_user_ids[0] 

578 

579 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

580 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

581 # chat, you know they already have a shared group chat 

582 count = func.count(GroupChatSubscription.id).label("count") 

583 if session.execute( 

584 select(count) 

585 .where( 

586 or_( 

587 GroupChatSubscription.user_id == context.user_id, 

588 GroupChatSubscription.user_id == other_user_id, 

589 ) 

590 ) 

591 .where(GroupChatSubscription.left == None) 

592 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

593 .where(GroupChat.is_dm == True) 

594 .group_by(GroupChatSubscription.group_chat_id) 

595 .having(count == 2) 

596 ).scalar_one_or_none(): 

597 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm") 

598 

599 # Check if user has been initiating chats excessively 

600 if process_rate_limits_and_check_abort( 

601 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation 

602 ): 

603 context.abort_with_error_code( 

604 grpc.StatusCode.RESOURCE_EXHAUSTED, 

605 "chat_initiation_rate_limit", 

606 substitutions={"hours": RATE_LIMIT_HOURS}, 

607 ) 

608 

609 group_chat = _create_chat( 

610 session, 

611 creator_id=context.user_id, 

612 recipient_ids=request.recipient_user_ids, 

613 title=request.title.value, 

614 ) 

615 

616 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id) 

617 

618 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

619 

620 session.flush() 

621 

622 return conversations_pb2.GroupChat( 

623 group_chat_id=group_chat.conversation_id, 

624 title=group_chat.title, 

625 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

626 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

627 only_admins_invite=group_chat.only_admins_invite, 

628 is_dm=group_chat.is_dm, 

629 created=Timestamp_from_datetime(group_chat.conversation.created), 

630 mute_info=_mute_info(your_subscription), 

631 can_message=True, 

632 ) 

633 

634 def SendMessage(self, request, context, session): 

635 if request.text == "": 

636 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

637 

638 result = session.execute( 

639 select(GroupChatSubscription, GroupChat) 

640 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

641 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

642 .where(GroupChatSubscription.user_id == context.user_id) 

643 .where(GroupChatSubscription.left == None) 

644 ).one_or_none() 

645 if not result: 

646 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

647 

648 subscription, group_chat = result 

649 if not _user_can_message(session, context, group_chat): 

650 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat") 

651 

652 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

653 

654 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

655 sent_messages_counter.labels( 

656 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

657 ).inc() 

658 

659 return empty_pb2.Empty() 

660 

661 def SendDirectMessage(self, request, context, session): 

662 user_id = context.user_id 

663 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

664 

665 recipient_id = request.recipient_user_id 

666 

667 if not user.has_completed_profile: 

668 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message") 

669 

670 if not recipient_id: 

671 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients") 

672 

673 recipient_user_id = session.execute( 

674 select(User.id).where_users_visible(context).where(User.id == recipient_id) 

675 ).scalar_one_or_none() 

676 

677 if not recipient_user_id: 

678 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found") 

679 

680 if user_id == recipient_id: 

681 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self") 

682 

683 if request.text == "": 

684 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

685 

686 # Look for an existing direct message (DM) chat between the two users 

687 dm_chat_ids = ( 

688 select(GroupChatSubscription.group_chat_id) 

689 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id])) 

690 .group_by(GroupChatSubscription.group_chat_id) 

691 .having(func.count(GroupChatSubscription.user_id) == 2) 

692 ) 

693 

694 chat = session.execute( 

695 select(GroupChat).where(GroupChat.is_dm == True).where(GroupChat.conversation_id.in_(dm_chat_ids)).limit(1) 

696 ).scalar_one_or_none() 

697 

698 if not chat: 

699 chat = _create_chat(session, user_id, [recipient_id]) 

700 

701 # Retrieve the sender's active subscription to the chat 

702 subscription = _get_message_subscription(session, user_id, chat.conversation_id) 

703 

704 # Add the message to the conversation 

705 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

706 

707 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one() 

708 sent_messages_counter.labels(user_gender, "direct message").inc() 

709 

710 session.flush() 

711 

712 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id) 

713 

714 def EditGroupChat(self, request, context, session): 

715 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

716 

717 if not subscription: 

718 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

719 

720 if subscription.role != GroupChatRole.admin: 

721 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit") 

722 

723 if request.HasField("title"): 

724 subscription.group_chat.title = request.title.value 

725 

726 if request.HasField("only_admins_invite"): 

727 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

728 

729 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

730 

731 return empty_pb2.Empty() 

732 

733 def MakeGroupChatAdmin(self, request, context, session): 

734 if not session.execute( 

735 select(User).where_users_visible(context).where(User.id == request.user_id) 

736 ).scalar_one_or_none(): 

737 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

738 

739 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

740 

741 if not your_subscription: 

742 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

743 

744 if your_subscription.role != GroupChatRole.admin: 

745 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin") 

746 

747 if request.user_id == context.user_id: 

748 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin") 

749 

750 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

751 

752 if not their_subscription: 

753 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

754 

755 if their_subscription.role != GroupChatRole.participant: 

756 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin") 

757 

758 their_subscription.role = GroupChatRole.admin 

759 

760 _add_message_to_subscription( 

761 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

762 ) 

763 

764 return empty_pb2.Empty() 

765 

766 def RemoveGroupChatAdmin(self, request, context, session): 

767 if not session.execute( 

768 select(User).where_users_visible(context).where(User.id == request.user_id) 

769 ).scalar_one_or_none(): 

770 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

771 

772 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

773 

774 if not your_subscription: 

775 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

776 

777 if request.user_id == context.user_id: 

778 # Race condition! 

779 other_admins_count = session.execute( 

780 select(func.count()) 

781 .select_from(GroupChatSubscription) 

782 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

783 .where(GroupChatSubscription.user_id != context.user_id) 

784 .where(GroupChatSubscription.role == GroupChatRole.admin) 

785 .where(GroupChatSubscription.left == None) 

786 ).scalar_one() 

787 if not other_admins_count > 0: 

788 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin") 

789 

790 if your_subscription.role != GroupChatRole.admin: 

791 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin") 

792 

793 their_subscription = session.execute( 

794 select(GroupChatSubscription) 

795 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

796 .where(GroupChatSubscription.user_id == request.user_id) 

797 .where(GroupChatSubscription.left == None) 

798 .where(GroupChatSubscription.role == GroupChatRole.admin) 

799 ).scalar_one_or_none() 

800 

801 if not their_subscription: 

802 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

803 

804 their_subscription.role = GroupChatRole.participant 

805 

806 _add_message_to_subscription( 

807 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

808 ) 

809 

810 return empty_pb2.Empty() 

811 

812 def InviteToGroupChat(self, request, context, session): 

813 if not session.execute( 

814 select(User).where_users_visible(context).where(User.id == request.user_id) 

815 ).scalar_one_or_none(): 

816 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

817 

818 result = session.execute( 

819 select(GroupChatSubscription, GroupChat) 

820 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

821 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

822 .where(GroupChatSubscription.user_id == context.user_id) 

823 .where(GroupChatSubscription.left == None) 

824 ).one_or_none() 

825 

826 if not result: 

827 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

828 

829 your_subscription, group_chat = result 

830 

831 if not your_subscription or not group_chat: 

832 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

833 

834 if request.user_id == context.user_id: 

835 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self") 

836 

837 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

838 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied") 

839 

840 if group_chat.is_dm: 

841 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm") 

842 

843 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

844 

845 if their_subscription: 

846 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat") 

847 

848 # TODO: race condition! 

849 

850 subscription = GroupChatSubscription( 

851 user_id=request.user_id, 

852 group_chat=your_subscription.group_chat, 

853 role=GroupChatRole.participant, 

854 ) 

855 session.add(subscription) 

856 

857 _add_message_to_subscription( 

858 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

859 ) 

860 

861 return empty_pb2.Empty() 

862 

863 def RemoveGroupChatUser(self, request, context, session): 

864 """ 

865 1. Get admin info and check it's correct 

866 2. Get user data, check it's correct and remove user 

867 """ 

868 # Admin info 

869 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

870 

871 # if user info is missing 

872 if not your_subscription: 

873 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

874 

875 # if user not admin 

876 if your_subscription.role != GroupChatRole.admin: 

877 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user") 

878 

879 # if user wants to remove themselves 

880 if request.user_id == context.user_id: 

881 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self") 

882 

883 # get user info 

884 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id) 

885 

886 # user not found 

887 if not their_subscription: 

888 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat") 

889 

890 _add_message_to_subscription( 

891 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

892 ) 

893 

894 their_subscription.left = func.now() 

895 

896 return empty_pb2.Empty() 

897 

898 def LeaveGroupChat(self, request, context, session): 

899 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id) 

900 

901 if not subscription: 

902 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found") 

903 

904 if subscription.role == GroupChatRole.admin: 

905 other_admins_count = session.execute( 

906 select(func.count()) 

907 .select_from(GroupChatSubscription) 

908 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

909 .where(GroupChatSubscription.user_id != context.user_id) 

910 .where(GroupChatSubscription.role == GroupChatRole.admin) 

911 .where(GroupChatSubscription.left == None) 

912 ).scalar_one() 

913 participants_count = session.execute( 

914 select(func.count()) 

915 .select_from(GroupChatSubscription) 

916 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

917 .where(GroupChatSubscription.user_id != context.user_id) 

918 .where(GroupChatSubscription.role == GroupChatRole.participant) 

919 .where(GroupChatSubscription.left == None) 

920 ).scalar_one() 

921 if not (other_admins_count > 0 or participants_count == 0): 

922 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave") 

923 

924 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

925 

926 subscription.left = func.now() 

927 

928 return empty_pb2.Empty()