Coverage for src/couchers/servicers/conversations.py: 91%

251 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-10-15 13:03 +0000

1import logging 

2from datetime import timedelta 

3from types import SimpleNamespace 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import func, not_, or_, select 

8 

9from couchers import errors 

10from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

11from couchers.db import session_scope 

12from couchers.jobs.enqueue import queue_job 

13from couchers.metrics import sent_messages_counter 

14from couchers.models import Conversation, GroupChat, GroupChatRole, GroupChatSubscription, Message, MessageType, User 

15from couchers.notifications.notify import notify 

16from couchers.servicers.api import user_model_to_pb 

17from couchers.servicers.blocking import are_blocked 

18from couchers.sql import couchers_select as select 

19from couchers.utils import Timestamp_from_datetime, now 

20from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

21from proto.internal import jobs_pb2 

22 

23logger = logging.getLogger(__name__) 

24 

25# TODO: Still needs custom pagination: GetUpdates 

26DEFAULT_PAGINATION_LENGTH = 20 

27MAX_PAGE_SIZE = 50 

28 

29 

30def _message_to_pb(message: Message): 

31 """ 

32 Turns the given message to a protocol buffer 

33 """ 

34 if message.is_normal_message: 

35 return conversations_pb2.Message( 

36 message_id=message.id, 

37 author_user_id=message.author_id, 

38 time=Timestamp_from_datetime(message.time), 

39 text=conversations_pb2.MessageContentText(text=message.text), 

40 ) 

41 else: 

42 return conversations_pb2.Message( 

43 message_id=message.id, 

44 author_user_id=message.author_id, 

45 time=Timestamp_from_datetime(message.time), 

46 chat_created=( 

47 conversations_pb2.MessageContentChatCreated() 

48 if message.message_type == MessageType.chat_created 

49 else None 

50 ), 

51 chat_edited=( 

52 conversations_pb2.MessageContentChatEdited() 

53 if message.message_type == MessageType.chat_edited 

54 else None 

55 ), 

56 user_invited=( 

57 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

58 if message.message_type == MessageType.user_invited 

59 else None 

60 ), 

61 user_left=( 

62 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

63 ), 

64 user_made_admin=( 

65 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

66 if message.message_type == MessageType.user_made_admin 

67 else None 

68 ), 

69 user_removed_admin=( 

70 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

71 if message.message_type == MessageType.user_removed_admin 

72 else None 

73 ), 

74 group_chat_user_removed=( 

75 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

76 if message.message_type == MessageType.user_removed 

77 else None 

78 ), 

79 ) 

80 

81 

82def _get_visible_members_for_subscription(subscription): 

83 """ 

84 If a user leaves a group chat, they shouldn't be able to see who's added 

85 after they left 

86 """ 

87 if not subscription.left: 

88 # still in the chat, we see everyone with a current subscription 

89 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

90 else: 

91 # not in chat anymore, see everyone who was in chat when we left 

92 return [ 

93 sub.user_id 

94 for sub in subscription.group_chat.subscriptions.where( 

95 GroupChatSubscription.joined <= subscription.left 

96 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

97 ] 

98 

99 

100def _get_visible_admins_for_subscription(subscription): 

101 """ 

102 If a user leaves a group chat, they shouldn't be able to see who's added 

103 after they left 

104 """ 

105 if not subscription.left: 

106 # still in the chat, we see everyone with a current subscription 

107 return [ 

108 sub.user_id 

109 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

110 GroupChatSubscription.role == GroupChatRole.admin 

111 ) 

112 ] 

113 else: 

114 # not in chat anymore, see everyone who was in chat when we left 

115 return [ 

116 sub.user_id 

117 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

118 .where(GroupChatSubscription.joined <= subscription.left) 

119 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

120 ] 

121 

122 

123def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload): 

124 """ 

125 Background job to generate notifications for a message sent to a group chat 

126 """ 

127 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

128 

129 with session_scope() as session: 

130 message, group_chat = session.execute( 

131 select(Message, GroupChat) 

132 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

133 .where(Message.id == payload.message_id) 

134 ).one() 

135 

136 if message.message_type != MessageType.text: 

137 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

138 return [] 

139 

