Coverage for src/couchers/servicers/conversations.py: 91%

250 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 02:39 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import func, not_, or_, select 

7 

8from couchers import errors 

9from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

10from couchers.db import session_scope 

11from couchers.jobs.enqueue import queue_job 

12from couchers.metrics import sent_messages_counter 

13from couchers.models import Conversation, GroupChat, GroupChatRole, GroupChatSubscription, Message, MessageType, User 

14from couchers.notifications.notify import notify 

15from couchers.servicers.api import user_model_to_pb 

16from couchers.servicers.blocking import are_blocked 

17from couchers.sql import couchers_select as select 

18from couchers.utils import Timestamp_from_datetime, make_user_context, now 

19from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

20from proto.internal import jobs_pb2 

21 

22logger = logging.getLogger(__name__) 

23 

24# TODO: Still needs custom pagination: GetUpdates 

25DEFAULT_PAGINATION_LENGTH = 20 

26MAX_PAGE_SIZE = 50 

27 

28 

29def _message_to_pb(message: Message): 

30 """ 

31 Turns the given message to a protocol buffer 

32 """ 

33 if message.is_normal_message: 

34 return conversations_pb2.Message( 

35 message_id=message.id, 

36 author_user_id=message.author_id, 

37 time=Timestamp_from_datetime(message.time), 

38 text=conversations_pb2.MessageContentText(text=message.text), 

39 ) 

40 else: 

41 return conversations_pb2.Message( 

42 message_id=message.id, 

43 author_user_id=message.author_id, 

44 time=Timestamp_from_datetime(message.time), 

45 chat_created=( 

46 conversations_pb2.MessageContentChatCreated() 

47 if message.message_type == MessageType.chat_created 

48 else None 

49 ), 

50 chat_edited=( 

51 conversations_pb2.MessageContentChatEdited() 

52 if message.message_type == MessageType.chat_edited 

53 else None 

54 ), 

55 user_invited=( 

56 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

57 if message.message_type == MessageType.user_invited 

58 else None 

59 ), 

60 user_left=( 

61 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

62 ), 

63 user_made_admin=( 

64 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

65 if message.message_type == MessageType.user_made_admin 

66 else None 

67 ), 

68 user_removed_admin=( 

69 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

70 if message.message_type == MessageType.user_removed_admin 

71 else None 

72 ), 

73 group_chat_user_removed=( 

74 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

75 if message.message_type == MessageType.user_removed 

76 else None 

77 ), 

78 ) 

79 

80 

81def _get_visible_members_for_subscription(subscription): 

82 """ 

83 If a user leaves a group chat, they shouldn't be able to see who's added 

84 after they left 

85 """ 

86 if not subscription.left: 

87 # still in the chat, we see everyone with a current subscription 

88 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

89 else: 

90 # not in chat anymore, see everyone who was in chat when we left 

91 return [ 

92 sub.user_id 

93 for sub in subscription.group_chat.subscriptions.where( 

94 GroupChatSubscription.joined <= subscription.left 

95 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

96 ] 

97 

98 

99def _get_visible_admins_for_subscription(subscription): 

100 """ 

101 If a user leaves a group chat, they shouldn't be able to see who's added 

102 after they left 

103 """ 

104 if not subscription.left: 

105 # still in the chat, we see everyone with a current subscription 

106 return [ 

107 sub.user_id 

108 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

109 GroupChatSubscription.role == GroupChatRole.admin 

110 ) 

111 ] 

112 else: 

113 # not in chat anymore, see everyone who was in chat when we left 

114 return [ 

115 sub.user_id 

116 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

117 .where(GroupChatSubscription.joined <= subscription.left) 

118 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

119 ] 

120 

121 

122def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload): 

123 """ 

124 Background job to generate notifications for a message sent to a group chat 

125 """ 

126 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

127 

128 with session_scope() as session: 

129 message, group_chat = session.execute( 

130 select(Message, GroupChat) 

131 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

132 .where(Message.id == payload.message_id) 

133 ).one() 

134 

135 if message.message_type != MessageType.text: 

136 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

137 return [] 

138 

