Coverage for src/couchers/servicers/conversations.py: 91%

261 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-20 21:46 +0000

1import logging 

2from datetime import timedelta 

3from types import SimpleNamespace 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import func, not_, or_, select 

8 

9from couchers import errors 

10from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY 

11from couchers.db import session_scope 

12from couchers.jobs.enqueue import queue_job 

13from couchers.models import Conversation, GroupChat, GroupChatRole, GroupChatSubscription, Message, MessageType, User 

14from couchers.notifications.notify import notify 

15from couchers.servicers.api import user_model_to_pb 

16from couchers.servicers.blocking import are_blocked 

17from couchers.sql import couchers_select as select 

18from couchers.utils import Timestamp_from_datetime, now 

19from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2 

20from proto.internal import jobs_pb2 

21 

22logger = logging.getLogger(__name__) 

23 

24# TODO: Still needs custom pagination: GetUpdates 

25DEFAULT_PAGINATION_LENGTH = 20 

26MAX_PAGE_SIZE = 50 

27 

28 

29def _message_to_pb(message: Message): 

30 """ 

31 Turns the given message to a protocol buffer 

32 """ 

33 if message.is_normal_message: 

34 return conversations_pb2.Message( 

35 message_id=message.id, 

36 author_user_id=message.author_id, 

37 time=Timestamp_from_datetime(message.time), 

38 text=conversations_pb2.MessageContentText(text=message.text), 

39 ) 

40 else: 

41 return conversations_pb2.Message( 

42 message_id=message.id, 

43 author_user_id=message.author_id, 

44 time=Timestamp_from_datetime(message.time), 

45 chat_created=( 

46 conversations_pb2.MessageContentChatCreated() 

47 if message.message_type == MessageType.chat_created 

48 else None 

49 ), 

50 chat_edited=( 

51 conversations_pb2.MessageContentChatEdited() 

52 if message.message_type == MessageType.chat_edited 

53 else None 

54 ), 

55 user_invited=( 

56 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id) 

57 if message.message_type == MessageType.user_invited 

58 else None 

59 ), 

60 user_left=( 

61 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None 

62 ), 

63 user_made_admin=( 

64 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id) 

65 if message.message_type == MessageType.user_made_admin 

66 else None 

67 ), 

68 user_removed_admin=( 

69 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id) 

70 if message.message_type == MessageType.user_removed_admin 

71 else None 

72 ), 

73 group_chat_user_removed=( 

74 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id) 

75 if message.message_type == MessageType.user_removed 

76 else None 

77 ), 

78 ) 

79 

80 

81def _get_visible_members_for_subscription(subscription): 

82 """ 

83 If a user leaves a group chat, they shouldn't be able to see who's added 

84 after they left 

85 """ 

86 if not subscription.left: 

87 # still in the chat, we see everyone with a current subscription 

88 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)] 

89 else: 

90 # not in chat anymore, see everyone who was in chat when we left 

91 return [ 

92 sub.user_id 

93 for sub in subscription.group_chat.subscriptions.where( 

94 GroupChatSubscription.joined <= subscription.left 

95 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

96 ] 

97 

98 

99def _get_visible_admins_for_subscription(subscription): 

100 """ 

101 If a user leaves a group chat, they shouldn't be able to see who's added 

102 after they left 

103 """ 

104 if not subscription.left: 

105 # still in the chat, we see everyone with a current subscription 

106 return [ 

107 sub.user_id 

108 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where( 

109 GroupChatSubscription.role == GroupChatRole.admin 

110 ) 

111 ] 

112 else: 

113 # not in chat anymore, see everyone who was in chat when we left 

114 return [ 

115 sub.user_id 

116 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin) 

117 .where(GroupChatSubscription.joined <= subscription.left) 

118 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None)) 

119 ] 

120 

121 

122def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload): 

123 """ 

124 Background job to generate notifications for a message sent to a group chat 

125 """ 

126 logger.info(f"Fanning notifications for message_id = {payload.message_id}") 

127 

128 with session_scope() as session: 

129 message, group_chat = session.execute( 

130 select(Message, GroupChat) 

131 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id) 

132 .where(Message.id == payload.message_id) 

133 ).one() 

134 

135 if message.message_type != MessageType.text: 

136 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}") 

137 return [] 

138 

139 subscriptions = ( 

140 session.execute( 

141 select(GroupChatSubscription) 

142 .join(User, User.id == GroupChatSubscription.user_id) 

143 .where(GroupChatSubscription.group_chat_id == message.conversation_id) 

144 .where(User.is_visible) 

145 .where(User.id != message.author_id) 

146 .where(GroupChatSubscription.joined <= message.time) 

147 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time)) 

