Coverage for src/couchers/servicers/conversations.py: 91%
250 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 02:39 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 02:39 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import func, not_, or_, select
8from couchers import errors
9from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
10from couchers.db import session_scope
11from couchers.jobs.enqueue import queue_job
12from couchers.metrics import sent_messages_counter
13from couchers.models import Conversation, GroupChat, GroupChatRole, GroupChatSubscription, Message, MessageType, User
14from couchers.notifications.notify import notify
15from couchers.servicers.api import user_model_to_pb
16from couchers.servicers.blocking import are_blocked
17from couchers.sql import couchers_select as select
18from couchers.utils import Timestamp_from_datetime, make_user_context, now
19from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
20from proto.internal import jobs_pb2
22logger = logging.getLogger(__name__)
24# TODO: Still needs custom pagination: GetUpdates
25DEFAULT_PAGINATION_LENGTH = 20
26MAX_PAGE_SIZE = 50
29def _message_to_pb(message: Message):
30 """
31 Turns the given message to a protocol buffer
32 """
33 if message.is_normal_message:
34 return conversations_pb2.Message(
35 message_id=message.id,
36 author_user_id=message.author_id,
37 time=Timestamp_from_datetime(message.time),
38 text=conversations_pb2.MessageContentText(text=message.text),
39 )
40 else:
41 return conversations_pb2.Message(
42 message_id=message.id,
43 author_user_id=message.author_id,
44 time=Timestamp_from_datetime(message.time),
45 chat_created=(
46 conversations_pb2.MessageContentChatCreated()
47 if message.message_type == MessageType.chat_created
48 else None
49 ),
50 chat_edited=(
51 conversations_pb2.MessageContentChatEdited()
52 if message.message_type == MessageType.chat_edited
53 else None
54 ),
55 user_invited=(
56 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
57 if message.message_type == MessageType.user_invited
58 else None
59 ),
60 user_left=(
61 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
62 ),
63 user_made_admin=(
64 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
65 if message.message_type == MessageType.user_made_admin
66 else None
67 ),
68 user_removed_admin=(
69 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
70 if message.message_type == MessageType.user_removed_admin
71 else None
72 ),
73 group_chat_user_removed=(
74 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
75 if message.message_type == MessageType.user_removed
76 else None
77 ),
78 )
81def _get_visible_members_for_subscription(subscription):
82 """
83 If a user leaves a group chat, they shouldn't be able to see who's added
84 after they left
85 """
86 if not subscription.left:
87 # still in the chat, we see everyone with a current subscription
88 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
89 else:
90 # not in chat anymore, see everyone who was in chat when we left
91 return [
92 sub.user_id
93 for sub in subscription.group_chat.subscriptions.where(
94 GroupChatSubscription.joined <= subscription.left
95 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
96 ]
99def _get_visible_admins_for_subscription(subscription):
100 """
101 If a user leaves a group chat, they shouldn't be able to see who's added
102 after they left
103 """
104 if not subscription.left:
105 # still in the chat, we see everyone with a current subscription
106 return [
107 sub.user_id
108 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
109 GroupChatSubscription.role == GroupChatRole.admin
110 )
111 ]
112 else:
113 # not in chat anymore, see everyone who was in chat when we left
114 return [
115 sub.user_id
116 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
117 .where(GroupChatSubscription.joined <= subscription.left)
118 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
119 ]
122def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload):
123 """
124 Background job to generate notifications for a message sent to a group chat
125 """
126 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
128 with session_scope() as session:
129 message, group_chat = session.execute(
130 select(Message, GroupChat)
131 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
132 .where(Message.id == payload.message_id)
133 ).one()
135 if message.message_type != MessageType.text:
136 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
137 return []
139 subscriptions = (
140 session.execute(
141 select(GroupChatSubscription)
142 .join(User, User.id == GroupChatSubscription.user_id)
143 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
144 .where(User.is_visible)
145 .where(User.id != message.author_id)
146 .where(GroupChatSubscription.joined <= message.time)
147 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
148 .where(not_(GroupChatSubscription.is_muted))
149 )
150 .scalars()
151 .all()
152 )
154 if group_chat.is_dm:
155 msg = f"{message.author.name} sent you a message"
156 else:
157 msg = f"{message.author.name} sent a message in {group_chat.title}"
159 for subscription in subscriptions:
160 if are_blocked(session, subscription.user_id, message.author.id):
161 continue
162 notify(
163 session,
164 user_id=subscription.user_id,
165 topic_action="chat:message",
166 key=message.conversation_id,
167 data=notification_data_pb2.ChatMessage(
168 author=user_model_to_pb(
169 message.author,
170 session,
171 make_user_context(user_id=subscription.user_id),
172 ),
173 message=msg,
174 text=message.text,
175 group_chat_id=message.conversation_id,
176 ),
177 )
180def _add_message_to_subscription(session, subscription, **kwargs):
181 """
182 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
184 Specify the keyword args for Message
185 """
186 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs)
188 session.add(message)
189 session.flush()
191 subscription.last_seen_message_id = message.id
193 queue_job(
194 session,
195 job_type="generate_message_notifications",
196 payload=jobs_pb2.GenerateMessageNotificationsPayload(
197 message_id=message.id,
198 ),
199 )
201 return message
204def _unseen_message_count(session, subscription_id):
205 return session.execute(
206 select(func.count())
207 .select_from(Message)
208 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
209 .where(GroupChatSubscription.id == subscription_id)
210 .where(Message.id > GroupChatSubscription.last_seen_message_id)
211 ).scalar_one()
214def _mute_info(subscription):
215 (muted, muted_until) = subscription.muted_display()
216 return conversations_pb2.MuteInfo(
217 muted=muted,
218 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
219 )
222class Conversations(conversations_pb2_grpc.ConversationsServicer):
223 def ListGroupChats(self, request, context, session):
224 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
225 page_size = min(page_size, MAX_PAGE_SIZE)
227 # select group chats where you have a subscription, and for each of
228 # these, the latest message from them
230 t = (
231 select(
232 GroupChatSubscription.group_chat_id.label("group_chat_id"),
233 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
234 func.max(Message.id).label("message_id"),
235 )
236 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
237 .where(GroupChatSubscription.user_id == context.user_id)
238 .where(Message.time >= GroupChatSubscription.joined)
239 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
240 .group_by(GroupChatSubscription.group_chat_id)
241 .order_by(func.max(Message.id).desc())
242 .subquery()
243 )
245 results = session.execute(
246 select(t, GroupChat, GroupChatSubscription, Message)
247 .join(Message, Message.id == t.c.message_id)
248 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
249 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
250 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0))
251 .order_by(t.c.message_id.desc())
252 .limit(page_size + 1)
253 ).all()
255 return conversations_pb2.ListGroupChatsRes(
256 group_chats=[
257 conversations_pb2.GroupChat(
258 group_chat_id=result.GroupChat.conversation_id,
259 title=result.GroupChat.title, # TODO: proper title for DMs, etc
260 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
261 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
262 only_admins_invite=result.GroupChat.only_admins_invite,
263 is_dm=result.GroupChat.is_dm,
264 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
265 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
266 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
267 latest_message=_message_to_pb(result.Message) if result.Message else None,
268 mute_info=_mute_info(result.GroupChatSubscription),
269 )
270 for result in results[:page_size]
271 ],
272 last_message_id=(
273 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
274 ), # TODO
275 no_more=len(results) <= page_size,
276 )
278 def GetGroupChat(self, request, context, session):
279 result = session.execute(
280 select(GroupChat, GroupChatSubscription, Message)
281 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
282 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
283 .where(GroupChatSubscription.user_id == context.user_id)
284 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
285 .where(Message.time >= GroupChatSubscription.joined)
286 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
287 .order_by(Message.id.desc())
288 .limit(1)
289 ).one_or_none()
291 if not result:
292 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
294 return conversations_pb2.GroupChat(
295 group_chat_id=result.GroupChat.conversation_id,
296 title=result.GroupChat.title,
297 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
298 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
299 only_admins_invite=result.GroupChat.only_admins_invite,
300 is_dm=result.GroupChat.is_dm,
301 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
302 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
303 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
304 latest_message=_message_to_pb(result.Message) if result.Message else None,
305 mute_info=_mute_info(result.GroupChatSubscription),
306 )
308 def GetDirectMessage(self, request, context, session):
309 count = func.count(GroupChatSubscription.id).label("count")
310 subquery = (
311 select(GroupChatSubscription.group_chat_id)
312 .where(
313 or_(
314 GroupChatSubscription.user_id == context.user_id,
315 GroupChatSubscription.user_id == request.user_id,
316 )
317 )
318 .where(GroupChatSubscription.left == None)
319 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
320 .where(GroupChat.is_dm == True)
321 .group_by(GroupChatSubscription.group_chat_id)
322 .having(count == 2)
323 .subquery()
324 )
326 result = session.execute(
327 select(subquery, GroupChat, GroupChatSubscription, Message)
328 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
329 .join(Message, Message.conversation_id == GroupChat.conversation_id)
330 .where(GroupChatSubscription.user_id == context.user_id)
331 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
332 .where(Message.time >= GroupChatSubscription.joined)
333 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
334 .order_by(Message.id.desc())
335 .limit(1)
336 ).one_or_none()
338 if not result:
339 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
341 return conversations_pb2.GroupChat(
342 group_chat_id=result.GroupChat.conversation_id,
343 title=result.GroupChat.title,
344 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
345 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
346 only_admins_invite=result.GroupChat.only_admins_invite,
347 is_dm=result.GroupChat.is_dm,
348 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
349 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
350 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
351 latest_message=_message_to_pb(result.Message) if result.Message else None,
352 mute_info=_mute_info(result.GroupChatSubscription),
353 )
355 def GetUpdates(self, request, context, session):
356 results = (
357 session.execute(
358 select(Message)
359 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
360 .where(GroupChatSubscription.user_id == context.user_id)
361 .where(Message.time >= GroupChatSubscription.joined)
362 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
363 .where(Message.id > request.newest_message_id)
364 .order_by(Message.id.asc())
365 .limit(DEFAULT_PAGINATION_LENGTH + 1)
366 )
367 .scalars()
368 .all()
369 )
371 return conversations_pb2.GetUpdatesRes(
372 updates=[
373 conversations_pb2.Update(
374 group_chat_id=message.conversation_id,
375 message=_message_to_pb(message),
376 )
377 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
378 ],
379 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
380 )
382 def GetGroupChatMessages(self, request, context, session):
383 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
384 page_size = min(page_size, MAX_PAGE_SIZE)
386 results = (
387 session.execute(
388 select(Message)
389 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
390 .where(GroupChatSubscription.user_id == context.user_id)
391 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
392 .where(Message.time >= GroupChatSubscription.joined)
393 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
394 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
395 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0))
396 .order_by(Message.id.desc())
397 .limit(page_size + 1)
398 )
399 .scalars()
400 .all()
401 )
403 return conversations_pb2.GetGroupChatMessagesRes(
404 messages=[_message_to_pb(message) for message in results[:page_size]],
405 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
406 no_more=len(results) <= page_size,
407 )
409 def MarkLastSeenGroupChat(self, request, context, session):
410 subscription = session.execute(
411 select(GroupChatSubscription)
412 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
413 .where(GroupChatSubscription.user_id == context.user_id)
414 .where(GroupChatSubscription.left == None)
415 ).scalar_one_or_none()
417 if not subscription:
418 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
420 if not subscription.last_seen_message_id <= request.last_seen_message_id:
421 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES)
423 subscription.last_seen_message_id = request.last_seen_message_id
425 # TODO: notify
427 return empty_pb2.Empty()
429 def MuteGroupChat(self, request, context, session):
430 subscription = session.execute(
431 select(GroupChatSubscription)
432 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
433 .where(GroupChatSubscription.user_id == context.user_id)
434 .where(GroupChatSubscription.left == None)
435 ).scalar_one_or_none()
437 if not subscription:
438 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
440 if request.unmute:
441 subscription.muted_until = DATETIME_MINUS_INFINITY
442 elif request.forever:
443 subscription.muted_until = DATETIME_INFINITY
444 elif request.for_duration:
445 duration = request.for_duration.ToTimedelta()
446 if duration < timedelta(seconds=0):
447 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST)
448 subscription.muted_until = now() + duration
450 return empty_pb2.Empty()
452 def SearchMessages(self, request, context, session):
453 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
454 page_size = min(page_size, MAX_PAGE_SIZE)
456 results = (
457 session.execute(
458 select(Message)
459 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
460 .where(GroupChatSubscription.user_id == context.user_id)
461 .where(Message.time >= GroupChatSubscription.joined)
462 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
463 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
464 .where(Message.text.ilike(f"%{request.query}%"))
465 .order_by(Message.id.desc())
466 .limit(page_size + 1)
467 )
468 .scalars()
469 .all()
470 )
472 return conversations_pb2.SearchMessagesRes(
473 results=[
474 conversations_pb2.MessageSearchResult(
475 group_chat_id=message.conversation_id,
476 message=_message_to_pb(message),
477 )
478 for message in results[:page_size]
479 ],
480 last_message_id=results[-2].id if len(results) > 1 else 0,
481 no_more=len(results) <= page_size,
482 )
484 def CreateGroupChat(self, request, context, session):
485 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
486 if not user.has_completed_profile:
487 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE)
489 recipient_user_ids = list(
490 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids)))
491 .scalars()
492 .all()
493 )
495 # make sure all requested users are visible
496 if len(recipient_user_ids) != len(request.recipient_user_ids):
497 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND)
499 if not recipient_user_ids:
500 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS)
502 if len(recipient_user_ids) != len(set(recipient_user_ids)):
503 # make sure there's no duplicate users
504 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS)
506 if context.user_id in recipient_user_ids:
507 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF)
509 if len(recipient_user_ids) == 1:
510 # can only have one DM at a time between any two users
511 other_user_id = recipient_user_ids[0]
513 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
514 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
515 # chat, you know they already have a shared group chat
516 count = func.count(GroupChatSubscription.id).label("count")
517 if session.execute(
518 select(count)
519 .where(
520 or_(
521 GroupChatSubscription.user_id == context.user_id,
522 GroupChatSubscription.user_id == other_user_id,
523 )
524 )
525 .where(GroupChatSubscription.left == None)
526 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
527 .where(GroupChat.is_dm == True)
528 .group_by(GroupChatSubscription.group_chat_id)
529 .having(count == 2)
530 ).scalar_one_or_none():
531 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM)
533 conversation = Conversation()
534 session.add(conversation)
536 group_chat = GroupChat(
537 conversation=conversation,
538 title=request.title.value,
539 creator_id=context.user_id,
540 is_dm=True if len(recipient_user_ids) == 1 else False, # TODO
541 )
542 session.add(group_chat)
544 your_subscription = GroupChatSubscription(
545 user_id=context.user_id,
546 group_chat=group_chat,
547 role=GroupChatRole.admin,
548 )
549 session.add(your_subscription)
551 for recipient_id in request.recipient_user_ids:
552 subscription = GroupChatSubscription(
553 user_id=recipient_id,
554 group_chat=group_chat,
555 role=GroupChatRole.participant,
556 )
557 session.add(subscription)
559 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
561 session.flush()
563 return conversations_pb2.GroupChat(
564 group_chat_id=group_chat.conversation_id,
565 title=group_chat.title,
566 member_user_ids=_get_visible_members_for_subscription(your_subscription),
567 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
568 only_admins_invite=group_chat.only_admins_invite,
569 is_dm=group_chat.is_dm,
570 created=Timestamp_from_datetime(group_chat.conversation.created),
571 mute_info=_mute_info(your_subscription),
572 )
574 def SendMessage(self, request, context, session):
575 if request.text == "":
576 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
578 subscription = session.execute(
579 select(GroupChatSubscription)
580 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
581 .where(GroupChatSubscription.user_id == context.user_id)
582 .where(GroupChatSubscription.left == None)
583 ).scalar_one_or_none()
584 if not subscription:
585 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
587 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
589 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
590 sent_messages_counter.labels(
591 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
592 ).inc()
594 return empty_pb2.Empty()
596 def EditGroupChat(self, request, context, session):
597 subscription = session.execute(
598 select(GroupChatSubscription)
599 .where(GroupChatSubscription.user_id == context.user_id)
600 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
601 .where(GroupChatSubscription.left == None)
602 ).scalar_one_or_none()
604 if not subscription:
605 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
607 if subscription.role != GroupChatRole.admin:
608 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT)
610 if request.HasField("title"):
611 subscription.group_chat.title = request.title.value
613 if request.HasField("only_admins_invite"):
614 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
616 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
618 return empty_pb2.Empty()
620 def MakeGroupChatAdmin(self, request, context, session):
621 if not session.execute(
622 select(User).where_users_visible(context).where(User.id == request.user_id)
623 ).scalar_one_or_none():
624 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
626 your_subscription = session.execute(
627 select(GroupChatSubscription)
628 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
629 .where(GroupChatSubscription.user_id == context.user_id)
630 .where(GroupChatSubscription.left == None)
631 ).scalar_one_or_none()
633 if not your_subscription:
634 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
636 if your_subscription.role != GroupChatRole.admin:
637 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN)
639 if request.user_id == context.user_id:
640 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN)
642 their_subscription = session.execute(
643 select(GroupChatSubscription)
644 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
645 .where(GroupChatSubscription.user_id == request.user_id)
646 .where(GroupChatSubscription.left == None)
647 ).scalar_one_or_none()
649 if not their_subscription:
650 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT)
652 if their_subscription.role != GroupChatRole.participant:
653 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN)
655 their_subscription.role = GroupChatRole.admin
657 _add_message_to_subscription(
658 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
659 )
661 return empty_pb2.Empty()
663 def RemoveGroupChatAdmin(self, request, context, session):
664 if not session.execute(
665 select(User).where_users_visible(context).where(User.id == request.user_id)
666 ).scalar_one_or_none():
667 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
669 your_subscription = session.execute(
670 select(GroupChatSubscription)
671 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
672 .where(GroupChatSubscription.user_id == context.user_id)
673 .where(GroupChatSubscription.left == None)
674 ).scalar_one_or_none()
676 if not your_subscription:
677 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
679 if request.user_id == context.user_id:
680 # Race condition!
681 other_admins_count = session.execute(
682 select(func.count())
683 .select_from(GroupChatSubscription)
684 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
685 .where(GroupChatSubscription.user_id != context.user_id)
686 .where(GroupChatSubscription.role == GroupChatRole.admin)
687 .where(GroupChatSubscription.left == None)
688 ).scalar_one()
689 if not other_admins_count > 0:
690 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN)
692 if your_subscription.role != GroupChatRole.admin:
693 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN)
695 their_subscription = session.execute(
696 select(GroupChatSubscription)
697 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
698 .where(GroupChatSubscription.user_id == request.user_id)
699 .where(GroupChatSubscription.left == None)
700 .where(GroupChatSubscription.role == GroupChatRole.admin)
701 ).scalar_one_or_none()
703 if not their_subscription:
704 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
706 their_subscription.role = GroupChatRole.participant
708 _add_message_to_subscription(
709 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
710 )
712 return empty_pb2.Empty()
714 def InviteToGroupChat(self, request, context, session):
715 if not session.execute(
716 select(User).where_users_visible(context).where(User.id == request.user_id)
717 ).scalar_one_or_none():
718 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
720 result = session.execute(
721 select(GroupChatSubscription, GroupChat)
722 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
723 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
724 .where(GroupChatSubscription.user_id == context.user_id)
725 .where(GroupChatSubscription.left == None)
726 ).one_or_none()
728 if not result:
729 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
731 your_subscription, group_chat = result
733 if not your_subscription or not group_chat:
734 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
736 if request.user_id == context.user_id:
737 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF)
739 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
740 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED)
742 if group_chat.is_dm:
743 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM)
745 their_subscription = session.execute(
746 select(GroupChatSubscription)
747 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
748 .where(GroupChatSubscription.user_id == request.user_id)
749 .where(GroupChatSubscription.left == None)
750 ).scalar_one_or_none()
752 if their_subscription:
753 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT)
755 # TODO: race condition!
757 subscription = GroupChatSubscription(
758 user_id=request.user_id,
759 group_chat=your_subscription.group_chat,
760 role=GroupChatRole.participant,
761 )
762 session.add(subscription)
764 _add_message_to_subscription(
765 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
766 )
768 return empty_pb2.Empty()
770 def RemoveGroupChatUser(self, request, context, session):
771 """
772 1. Get admin info and check it's correct
773 2. Get user data, check it's correct and remove user
774 """
775 # Admin info
776 your_subscription = session.execute(
777 select(GroupChatSubscription)
778 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
779 .where(GroupChatSubscription.user_id == context.user_id)
780 .where(GroupChatSubscription.left == None)
781 ).scalar_one_or_none()
783 # if user info is missing
784 if not your_subscription:
785 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
787 # if user not admin
788 if your_subscription.role != GroupChatRole.admin:
789 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER)
791 # if user wants to remove themselves
792 if request.user_id == context.user_id:
793 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF)
795 # get user info
796 their_subscription = session.execute(
797 select(GroupChatSubscription)
798 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
799 .where(GroupChatSubscription.user_id == request.user_id)
800 .where(GroupChatSubscription.left == None)
801 ).scalar_one_or_none()
803 # user not found
804 if not their_subscription:
805 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT)
807 _add_message_to_subscription(
808 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
809 )
811 their_subscription.left = func.now()
813 return empty_pb2.Empty()
815 def LeaveGroupChat(self, request, context, session):
816 subscription = session.execute(
817 select(GroupChatSubscription)
818 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
819 .where(GroupChatSubscription.user_id == context.user_id)
820 .where(GroupChatSubscription.left == None)
821 ).scalar_one_or_none()
823 if not subscription:
824 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
826 if subscription.role == GroupChatRole.admin:
827 other_admins_count = session.execute(
828 select(func.count())
829 .select_from(GroupChatSubscription)
830 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
831 .where(GroupChatSubscription.user_id != context.user_id)
832 .where(GroupChatSubscription.role == GroupChatRole.admin)
833 .where(GroupChatSubscription.left == None)
834 ).scalar_one()
835 participants_count = session.execute(
836 select(func.count())
837 .select_from(GroupChatSubscription)
838 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
839 .where(GroupChatSubscription.user_id != context.user_id)
840 .where(GroupChatSubscription.role == GroupChatRole.participant)
841 .where(GroupChatSubscription.left == None)
842 ).scalar_one()
843 if not (other_admins_count > 0 or participants_count == 0):
844 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE)
846 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
848 subscription.left = func.now()
850 return empty_pb2.Empty()