139 subscriptions = ( 

140 session.execute( 

141 select(GroupChatSubscription) 

142 .join(User, User.id == GroupChatSubscription.user_id) 

143 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

144 .where(User.is_visible) 

145 .where(User.id != message.author_id) 

146 .where(GroupChatSubscription.joined <= message.time) 

147 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

148 .where(not_(GroupChatSubscription.is_muted)) 

149 ) 

150 .scalars() 

151 .all() 

152 ) 

153 

154 if group_chat.is_dm: 

155 msg = f"{message.author.name} sent you a message" 

156 else: 

157 msg = f"{message.author.name} sent a message in {group_chat.title}" 

158 

159 for subscription in subscriptions: 

160 if are_blocked(session, subscription.user_id, message.author.id): 

161 continue 

162 notify( 

163 session, 

164 user_id=subscription.user_id, 

165 topic_action="chat:message", 

166 key=message.conversation_id, 

167 data=notification_data_pb2.ChatMessage( 

168 author=user_model_to_pb( 

169 message.author, 

170 session, 

171 make_user_context(user_id=subscription.user_id), 

172 ), 

173 message=msg, 

174 text=message.text, 

175 group_chat_id=message.conversation_id, 

176 ), 

177 ) 

178 

179 

180def _add_message_to_subscription(session, subscription, **kwargs): 

181 """ 

182 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

183 

184 Specify the keyword args for Message 

185 """ 

186 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs) 

187 

188 session.add(message) 

189 session.flush() 

190 

191 subscription.last_seen_message_id = message.id 

192 

193 queue_job( 

194 session, 

195 job_type="generate_message_notifications", 

196 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

197 message_id=message.id, 

198 ), 

199 ) 

200 

201 return message 

202 

203 

204def _unseen_message_count(session, subscription_id): 

205 return session.execute( 

206 select(func.count()) 

207 .select_from(Message) 

208 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

209 .where(GroupChatSubscription.id == subscription_id) 

210 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

211 ).scalar_one() 

212 

213 

214def _mute_info(subscription): 

215 (muted, muted_until) = subscription.muted_display() 

216 return conversations_pb2.MuteInfo( 

217 muted=muted, 

218 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

219 ) 

220 

221 

222class Conversations(conversations_pb2_grpc.ConversationsServicer): 

223 def ListGroupChats(self, request, context, session): 

224 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

225 page_size = min(page_size, MAX_PAGE_SIZE) 

226 

227 # select group chats where you have a subscription, and for each of 

228 # these, the latest message from them 

229 

230 t = ( 

231 select( 

232 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

233 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

234 func.max(Message.id).label("message_id"), 

235 ) 

236 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

237 .where(GroupChatSubscription.user_id == context.user_id) 

238 .where(Message.time >= GroupChatSubscription.joined) 

239 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

240 .group_by(GroupChatSubscription.group_chat_id) 

241 .order_by(func.max(Message.id).desc()) 

242 .subquery() 

243 ) 

244 

245 results = session.execute( 

246 select(t, GroupChat, GroupChatSubscription, Message) 

247 .join(Message, Message.id == t.c.message_id) 

248 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

249 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

250 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0)) 

251 .order_by(t.c.message_id.desc()) 

252 .limit(page_size + 1) 

253 ).all() 

254 

255 return conversations_pb2.ListGroupChatsRes( 

256 group_chats=[ 

257 conversations_pb2.GroupChat( 

258 group_chat_id=result.GroupChat.conversation_id, 

259 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

260 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

261 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

262 only_admins_invite=result.GroupChat.only_admins_invite, 

263 is_dm=result.GroupChat.is_dm, 

264 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

265 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

266 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

267 latest_message=_message_to_pb(result.Message) if result.Message else None, 

268 mute_info=_mute_info(result.GroupChatSubscription), 

269 ) 

270 for result in results[:page_size] 

271 ], 

272 last_message_id=( 

273 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

274 ), # TODO 

275 no_more=len(results) <= page_size, 

276 ) 

277 

278 def GetGroupChat(self, request, context, session): 

279 result = session.execute( 

280 select(GroupChat, GroupChatSubscription, Message) 

281 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

282 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

283 .where(GroupChatSubscription.user_id == context.user_id) 

284 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

285 .where(Message.time >= GroupChatSubscription.joined) 

286 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

287 .order_by(Message.id.desc()) 

288 .limit(1) 

289 ).one_or_none() 

290 

291 if not result: 