140 subscriptions = ( 

141 session.execute( 

142 select(GroupChatSubscription) 

143 .join(User, User.id == GroupChatSubscription.user_id) 

144 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

145 .where(User.is_visible) 

146 .where(User.id != message.author_id) 

147 .where(GroupChatSubscription.joined <= message.time) 

148 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

149 .where(not_(GroupChatSubscription.is_muted)) 

150 ) 

151 .scalars() 

152 .all() 

153 ) 

154 

155 if group_chat.is_dm: 

156 msg = f"{message.author.name} sent you a message" 

157 else: 

158 msg = f"{message.author.name} sent a message in {group_chat.title}" 

159 

160 for subscription in subscriptions: 

161 if are_blocked(session, subscription.user_id, message.author.id): 

162 continue 

163 notify( 

164 session, 

165 user_id=subscription.user_id, 

166 topic_action="chat:message", 

167 key=message.conversation_id, 

168 data=notification_data_pb2.ChatMessage( 

169 author=user_model_to_pb( 

170 message.author, 

171 session, 

172 SimpleNamespace(user_id=subscription.user_id), 

173 ), 

174 message=msg, 

175 text=message.text, 

176 group_chat_id=message.conversation_id, 

177 ), 

178 ) 

179 

180 

181def _add_message_to_subscription(session, subscription, **kwargs): 

182 """ 

183 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

184 

185 Specify the keyword args for Message 

186 """ 

187 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs) 

188 

189 session.add(message) 

190 session.flush() 

191 

192 subscription.last_seen_message_id = message.id 

193 

194 queue_job( 

195 session, 

196 job_type="generate_message_notifications", 

197 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

198 message_id=message.id, 

199 ), 

200 ) 

201 

202 return message 

203 

204 

205def _unseen_message_count(session, subscription_id): 

206 return session.execute( 

207 select(func.count()) 

208 .select_from(Message) 

209 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

210 .where(GroupChatSubscription.id == subscription_id) 

211 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

212 ).scalar_one() 

213 

214 

215def _mute_info(subscription): 

216 (muted, muted_until) = subscription.muted_display() 

217 return conversations_pb2.MuteInfo( 

218 muted=muted, 

219 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

220 ) 

221 

222 

223class Conversations(conversations_pb2_grpc.ConversationsServicer): 

224 def ListGroupChats(self, request, context, session): 

225 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

226 page_size = min(page_size, MAX_PAGE_SIZE) 

227 

228 # select group chats where you have a subscription, and for each of 

229 # these, the latest message from them 

230 

231 t = ( 

232 select( 

233 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

234 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

235 func.max(Message.id).label("message_id"), 

236 ) 

237 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

238 .where(GroupChatSubscription.user_id == context.user_id) 

239 .where(Message.time >= GroupChatSubscription.joined) 

240 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

241 .group_by(GroupChatSubscription.group_chat_id) 

242 .order_by(func.max(Message.id).desc()) 

243 .subquery() 

244 ) 

245 

246 results = session.execute( 

247 select(t, GroupChat, GroupChatSubscription, Message) 

248 .join(Message, Message.id == t.c.message_id) 

249 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

250 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

251 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0)) 

252 .order_by(t.c.message_id.desc()) 

253 .limit(page_size + 1) 

254 ).all() 

255 

256 return conversations_pb2.ListGroupChatsRes( 

257 group_chats=[ 

258 conversations_pb2.GroupChat( 

259 group_chat_id=result.GroupChat.conversation_id, 

260 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

261 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

262 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

263 only_admins_invite=result.GroupChat.only_admins_invite, 

264 is_dm=result.GroupChat.is_dm, 

265 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

266 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

267 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

268 latest_message=_message_to_pb(result.Message) if result.Message else None, 

269 mute_info=_mute_info(result.GroupChatSubscription), 

270 ) 

271 for result in results[:page_size] 

272 ], 

273 last_message_id=( 

274 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

275 ), # TODO 

276 no_more=len(results) <= page_size, 

277 ) 

278 

279 def GetGroupChat(self, request, context, session): 

280 result = session.execute( 

281 select(GroupChat, GroupChatSubscription, Message) 

282 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

283 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

284 .where(GroupChatSubscription.user_id == context.user_id) 

285 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

286 .where(Message.time >= GroupChatSubscription.joined) 

287 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

288 .order_by(Message.id.desc()) 

289 ).first() 

