Coverage for src/couchers/servicers/conversations.py: 91%
291 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-12 05:54 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-12 05:54 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import func, not_, or_
8from couchers import errors
9from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
10from couchers.db import session_scope
11from couchers.jobs.enqueue import queue_job
12from couchers.metrics import sent_messages_counter
13from couchers.models import (
14 Conversation,
15 GroupChat,
16 GroupChatRole,
17 GroupChatSubscription,
18 Message,
19 MessageType,
20 RateLimitAction,
21 User,
22)
23from couchers.notifications.notify import notify
24from couchers.rate_limits.check import process_rate_limits_and_check_abort
25from couchers.servicers.api import user_model_to_pb
26from couchers.sql import couchers_select as select
27from couchers.utils import Timestamp_from_datetime, make_user_context, now
28from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
29from proto.internal import jobs_pb2
31logger = logging.getLogger(__name__)
33# TODO: Still needs custom pagination: GetUpdates
34DEFAULT_PAGINATION_LENGTH = 20
35MAX_PAGE_SIZE = 50
38def _message_to_pb(message: Message):
39 """
40 Turns the given message to a protocol buffer
41 """
42 if message.is_normal_message:
43 return conversations_pb2.Message(
44 message_id=message.id,
45 author_user_id=message.author_id,
46 time=Timestamp_from_datetime(message.time),
47 text=conversations_pb2.MessageContentText(text=message.text),
48 )
49 else:
50 return conversations_pb2.Message(
51 message_id=message.id,
52 author_user_id=message.author_id,
53 time=Timestamp_from_datetime(message.time),
54 chat_created=(
55 conversations_pb2.MessageContentChatCreated()
56 if message.message_type == MessageType.chat_created
57 else None
58 ),
59 chat_edited=(
60 conversations_pb2.MessageContentChatEdited()
61 if message.message_type == MessageType.chat_edited
62 else None
63 ),
64 user_invited=(
65 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
66 if message.message_type == MessageType.user_invited
67 else None
68 ),
69 user_left=(
70 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
71 ),
72 user_made_admin=(
73 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
74 if message.message_type == MessageType.user_made_admin
75 else None
76 ),
77 user_removed_admin=(
78 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
79 if message.message_type == MessageType.user_removed_admin
80 else None
81 ),
82 group_chat_user_removed=(
83 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
84 if message.message_type == MessageType.user_removed
85 else None
86 ),
87 )
90def _get_visible_members_for_subscription(subscription):
91 """
92 If a user leaves a group chat, they shouldn't be able to see who's added
93 after they left
94 """
95 if not subscription.left:
96 # still in the chat, we see everyone with a current subscription
97 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
98 else:
99 # not in chat anymore, see everyone who was in chat when we left
100 return [
101 sub.user_id
102 for sub in subscription.group_chat.subscriptions.where(
103 GroupChatSubscription.joined <= subscription.left
104 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
105 ]
108def _get_visible_admins_for_subscription(subscription):
109 """
110 If a user leaves a group chat, they shouldn't be able to see who's added
111 after they left
112 """
113 if not subscription.left:
114 # still in the chat, we see everyone with a current subscription
115 return [
116 sub.user_id
117 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
118 GroupChatSubscription.role == GroupChatRole.admin
119 )
120 ]
121 else:
122 # not in chat anymore, see everyone who was in chat when we left
123 return [
124 sub.user_id
125 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
126 .where(GroupChatSubscription.joined <= subscription.left)
127 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
128 ]
131def _user_can_message(session, context, group_chat: GroupChat) -> bool:
132 """
133 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant
134 - Is not deleted/banned
135 - Has not been blocked by the user or is blocking the user
136 - Has not left the chat
137 """
138 if not group_chat.is_dm:
139 return True
140 return session.execute(
141 func.exists(
142 select(GroupChatSubscription)
143 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id)
144 .where(GroupChatSubscription.user_id != context.user_id)
145 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
146 .where(GroupChatSubscription.left == None)
147 )
148 ).scalar_one()
151def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload):
152 """
153 Background job to generate notifications for a message sent to a group chat
154 """
155 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
157 with session_scope() as session:
158 message, group_chat = session.execute(
159 select(Message, GroupChat)
160 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
161 .where(Message.id == payload.message_id)
162 ).one()
164 if message.message_type != MessageType.text:
165 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
166 return []
168 context = make_user_context(user_id=message.author_id)
169 user_ids_to_notify = (
170 session.execute(
171 select(GroupChatSubscription.user_id)
172 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id)
173 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
174 .where(GroupChatSubscription.user_id != message.author_id)
175 .where(GroupChatSubscription.joined <= message.time)
176 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
177 .where(not_(GroupChatSubscription.is_muted))
178 )
179 .scalars()
180 .all()
181 )
183 if group_chat.is_dm:
184 msg = f"{message.author.name} sent you a message"
185 else:
186 msg = f"{message.author.name} sent a message in {group_chat.title}"
188 for user_id in user_ids_to_notify:
189 notify(
190 session,
191 user_id=user_id,
192 topic_action="chat:message",
193 key=message.conversation_id,
194 data=notification_data_pb2.ChatMessage(
195 author=user_model_to_pb(
196 message.author,
197 session,
198 make_user_context(user_id=user_id),
199 ),
200 message=msg,
201 text=message.text,
202 group_chat_id=message.conversation_id,
203 ),
204 )
207def _add_message_to_subscription(session, subscription, **kwargs):
208 """
209 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
211 Specify the keyword args for Message
212 """
213 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs)
215 session.add(message)
216 session.flush()
218 subscription.last_seen_message_id = message.id
220 queue_job(
221 session,
222 job_type="generate_message_notifications",
223 payload=jobs_pb2.GenerateMessageNotificationsPayload(
224 message_id=message.id,
225 ),
226 )
228 return message
231def _create_chat(session, creator_id, recipient_ids, title=None, only_admins_invite=True):
232 conversation = Conversation()
233 session.add(conversation)
234 session.flush()
236 chat = GroupChat(
237 conversation_id=conversation.id,
238 title=title,
239 creator_id=creator_id,
240 is_dm=True if len(recipient_ids) == 1 else False,
241 only_admins_invite=only_admins_invite,
242 )
243 session.add(chat)
244 session.flush()
246 creator_subscription = GroupChatSubscription(
247 user_id=creator_id,
248 group_chat=chat,
249 role=GroupChatRole.admin,
250 )
251 session.add(creator_subscription)
253 for uid in recipient_ids:
254 session.add(
255 GroupChatSubscription(
256 user_id=uid,
257 group_chat=chat,
258 role=GroupChatRole.participant,
259 )
260 )
262 return chat
265def _get_message_subscription(session, user_id, conversation_id):
266 subscription = session.execute(
267 select(GroupChatSubscription)
268 .where(GroupChatSubscription.group_chat_id == conversation_id)
269 .where(GroupChatSubscription.user_id == user_id)
270 .where(GroupChatSubscription.left == None)
271 ).scalar_one_or_none()
273 return subscription
276def _unseen_message_count(session, subscription_id):
277 return session.execute(
278 select(func.count())
279 .select_from(Message)
280 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
281 .where(GroupChatSubscription.id == subscription_id)
282 .where(Message.id > GroupChatSubscription.last_seen_message_id)
283 ).scalar_one()
286def _mute_info(subscription):
287 (muted, muted_until) = subscription.muted_display()
288 return conversations_pb2.MuteInfo(
289 muted=muted,
290 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
291 )
294class Conversations(conversations_pb2_grpc.ConversationsServicer):
295 def ListGroupChats(self, request, context, session):
296 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
297 page_size = min(page_size, MAX_PAGE_SIZE)
299 # select group chats where you have a subscription, and for each of
300 # these, the latest message from them
302 t = (
303 select(
304 GroupChatSubscription.group_chat_id.label("group_chat_id"),
305 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
306 func.max(Message.id).label("message_id"),
307 )
308 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
309 .where(GroupChatSubscription.user_id == context.user_id)
310 .where(Message.time >= GroupChatSubscription.joined)
311 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
312 .group_by(GroupChatSubscription.group_chat_id)
313 .order_by(func.max(Message.id).desc())
314 .subquery()
315 )
317 results = session.execute(
318 select(t, GroupChat, GroupChatSubscription, Message)
319 .join(Message, Message.id == t.c.message_id)
320 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
321 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
322 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0))
323 .order_by(t.c.message_id.desc())
324 .limit(page_size + 1)
325 ).all()
327 return conversations_pb2.ListGroupChatsRes(
328 group_chats=[
329 conversations_pb2.GroupChat(
330 group_chat_id=result.GroupChat.conversation_id,
331 title=result.GroupChat.title, # TODO: proper title for DMs, etc
332 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
333 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
334 only_admins_invite=result.GroupChat.only_admins_invite,
335 is_dm=result.GroupChat.is_dm,
336 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
337 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
338 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
339 latest_message=_message_to_pb(result.Message) if result.Message else None,
340 mute_info=_mute_info(result.GroupChatSubscription),
341 can_message=_user_can_message(session, context, result.GroupChat),
342 )
343 for result in results[:page_size]
344 ],
345 last_message_id=(
346 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
347 ), # TODO
348 no_more=len(results) <= page_size,
349 )
351 def GetGroupChat(self, request, context, session):
352 result = session.execute(
353 select(GroupChat, GroupChatSubscription, Message)
354 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
355 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
356 .where(GroupChatSubscription.user_id == context.user_id)
357 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
358 .where(Message.time >= GroupChatSubscription.joined)
359 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
360 .order_by(Message.id.desc())
361 .limit(1)
362 ).one_or_none()
364 if not result:
365 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
367 return conversations_pb2.GroupChat(
368 group_chat_id=result.GroupChat.conversation_id,
369 title=result.GroupChat.title,
370 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
371 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
372 only_admins_invite=result.GroupChat.only_admins_invite,
373 is_dm=result.GroupChat.is_dm,
374 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
375 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
376 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
377 latest_message=_message_to_pb(result.Message) if result.Message else None,
378 mute_info=_mute_info(result.GroupChatSubscription),
379 can_message=_user_can_message(session, context, result.GroupChat),
380 )
382 def GetDirectMessage(self, request, context, session):
383 count = func.count(GroupChatSubscription.id).label("count")
384 subquery = (
385 select(GroupChatSubscription.group_chat_id)
386 .where(
387 or_(
388 GroupChatSubscription.user_id == context.user_id,
389 GroupChatSubscription.user_id == request.user_id,
390 )
391 )
392 .where(GroupChatSubscription.left == None)
393 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
394 .where(GroupChat.is_dm == True)
395 .group_by(GroupChatSubscription.group_chat_id)
396 .having(count == 2)
397 .subquery()
398 )
400 result = session.execute(
401 select(subquery, GroupChat, GroupChatSubscription, Message)
402 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
403 .join(Message, Message.conversation_id == GroupChat.conversation_id)
404 .where(GroupChatSubscription.user_id == context.user_id)
405 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
406 .where(Message.time >= GroupChatSubscription.joined)
407 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
408 .order_by(Message.id.desc())
409 .limit(1)
410 ).one_or_none()
412 if not result:
413 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
415 return conversations_pb2.GroupChat(
416 group_chat_id=result.GroupChat.conversation_id,
417 title=result.GroupChat.title,
418 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
419 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
420 only_admins_invite=result.GroupChat.only_admins_invite,
421 is_dm=result.GroupChat.is_dm,
422 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
423 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
424 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
425 latest_message=_message_to_pb(result.Message) if result.Message else None,
426 mute_info=_mute_info(result.GroupChatSubscription),
427 can_message=_user_can_message(session, context, result.GroupChat),
428 )
430 def GetUpdates(self, request, context, session):
431 results = (
432 session.execute(
433 select(Message)
434 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
435 .where(GroupChatSubscription.user_id == context.user_id)
436 .where(Message.time >= GroupChatSubscription.joined)
437 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
438 .where(Message.id > request.newest_message_id)
439 .order_by(Message.id.asc())
440 .limit(DEFAULT_PAGINATION_LENGTH + 1)
441 )
442 .scalars()
443 .all()
444 )
446 return conversations_pb2.GetUpdatesRes(
447 updates=[
448 conversations_pb2.Update(
449 group_chat_id=message.conversation_id,
450 message=_message_to_pb(message),
451 )
452 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
453 ],
454 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
455 )
457 def GetGroupChatMessages(self, request, context, session):
458 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
459 page_size = min(page_size, MAX_PAGE_SIZE)
461 results = (
462 session.execute(
463 select(Message)
464 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
465 .where(GroupChatSubscription.user_id == context.user_id)
466 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
467 .where(Message.time >= GroupChatSubscription.joined)
468 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
469 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
470 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0))
471 .order_by(Message.id.desc())
472 .limit(page_size + 1)
473 )
474 .scalars()
475 .all()
476 )
478 return conversations_pb2.GetGroupChatMessagesRes(
479 messages=[_message_to_pb(message) for message in results[:page_size]],
480 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
481 no_more=len(results) <= page_size,
482 )
484 def MarkLastSeenGroupChat(self, request, context, session):
485 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
487 if not subscription:
488 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
490 if not subscription.last_seen_message_id <= request.last_seen_message_id:
491 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES)
493 subscription.last_seen_message_id = request.last_seen_message_id
495 # TODO: notify
497 return empty_pb2.Empty()
499 def MuteGroupChat(self, request, context, session):
500 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
502 if not subscription:
503 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
505 if request.unmute:
506 subscription.muted_until = DATETIME_MINUS_INFINITY
507 elif request.forever:
508 subscription.muted_until = DATETIME_INFINITY
509 elif request.for_duration:
510 duration = request.for_duration.ToTimedelta()
511 if duration < timedelta(seconds=0):
512 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST)
513 subscription.muted_until = now() + duration
515 return empty_pb2.Empty()
517 def SearchMessages(self, request, context, session):
518 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
519 page_size = min(page_size, MAX_PAGE_SIZE)
521 results = (
522 session.execute(
523 select(Message)
524 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
525 .where(GroupChatSubscription.user_id == context.user_id)
526 .where(Message.time >= GroupChatSubscription.joined)
527 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
528 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
529 .where(Message.text.ilike(f"%{request.query}%"))
530 .order_by(Message.id.desc())
531 .limit(page_size + 1)
532 )
533 .scalars()
534 .all()
535 )
537 return conversations_pb2.SearchMessagesRes(
538 results=[
539 conversations_pb2.MessageSearchResult(
540 group_chat_id=message.conversation_id,
541 message=_message_to_pb(message),
542 )
543 for message in results[:page_size]
544 ],
545 last_message_id=results[-2].id if len(results) > 1 else 0,
546 no_more=len(results) <= page_size,
547 )
549 def CreateGroupChat(self, request, context, session):
550 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
551 if not user.has_completed_profile:
552 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE)
554 recipient_user_ids = list(
555 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids)))
556 .scalars()
557 .all()
558 )
560 # make sure all requested users are visible
561 if len(recipient_user_ids) != len(request.recipient_user_ids):
562 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND)
564 if not recipient_user_ids:
565 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS)
567 if len(recipient_user_ids) != len(set(recipient_user_ids)):
568 # make sure there's no duplicate users
569 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS)
571 if context.user_id in recipient_user_ids:
572 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF)
574 if len(recipient_user_ids) == 1:
575 # can only have one DM at a time between any two users
576 other_user_id = recipient_user_ids[0]
578 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
579 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
580 # chat, you know they already have a shared group chat
581 count = func.count(GroupChatSubscription.id).label("count")
582 if session.execute(
583 select(count)
584 .where(
585 or_(
586 GroupChatSubscription.user_id == context.user_id,
587 GroupChatSubscription.user_id == other_user_id,
588 )
589 )
590 .where(GroupChatSubscription.left == None)
591 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
592 .where(GroupChat.is_dm == True)
593 .group_by(GroupChatSubscription.group_chat_id)
594 .having(count == 2)
595 ).scalar_one_or_none():
596 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM)
598 # Check if user has been initiating chats excessively
599 if process_rate_limits_and_check_abort(
600 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation
601 ):
602 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.CHAT_INITIATION_RATE_LIMIT)
604 group_chat = _create_chat(
605 session,
606 creator_id=context.user_id,
607 recipient_ids=request.recipient_user_ids,
608 title=request.title.value,
609 )
611 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id)
613 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
615 session.flush()
617 return conversations_pb2.GroupChat(
618 group_chat_id=group_chat.conversation_id,
619 title=group_chat.title,
620 member_user_ids=_get_visible_members_for_subscription(your_subscription),
621 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
622 only_admins_invite=group_chat.only_admins_invite,
623 is_dm=group_chat.is_dm,
624 created=Timestamp_from_datetime(group_chat.conversation.created),
625 mute_info=_mute_info(your_subscription),
626 can_message=True,
627 )
629 def SendMessage(self, request, context, session):
630 if request.text == "":
631 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
633 result = session.execute(
634 select(GroupChatSubscription, GroupChat)
635 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
636 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
637 .where(GroupChatSubscription.user_id == context.user_id)
638 .where(GroupChatSubscription.left == None)
639 ).one_or_none()
640 if not result:
641 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
643 subscription, group_chat = result
644 if not _user_can_message(session, context, group_chat):
645 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MESSAGE_IN_CHAT)
647 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
649 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
650 sent_messages_counter.labels(
651 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
652 ).inc()
654 return empty_pb2.Empty()
656 def SendDirectMessage(self, request, context, session):
657 user_id = context.user_id
658 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
660 recipient_id = request.recipient_user_id
662 if not user.has_completed_profile:
663 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE)
665 if not recipient_id:
666 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS)
668 recipient_user_id = session.execute(
669 select(User.id).where_users_visible(context).where(User.id == recipient_id)
670 ).scalar_one_or_none()
672 if not recipient_user_id:
673 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND)
675 if user_id == recipient_id:
676 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF)
678 if request.text == "":
679 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
681 # Look for an existing direct message (DM) chat between the two users
682 dm_chat_ids = (
683 select(GroupChatSubscription.group_chat_id)
684 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id]))
685 .group_by(GroupChatSubscription.group_chat_id)
686 .having(func.count(GroupChatSubscription.user_id) == 2)
687 )
689 chat = session.execute(
690 select(GroupChat).where(GroupChat.is_dm == True).where(GroupChat.conversation_id.in_(dm_chat_ids)).limit(1)
691 ).scalar_one_or_none()
693 if not chat:
694 chat = _create_chat(session, user_id, [recipient_id])
696 # Retrieve the sender's active subscription to the chat
697 subscription = _get_message_subscription(session, user_id, chat.conversation_id)
699 # Add the message to the conversation
700 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
702 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one()
703 sent_messages_counter.labels(user_gender, "direct message").inc()
705 session.flush()
707 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id)
709 def EditGroupChat(self, request, context, session):
710 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
712 if not subscription:
713 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
715 if subscription.role != GroupChatRole.admin:
716 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT)
718 if request.HasField("title"):
719 subscription.group_chat.title = request.title.value
721 if request.HasField("only_admins_invite"):
722 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
724 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
726 return empty_pb2.Empty()
728 def MakeGroupChatAdmin(self, request, context, session):
729 if not session.execute(
730 select(User).where_users_visible(context).where(User.id == request.user_id)
731 ).scalar_one_or_none():
732 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
734 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
736 if not your_subscription:
737 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
739 if your_subscription.role != GroupChatRole.admin:
740 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN)
742 if request.user_id == context.user_id:
743 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN)
745 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
747 if not their_subscription:
748 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT)
750 if their_subscription.role != GroupChatRole.participant:
751 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN)
753 their_subscription.role = GroupChatRole.admin
755 _add_message_to_subscription(
756 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
757 )
759 return empty_pb2.Empty()
761 def RemoveGroupChatAdmin(self, request, context, session):
762 if not session.execute(
763 select(User).where_users_visible(context).where(User.id == request.user_id)
764 ).scalar_one_or_none():
765 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
767 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
769 if not your_subscription:
770 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
772 if request.user_id == context.user_id:
773 # Race condition!
774 other_admins_count = session.execute(
775 select(func.count())
776 .select_from(GroupChatSubscription)
777 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
778 .where(GroupChatSubscription.user_id != context.user_id)
779 .where(GroupChatSubscription.role == GroupChatRole.admin)
780 .where(GroupChatSubscription.left == None)
781 ).scalar_one()
782 if not other_admins_count > 0:
783 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN)
785 if your_subscription.role != GroupChatRole.admin:
786 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN)
788 their_subscription = session.execute(
789 select(GroupChatSubscription)
790 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
791 .where(GroupChatSubscription.user_id == request.user_id)
792 .where(GroupChatSubscription.left == None)
793 .where(GroupChatSubscription.role == GroupChatRole.admin)
794 ).scalar_one_or_none()
796 if not their_subscription:
797 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
799 their_subscription.role = GroupChatRole.participant
801 _add_message_to_subscription(
802 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
803 )
805 return empty_pb2.Empty()
807 def InviteToGroupChat(self, request, context, session):
808 if not session.execute(
809 select(User).where_users_visible(context).where(User.id == request.user_id)
810 ).scalar_one_or_none():
811 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
813 result = session.execute(
814 select(GroupChatSubscription, GroupChat)
815 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
816 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
817 .where(GroupChatSubscription.user_id == context.user_id)
818 .where(GroupChatSubscription.left == None)
819 ).one_or_none()
821 if not result:
822 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
824 your_subscription, group_chat = result
826 if not your_subscription or not group_chat:
827 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
829 if request.user_id == context.user_id:
830 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF)
832 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
833 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED)
835 if group_chat.is_dm:
836 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM)
838 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
840 if their_subscription:
841 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT)
843 # TODO: race condition!
845 subscription = GroupChatSubscription(
846 user_id=request.user_id,
847 group_chat=your_subscription.group_chat,
848 role=GroupChatRole.participant,
849 )
850 session.add(subscription)
852 _add_message_to_subscription(
853 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
854 )
856 return empty_pb2.Empty()
858 def RemoveGroupChatUser(self, request, context, session):
859 """
860 1. Get admin info and check it's correct
861 2. Get user data, check it's correct and remove user
862 """
863 # Admin info
864 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
866 # if user info is missing
867 if not your_subscription:
868 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
870 # if user not admin
871 if your_subscription.role != GroupChatRole.admin:
872 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER)
874 # if user wants to remove themselves
875 if request.user_id == context.user_id:
876 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF)
878 # get user info
879 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
881 # user not found
882 if not their_subscription:
883 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT)
885 _add_message_to_subscription(
886 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
887 )
889 their_subscription.left = func.now()
891 return empty_pb2.Empty()
893 def LeaveGroupChat(self, request, context, session):
894 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
896 if not subscription:
897 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
899 if subscription.role == GroupChatRole.admin:
900 other_admins_count = session.execute(
901 select(func.count())
902 .select_from(GroupChatSubscription)
903 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
904 .where(GroupChatSubscription.user_id != context.user_id)
905 .where(GroupChatSubscription.role == GroupChatRole.admin)
906 .where(GroupChatSubscription.left == None)
907 ).scalar_one()
908 participants_count = session.execute(
909 select(func.count())
910 .select_from(GroupChatSubscription)
911 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
912 .where(GroupChatSubscription.user_id != context.user_id)
913 .where(GroupChatSubscription.role == GroupChatRole.participant)
914 .where(GroupChatSubscription.left == None)
915 ).scalar_one()
916 if not (other_admins_count > 0 or participants_count == 0):
917 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE)
919 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
921 subscription.left = func.now()
923 return empty_pb2.Empty()