292 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

293 

294 return conversations_pb2.GroupChat( 

295 group_chat_id=result.GroupChat.conversation_id, 

296 title=result.GroupChat.title, 

297 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

298 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

299 only_admins_invite=result.GroupChat.only_admins_invite, 

300 is_dm=result.GroupChat.is_dm, 

301 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

302 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

303 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

304 latest_message=_message_to_pb(result.Message) if result.Message else None, 

305 mute_info=_mute_info(result.GroupChatSubscription), 

306 ) 

307 

308 def GetDirectMessage(self, request, context, session): 

309 count = func.count(GroupChatSubscription.id).label("count") 

310 subquery = ( 

311 select(GroupChatSubscription.group_chat_id) 

312 .where( 

313 or_( 

314 GroupChatSubscription.user_id == context.user_id, 

315 GroupChatSubscription.user_id == request.user_id, 

316 ) 

317 ) 

318 .where(GroupChatSubscription.left == None) 

319 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

320 .where(GroupChat.is_dm == True) 

321 .group_by(GroupChatSubscription.group_chat_id) 

322 .having(count == 2) 

323 .subquery() 

324 ) 

325 

326 result = session.execute( 

327 select(subquery, GroupChat, GroupChatSubscription, Message) 

328 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

329 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

330 .where(GroupChatSubscription.user_id == context.user_id) 

331 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

332 .where(Message.time >= GroupChatSubscription.joined) 

333 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

334 .order_by(Message.id.desc()) 

335 .limit(1) 

336 ).one_or_none() 

337 

338 if not result: 

339 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

340 

341 return conversations_pb2.GroupChat( 

342 group_chat_id=result.GroupChat.conversation_id, 

343 title=result.GroupChat.title, 

344 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

345 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

346 only_admins_invite=result.GroupChat.only_admins_invite, 

347 is_dm=result.GroupChat.is_dm, 

348 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

349 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

350 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

351 latest_message=_message_to_pb(result.Message) if result.Message else None, 

352 mute_info=_mute_info(result.GroupChatSubscription), 

353 ) 

354 

355 def GetUpdates(self, request, context, session): 

356 results = ( 

357 session.execute( 

358 select(Message) 

359 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

360 .where(GroupChatSubscription.user_id == context.user_id) 

361 .where(Message.time >= GroupChatSubscription.joined) 

362 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

363 .where(Message.id > request.newest_message_id) 

364 .order_by(Message.id.asc()) 

365 .limit(DEFAULT_PAGINATION_LENGTH + 1) 

366 ) 

367 .scalars() 

368 .all() 

369 ) 

370 

371 return conversations_pb2.GetUpdatesRes( 

372 updates=[ 

373 conversations_pb2.Update( 

374 group_chat_id=message.conversation_id, 

375 message=_message_to_pb(message), 

376 ) 

377 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

378 ], 

379 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

380 ) 

381 

382 def GetGroupChatMessages(self, request, context, session): 

383 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

384 page_size = min(page_size, MAX_PAGE_SIZE) 

385 

386 results = ( 

387 session.execute( 

388 select(Message) 

389 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

390 .where(GroupChatSubscription.user_id == context.user_id) 

391 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

392 .where(Message.time >= GroupChatSubscription.joined) 

393 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

394 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

395 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0)) 

396 .order_by(Message.id.desc()) 

397 .limit(page_size + 1) 

398 ) 

399 .scalars() 

400 .all() 

401 ) 

402 

403 return conversations_pb2.GetGroupChatMessagesRes( 

404 messages=[_message_to_pb(message) for message in results[:page_size]], 

405 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

406 no_more=len(results) <= page_size, 

407 ) 

408 

409 def MarkLastSeenGroupChat(self, request, context, session): 

410 subscription = session.execute( 

411 select(GroupChatSubscription) 

412 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

413 .where(GroupChatSubscription.user_id == context.user_id) 

414 .where(GroupChatSubscription.left == None) 

415 ).scalar_one_or_none() 

416 

417 if not subscription: 

418 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

419 

420 if not subscription.last_seen_message_id <= request.last_seen_message_id: 

421 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

422 

423 subscription.last_seen_message_id = request.last_seen_message_id 

424 

425 # TODO: notify 

426 

427 return empty_pb2.Empty() 