290 

291 if not result: 

292 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

293 

294 return conversations_pb2.GroupChat( 

295 group_chat_id=result.GroupChat.conversation_id, 

296 title=result.GroupChat.title, 

297 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

298 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

299 only_admins_invite=result.GroupChat.only_admins_invite, 

300 is_dm=result.GroupChat.is_dm, 

301 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

302 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

303 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

304 latest_message=_message_to_pb(result.Message) if result.Message else None, 

305 mute_info=_mute_info(result.GroupChatSubscription), 

306 ) 

307 

308 def GetDirectMessage(self, request, context, session): 

309 count = func.count(GroupChatSubscription.id).label("count") 

310 subquery = ( 

311 select(GroupChatSubscription.group_chat_id) 

312 .where( 

313 or_( 

314 GroupChatSubscription.user_id == context.user_id, 

315 GroupChatSubscription.user_id == request.user_id, 

316 ) 

317 ) 

318 .where(GroupChatSubscription.left == None) 

319 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

320 .where(GroupChat.is_dm == True) 

321 .group_by(GroupChatSubscription.group_chat_id) 

322 .having(count == 2) 

323 .subquery() 

324 ) 

325 

326 result = session.execute( 

327 select(subquery, GroupChat, GroupChatSubscription, Message) 

328 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

329 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

330 .where(GroupChatSubscription.user_id == context.user_id) 

331 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

332 .where(Message.time >= GroupChatSubscription.joined) 

333 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

334 .order_by(Message.id.desc()) 

335 ).first() 

336 

337 if not result: 

338 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

339 

340 return conversations_pb2.GroupChat( 

341 group_chat_id=result.GroupChat.conversation_id, 

342 title=result.GroupChat.title, 

343 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

344 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

345 only_admins_invite=result.GroupChat.only_admins_invite, 

346 is_dm=result.GroupChat.is_dm, 

347 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

348 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

349 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

350 latest_message=_message_to_pb(result.Message) if result.Message else None, 

351 mute_info=_mute_info(result.GroupChatSubscription), 

352 ) 

353 

354 def GetUpdates(self, request, context, session): 

355 results = ( 

356 session.execute( 

357 select(Message) 

358 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

359 .where(GroupChatSubscription.user_id == context.user_id) 

360 .where(Message.time >= GroupChatSubscription.joined) 

361 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

362 .where(Message.id > request.newest_message_id) 

363 .order_by(Message.id.asc()) 

364 .limit(DEFAULT_PAGINATION_LENGTH + 1) 

365 ) 

366 .scalars() 

367 .all() 

368 ) 

369 

370 return conversations_pb2.GetUpdatesRes( 

371 updates=[ 

372 conversations_pb2.Update( 

373 group_chat_id=message.conversation_id, 

374 message=_message_to_pb(message), 

375 ) 

376 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

377 ], 

378 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

379 ) 

380 

381 def GetGroupChatMessages(self, request, context, session): 

382 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

383 page_size = min(page_size, MAX_PAGE_SIZE) 

384 

385 results = ( 

386 session.execute( 

387 select(Message) 

388 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

389 .where(GroupChatSubscription.user_id == context.user_id) 

390 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

391 .where(Message.time >= GroupChatSubscription.joined) 

392 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

393 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

394 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0)) 

395 .order_by(Message.id.desc()) 

396 .limit(page_size + 1) 

397 ) 

398 .scalars() 

399 .all() 

400 ) 

401 

402 return conversations_pb2.GetGroupChatMessagesRes( 

403 messages=[_message_to_pb(message) for message in results[:page_size]], 

404 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

405 no_more=len(results) <= page_size, 

406 ) 

407 

408 def MarkLastSeenGroupChat(self, request, context, session): 

409 subscription = session.execute( 

410 select(GroupChatSubscription) 

411 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

412 .where(GroupChatSubscription.user_id == context.user_id) 

413 .where(GroupChatSubscription.left == None) 

414 ).scalar_one_or_none() 

415 

416 if not subscription: 

417 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

418 

419 if not subscription.last_seen_message_id <= request.last_seen_message_id: 

420 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

421 

422 subscription.last_seen_message_id = request.last_seen_message_id 

423 

424 # TODO: notify 

425 