148 .where(not_(GroupChatSubscription.is_muted)) 

149 ) 

150 .scalars() 

151 .all() 

152 ) 

153 

154 if group_chat.is_dm: 

155 msg = f"{message.author.name} sent you a message" 

156 else: 

157 msg = f"{message.author.name} sent a message in {group_chat.title}" 

158 

159 for subscription in subscriptions: 

160 if are_blocked(session, subscription.user_id, message.author.id): 

161 continue 

162 notify( 

163 user_id=subscription.user_id, 

164 topic_action="chat:message", 

165 key=message.conversation_id, 

166 data=notification_data_pb2.ChatMessage( 

167 author=user_model_to_pb( 

168 message.author, 

169 session, 

170 SimpleNamespace(user_id=subscription.user_id), 

171 ), 

172 message=msg, 

173 text=message.text, 

174 group_chat_id=message.conversation_id, 

175 ), 

176 ) 

177 

178 

179def _add_message_to_subscription(session, subscription, **kwargs): 

180 """ 

181 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id 

182 

183 Specify the keyword args for Message 

184 """ 

185 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs) 

186 

187 session.add(message) 

188 session.flush() 

189 

190 subscription.last_seen_message_id = message.id 

191 

192 queue_job( 

193 job_type="generate_message_notifications", 

194 payload=jobs_pb2.GenerateMessageNotificationsPayload( 

195 message_id=message.id, 

196 ), 

197 ) 

198 

199 return message 

200 

201 

202def _unseen_message_count(session, subscription_id): 

203 return session.execute( 

204 select(func.count()) 

205 .select_from(Message) 

206 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

207 .where(GroupChatSubscription.id == subscription_id) 

208 .where(Message.id > GroupChatSubscription.last_seen_message_id) 

209 ).scalar_one() 

210 

211 

212def _mute_info(subscription): 

213 (muted, muted_until) = subscription.muted_display() 

214 return conversations_pb2.MuteInfo( 

215 muted=muted, 

216 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None, 

217 ) 

218 

219 

220class Conversations(conversations_pb2_grpc.ConversationsServicer): 

221 def ListGroupChats(self, request, context): 

222 with session_scope() as session: 

223 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

224 page_size = min(page_size, MAX_PAGE_SIZE) 

225 

226 # select group chats where you have a subscription, and for each of 

227 # these, the latest message from them 

228 

229 t = ( 

230 select( 

231 GroupChatSubscription.group_chat_id.label("group_chat_id"), 

232 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"), 

233 func.max(Message.id).label("message_id"), 

234 ) 

235 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

236 .where(GroupChatSubscription.user_id == context.user_id) 

237 .where(Message.time >= GroupChatSubscription.joined) 

238 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

239 .group_by(GroupChatSubscription.group_chat_id) 

240 .order_by(func.max(Message.id).desc()) 

241 .subquery() 

242 ) 

243 

244 results = session.execute( 

245 select(t, GroupChat, GroupChatSubscription, Message) 

246 .join(Message, Message.id == t.c.message_id) 

247 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id) 

248 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id) 

249 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0)) 

250 .order_by(t.c.message_id.desc()) 

251 .limit(page_size + 1) 

252 ).all() 

253 

254 return conversations_pb2.ListGroupChatsRes( 

255 group_chats=[ 

256 conversations_pb2.GroupChat( 

257 group_chat_id=result.GroupChat.conversation_id, 

258 title=result.GroupChat.title, # TODO: proper title for DMs, etc 

259 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

260 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

261 only_admins_invite=result.GroupChat.only_admins_invite, 

262 is_dm=result.GroupChat.is_dm, 

263 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

264 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

265 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

266 latest_message=_message_to_pb(result.Message) if result.Message else None, 

267 mute_info=_mute_info(result.GroupChatSubscription), 

268 ) 

269 for result in results[:page_size] 

270 ], 

271 last_message_id=( 

272 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0 

273 ), # TODO 

274 no_more=len(results) <= page_size, 

275 ) 

276 

277 def GetGroupChat(self, request, context): 

278 with session_scope() as session: 

279 result = session.execute( 

280 select(GroupChat, GroupChatSubscription, Message) 

281 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id) 

282 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

283 .where(GroupChatSubscription.user_id == context.user_id) 

284 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

285 .where(Message.time >= GroupChatSubscription.joined) 