428 

429 def MuteGroupChat(self, request, context, session): 

430 subscription = session.execute( 

431 select(GroupChatSubscription) 

432 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

433 .where(GroupChatSubscription.user_id == context.user_id) 

434 .where(GroupChatSubscription.left == None) 

435 ).scalar_one_or_none() 

436 

437 if not subscription: 

438 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

439 

440 if request.unmute: 

441 subscription.muted_until = DATETIME_MINUS_INFINITY 

442 elif request.forever: 

443 subscription.muted_until = DATETIME_INFINITY 

444 elif request.for_duration: 

445 duration = request.for_duration.ToTimedelta() 

446 if duration < timedelta(seconds=0): 

447 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST) 

448 subscription.muted_until = now() + duration 

449 

450 return empty_pb2.Empty() 

451 

452 def SearchMessages(self, request, context, session): 

453 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

454 page_size = min(page_size, MAX_PAGE_SIZE) 

455 

456 results = ( 

457 session.execute( 

458 select(Message) 

459 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

460 .where(GroupChatSubscription.user_id == context.user_id) 

461 .where(Message.time >= GroupChatSubscription.joined) 

462 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

463 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

464 .where(Message.text.ilike(f"%{request.query}%")) 

465 .order_by(Message.id.desc()) 

466 .limit(page_size + 1) 

467 ) 

468 .scalars() 

469 .all() 

470 ) 

471 

472 return conversations_pb2.SearchMessagesRes( 

473 results=[ 

474 conversations_pb2.MessageSearchResult( 

475 group_chat_id=message.conversation_id, 

476 message=_message_to_pb(message), 

477 ) 

478 for message in results[:page_size] 

479 ], 

480 last_message_id=results[-2].id if len(results) > 1 else 0, 

481 no_more=len(results) <= page_size, 

482 ) 

483 

484 def CreateGroupChat(self, request, context, session): 

485 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

486 if not user.has_completed_profile: 

487 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE) 

488 

489 recipient_user_ids = list( 

490 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids))) 

491 .scalars() 

492 .all() 

493 ) 

494 

495 # make sure all requested users are visible 

496 if len(recipient_user_ids) != len(request.recipient_user_ids): 

497 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND) 

498 

499 if not recipient_user_ids: 

500 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS) 

501 

502 if len(recipient_user_ids) != len(set(recipient_user_ids)): 

503 # make sure there's no duplicate users 

504 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS) 

505 

506 if context.user_id in recipient_user_ids: 

507 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF) 

508 

509 if len(recipient_user_ids) == 1: 

510 # can only have one DM at a time between any two users 

511 other_user_id = recipient_user_ids[0] 

512 

513 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

514 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

515 # chat, you know they already have a shared group chat 

516 count = func.count(GroupChatSubscription.id).label("count") 

517 if session.execute( 

518 select(count) 

519 .where( 

520 or_( 

521 GroupChatSubscription.user_id == context.user_id, 

522 GroupChatSubscription.user_id == other_user_id, 

523 ) 

524 ) 

525 .where(GroupChatSubscription.left == None) 

526 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

527 .where(GroupChat.is_dm == True) 

528 .group_by(GroupChatSubscription.group_chat_id) 

529 .having(count == 2) 

530 ).scalar_one_or_none(): 

531 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM) 

532 

533 conversation = Conversation() 

534 session.add(conversation) 

535 

536 group_chat = GroupChat( 

537 conversation=conversation, 

538 title=request.title.value, 

539 creator_id=context.user_id, 

540 is_dm=True if len(recipient_user_ids) == 1 else False, # TODO 

541 ) 

542 session.add(group_chat) 

543 

544 your_subscription = GroupChatSubscription( 

545 user_id=context.user_id, 

546 group_chat=group_chat, 

547 role=GroupChatRole.admin, 

548 ) 

549 session.add(your_subscription) 

550 

551 for recipient_id in request.recipient_user_ids: 

552 subscription = GroupChatSubscription( 

553 user_id=recipient_id, 

554 group_chat=group_chat, 

555 role=GroupChatRole.participant, 

556 ) 

557 session.add(subscription) 

558 

559 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

560 

561 session.flush() 

562 

