Coverage for src/couchers/servicers/conversations.py: 91%
251 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1import logging
2from datetime import timedelta
3from types import SimpleNamespace
5import grpc
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import func, not_, or_, select
9from couchers import errors
10from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
11from couchers.db import session_scope
12from couchers.jobs.enqueue import queue_job
13from couchers.metrics import sent_messages_counter
14from couchers.models import Conversation, GroupChat, GroupChatRole, GroupChatSubscription, Message, MessageType, User
15from couchers.notifications.notify import notify
16from couchers.servicers.api import user_model_to_pb
17from couchers.servicers.blocking import are_blocked
18from couchers.sql import couchers_select as select
19from couchers.utils import Timestamp_from_datetime, now
20from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
21from proto.internal import jobs_pb2
23logger = logging.getLogger(__name__)
25# TODO: Still needs custom pagination: GetUpdates
26DEFAULT_PAGINATION_LENGTH = 20
27MAX_PAGE_SIZE = 50
30def _message_to_pb(message: Message):
31 """
32 Turns the given message to a protocol buffer
33 """
34 if message.is_normal_message:
35 return conversations_pb2.Message(
36 message_id=message.id,
37 author_user_id=message.author_id,
38 time=Timestamp_from_datetime(message.time),
39 text=conversations_pb2.MessageContentText(text=message.text),
40 )
41 else:
42 return conversations_pb2.Message(
43 message_id=message.id,
44 author_user_id=message.author_id,
45 time=Timestamp_from_datetime(message.time),
46 chat_created=(
47 conversations_pb2.MessageContentChatCreated()
48 if message.message_type == MessageType.chat_created
49 else None
50 ),
51 chat_edited=(
52 conversations_pb2.MessageContentChatEdited()
53 if message.message_type == MessageType.chat_edited
54 else None
55 ),
56 user_invited=(
57 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
58 if message.message_type == MessageType.user_invited
59 else None
60 ),
61 user_left=(
62 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
63 ),
64 user_made_admin=(
65 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
66 if message.message_type == MessageType.user_made_admin
67 else None
68 ),
69 user_removed_admin=(
70 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
71 if message.message_type == MessageType.user_removed_admin
72 else None
73 ),
74 group_chat_user_removed=(
75 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
76 if message.message_type == MessageType.user_removed
77 else None
78 ),
79 )
82def _get_visible_members_for_subscription(subscription):
83 """
84 If a user leaves a group chat, they shouldn't be able to see who's added
85 after they left
86 """
87 if not subscription.left:
88 # still in the chat, we see everyone with a current subscription
89 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
90 else:
91 # not in chat anymore, see everyone who was in chat when we left
92 return [
93 sub.user_id
94 for sub in subscription.group_chat.subscriptions.where(
95 GroupChatSubscription.joined <= subscription.left
96 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
97 ]
100def _get_visible_admins_for_subscription(subscription):
101 """
102 If a user leaves a group chat, they shouldn't be able to see who's added
103 after they left
104 """
105 if not subscription.left:
106 # still in the chat, we see everyone with a current subscription
107 return [
108 sub.user_id
109 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
110 GroupChatSubscription.role == GroupChatRole.admin
111 )
112 ]
113 else:
114 # not in chat anymore, see everyone who was in chat when we left
115 return [
116 sub.user_id
117 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
118 .where(GroupChatSubscription.joined <= subscription.left)
119 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
120 ]
123def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload):
124 """
125 Background job to generate notifications for a message sent to a group chat
126 """
127 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
129 with session_scope() as session:
130 message, group_chat = session.execute(
131 select(Message, GroupChat)
132 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
133 .where(Message.id == payload.message_id)
134 ).one()
136 if message.message_type != MessageType.text:
137 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
138 return []
140 subscriptions = (
141 session.execute(
142 select(GroupChatSubscription)
143 .join(User, User.id == GroupChatSubscription.user_id)
144 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
145 .where(User.is_visible)
146 .where(User.id != message.author_id)
147 .where(GroupChatSubscription.joined <= message.time)
148 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
149 .where(not_(GroupChatSubscription.is_muted))
150 )
151 .scalars()
152 .all()
153 )
155 if group_chat.is_dm:
156 msg = f"{message.author.name} sent you a message"
157 else:
158 msg = f"{message.author.name} sent a message in {group_chat.title}"
160 for subscription in subscriptions:
161 if are_blocked(session, subscription.user_id, message.author.id):
162 continue
163 notify(
164 session,
165 user_id=subscription.user_id,
166 topic_action="chat:message",
167 key=message.conversation_id,
168 data=notification_data_pb2.ChatMessage(
169 author=user_model_to_pb(
170 message.author,
171 session,
172 SimpleNamespace(user_id=subscription.user_id),
173 ),
174 message=msg,
175 text=message.text,
176 group_chat_id=message.conversation_id,
177 ),
178 )
181def _add_message_to_subscription(session, subscription, **kwargs):
182 """
183 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
185 Specify the keyword args for Message
186 """
187 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs)
189 session.add(message)
190 session.flush()
192 subscription.last_seen_message_id = message.id
194 queue_job(
195 session,
196 job_type="generate_message_notifications",
197 payload=jobs_pb2.GenerateMessageNotificationsPayload(
198 message_id=message.id,
199 ),
200 )
202 return message
205def _unseen_message_count(session, subscription_id):
206 return session.execute(
207 select(func.count())
208 .select_from(Message)
209 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
210 .where(GroupChatSubscription.id == subscription_id)
211 .where(Message.id > GroupChatSubscription.last_seen_message_id)
212 ).scalar_one()
215def _mute_info(subscription):
216 (muted, muted_until) = subscription.muted_display()
217 return conversations_pb2.MuteInfo(
218 muted=muted,
219 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
220 )
223class Conversations(conversations_pb2_grpc.ConversationsServicer):
224 def ListGroupChats(self, request, context, session):
225 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
226 page_size = min(page_size, MAX_PAGE_SIZE)
228 # select group chats where you have a subscription, and for each of
229 # these, the latest message from them
231 t = (
232 select(
233 GroupChatSubscription.group_chat_id.label("group_chat_id"),
234 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
235 func.max(Message.id).label("message_id"),
236 )
237 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
238 .where(GroupChatSubscription.user_id == context.user_id)
239 .where(Message.time >= GroupChatSubscription.joined)
240 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
241 .group_by(GroupChatSubscription.group_chat_id)
242 .order_by(func.max(Message.id).desc())
243 .subquery()
244 )
246 results = session.execute(
247 select(t, GroupChat, GroupChatSubscription, Message)
248 .join(Message, Message.id == t.c.message_id)
249 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
250 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
251 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0))
252 .order_by(t.c.message_id.desc())
253 .limit(page_size + 1)
254 ).all()
256 return conversations_pb2.ListGroupChatsRes(
257 group_chats=[
258 conversations_pb2.GroupChat(
259 group_chat_id=result.GroupChat.conversation_id,
260 title=result.GroupChat.title, # TODO: proper title for DMs, etc
261 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
262 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
263 only_admins_invite=result.GroupChat.only_admins_invite,
264 is_dm=result.GroupChat.is_dm,
265 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
266 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
267 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
268 latest_message=_message_to_pb(result.Message) if result.Message else None,
269 mute_info=_mute_info(result.GroupChatSubscription),
270 )
271 for result in results[:page_size]
272 ],
273 last_message_id=(
274 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
275 ), # TODO
276 no_more=len(results) <= page_size,
277 )
279 def GetGroupChat(self, request, context, session):
280 result = session.execute(
281 select(GroupChat, GroupChatSubscription, Message)
282 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
283 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
284 .where(GroupChatSubscription.user_id == context.user_id)
285 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
286 .where(Message.time >= GroupChatSubscription.joined)
287 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
288 .order_by(Message.id.desc())
289 ).first()
291 if not result:
292 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
294 return conversations_pb2.GroupChat(
295 group_chat_id=result.GroupChat.conversation_id,
296 title=result.GroupChat.title,
297 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
298 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
299 only_admins_invite=result.GroupChat.only_admins_invite,
300 is_dm=result.GroupChat.is_dm,
301 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
302 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
303 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
304 latest_message=_message_to_pb(result.Message) if result.Message else None,
305 mute_info=_mute_info(result.GroupChatSubscription),
306 )
308 def GetDirectMessage(self, request, context, session):
309 count = func.count(GroupChatSubscription.id).label("count")
310 subquery = (
311 select(GroupChatSubscription.group_chat_id)
312 .where(
313 or_(
314 GroupChatSubscription.user_id == context.user_id,
315 GroupChatSubscription.user_id == request.user_id,
316 )
317 )
318 .where(GroupChatSubscription.left == None)
319 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
320 .where(GroupChat.is_dm == True)
321 .group_by(GroupChatSubscription.group_chat_id)
322 .having(count == 2)
323 .subquery()
324 )
326 result = session.execute(
327 select(subquery, GroupChat, GroupChatSubscription, Message)
328 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
329 .join(Message, Message.conversation_id == GroupChat.conversation_id)
330 .where(GroupChatSubscription.user_id == context.user_id)
331 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
332 .where(Message.time >= GroupChatSubscription.joined)
333 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
334 .order_by(Message.id.desc())
335 ).first()
337 if not result:
338 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
340 return conversations_pb2.GroupChat(
341 group_chat_id=result.GroupChat.conversation_id,
342 title=result.GroupChat.title,
343 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
344 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
345 only_admins_invite=result.GroupChat.only_admins_invite,
346 is_dm=result.GroupChat.is_dm,
347 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
348 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
349 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
350 latest_message=_message_to_pb(result.Message) if result.Message else None,
351 mute_info=_mute_info(result.GroupChatSubscription),
352 )
354 def GetUpdates(self, request, context, session):
355 results = (
356 session.execute(
357 select(Message)
358 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
359 .where(GroupChatSubscription.user_id == context.user_id)
360 .where(Message.time >= GroupChatSubscription.joined)
361 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
362 .where(Message.id > request.newest_message_id)
363 .order_by(Message.id.asc())
364 .limit(DEFAULT_PAGINATION_LENGTH + 1)
365 )
366 .scalars()
367 .all()
368 )
370 return conversations_pb2.GetUpdatesRes(
371 updates=[
372 conversations_pb2.Update(
373 group_chat_id=message.conversation_id,
374 message=_message_to_pb(message),
375 )
376 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
377 ],
378 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
379 )
381 def GetGroupChatMessages(self, request, context, session):
382 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
383 page_size = min(page_size, MAX_PAGE_SIZE)
385 results = (
386 session.execute(
387 select(Message)
388 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
389 .where(GroupChatSubscription.user_id == context.user_id)
390 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
391 .where(Message.time >= GroupChatSubscription.joined)
392 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
393 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
394 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0))
395 .order_by(Message.id.desc())
396 .limit(page_size + 1)
397 )
398 .scalars()
399 .all()
400 )
402 return conversations_pb2.GetGroupChatMessagesRes(
403 messages=[_message_to_pb(message) for message in results[:page_size]],
404 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
405 no_more=len(results) <= page_size,
406 )
408 def MarkLastSeenGroupChat(self, request, context, session):
409 subscription = session.execute(
410 select(GroupChatSubscription)
411 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
412 .where(GroupChatSubscription.user_id == context.user_id)
413 .where(GroupChatSubscription.left == None)
414 ).scalar_one_or_none()
416 if not subscription:
417 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
419 if not subscription.last_seen_message_id <= request.last_seen_message_id:
420 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES)
422 subscription.last_seen_message_id = request.last_seen_message_id
424 # TODO: notify
426 return empty_pb2.Empty()
428 def MuteGroupChat(self, request, context, session):
429 subscription = session.execute(
430 select(GroupChatSubscription)
431 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
432 .where(GroupChatSubscription.user_id == context.user_id)
433 .where(GroupChatSubscription.left == None)
434 ).scalar_one_or_none()
436 if not subscription:
437 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
439 if request.unmute:
440 subscription.muted_until = DATETIME_MINUS_INFINITY
441 elif request.forever:
442 subscription.muted_until = DATETIME_INFINITY
443 elif request.for_duration:
444 duration = request.for_duration.ToTimedelta()
445 if duration < timedelta(seconds=0):
446 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST)
447 subscription.muted_until = now() + duration
449 return empty_pb2.Empty()
451 def SearchMessages(self, request, context, session):
452 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
453 page_size = min(page_size, MAX_PAGE_SIZE)
455 results = (
456 session.execute(
457 select(Message)
458 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
459 .where(GroupChatSubscription.user_id == context.user_id)
460 .where(Message.time >= GroupChatSubscription.joined)
461 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
462 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
463 .where(Message.text.ilike(f"%{request.query}%"))
464 .order_by(Message.id.desc())
465 .limit(page_size + 1)
466 )
467 .scalars()
468 .all()
469 )
471 return conversations_pb2.SearchMessagesRes(
472 results=[
473 conversations_pb2.MessageSearchResult(
474 group_chat_id=message.conversation_id,
475 message=_message_to_pb(message),
476 )
477 for message in results[:page_size]
478 ],
479 last_message_id=results[-2].id if len(results) > 1 else 0,
480 no_more=len(results) <= page_size,
481 )
483 def CreateGroupChat(self, request, context, session):
484 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
485 if not user.has_completed_profile:
486 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE)
488 recipient_user_ids = list(
489 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids)))
490 .scalars()
491 .all()
492 )
494 # make sure all requested users are visible
495 if len(recipient_user_ids) != len(request.recipient_user_ids):
496 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND)
498 if not recipient_user_ids:
499 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS)
501 if len(recipient_user_ids) != len(set(recipient_user_ids)):
502 # make sure there's no duplicate users
503 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS)
505 if context.user_id in recipient_user_ids:
506 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF)
508 if len(recipient_user_ids) == 1:
509 # can only have one DM at a time between any two users
510 other_user_id = recipient_user_ids[0]
512 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
513 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
514 # chat, you know they already have a shared group chat
515 count = func.count(GroupChatSubscription.id).label("count")
516 if session.execute(
517 select(count)
518 .where(
519 or_(
520 GroupChatSubscription.user_id == context.user_id,
521 GroupChatSubscription.user_id == other_user_id,
522 )
523 )
524 .where(GroupChatSubscription.left == None)
525 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
526 .where(GroupChat.is_dm == True)
527 .group_by(GroupChatSubscription.group_chat_id)
528 .having(count == 2)
529 ).scalar_one_or_none():
530 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM)
532 conversation = Conversation()
533 session.add(conversation)
535 group_chat = GroupChat(
536 conversation=conversation,
537 title=request.title.value,
538 creator_id=context.user_id,
539 is_dm=True if len(recipient_user_ids) == 1 else False, # TODO
540 )
541 session.add(group_chat)
543 your_subscription = GroupChatSubscription(
544 user_id=context.user_id,
545 group_chat=group_chat,
546 role=GroupChatRole.admin,
547 )
548 session.add(your_subscription)
550 for recipient_id in request.recipient_user_ids:
551 subscription = GroupChatSubscription(
552 user_id=recipient_id,
553 group_chat=group_chat,
554 role=GroupChatRole.participant,
555 )
556 session.add(subscription)
558 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
560 session.flush()
562 return conversations_pb2.GroupChat(
563 group_chat_id=group_chat.conversation_id,
564 title=group_chat.title,
565 member_user_ids=_get_visible_members_for_subscription(your_subscription),
566 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
567 only_admins_invite=group_chat.only_admins_invite,
568 is_dm=group_chat.is_dm,
569 created=Timestamp_from_datetime(group_chat.conversation.created),
570 mute_info=_mute_info(your_subscription),
571 )
573 def SendMessage(self, request, context, session):
574 if request.text == "":
575 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
577 subscription = session.execute(
578 select(GroupChatSubscription)
579 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
580 .where(GroupChatSubscription.user_id == context.user_id)
581 .where(GroupChatSubscription.left == None)
582 ).scalar_one_or_none()
583 if not subscription:
584 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
586 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
588 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
589 sent_messages_counter.labels(
590 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
591 ).inc()
593 return empty_pb2.Empty()
595 def EditGroupChat(self, request, context, session):
596 subscription = session.execute(
597 select(GroupChatSubscription)
598 .where(GroupChatSubscription.user_id == context.user_id)
599 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
600 .where(GroupChatSubscription.left == None)
601 ).scalar_one_or_none()
603 if not subscription:
604 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
606 if subscription.role != GroupChatRole.admin:
607 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT)
609 if request.HasField("title"):
610 subscription.group_chat.title = request.title.value
612 if request.HasField("only_admins_invite"):
613 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
615 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
617 return empty_pb2.Empty()
619 def MakeGroupChatAdmin(self, request, context, session):
620 if not session.execute(
621 select(User).where_users_visible(context).where(User.id == request.user_id)
622 ).scalar_one_or_none():
623 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
625 your_subscription = session.execute(
626 select(GroupChatSubscription)
627 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
628 .where(GroupChatSubscription.user_id == context.user_id)
629 .where(GroupChatSubscription.left == None)
630 ).scalar_one_or_none()
632 if not your_subscription:
633 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
635 if your_subscription.role != GroupChatRole.admin:
636 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN)
638 if request.user_id == context.user_id:
639 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN)
641 their_subscription = session.execute(
642 select(GroupChatSubscription)
643 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
644 .where(GroupChatSubscription.user_id == request.user_id)
645 .where(GroupChatSubscription.left == None)
646 ).scalar_one_or_none()
648 if not their_subscription:
649 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT)
651 if their_subscription.role != GroupChatRole.participant:
652 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN)
654 their_subscription.role = GroupChatRole.admin
656 _add_message_to_subscription(
657 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
658 )
660 return empty_pb2.Empty()
662 def RemoveGroupChatAdmin(self, request, context, session):
663 if not session.execute(
664 select(User).where_users_visible(context).where(User.id == request.user_id)
665 ).scalar_one_or_none():
666 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
668 your_subscription = session.execute(
669 select(GroupChatSubscription)
670 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
671 .where(GroupChatSubscription.user_id == context.user_id)
672 .where(GroupChatSubscription.left == None)
673 ).scalar_one_or_none()
675 if not your_subscription:
676 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
678 if request.user_id == context.user_id:
679 # Race condition!
680 other_admins_count = session.execute(
681 select(func.count())
682 .select_from(GroupChatSubscription)
683 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
684 .where(GroupChatSubscription.user_id != context.user_id)
685 .where(GroupChatSubscription.role == GroupChatRole.admin)
686 .where(GroupChatSubscription.left == None)
687 ).scalar_one()
688 if not other_admins_count > 0:
689 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN)
691 if your_subscription.role != GroupChatRole.admin:
692 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN)
694 their_subscription = session.execute(
695 select(GroupChatSubscription)
696 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
697 .where(GroupChatSubscription.user_id == request.user_id)
698 .where(GroupChatSubscription.left == None)
699 .where(GroupChatSubscription.role == GroupChatRole.admin)
700 ).scalar_one_or_none()
702 if not their_subscription:
703 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
705 their_subscription.role = GroupChatRole.participant
707 _add_message_to_subscription(
708 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
709 )
711 return empty_pb2.Empty()
713 def InviteToGroupChat(self, request, context, session):
714 if not session.execute(
715 select(User).where_users_visible(context).where(User.id == request.user_id)
716 ).scalar_one_or_none():
717 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
719 result = session.execute(
720 select(GroupChatSubscription, GroupChat)
721 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
722 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
723 .where(GroupChatSubscription.user_id == context.user_id)
724 .where(GroupChatSubscription.left == None)
725 ).one_or_none()
727 if not result:
728 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
730 your_subscription, group_chat = result
732 if not your_subscription or not group_chat:
733 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
735 if request.user_id == context.user_id:
736 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF)
738 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
739 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED)
741 if group_chat.is_dm:
742 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM)
744 their_subscription = session.execute(
745 select(GroupChatSubscription)
746 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
747 .where(GroupChatSubscription.user_id == request.user_id)
748 .where(GroupChatSubscription.left == None)
749 ).scalar_one_or_none()
751 if their_subscription:
752 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT)
754 # TODO: race condition!
756 subscription = GroupChatSubscription(
757 user_id=request.user_id,
758 group_chat=your_subscription.group_chat,
759 role=GroupChatRole.participant,
760 )
761 session.add(subscription)
763 _add_message_to_subscription(
764 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
765 )
767 return empty_pb2.Empty()
769 def RemoveGroupChatUser(self, request, context, session):
770 """
771 1. Get admin info and check it's correct
772 2. Get user data, check it's correct and remove user
773 """
774 # Admin info
775 your_subscription = session.execute(
776 select(GroupChatSubscription)
777 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
778 .where(GroupChatSubscription.user_id == context.user_id)
779 .where(GroupChatSubscription.left == None)
780 ).scalar_one_or_none()
782 # if user info is missing
783 if not your_subscription:
784 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
786 # if user not admin
787 if your_subscription.role != GroupChatRole.admin:
788 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER)
790 # if user wants to remove themselves
791 if request.user_id == context.user_id:
792 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF)
794 # get user info
795 their_subscription = session.execute(
796 select(GroupChatSubscription)
797 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
798 .where(GroupChatSubscription.user_id == request.user_id)
799 .where(GroupChatSubscription.left == None)
800 ).scalar_one_or_none()
802 # user not found
803 if not their_subscription:
804 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT)
806 _add_message_to_subscription(
807 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
808 )
810 their_subscription.left = func.now()
812 return empty_pb2.Empty()
814 def LeaveGroupChat(self, request, context, session):
815 subscription = session.execute(
816 select(GroupChatSubscription)
817 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
818 .where(GroupChatSubscription.user_id == context.user_id)
819 .where(GroupChatSubscription.left == None)
820 ).scalar_one_or_none()
822 if not subscription:
823 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
825 if subscription.role == GroupChatRole.admin:
826 other_admins_count = session.execute(
827 select(func.count())
828 .select_from(GroupChatSubscription)
829 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
830 .where(GroupChatSubscription.user_id != context.user_id)
831 .where(GroupChatSubscription.role == GroupChatRole.admin)
832 .where(GroupChatSubscription.left == None)
833 ).scalar_one()
834 participants_count = session.execute(
835 select(func.count())
836 .select_from(GroupChatSubscription)
837 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
838 .where(GroupChatSubscription.user_id != context.user_id)
839 .where(GroupChatSubscription.role == GroupChatRole.participant)
840 .where(GroupChatSubscription.left == None)
841 ).scalar_one()
842 if not (other_admins_count > 0 or participants_count == 0):
843 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE)
845 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
847 subscription.left = func.now()
849 return empty_pb2.Empty()