286 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

287 .order_by(Message.id.desc()) 

288 ).first() 

289 

290 if not result: 

291 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

292 

293 return conversations_pb2.GroupChat( 

294 group_chat_id=result.GroupChat.conversation_id, 

295 title=result.GroupChat.title, 

296 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

297 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

298 only_admins_invite=result.GroupChat.only_admins_invite, 

299 is_dm=result.GroupChat.is_dm, 

300 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

301 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

302 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

303 latest_message=_message_to_pb(result.Message) if result.Message else None, 

304 mute_info=_mute_info(result.GroupChatSubscription), 

305 ) 

306 

307 def GetDirectMessage(self, request, context): 

308 with session_scope() as session: 

309 count = func.count(GroupChatSubscription.id).label("count") 

310 subquery = ( 

311 select(GroupChatSubscription.group_chat_id) 

312 .where( 

313 or_( 

314 GroupChatSubscription.user_id == context.user_id, 

315 GroupChatSubscription.user_id == request.user_id, 

316 ) 

317 ) 

318 .where(GroupChatSubscription.left == None) 

319 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

320 .where(GroupChat.is_dm == True) 

321 .group_by(GroupChatSubscription.group_chat_id) 

322 .having(count == 2) 

323 .subquery() 

324 ) 

325 

326 result = session.execute( 

327 select(subquery, GroupChat, GroupChatSubscription, Message) 

328 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id) 

329 .join(Message, Message.conversation_id == GroupChat.conversation_id) 

330 .where(GroupChatSubscription.user_id == context.user_id) 

331 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id) 

332 .where(Message.time >= GroupChatSubscription.joined) 

333 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

334 .order_by(Message.id.desc()) 

335 ).first() 

336 

337 if not result: 

338 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

339 

340 return conversations_pb2.GroupChat( 

341 group_chat_id=result.GroupChat.conversation_id, 

342 title=result.GroupChat.title, 

343 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription), 

344 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription), 

345 only_admins_invite=result.GroupChat.only_admins_invite, 

346 is_dm=result.GroupChat.is_dm, 

347 created=Timestamp_from_datetime(result.GroupChat.conversation.created), 

348 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id), 

349 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id, 

350 latest_message=_message_to_pb(result.Message) if result.Message else None, 

351 mute_info=_mute_info(result.GroupChatSubscription), 

352 ) 

353 

354 def GetUpdates(self, request, context): 

355 with session_scope() as session: 

356 results = ( 

357 session.execute( 

358 select(Message) 

359 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

360 .where(GroupChatSubscription.user_id == context.user_id) 

361 .where(Message.time >= GroupChatSubscription.joined) 

362 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

363 .where(Message.id > request.newest_message_id) 

364 .order_by(Message.id.asc()) 

365 .limit(DEFAULT_PAGINATION_LENGTH + 1) 

366 ) 

367 .scalars() 

368 .all() 

369 ) 

370 

371 return conversations_pb2.GetUpdatesRes( 

372 updates=[ 

373 conversations_pb2.Update( 

374 group_chat_id=message.conversation_id, 

375 message=_message_to_pb(message), 

376 ) 

377 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH] 

378 ], 

379 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH, 

380 ) 

381 

382 def GetGroupChatMessages(self, request, context): 

383 with session_scope() as session: 

384 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

385 page_size = min(page_size, MAX_PAGE_SIZE) 

386 

387 results = ( 

388 session.execute( 

389 select(Message) 

390 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

391 .where(GroupChatSubscription.user_id == context.user_id) 

392 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

393 .where(Message.time >= GroupChatSubscription.joined) 

394 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

395 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

396 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0)) 

397 .order_by(Message.id.desc()) 

398 .limit(page_size + 1) 

399 ) 

400 .scalars() 

401 .all() 

402 ) 

403 

404 return conversations_pb2.GetGroupChatMessagesRes( 

405 messages=[_message_to_pb(message) for message in results[:page_size]], 

406 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO 

407 no_more=len(results) <= page_size, 

408 ) 

409 

410 def MarkLastSeenGroupChat(self, request, context): 

411 with session_scope() as session: 

412 subscription = session.execute( 

413 select(GroupChatSubscription) 

414 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

415 .where(GroupChatSubscription.user_id == context.user_id) 

416 .where(GroupChatSubscription.left == None) 

417 ).scalar_one_or_none() 

418 

419 if not subscription: 

420 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

421 

422 if not subscription.last_seen_message_id <= request.last_seen_message_id: 