563 return conversations_pb2.GroupChat( 

564 group_chat_id=group_chat.conversation_id, 

565 title=group_chat.title, 

566 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

567 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

568 only_admins_invite=group_chat.only_admins_invite, 

569 is_dm=group_chat.is_dm, 

570 created=Timestamp_from_datetime(group_chat.conversation.created), 

571 mute_info=_mute_info(your_subscription), 

572 ) 

573 

574 def SendMessage(self, request, context, session): 

575 if request.text == "": 

576 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

577 

578 subscription = session.execute( 

579 select(GroupChatSubscription) 

580 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

581 .where(GroupChatSubscription.user_id == context.user_id) 

582 .where(GroupChatSubscription.left == None) 

583 ).scalar_one_or_none() 

584 if not subscription: 

585 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

586 

587 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

588 

589 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

590 sent_messages_counter.labels( 

591 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

592 ).inc() 

593 

594 return empty_pb2.Empty() 

595 

596 def EditGroupChat(self, request, context, session): 

597 subscription = session.execute( 

598 select(GroupChatSubscription) 

599 .where(GroupChatSubscription.user_id == context.user_id) 

600 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

601 .where(GroupChatSubscription.left == None) 

602 ).scalar_one_or_none() 

603 

604 if not subscription: 

605 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

606 

607 if subscription.role != GroupChatRole.admin: 

608 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT) 

609 

610 if request.HasField("title"): 

611 subscription.group_chat.title = request.title.value 

612 

613 if request.HasField("only_admins_invite"): 

614 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

615 

616 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

617 

618 return empty_pb2.Empty() 

619 

620 def MakeGroupChatAdmin(self, request, context, session): 

621 if not session.execute( 

622 select(User).where_users_visible(context).where(User.id == request.user_id) 

623 ).scalar_one_or_none(): 

624 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

625 

626 your_subscription = session.execute( 

627 select(GroupChatSubscription) 

628 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

629 .where(GroupChatSubscription.user_id == context.user_id) 

630 .where(GroupChatSubscription.left == None) 

631 ).scalar_one_or_none() 

632 

633 if not your_subscription: 

634 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

635 

636 if your_subscription.role != GroupChatRole.admin: 

637 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN) 

638 

639 if request.user_id == context.user_id: 

640 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN) 

641 

642 their_subscription = session.execute( 

643 select(GroupChatSubscription) 

644 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

645 .where(GroupChatSubscription.user_id == request.user_id) 

646 .where(GroupChatSubscription.left == None) 

647 ).scalar_one_or_none() 

648 

649 if not their_subscription: 

650 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

651 

652 if their_subscription.role != GroupChatRole.participant: 

653 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN) 

654 

655 their_subscription.role = GroupChatRole.admin 

656 

657 _add_message_to_subscription( 

658 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

659 ) 

660 

661 return empty_pb2.Empty() 

662 

663 def RemoveGroupChatAdmin(self, request, context, session): 

664 if not session.execute( 

665 select(User).where_users_visible(context).where(User.id == request.user_id) 

666 ).scalar_one_or_none(): 

667 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

668 

669 your_subscription = session.execute( 

670 select(GroupChatSubscription) 

671 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

672 .where(GroupChatSubscription.user_id == context.user_id) 

673 .where(GroupChatSubscription.left == None) 

674 ).scalar_one_or_none() 

675 

676 if not your_subscription: 

677 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

678 

679 if request.user_id == context.user_id: 

680 # Race condition! 

681 other_admins_count = session.execute( 

682 select(func.count()) 

683 .select_from(GroupChatSubscription) 

684 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

685 .where(GroupChatSubscription.user_id != context.user_id) 

686 .where(GroupChatSubscription.role == GroupChatRole.admin) 

687 .where(GroupChatSubscription.left == None) 

688 ).scalar_one() 

689 if not other_admins_count > 0: 

690 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN) 

691 

692 if your_subscription.role != GroupChatRole.admin: 

693 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN) 

694 

695 their_subscription = session.execute( 

696 select(GroupChatSubscription) 

697 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

698 .where(GroupChatSubscription.user_id == request.user_id) 

699 .where(GroupChatSubscription.left == None) 

700 .where(GroupChatSubscription.role == GroupChatRole.admin) 

701 ).scalar_one_or_none() 

702 

703 if not their_subscription: 

