Coverage for src/couchers/servicers/conversations.py: 91%
292 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import func, not_, or_
8from couchers import errors
9from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
10from couchers.context import make_background_user_context
11from couchers.db import session_scope
12from couchers.jobs.enqueue import queue_job
13from couchers.metrics import sent_messages_counter
14from couchers.models import (
15 Conversation,
16 GroupChat,
17 GroupChatRole,
18 GroupChatSubscription,
19 Message,
20 MessageType,
21 RateLimitAction,
22 User,
23)
24from couchers.notifications.notify import notify
25from couchers.rate_limits.check import process_rate_limits_and_check_abort
26from couchers.servicers.api import user_model_to_pb
27from couchers.sql import couchers_select as select
28from couchers.utils import Timestamp_from_datetime, now
29from proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
30from proto.internal import jobs_pb2
32logger = logging.getLogger(__name__)
34# TODO: Still needs custom pagination: GetUpdates
35DEFAULT_PAGINATION_LENGTH = 20
36MAX_PAGE_SIZE = 50
39def _message_to_pb(message: Message):
40 """
41 Turns the given message to a protocol buffer
42 """
43 if message.is_normal_message:
44 return conversations_pb2.Message(
45 message_id=message.id,
46 author_user_id=message.author_id,
47 time=Timestamp_from_datetime(message.time),
48 text=conversations_pb2.MessageContentText(text=message.text),
49 )
50 else:
51 return conversations_pb2.Message(
52 message_id=message.id,
53 author_user_id=message.author_id,
54 time=Timestamp_from_datetime(message.time),
55 chat_created=(
56 conversations_pb2.MessageContentChatCreated()
57 if message.message_type == MessageType.chat_created
58 else None
59 ),
60 chat_edited=(
61 conversations_pb2.MessageContentChatEdited()
62 if message.message_type == MessageType.chat_edited
63 else None
64 ),
65 user_invited=(
66 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
67 if message.message_type == MessageType.user_invited
68 else None
69 ),
70 user_left=(
71 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
72 ),
73 user_made_admin=(
74 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
75 if message.message_type == MessageType.user_made_admin
76 else None
77 ),
78 user_removed_admin=(
79 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
80 if message.message_type == MessageType.user_removed_admin
81 else None
82 ),
83 group_chat_user_removed=(
84 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
85 if message.message_type == MessageType.user_removed
86 else None
87 ),
88 )
91def _get_visible_members_for_subscription(subscription):
92 """
93 If a user leaves a group chat, they shouldn't be able to see who's added
94 after they left
95 """
96 if not subscription.left:
97 # still in the chat, we see everyone with a current subscription
98 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
99 else:
100 # not in chat anymore, see everyone who was in chat when we left
101 return [
102 sub.user_id
103 for sub in subscription.group_chat.subscriptions.where(
104 GroupChatSubscription.joined <= subscription.left
105 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
106 ]
109def _get_visible_admins_for_subscription(subscription):
110 """
111 If a user leaves a group chat, they shouldn't be able to see who's added
112 after they left
113 """
114 if not subscription.left:
115 # still in the chat, we see everyone with a current subscription
116 return [
117 sub.user_id
118 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
119 GroupChatSubscription.role == GroupChatRole.admin
120 )
121 ]
122 else:
123 # not in chat anymore, see everyone who was in chat when we left
124 return [
125 sub.user_id
126 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
127 .where(GroupChatSubscription.joined <= subscription.left)
128 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
129 ]
132def _user_can_message(session, context, group_chat: GroupChat) -> bool:
133 """
134 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant
135 - Is not deleted/banned
136 - Has not been blocked by the user or is blocking the user
137 - Has not left the chat
138 """
139 if not group_chat.is_dm:
140 return True
141 return session.execute(
142 func.exists(
143 select(GroupChatSubscription)
144 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id)
145 .where(GroupChatSubscription.user_id != context.user_id)
146 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
147 .where(GroupChatSubscription.left == None)
148 )
149 ).scalar_one()
152def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload):
153 """
154 Background job to generate notifications for a message sent to a group chat
155 """
156 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
158 with session_scope() as session:
159 message, group_chat = session.execute(
160 select(Message, GroupChat)
161 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
162 .where(Message.id == payload.message_id)
163 ).one()
165 if message.message_type != MessageType.text:
166 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
167 return []
169 context = make_background_user_context(user_id=message.author_id)
170 user_ids_to_notify = (
171 session.execute(
172 select(GroupChatSubscription.user_id)
173 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id)
174 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
175 .where(GroupChatSubscription.user_id != message.author_id)
176 .where(GroupChatSubscription.joined <= message.time)
177 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
178 .where(not_(GroupChatSubscription.is_muted))
179 )
180 .scalars()
181 .all()
182 )
184 if group_chat.is_dm:
185 msg = f"{message.author.name} sent you a message"
186 else:
187 msg = f"{message.author.name} sent a message in {group_chat.title}"
189 for user_id in user_ids_to_notify:
190 notify(
191 session,
192 user_id=user_id,
193 topic_action="chat:message",
194 key=message.conversation_id,
195 data=notification_data_pb2.ChatMessage(
196 author=user_model_to_pb(
197 message.author,
198 session,
199 make_background_user_context(user_id=user_id),
200 ),
201 message=msg,
202 text=message.text,
203 group_chat_id=message.conversation_id,
204 ),
205 )
208def _add_message_to_subscription(session, subscription, **kwargs):
209 """
210 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
212 Specify the keyword args for Message
213 """
214 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs)
216 session.add(message)
217 session.flush()
219 subscription.last_seen_message_id = message.id
221 queue_job(
222 session,
223 job_type="generate_message_notifications",
224 payload=jobs_pb2.GenerateMessageNotificationsPayload(
225 message_id=message.id,
226 ),
227 )
229 return message
232def _create_chat(session, creator_id, recipient_ids, title=None, only_admins_invite=True):
233 conversation = Conversation()
234 session.add(conversation)
235 session.flush()
237 chat = GroupChat(
238 conversation_id=conversation.id,
239 title=title,
240 creator_id=creator_id,
241 is_dm=True if len(recipient_ids) == 1 else False,
242 only_admins_invite=only_admins_invite,
243 )
244 session.add(chat)
245 session.flush()
247 creator_subscription = GroupChatSubscription(
248 user_id=creator_id,
249 group_chat=chat,
250 role=GroupChatRole.admin,
251 )
252 session.add(creator_subscription)
254 for uid in recipient_ids:
255 session.add(
256 GroupChatSubscription(
257 user_id=uid,
258 group_chat=chat,
259 role=GroupChatRole.participant,
260 )
261 )
263 return chat
266def _get_message_subscription(session, user_id, conversation_id):
267 subscription = session.execute(
268 select(GroupChatSubscription)
269 .where(GroupChatSubscription.group_chat_id == conversation_id)
270 .where(GroupChatSubscription.user_id == user_id)
271 .where(GroupChatSubscription.left == None)
272 ).scalar_one_or_none()
274 return subscription
277def _unseen_message_count(session, subscription_id):
278 return session.execute(
279 select(func.count())
280 .select_from(Message)
281 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
282 .where(GroupChatSubscription.id == subscription_id)
283 .where(Message.id > GroupChatSubscription.last_seen_message_id)
284 ).scalar_one()
287def _mute_info(subscription):
288 (muted, muted_until) = subscription.muted_display()
289 return conversations_pb2.MuteInfo(
290 muted=muted,
291 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
292 )
295class Conversations(conversations_pb2_grpc.ConversationsServicer):
296 def ListGroupChats(self, request, context, session):
297 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
298 page_size = min(page_size, MAX_PAGE_SIZE)
300 # select group chats where you have a subscription, and for each of
301 # these, the latest message from them
303 t = (
304 select(
305 GroupChatSubscription.group_chat_id.label("group_chat_id"),
306 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
307 func.max(Message.id).label("message_id"),
308 )
309 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
310 .where(GroupChatSubscription.user_id == context.user_id)
311 .where(Message.time >= GroupChatSubscription.joined)
312 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
313 .group_by(GroupChatSubscription.group_chat_id)
314 .order_by(func.max(Message.id).desc())
315 .subquery()
316 )
318 results = session.execute(
319 select(t, GroupChat, GroupChatSubscription, Message)
320 .join(Message, Message.id == t.c.message_id)
321 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
322 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
323 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0))
324 .order_by(t.c.message_id.desc())
325 .limit(page_size + 1)
326 ).all()
328 return conversations_pb2.ListGroupChatsRes(
329 group_chats=[
330 conversations_pb2.GroupChat(
331 group_chat_id=result.GroupChat.conversation_id,
332 title=result.GroupChat.title, # TODO: proper title for DMs, etc
333 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
334 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
335 only_admins_invite=result.GroupChat.only_admins_invite,
336 is_dm=result.GroupChat.is_dm,
337 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
338 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
339 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
340 latest_message=_message_to_pb(result.Message) if result.Message else None,
341 mute_info=_mute_info(result.GroupChatSubscription),
342 can_message=_user_can_message(session, context, result.GroupChat),
343 )
344 for result in results[:page_size]
345 ],
346 last_message_id=(
347 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
348 ), # TODO
349 no_more=len(results) <= page_size,
350 )
352 def GetGroupChat(self, request, context, session):
353 result = session.execute(
354 select(GroupChat, GroupChatSubscription, Message)
355 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
356 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
357 .where(GroupChatSubscription.user_id == context.user_id)
358 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
359 .where(Message.time >= GroupChatSubscription.joined)
360 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
361 .order_by(Message.id.desc())
362 .limit(1)
363 ).one_or_none()
365 if not result:
366 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
368 return conversations_pb2.GroupChat(
369 group_chat_id=result.GroupChat.conversation_id,
370 title=result.GroupChat.title,
371 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
372 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
373 only_admins_invite=result.GroupChat.only_admins_invite,
374 is_dm=result.GroupChat.is_dm,
375 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
376 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
377 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
378 latest_message=_message_to_pb(result.Message) if result.Message else None,
379 mute_info=_mute_info(result.GroupChatSubscription),
380 can_message=_user_can_message(session, context, result.GroupChat),
381 )
383 def GetDirectMessage(self, request, context, session):
384 count = func.count(GroupChatSubscription.id).label("count")
385 subquery = (
386 select(GroupChatSubscription.group_chat_id)
387 .where(
388 or_(
389 GroupChatSubscription.user_id == context.user_id,
390 GroupChatSubscription.user_id == request.user_id,
391 )
392 )
393 .where(GroupChatSubscription.left == None)
394 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
395 .where(GroupChat.is_dm == True)
396 .group_by(GroupChatSubscription.group_chat_id)
397 .having(count == 2)
398 .subquery()
399 )
401 result = session.execute(
402 select(subquery, GroupChat, GroupChatSubscription, Message)
403 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
404 .join(Message, Message.conversation_id == GroupChat.conversation_id)
405 .where(GroupChatSubscription.user_id == context.user_id)
406 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
407 .where(Message.time >= GroupChatSubscription.joined)
408 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
409 .order_by(Message.id.desc())
410 .limit(1)
411 ).one_or_none()
413 if not result:
414 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
416 return conversations_pb2.GroupChat(
417 group_chat_id=result.GroupChat.conversation_id,
418 title=result.GroupChat.title,
419 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
420 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
421 only_admins_invite=result.GroupChat.only_admins_invite,
422 is_dm=result.GroupChat.is_dm,
423 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
424 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
425 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
426 latest_message=_message_to_pb(result.Message) if result.Message else None,
427 mute_info=_mute_info(result.GroupChatSubscription),
428 can_message=_user_can_message(session, context, result.GroupChat),
429 )
431 def GetUpdates(self, request, context, session):
432 results = (
433 session.execute(
434 select(Message)
435 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
436 .where(GroupChatSubscription.user_id == context.user_id)
437 .where(Message.time >= GroupChatSubscription.joined)
438 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
439 .where(Message.id > request.newest_message_id)
440 .order_by(Message.id.asc())
441 .limit(DEFAULT_PAGINATION_LENGTH + 1)
442 )
443 .scalars()
444 .all()
445 )
447 return conversations_pb2.GetUpdatesRes(
448 updates=[
449 conversations_pb2.Update(
450 group_chat_id=message.conversation_id,
451 message=_message_to_pb(message),
452 )
453 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
454 ],
455 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
456 )
458 def GetGroupChatMessages(self, request, context, session):
459 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
460 page_size = min(page_size, MAX_PAGE_SIZE)
462 results = (
463 session.execute(
464 select(Message)
465 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
466 .where(GroupChatSubscription.user_id == context.user_id)
467 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
468 .where(Message.time >= GroupChatSubscription.joined)
469 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
470 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
471 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0))
472 .order_by(Message.id.desc())
473 .limit(page_size + 1)
474 )
475 .scalars()
476 .all()
477 )
479 return conversations_pb2.GetGroupChatMessagesRes(
480 messages=[_message_to_pb(message) for message in results[:page_size]],
481 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
482 no_more=len(results) <= page_size,
483 )
485 def MarkLastSeenGroupChat(self, request, context, session):
486 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
488 if not subscription:
489 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
491 if not subscription.last_seen_message_id <= request.last_seen_message_id:
492 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES)
494 subscription.last_seen_message_id = request.last_seen_message_id
496 # TODO: notify
498 return empty_pb2.Empty()
500 def MuteGroupChat(self, request, context, session):
501 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
503 if not subscription:
504 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
506 if request.unmute:
507 subscription.muted_until = DATETIME_MINUS_INFINITY
508 elif request.forever:
509 subscription.muted_until = DATETIME_INFINITY
510 elif request.for_duration:
511 duration = request.for_duration.ToTimedelta()
512 if duration < timedelta(seconds=0):
513 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MUTE_PAST)
514 subscription.muted_until = now() + duration
516 return empty_pb2.Empty()
518 def SearchMessages(self, request, context, session):
519 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
520 page_size = min(page_size, MAX_PAGE_SIZE)
522 results = (
523 session.execute(
524 select(Message)
525 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
526 .where(GroupChatSubscription.user_id == context.user_id)
527 .where(Message.time >= GroupChatSubscription.joined)
528 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
529 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
530 .where(Message.text.ilike(f"%{request.query}%"))
531 .order_by(Message.id.desc())
532 .limit(page_size + 1)
533 )
534 .scalars()
535 .all()
536 )
538 return conversations_pb2.SearchMessagesRes(
539 results=[
540 conversations_pb2.MessageSearchResult(
541 group_chat_id=message.conversation_id,
542 message=_message_to_pb(message),
543 )
544 for message in results[:page_size]
545 ],
546 last_message_id=results[-2].id if len(results) > 1 else 0,
547 no_more=len(results) <= page_size,
548 )
550 def CreateGroupChat(self, request, context, session):
551 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
552 if not user.has_completed_profile:
553 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE)
555 recipient_user_ids = list(
556 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids)))
557 .scalars()
558 .all()
559 )
561 # make sure all requested users are visible
562 if len(recipient_user_ids) != len(request.recipient_user_ids):
563 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND)
565 if not recipient_user_ids:
566 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS)
568 if len(recipient_user_ids) != len(set(recipient_user_ids)):
569 # make sure there's no duplicate users
570 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_RECIPIENTS)
572 if context.user_id in recipient_user_ids:
573 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF)
575 if len(recipient_user_ids) == 1:
576 # can only have one DM at a time between any two users
577 other_user_id = recipient_user_ids[0]
579 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
580 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
581 # chat, you know they already have a shared group chat
582 count = func.count(GroupChatSubscription.id).label("count")
583 if session.execute(
584 select(count)
585 .where(
586 or_(
587 GroupChatSubscription.user_id == context.user_id,
588 GroupChatSubscription.user_id == other_user_id,
589 )
590 )
591 .where(GroupChatSubscription.left == None)
592 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
593 .where(GroupChat.is_dm == True)
594 .group_by(GroupChatSubscription.group_chat_id)
595 .having(count == 2)
596 ).scalar_one_or_none():
597 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAVE_DM)
599 # Check if user has been initiating chats excessively
600 if process_rate_limits_and_check_abort(
601 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation
602 ):
603 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.CHAT_INITIATION_RATE_LIMIT)
605 group_chat = _create_chat(
606 session,
607 creator_id=context.user_id,
608 recipient_ids=request.recipient_user_ids,
609 title=request.title.value,
610 )
612 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id)
614 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
616 session.flush()
618 return conversations_pb2.GroupChat(
619 group_chat_id=group_chat.conversation_id,
620 title=group_chat.title,
621 member_user_ids=_get_visible_members_for_subscription(your_subscription),
622 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
623 only_admins_invite=group_chat.only_admins_invite,
624 is_dm=group_chat.is_dm,
625 created=Timestamp_from_datetime(group_chat.conversation.created),
626 mute_info=_mute_info(your_subscription),
627 can_message=True,
628 )
630 def SendMessage(self, request, context, session):
631 if request.text == "":
632 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
634 result = session.execute(
635 select(GroupChatSubscription, GroupChat)
636 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
637 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
638 .where(GroupChatSubscription.user_id == context.user_id)
639 .where(GroupChatSubscription.left == None)
640 ).one_or_none()
641 if not result:
642 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
644 subscription, group_chat = result
645 if not _user_can_message(session, context, group_chat):
646 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MESSAGE_IN_CHAT)
648 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
650 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
651 sent_messages_counter.labels(
652 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
653 ).inc()
655 return empty_pb2.Empty()
657 def SendDirectMessage(self, request, context, session):
658 user_id = context.user_id
659 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
661 recipient_id = request.recipient_user_id
663 if not user.has_completed_profile:
664 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_MESSAGE)
666 if not recipient_id:
667 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_RECIPIENTS)
669 recipient_user_id = session.execute(
670 select(User.id).where_users_visible(context).where(User.id == recipient_id)
671 ).scalar_one_or_none()
673 if not recipient_user_id:
674 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.USER_NOT_FOUND)
676 if user_id == recipient_id:
677 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_ADD_SELF)
679 if request.text == "":
680 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
682 # Look for an existing direct message (DM) chat between the two users
683 dm_chat_ids = (
684 select(GroupChatSubscription.group_chat_id)
685 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id]))
686 .group_by(GroupChatSubscription.group_chat_id)
687 .having(func.count(GroupChatSubscription.user_id) == 2)
688 )
690 chat = session.execute(
691 select(GroupChat).where(GroupChat.is_dm == True).where(GroupChat.conversation_id.in_(dm_chat_ids)).limit(1)
692 ).scalar_one_or_none()
694 if not chat:
695 chat = _create_chat(session, user_id, [recipient_id])
697 # Retrieve the sender's active subscription to the chat
698 subscription = _get_message_subscription(session, user_id, chat.conversation_id)
700 # Add the message to the conversation
701 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
703 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one()
704 sent_messages_counter.labels(user_gender, "direct message").inc()
706 session.flush()
708 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id)
710 def EditGroupChat(self, request, context, session):
711 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
713 if not subscription:
714 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
716 if subscription.role != GroupChatRole.admin:
717 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_EDIT)
719 if request.HasField("title"):
720 subscription.group_chat.title = request.title.value
722 if request.HasField("only_admins_invite"):
723 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
725 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
727 return empty_pb2.Empty()
729 def MakeGroupChatAdmin(self, request, context, session):
730 if not session.execute(
731 select(User).where_users_visible(context).where(User.id == request.user_id)
732 ).scalar_one_or_none():
733 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
735 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
737 if not your_subscription:
738 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
740 if your_subscription.role != GroupChatRole.admin:
741 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_MAKE_ADMIN)
743 if request.user_id == context.user_id:
744 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_MAKE_SELF_ADMIN)
746 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
748 if not their_subscription:
749 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT)
751 if their_subscription.role != GroupChatRole.participant:
752 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_ADMIN)
754 their_subscription.role = GroupChatRole.admin
756 _add_message_to_subscription(
757 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
758 )
760 return empty_pb2.Empty()
762 def RemoveGroupChatAdmin(self, request, context, session):
763 if not session.execute(
764 select(User).where_users_visible(context).where(User.id == request.user_id)
765 ).scalar_one_or_none():
766 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
768 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
770 if not your_subscription:
771 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
773 if request.user_id == context.user_id:
774 # Race condition!
775 other_admins_count = session.execute(
776 select(func.count())
777 .select_from(GroupChatSubscription)
778 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
779 .where(GroupChatSubscription.user_id != context.user_id)
780 .where(GroupChatSubscription.role == GroupChatRole.admin)
781 .where(GroupChatSubscription.left == None)
782 ).scalar_one()
783 if not other_admins_count > 0:
784 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_LAST_ADMIN)
786 if your_subscription.role != GroupChatRole.admin:
787 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_ADMIN)
789 their_subscription = session.execute(
790 select(GroupChatSubscription)
791 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
792 .where(GroupChatSubscription.user_id == request.user_id)
793 .where(GroupChatSubscription.left == None)
794 .where(GroupChatSubscription.role == GroupChatRole.admin)
795 ).scalar_one_or_none()
797 if not their_subscription:
798 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
800 their_subscription.role = GroupChatRole.participant
802 _add_message_to_subscription(
803 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
804 )
806 return empty_pb2.Empty()
808 def InviteToGroupChat(self, request, context, session):
809 if not session.execute(
810 select(User).where_users_visible(context).where(User.id == request.user_id)
811 ).scalar_one_or_none():
812 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
814 result = session.execute(
815 select(GroupChatSubscription, GroupChat)
816 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
817 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
818 .where(GroupChatSubscription.user_id == context.user_id)
819 .where(GroupChatSubscription.left == None)
820 ).one_or_none()
822 if not result:
823 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
825 your_subscription, group_chat = result
827 if not your_subscription or not group_chat:
828 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
830 if request.user_id == context.user_id:
831 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_SELF)
833 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
834 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVITE_PERMISSION_DENIED)
836 if group_chat.is_dm:
837 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_INVITE_TO_DM)
839 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
841 if their_subscription:
842 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_CHAT)
844 # TODO: race condition!
846 subscription = GroupChatSubscription(
847 user_id=request.user_id,
848 group_chat=your_subscription.group_chat,
849 role=GroupChatRole.participant,
850 )
851 session.add(subscription)
853 _add_message_to_subscription(
854 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
855 )
857 return empty_pb2.Empty()
859 def RemoveGroupChatUser(self, request, context, session):
860 """
861 1. Get admin info and check it's correct
862 2. Get user data, check it's correct and remove user
863 """
864 # Admin info
865 your_subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
867 # if user info is missing
868 if not your_subscription:
869 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
871 # if user not admin
872 if your_subscription.role != GroupChatRole.admin:
873 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.ONLY_ADMIN_CAN_REMOVE_USER)
875 # if user wants to remove themselves
876 if request.user_id == context.user_id:
877 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_REMOVE_SELF)
879 # get user info
880 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
882 # user not found
883 if not their_subscription:
884 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_CHAT)
886 _add_message_to_subscription(
887 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
888 )
890 their_subscription.left = func.now()
892 return empty_pb2.Empty()
894 def LeaveGroupChat(self, request, context, session):
895 subscription = _get_message_subscription(session, context.user_id, request.group_chat_id)
897 if not subscription:
898 context.abort(grpc.StatusCode.NOT_FOUND, errors.CHAT_NOT_FOUND)
900 if subscription.role == GroupChatRole.admin:
901 other_admins_count = session.execute(
902 select(func.count())
903 .select_from(GroupChatSubscription)
904 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
905 .where(GroupChatSubscription.user_id != context.user_id)
906 .where(GroupChatSubscription.role == GroupChatRole.admin)
907 .where(GroupChatSubscription.left == None)
908 ).scalar_one()
909 participants_count = session.execute(
910 select(func.count())
911 .select_from(GroupChatSubscription)
912 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
913 .where(GroupChatSubscription.user_id != context.user_id)
914 .where(GroupChatSubscription.role == GroupChatRole.participant)
915 .where(GroupChatSubscription.left == None)
916 ).scalar_one()
917 if not (other_admins_count > 0 or participants_count == 0):
918 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.LAST_ADMIN_CANT_LEAVE)
920 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
922 subscription.left = func.now()
924 return empty_pb2.Empty()