423 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

424 

425 subscription.last_seen_message_id = request.last_seen_message_id 

426 

427 # TODO: notify 

428 

429 return empty_pb2.Empty() 

430 

431 def MuteGroupChat(self, request, context): 

432 with session_scope() as session: 

433 subscription = session.execute( 

434 select(GroupChatSubscription) 

435 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

436 .where(GroupChatSubscription.user_id == context.user_id) 

437 .where(GroupChatSubscription.left == None) 

438 ).scalar_one_or_none() 

439 

440 if not subscription: 

441 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

442 

443 if request.unmute: 

444 subscription.muted_until = DATETIME_MINUS_INFINITY 

445 elif request.forever: 

446 subscription.muted_until = DATETIME_INFINITY 

447 elif request.for_duration: 

448 duration = request.for_duration.ToTimedelta() 

449 if duration < timedelta(seconds=0): 

450 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST) 

451 subscription.muted_until = now() + duration 

452 

453 return empty_pb2.Empty() 

454 

455 def SearchMessages(self, request, context): 

456 with session_scope() as session: 

457 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH 

458 page_size = min(page_size, MAX_PAGE_SIZE) 

459 

460 results = ( 

461 session.execute( 

462 select(Message) 

463 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id) 

464 .where(GroupChatSubscription.user_id == context.user_id) 

465 .where(Message.time >= GroupChatSubscription.joined) 

466 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None)) 

467 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

468 .where(Message.text.ilike(f"%{request.query}%")) 

469 .order_by(Message.id.desc()) 

470 .limit(page_size + 1) 

471 ) 

472 .scalars() 

473 .all() 

474 ) 

475 

476 return conversations_pb2.SearchMessagesRes( 

477 results=[ 

478 conversations_pb2.MessageSearchResult( 

479 group_chat_id=message.conversation_id, 

480 message=_message_to_pb(message), 

481 ) 

482 for message in results[:page_size] 

483 ], 

484 last_message_id=results[-2].id if len(results) > 1 else 0, 

485 no_more=len(results) <= page_size, 

486 ) 

487 

488 def CreateGroupChat(self, request, context): 

489 with session_scope() as session: 

490 recipient_user_ids = list( 

491 session.execute( 

492 select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids)) 

493 ) 

494 .scalars() 

495 .all() 

496 ) 

497 

498 # make sure all requested users are visible 

499 if len(recipient_user_ids) != len(request.recipient_user_ids): 

500 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND) 

501 

502 if not recipient_user_ids: 

503 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS) 

504 

505 if len(recipient_user_ids) != len(set(recipient_user_ids)): 

506 # make sure there's no duplicate users 

507 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS) 

508 

509 if context.user_id in recipient_user_ids: 

510 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF) 

511 

512 if len(recipient_user_ids) == 1: 

513 # can only have one DM at a time between any two users 

514 other_user_id = recipient_user_ids[0] 

515 

516 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have 

517 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group 

518 # chat, you know they already have a shared group chat 

519 count = func.count(GroupChatSubscription.id).label("count") 

520 if session.execute( 

521 select(count) 

522 .where( 

523 or_( 

524 GroupChatSubscription.user_id == context.user_id, 

525 GroupChatSubscription.user_id == other_user_id, 

526 ) 

527 ) 

528 .where(GroupChatSubscription.left == None) 

529 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

530 .where(GroupChat.is_dm == True) 

531 .group_by(GroupChatSubscription.group_chat_id) 

532 .having(count == 2) 

533 ).scalar_one_or_none(): 

534 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM) 

535 

536 conversation = Conversation() 

537 session.add(conversation) 

538 

539 group_chat = GroupChat( 

540 conversation=conversation, 

541 title=request.title.value, 

542 creator_id=context.user_id, 

543 is_dm=True if len(recipient_user_ids) == 1 else False, # TODO 

544 ) 

545 session.add(group_chat) 

546 

547 your_subscription = GroupChatSubscription( 

548 user_id=context.user_id, 

549 group_chat=group_chat, 

550 role=GroupChatRole.admin, 

551 ) 

552 session.add(your_subscription) 

553 

554 for recipient_id in request.recipient_user_ids: 

555 subscription = GroupChatSubscription( 

556 user_id=recipient_id, 

557 group_chat=group_chat, 

558 role=GroupChatRole.participant, 

559 ) 

560 session.add(subscription) 

561 

562 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created) 

563 

564 session.flush() 

565 