704 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

705 

706 their_subscription.role = GroupChatRole.participant 

707 

708 _add_message_to_subscription( 

709 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

710 ) 

711 

712 return empty_pb2.Empty() 

713 

714 def InviteToGroupChat(self, request, context, session): 

715 if not session.execute( 

716 select(User).where_users_visible(context).where(User.id == request.user_id) 

717 ).scalar_one_or_none(): 

718 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

719 

720 result = session.execute( 

721 select(GroupChatSubscription, GroupChat) 

722 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

723 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

724 .where(GroupChatSubscription.user_id == context.user_id) 

725 .where(GroupChatSubscription.left == None) 

726 ).one_or_none() 

727 

728 if not result: 

729 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

730 

731 your_subscription, group_chat = result 

732 

733 if not your_subscription or not group_chat: 

734 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

735 

736 if request.user_id == context.user_id: 

737 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF) 

738 

739 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

740 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED) 

741 

742 if group_chat.is_dm: 

743 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM) 

744 

745 their_subscription = session.execute( 

746 select(GroupChatSubscription) 

747 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

748 .where(GroupChatSubscription.user_id == request.user_id) 

749 .where(GroupChatSubscription.left == None) 

750 ).scalar_one_or_none() 

751 

752 if their_subscription: 

753 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT) 

754 

755 # TODO: race condition! 

756 

757 subscription = GroupChatSubscription( 

758 user_id=request.user_id, 

759 group_chat=your_subscription.group_chat, 

760 role=GroupChatRole.participant, 

761 ) 

762 session.add(subscription) 

763 

764 _add_message_to_subscription( 

765 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

766 ) 

767 

768 return empty_pb2.Empty() 

769 

770 def RemoveGroupChatUser(self, request, context, session): 

771 """ 

772 1. Get admin info and check it's correct 

773 2. Get user data, check it's correct and remove user 

774 """ 

775 # Admin info 

776 your_subscription = session.execute( 

777 select(GroupChatSubscription) 

778 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

779 .where(GroupChatSubscription.user_id == context.user_id) 

780 .where(GroupChatSubscription.left == None) 

781 ).scalar_one_or_none() 

782 

783 # if user info is missing 

784 if not your_subscription: 

785 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

786 

787 # if user not admin 

788 if your_subscription.role != GroupChatRole.admin: 

789 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER) 

790 

791 # if user wants to remove themselves 

792 if request.user_id == context.user_id: 

793 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF) 

794 

795 # get user info 

796 their_subscription = session.execute( 

797 select(GroupChatSubscription) 

798 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

799 .where(GroupChatSubscription.user_id == request.user_id) 

800 .where(GroupChatSubscription.left == None) 

801 ).scalar_one_or_none() 

802 

803 # user not found 

804 if not their_subscription: 

805 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

806 

807 _add_message_to_subscription( 

808 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

809 ) 

810 

811 their_subscription.left = func.now() 

812 

813 return empty_pb2.Empty() 

814 

815 def LeaveGroupChat(self, request, context, session): 

816 subscription = session.execute( 

817 select(GroupChatSubscription) 

818 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

819 .where(GroupChatSubscription.user_id == context.user_id) 

820 .where(GroupChatSubscription.left == None) 

821 ).scalar_one_or_none() 

822 

823 if not subscription: 

824 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

825 

826 if subscription.role == GroupChatRole.admin: 

827 other_admins_count = session.execute( 

828 select(func.count()) 

829 .select_from(GroupChatSubscription) 

830 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

831 .where(GroupChatSubscription.user_id != context.user_id) 

832 .where(GroupChatSubscription.role == GroupChatRole.admin) 

833 .where(GroupChatSubscription.left == None) 

834 ).scalar_one() 

835 participants_count = session.execute( 

836 select(func.count()) 

837 .select_from(GroupChatSubscription) 

838 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

839 .where(GroupChatSubscription.user_id != context.user_id) 

840 .where(GroupChatSubscription.role == GroupChatRole.participant) 

841 .where(GroupChatSubscription.left == None) 

842 ).scalar_one() 

843 if not (other_admins_count > 0 or participants_count == 0): 

844 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE) 

845 

846 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

847 

848 subscription.left = func.now() 

849 

850 return empty_pb2.Empty()