426 return empty_pb2.Empty() 

427 

428 def MuteGroupChat(self, request, context, session): 

429 subscription = session.execute( 

430 select(GroupChatSubscription) 

431 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

432 .where(GroupChatSubscription.user_id == context.user_id) 

433 .where(GroupChatSubscription.left == None) 

434 ).scalar_one_or_none() 

435 

436 if not subscription: 

437 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

438 

439 if request.unmute: 

440 subscription.muted_until = DATETIME_MINUS_INFINITY 

441 elif request.forever: 

442 subscription.muted_until = DATETIME_INFINITY 

443 elif request.for_duration: 

444 duration = request.for_duration.ToTimedelta() 

445 if duration < timedelta(seconds=0): 

446 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST) 

447 subscription.muted_until = now() + duration 

448 

449 return empty_pb2.Empty() 

450 

451 def SearchMessages(self, request, context, session): 

452 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

453 page_size = min(page_size, MAX_PAGE_SIZE) 

454 

455 results = ( 

456 session.execute( 

457 select(Message) 

458 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

459 .where(GroupChatSubscription.user_id == context.user_id) 

460 .where(Message.time >= GroupChatSubscription.joined) 

461 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

462 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

463 .where(Message.text.ilike(f"%{request.query}%")) 

464 .order_by(Message.id.desc()) 

465 .limit(page_size + 1) 

466 ) 

467 .scalars() 

468 .all() 

469 ) 

470 

471 return conversations_pb2.SearchMessagesRes( 

472 results=[ 

473 conversations_pb2.MessageSearchResult( 

474 group_chat_id=message.conversation_id, 

475 message=_message_to_pb(message), 

476 ) 

477 for message in results[:page_size] 

478 ], 

479 last_message_id=results[-2].id if len(results) > 1 else 0, 

480 no_more=len(results) <= page_size, 

481 ) 

482 

483 def CreateGroupChat(self, request, context, session): 

484 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

485 if not user.has_completed_profile: 

486 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE) 

487 

488 recipient_user_ids = list( 

489 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids))) 

490 .scalars() 

491 .all() 

492 ) 

493 

494 # make sure all requested users are visible 

495 if len(recipient_user_ids) != len(request.recipient_user_ids): 

496 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND) 

497 

498 if not recipient_user_ids: 

499 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS) 

500 

501 if len(recipient_user_ids) != len(set(recipient_user_ids)): 

502 # make sure there's no duplicate users 

503 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS) 

504 

505 if context.user_id in recipient_user_ids: 

506 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF) 

507 

508 if len(recipient_user_ids) == 1: 

509 # can only have one DM at a time between any two users 

510 other_user_id = recipient_user_ids[0] 

511 

512 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

513 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

514 # chat, you know they already have a shared group chat 

515 count = func.count(GroupChatSubscription.id).label("count") 

516 if session.execute( 

517 select(count) 

518 .where( 

519 or_( 

520 GroupChatSubscription.user_id == context.user_id, 

521 GroupChatSubscription.user_id == other_user_id, 

522 ) 

523 ) 

524 .where(GroupChatSubscription.left == None) 

525 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

526 .where(GroupChat.is_dm == True) 

527 .group_by(GroupChatSubscription.group_chat_id) 

528 .having(count == 2) 

529 ).scalar_one_or_none(): 

530 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM) 

531 

532 conversation = Conversation() 

533 session.add(conversation) 

534 

535 group_chat = GroupChat( 

536 conversation=conversation, 

537 title=request.title.value, 

538 creator_id=context.user_id, 

539 is_dm=True if len(recipient_user_ids) == 1 else False, # TODO 

540 ) 

541 session.add(group_chat) 

542 

543 your_subscription = GroupChatSubscription( 

544 user_id=context.user_id, 

545 group_chat=group_chat, 

546 role=GroupChatRole.admin, 

547 ) 

548 session.add(your_subscription) 

549 

550 for recipient_id in request.recipient_user_ids: 

551 subscription = GroupChatSubscription( 

552 user_id=recipient_id, 

553 group_chat=group_chat, 

554 role=GroupChatRole.participant, 

555 ) 

556 session.add(subscription) 

557 

558 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

559 

560 session.flush() 

561 