566 return conversations_pb2.GroupChat( 

567 group_chat_id=group_chat.conversation_id, 

568 title=group_chat.title, 

569 member_user_ids=_get_visible_members_for_subscription(your_subscription), 

570 admin_user_ids=_get_visible_admins_for_subscription(your_subscription), 

571 only_admins_invite=group_chat.only_admins_invite, 

572 is_dm=group_chat.is_dm, 

573 created=Timestamp_from_datetime(group_chat.conversation.created), 

574 mute_info=_mute_info(your_subscription), 

575 ) 

576 

577 def SendMessage(self, request, context): 

578 if request.text == "": 

579 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

580 

581 with session_scope() as session: 

582 subscription = session.execute( 

583 select(GroupChatSubscription) 

584 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

585 .where(GroupChatSubscription.user_id == context.user_id) 

586 .where(GroupChatSubscription.left == None) 

587 ).scalar_one_or_none() 

588 if not subscription: 

589 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

590 

591 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text) 

592 

593 return empty_pb2.Empty() 

594 

595 def EditGroupChat(self, request, context): 

596 with session_scope() as session: 

597 subscription = session.execute( 

598 select(GroupChatSubscription) 

599 .where(GroupChatSubscription.user_id == context.user_id) 

600 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

601 .where(GroupChatSubscription.left == None) 

602 ).scalar_one_or_none() 

603 

604 if not subscription: 

605 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

606 

607 if subscription.role != GroupChatRole.admin: 

608 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT) 

609 

610 if request.HasField("title"): 

611 subscription.group_chat.title = request.title.value 

612 

613 if request.HasField("only_admins_invite"): 

614 subscription.group_chat.only_admins_invite = request.only_admins_invite.value 

615 

616 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited) 

617 

618 return empty_pb2.Empty() 

619 

620 def MakeGroupChatAdmin(self, request, context): 

621 with session_scope() as session: 

622 if not session.execute( 

623 select(User).where_users_visible(context).where(User.id == request.user_id) 

624 ).scalar_one_or_none(): 

625 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

626 

627 your_subscription = session.execute( 

628 select(GroupChatSubscription) 

629 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

630 .where(GroupChatSubscription.user_id == context.user_id) 

631 .where(GroupChatSubscription.left == None) 

632 ).scalar_one_or_none() 

633 

634 if not your_subscription: 

635 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

636 

637 if your_subscription.role != GroupChatRole.admin: 

638 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN) 

639 

640 if request.user_id == context.user_id: 

641 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN) 

642 

643 their_subscription = session.execute( 

644 select(GroupChatSubscription) 

645 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

646 .where(GroupChatSubscription.user_id == request.user_id) 

647 .where(GroupChatSubscription.left == None) 

648 ).scalar_one_or_none() 

649 

650 if not their_subscription: 

651 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

652 

653 if their_subscription.role != GroupChatRole.participant: 

654 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN) 

655 

656 their_subscription.role = GroupChatRole.admin 

657 

658 _add_message_to_subscription( 

659 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id 

660 ) 

661 

662 return empty_pb2.Empty() 

663 

664 def RemoveGroupChatAdmin(self, request, context): 

665 with session_scope() as session: 

666 if not session.execute( 

667 select(User).where_users_visible(context).where(User.id == request.user_id) 

668 ).scalar_one_or_none(): 

669 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

670 

671 your_subscription = session.execute( 

672 select(GroupChatSubscription) 

673 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

674 .where(GroupChatSubscription.user_id == context.user_id) 

675 .where(GroupChatSubscription.left == None) 

676 ).scalar_one_or_none() 

677 

678 if not your_subscription: 

679 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

680 

681 if request.user_id == context.user_id: 

682 # Race condition! 

683 other_admins_count = session.execute( 

684 select(func.count()) 

685 .select_from(GroupChatSubscription) 

686 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

687 .where(GroupChatSubscription.user_id != context.user_id) 

688 .where(GroupChatSubscription.role == GroupChatRole.admin) 

689 .where(GroupChatSubscription.left == None) 

690 ).scalar_one() 

691 if not other_admins_count > 0: 

692 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN) 

693 

694 if your_subscription.role != GroupChatRole.admin: 

695 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN) 

696 

697 their_subscription = session.execute( 

698 select(GroupChatSubscription) 

699 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

700 .where(GroupChatSubscription.user_id == request.user_id) 

701 .where(GroupChatSubscription.left == None) 

702 .where(GroupChatSubscription.role == GroupChatRole.admin) 

703 ).scalar_one_or_none() 

704 