562 return conversations_pb2.GroupChat( 

563 group_chat_id=group_chat.conversation_id, 

564 title=group_chat.title, 

565 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

566 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

567 only_admins_invite=group_chat.only_admins_invite, 

568 is_dm=group_chat.is_dm, 

569 created=Timestamp_from_datetime(group_chat.conversation.created), 

570 mute_info=_mute_info(your_subscription), 

571 ) 

572 

573 def SendMessage(self, request, context, session): 

574 if request.text == "": 

575 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

576 

577 subscription = session.execute( 

578 select(GroupChatSubscription) 

579 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

580 .where(GroupChatSubscription.user_id == context.user_id) 

581 .where(GroupChatSubscription.left == None) 

582 ).scalar_one_or_none() 

583 if not subscription: 

584 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

585 

586 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

587 

588 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

589 sent_messages_counter.labels( 

590 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat" 

591 ).inc() 

592 

593 return empty_pb2.Empty() 

594 

595 def EditGroupChat(self, request, context, session): 

596 subscription = session.execute( 

597 select(GroupChatSubscription) 

598 .where(GroupChatSubscription.user_id == context.user_id) 

599 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

600 .where(GroupChatSubscription.left == None) 

601 ).scalar_one_or_none() 

602 

603 if not subscription: 

604 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

605 

606 if subscription.role != GroupChatRole.admin: 

607 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT) 

608 

609 if request.HasField("title"): 

610 subscription.group_chat.title = request.title.value 

611 

612 if request.HasField("only_admins_invite"): 

613 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

614 

615 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

616 

617 return empty_pb2.Empty() 

618 

619 def MakeGroupChatAdmin(self, request, context, session): 

620 if not session.execute( 

621 select(User).where_users_visible(context).where(User.id == request.user_id) 

622 ).scalar_one_or_none(): 

623 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

624 

625 your_subscription = session.execute( 

626 select(GroupChatSubscription) 

627 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

628 .where(GroupChatSubscription.user_id == context.user_id) 

629 .where(GroupChatSubscription.left == None) 

630 ).scalar_one_or_none() 

631 

632 if not your_subscription: 

633 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

634 

635 if your_subscription.role != GroupChatRole.admin: 

636 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN) 

637 

638 if request.user_id == context.user_id: 

639 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN) 

640 

641 their_subscription = session.execute( 

642 select(GroupChatSubscription) 

643 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

644 .where(GroupChatSubscription.user_id == request.user_id) 

645 .where(GroupChatSubscription.left == None) 

646 ).scalar_one_or_none() 

647 

648 if not their_subscription: 

649 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

650 

651 if their_subscription.role != GroupChatRole.participant: 

652 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN) 

653 

654 their_subscription.role = GroupChatRole.admin 

655 

656 _add_message_to_subscription( 

657 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

658 ) 

659 

660 return empty_pb2.Empty() 

661 

662 def RemoveGroupChatAdmin(self, request, context, session): 

663 if not session.execute( 

664 select(User).where_users_visible(context).where(User.id == request.user_id) 

665 ).scalar_one_or_none(): 

666 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

667 

668 your_subscription = session.execute( 

669 select(GroupChatSubscription) 

670 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

671 .where(GroupChatSubscription.user_id == context.user_id) 

672 .where(GroupChatSubscription.left == None) 

673 ).scalar_one_or_none() 

674 

675 if not your_subscription: 

676 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

677 

678 if request.user_id == context.user_id: 

679 # Race condition! 

680 other_admins_count = session.execute( 

681 select(func.count()) 

682 .select_from(GroupChatSubscription) 

683 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

684 .where(GroupChatSubscription.user_id != context.user_id) 

685 .where(GroupChatSubscription.role == GroupChatRole.admin) 

686 .where(GroupChatSubscription.left == None) 

687 ).scalar_one() 

688 if not other_admins_count > 0: 

689 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN) 

690 

691 if your_subscription.role != GroupChatRole.admin: 

692 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN) 

693 

694 their_subscription = session.execute( 

695 select(GroupChatSubscription) 

696 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

697 .where(GroupChatSubscription.user_id == request.user_id) 

698 .where(GroupChatSubscription.left == None) 

699 .where(GroupChatSubscription.role == GroupChatRole.admin) 

700 ).scalar_one_or_none() 

701 

702 if not their_subscription: 