705 if not their_subscription: 

706 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

707 

708 their_subscription.role = GroupChatRole.participant 

709 

710 _add_message_to_subscription( 

711 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id 

712 ) 

713 

714 return empty_pb2.Empty() 

715 

716 def InviteToGroupChat(self, request, context): 

717 with session_scope() as session: 

718 if not session.execute( 

719 select(User).where_users_visible(context).where(User.id == request.user_id) 

720 ).scalar_one_or_none(): 

721 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

722 

723 result = session.execute( 

724 select(GroupChatSubscription, GroupChat) 

725 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id) 

726 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

727 .where(GroupChatSubscription.user_id == context.user_id) 

728 .where(GroupChatSubscription.left == None) 

729 ).one_or_none() 

730 

731 if not result: 

732 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

733 

734 your_subscription, group_chat = result 

735 

736 if not your_subscription or not group_chat: 

737 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

738 

739 if request.user_id == context.user_id: 

740 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF) 

741 

742 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite: 

743 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED) 

744 

745 if group_chat.is_dm: 

746 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM) 

747 

748 their_subscription = session.execute( 

749 select(GroupChatSubscription) 

750 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

751 .where(GroupChatSubscription.user_id == request.user_id) 

752 .where(GroupChatSubscription.left == None) 

753 ).scalar_one_or_none() 

754 

755 if their_subscription: 

756 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT) 

757 

758 # TODO: race condition! 

759 

760 subscription = GroupChatSubscription( 

761 user_id=request.user_id, 

762 group_chat=your_subscription.group_chat, 

763 role=GroupChatRole.participant, 

764 ) 

765 session.add(subscription) 

766 

767 _add_message_to_subscription( 

768 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id 

769 ) 

770 

771 return empty_pb2.Empty() 

772 

773 def RemoveGroupChatUser(self, request, context): 

774 """ 

775 1. Get admin info and check it's correct 

776 2. Get user data, check it's correct and remove user 

777 """ 

778 with session_scope() as session: 

779 # Admin info 

780 your_subscription = session.execute( 

781 select(GroupChatSubscription) 

782 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

783 .where(GroupChatSubscription.user_id == context.user_id) 

784 .where(GroupChatSubscription.left == None) 

785 ).scalar_one_or_none() 

786 

787 # if user info is missing 

788 if not your_subscription: 

789 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

790 

791 # if user not admin 

792 if your_subscription.role != GroupChatRole.admin: 

793 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER) 

794 

795 # if user wants to remove themselves 

796 if request.user_id == context.user_id: 

797 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF) 

798 

799 # get user info 

800 their_subscription = session.execute( 

801 select(GroupChatSubscription) 

802 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

803 .where(GroupChatSubscription.user_id == request.user_id) 

804 .where(GroupChatSubscription.left == None) 

805 ).scalar_one_or_none() 

806 

807 # user not found 

808 if not their_subscription: 

809 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT) 

810 

811 _add_message_to_subscription( 

812 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id 

813 ) 

814 

815 their_subscription.left = func.now() 

816 

817 return empty_pb2.Empty() 

818 

819 def LeaveGroupChat(self, request, context): 

820 with session_scope() as session: 

821 subscription = session.execute( 

822 select(GroupChatSubscription) 

823 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

824 .where(GroupChatSubscription.user_id == context.user_id) 

825 .where(GroupChatSubscription.left == None) 

826 ).scalar_one_or_none() 

827 

828 if not subscription: 

829 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND) 

830 

831 if subscription.role == GroupChatRole.admin: 

832 other_admins_count = session.execute( 

833 select(func.count()) 

834 .select_from(GroupChatSubscription) 

835 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

836 .where(GroupChatSubscription.user_id != context.user_id) 

837 .where(GroupChatSubscription.role == GroupChatRole.admin) 

838 .where(GroupChatSubscription.left == None) 

839 ).scalar_one() 

840 participants_count = session.execute( 

841 select(func.count()) 

842 .select_from(GroupChatSubscription) 

843 .where(GroupChatSubscription.group_chat_id == request.group_chat_id) 

844 .where(GroupChatSubscription.user_id != context.user_id) 

845 .where(GroupChatSubscription.role == GroupChatRole.participant) 

846 .where(GroupChatSubscription.left == None) 

847 ).scalar_one() 

848 if not (other_admins_count > 0 or participants_count == 0): 

849 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE) 

850 

851 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left) 

852 

853 subscription.left = func.now() 

854 

855 return empty_pb2.Empty()