703 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

704 

705 their_subscription.role = GroupChatRole.participant 

706 

707 _add_message_to_subscription( 

708 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

709 ) 

710 

711 return empty_pb2.Empty() 

712 

713 def InviteToGroupChat(self, request, context, session): 

714 if not session.execute( 

715 select(User).where_users_visible(context).where(User.id == request.user_id) 

716 ).scalar_one_or_none(): 

717 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

718 

719 result = session.execute( 

720 select(GroupChatSubscription, GroupChat) 

721 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

722 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

723 .where(GroupChatSubscription.user_id == context.user_id) 

724 .where(GroupChatSubscription.left == None) 

725 ).one_or_none() 

726 

727 if not result: 

728 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

729 

730 your_subscription, group_chat = result 

731 

732 if not your_subscription or not group_chat: 

733 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

734 

735 if request.user_id == context.user_id: 

736 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF) 

737 

738 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

739 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED) 

740 

741 if group_chat.is_dm: 

742 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM) 

743 

744 their_subscription = session.execute( 

745 select(GroupChatSubscription) 

746 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

747 .where(GroupChatSubscription.user_id == request.user_id) 

748 .where(GroupChatSubscription.left == None) 

749 ).scalar_one_or_none() 

750 

751 if their_subscription: 

752 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT) 

753 

754 # TODO: race condition! 

755 

756 subscription = GroupChatSubscription( 

757 user_id=request.user_id, 

758 group_chat=your_subscription.group_chat, 

759 role=GroupChatRole.participant, 

760 ) 

761 session.add(subscription) 

762 

763 _add_message_to_subscription( 

764 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

765 ) 

766 

767 return empty_pb2.Empty() 

768 

769 def RemoveGroupChatUser(self, request, context, session): 

770 """ 

771 1. Get admin info and check it's correct 

772 2. Get user data, check it's correct and remove user 

773 """ 

774 # Admin info 

775 your_subscription = session.execute( 

776 select(GroupChatSubscription) 

777 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

778 .where(GroupChatSubscription.user_id == context.user_id) 

779 .where(GroupChatSubscription.left == None) 

780 ).scalar_one_or_none() 

781 

782 # if user info is missing 

783 if not your_subscription: 

784 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

785 

786 # if user not admin 

787 if your_subscription.role != GroupChatRole.admin: 

788 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER) 

789 

790 # if user wants to remove themselves 

791 if request.user_id == context.user_id: 

792 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF) 

793 

794 # get user info 

795 their_subscription = session.execute( 

796 select(GroupChatSubscription) 

797 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

798 .where(GroupChatSubscription.user_id == request.user_id) 

799 .where(GroupChatSubscription.left == None) 

800 ).scalar_one_or_none() 

801 

802 # user not found 

803 if not their_subscription: 

804 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

805 

806 _add_message_to_subscription( 

807 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

808 ) 

809 

810 their_subscription.left = func.now() 

811 

812 return empty_pb2.Empty() 

813 

814 def LeaveGroupChat(self, request, context, session): 

815 subscription = session.execute( 

816 select(GroupChatSubscription) 

817 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

818 .where(GroupChatSubscription.user_id == context.user_id) 

819 .where(GroupChatSubscription.left == None) 

820 ).scalar_one_or_none() 

821 

822 if not subscription: 

823 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

824 

825 if subscription.role == GroupChatRole.admin: 

826 other_admins_count = session.execute( 

827 select(func.count()) 

828 .select_from(GroupChatSubscription) 

829 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

830 .where(GroupChatSubscription.user_id != context.user_id) 

831 .where(GroupChatSubscription.role == GroupChatRole.admin) 

832 .where(GroupChatSubscription.left == None) 

833 ).scalar_one() 

834 participants_count = session.execute( 

835 select(func.count()) 

836 .select_from(GroupChatSubscription) 

837 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

838 .where(GroupChatSubscription.user_id != context.user_id) 

839 .where(GroupChatSubscription.role == GroupChatRole.participant) 

840 .where(GroupChatSubscription.left == None) 

841 ).scalar_one() 

842 if not (other_admins_count > 0 or participants_count == 0): 

843 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE) 

844 

845 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

846 

847 subscription.left = func.now() 

848 

849 return empty_pb2.Empty()