Coverage for src/couchers/servicers/conversations.py: 91%
296 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-24 15:28 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-24 15:28 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.orm import Session
7from sqlalchemy.sql import func, not_, or_
9from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
10from couchers.context import CouchersContext, make_background_user_context
11from couchers.db import session_scope
12from couchers.jobs.enqueue import queue_job
13from couchers.metrics import sent_messages_counter
14from couchers.models import (
15 Conversation,
16 GroupChat,
17 GroupChatRole,
18 GroupChatSubscription,
19 Message,
20 MessageType,
21 ModerationObjectType,
22 RateLimitAction,
23 User,
24)
25from couchers.moderation.utils import create_moderation
26from couchers.notifications.notify import notify
27from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
28from couchers.proto.internal import jobs_pb2
29from couchers.rate_limits.check import process_rate_limits_and_check_abort
30from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
31from couchers.servicers.api import user_model_to_pb
32from couchers.sql import couchers_select as select
33from couchers.utils import Timestamp_from_datetime, now
35logger = logging.getLogger(__name__)
37# TODO: Still needs custom pagination: GetUpdates
38DEFAULT_PAGINATION_LENGTH = 20
39MAX_PAGE_SIZE = 50
42def _message_to_pb(message: Message) -> conversations_pb2.Message:
43 """
44 Turns the given message to a protocol buffer
45 """
46 if message.is_normal_message:
47 return conversations_pb2.Message(
48 message_id=message.id,
49 author_user_id=message.author_id,
50 time=Timestamp_from_datetime(message.time),
51 text=conversations_pb2.MessageContentText(text=message.text),
52 )
53 else:
54 return conversations_pb2.Message(
55 message_id=message.id,
56 author_user_id=message.author_id,
57 time=Timestamp_from_datetime(message.time),
58 chat_created=(
59 conversations_pb2.MessageContentChatCreated()
60 if message.message_type == MessageType.chat_created
61 else None
62 ),
63 chat_edited=(
64 conversations_pb2.MessageContentChatEdited()
65 if message.message_type == MessageType.chat_edited
66 else None
67 ),
68 user_invited=(
69 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
70 if message.message_type == MessageType.user_invited
71 else None
72 ),
73 user_left=(
74 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
75 ),
76 user_made_admin=(
77 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
78 if message.message_type == MessageType.user_made_admin
79 else None
80 ),
81 user_removed_admin=(
82 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
83 if message.message_type == MessageType.user_removed_admin
84 else None
85 ),
86 group_chat_user_removed=(
87 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
88 if message.message_type == MessageType.user_removed
89 else None
90 ),
91 )
94def _get_visible_members_for_subscription(subscription):
95 """
96 If a user leaves a group chat, they shouldn't be able to see who's added
97 after they left
98 """
99 if not subscription.left:
100 # still in the chat, we see everyone with a current subscription
101 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
102 else:
103 # not in chat anymore, see everyone who was in chat when we left
104 return [
105 sub.user_id
106 for sub in subscription.group_chat.subscriptions.where(
107 GroupChatSubscription.joined <= subscription.left
108 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
109 ]
112def _get_visible_admins_for_subscription(subscription):
113 """
114 If a user leaves a group chat, they shouldn't be able to see who's added
115 after they left
116 """
117 if not subscription.left:
118 # still in the chat, we see everyone with a current subscription
119 return [
120 sub.user_id
121 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
122 GroupChatSubscription.role == GroupChatRole.admin
123 )
124 ]
125 else:
126 # not in chat anymore, see everyone who was in chat when we left
127 return [
128 sub.user_id
129 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
130 .where(GroupChatSubscription.joined <= subscription.left)
131 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
132 ]
135def _user_can_message(session, context, group_chat: GroupChat) -> bool:
136 """
137 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant
138 - Is not deleted/banned
139 - Has not been blocked by the user or is blocking the user
140 - Has not left the chat
141 """
142 if not group_chat.is_dm:
143 return True
144 return session.execute(
145 func.exists(
146 select(GroupChatSubscription)
147 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id)
148 .where(GroupChatSubscription.user_id != context.user_id)
149 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
150 .where(GroupChatSubscription.left == None)
151 )
152 ).scalar_one()
155def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload):
156 """
157 Background job to generate notifications for a message sent to a group chat
158 """
159 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
161 with session_scope() as session:
162 message, group_chat = session.execute(
163 select(Message, GroupChat)
164 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
165 .where(Message.id == payload.message_id)
166 ).one()
168 if message.message_type != MessageType.text:
169 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
170 return []
172 context = make_background_user_context(user_id=message.author_id)
173 user_ids_to_notify = (
174 session.execute(
175 select(GroupChatSubscription.user_id)
176 .where_users_column_visible(context=context, column=GroupChatSubscription.user_id)
177 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
178 .where(GroupChatSubscription.user_id != message.author_id)
179 .where(GroupChatSubscription.joined <= message.time)
180 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
181 .where(not_(GroupChatSubscription.is_muted))
182 )
183 .scalars()
184 .all()
185 )
187 if group_chat.is_dm:
188 msg = f"{message.author.name} sent you a message"
189 else:
190 msg = f"{message.author.name} sent a message in {group_chat.title}"
192 for user_id in user_ids_to_notify:
193 notify(
194 session,
195 user_id=user_id,
196 topic_action="chat:message",
197 key=str(message.conversation_id),
198 data=notification_data_pb2.ChatMessage(
199 author=user_model_to_pb(
200 message.author,
201 session,
202 make_background_user_context(user_id=user_id),
203 ),
204 message=msg,
205 text=message.text,
206 group_chat_id=message.conversation_id,
207 ),
208 moderation_state_id=group_chat.moderation_state_id,
209 )
212def _add_message_to_subscription(session, subscription, **kwargs):
213 """
214 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
216 Specify the keyword args for Message
217 """
218 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs)
220 session.add(message)
221 session.flush()
223 subscription.last_seen_message_id = message.id
225 queue_job(
226 session,
227 job_type="generate_message_notifications",
228 payload=jobs_pb2.GenerateMessageNotificationsPayload(
229 message_id=message.id,
230 ),
231 )
233 return message
236def _create_chat(session, creator_id, recipient_ids, title=None, only_admins_invite=True):
237 conversation = Conversation()
238 session.add(conversation)
239 session.flush()
241 # Create moderation state for UMS (starts as SHADOWED)
242 moderation_state = create_moderation(
243 session=session,
244 object_type=ModerationObjectType.GROUP_CHAT,
245 object_id=conversation.id,
246 creator_user_id=creator_id,
247 )
249 chat = GroupChat(
250 conversation_id=conversation.id,
251 title=title,
252 creator_id=creator_id,
253 is_dm=True if len(recipient_ids) == 1 else False,
254 only_admins_invite=only_admins_invite,
255 moderation_state_id=moderation_state.id,
256 )
257 session.add(chat)
258 session.flush()
260 creator_subscription = GroupChatSubscription(
261 user_id=creator_id,
262 group_chat=chat,
263 role=GroupChatRole.admin,
264 )
265 session.add(creator_subscription)
267 for uid in recipient_ids:
268 session.add(
269 GroupChatSubscription(
270 user_id=uid,
271 group_chat=chat,
272 role=GroupChatRole.participant,
273 )
274 )
276 return chat
279def _get_message_subscription(session, user_id, conversation_id):
280 subscription = session.execute(
281 select(GroupChatSubscription)
282 .where(GroupChatSubscription.group_chat_id == conversation_id)
283 .where(GroupChatSubscription.user_id == user_id)
284 .where(GroupChatSubscription.left == None)
285 ).scalar_one_or_none()
287 return subscription
290def _get_visible_message_subscription(session, context, conversation_id):
291 """Get subscription with visibility filtering"""
292 subscription = session.execute(
293 select(GroupChatSubscription)
294 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
295 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
296 .where(GroupChatSubscription.group_chat_id == conversation_id)
297 .where(GroupChatSubscription.user_id == context.user_id)
298 .where(GroupChatSubscription.left == None)
299 ).scalar_one_or_none()
301 return subscription
304def _unseen_message_count(session, subscription_id):
305 return session.execute(
306 select(func.count())
307 .select_from(Message)
308 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
309 .where(GroupChatSubscription.id == subscription_id)
310 .where(Message.id > GroupChatSubscription.last_seen_message_id)
311 ).scalar_one()
314def _mute_info(subscription):
315 (muted, muted_until) = subscription.muted_display()
316 return conversations_pb2.MuteInfo(
317 muted=muted,
318 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
319 )
322class Conversations(conversations_pb2_grpc.ConversationsServicer):
323 def ListGroupChats(
324 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session
325 ) -> conversations_pb2.ListGroupChatsRes:
326 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
327 page_size = min(page_size, MAX_PAGE_SIZE)
329 # select group chats where you have a subscription, and for each of
330 # these, the latest message from them
332 t = (
333 select(
334 GroupChatSubscription.group_chat_id.label("group_chat_id"),
335 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
336 func.max(Message.id).label("message_id"),
337 )
338 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
339 .where(GroupChatSubscription.user_id == context.user_id)
340 .where(Message.time >= GroupChatSubscription.joined)
341 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
342 .group_by(GroupChatSubscription.group_chat_id)
343 .order_by(func.max(Message.id).desc())
344 .subquery()
345 )
347 results = session.execute(
348 select(t, GroupChat, GroupChatSubscription, Message)
349 .join(Message, Message.id == t.c.message_id)
350 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
351 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
352 .where_moderated_content_visible(context, GroupChat, is_list_operation=True)
353 .where(or_(t.c.message_id < request.last_message_id, request.last_message_id == 0))
354 .order_by(t.c.message_id.desc())
355 .limit(page_size + 1)
356 ).all()
358 return conversations_pb2.ListGroupChatsRes(
359 group_chats=[
360 conversations_pb2.GroupChat(
361 group_chat_id=result.GroupChat.conversation_id,
362 title=result.GroupChat.title, # TODO: proper title for DMs, etc
363 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
364 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
365 only_admins_invite=result.GroupChat.only_admins_invite,
366 is_dm=result.GroupChat.is_dm,
367 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
368 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
369 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
370 latest_message=_message_to_pb(result.Message) if result.Message else None,
371 mute_info=_mute_info(result.GroupChatSubscription),
372 can_message=_user_can_message(session, context, result.GroupChat),
373 )
374 for result in results[:page_size]
375 ],
376 last_message_id=(
377 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
378 ), # TODO
379 no_more=len(results) <= page_size,
380 )
382 def GetGroupChat(
383 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session
384 ) -> conversations_pb2.GroupChat:
385 result = session.execute(
386 select(GroupChat, GroupChatSubscription, Message)
387 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
388 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
389 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
390 .where(GroupChatSubscription.user_id == context.user_id)
391 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
392 .where(Message.time >= GroupChatSubscription.joined)
393 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
394 .order_by(Message.id.desc())
395 .limit(1)
396 ).one_or_none()
398 if not result:
399 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
401 return conversations_pb2.GroupChat(
402 group_chat_id=result.GroupChat.conversation_id,
403 title=result.GroupChat.title,
404 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
405 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
406 only_admins_invite=result.GroupChat.only_admins_invite,
407 is_dm=result.GroupChat.is_dm,
408 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
409 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
410 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
411 latest_message=_message_to_pb(result.Message) if result.Message else None,
412 mute_info=_mute_info(result.GroupChatSubscription),
413 can_message=_user_can_message(session, context, result.GroupChat),
414 )
416 def GetDirectMessage(
417 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session
418 ) -> conversations_pb2.GroupChat:
419 count = func.count(GroupChatSubscription.id).label("count")
420 subquery = (
421 select(GroupChatSubscription.group_chat_id)
422 .where(
423 or_(
424 GroupChatSubscription.user_id == context.user_id,
425 GroupChatSubscription.user_id == request.user_id,
426 )
427 )
428 .where(GroupChatSubscription.left == None)
429 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
430 .where(GroupChat.is_dm == True)
431 .group_by(GroupChatSubscription.group_chat_id)
432 .having(count == 2)
433 .subquery()
434 )
436 result = session.execute(
437 select(subquery, GroupChat, GroupChatSubscription, Message)
438 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
439 .join(Message, Message.conversation_id == GroupChat.conversation_id)
440 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
441 .where(GroupChatSubscription.user_id == context.user_id)
442 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
443 .where(Message.time >= GroupChatSubscription.joined)
444 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
445 .order_by(Message.id.desc())
446 .limit(1)
447 ).one_or_none()
449 if not result:
450 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
452 return conversations_pb2.GroupChat(
453 group_chat_id=result.GroupChat.conversation_id,
454 title=result.GroupChat.title,
455 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
456 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
457 only_admins_invite=result.GroupChat.only_admins_invite,
458 is_dm=result.GroupChat.is_dm,
459 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
460 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
461 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
462 latest_message=_message_to_pb(result.Message) if result.Message else None,
463 mute_info=_mute_info(result.GroupChatSubscription),
464 can_message=_user_can_message(session, context, result.GroupChat),
465 )
467 def GetUpdates(
468 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session
469 ) -> conversations_pb2.GetUpdatesRes:
470 results = (
471 session.execute(
472 select(Message)
473 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
474 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
475 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
476 .where(GroupChatSubscription.user_id == context.user_id)
477 .where(Message.time >= GroupChatSubscription.joined)
478 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
479 .where(Message.id > request.newest_message_id)
480 .order_by(Message.id.asc())
481 .limit(DEFAULT_PAGINATION_LENGTH + 1)
482 )
483 .scalars()
484 .all()
485 )
487 return conversations_pb2.GetUpdatesRes(
488 updates=[
489 conversations_pb2.Update(
490 group_chat_id=message.conversation_id,
491 message=_message_to_pb(message),
492 )
493 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
494 ],
495 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
496 )
498 def GetGroupChatMessages(
499 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session
500 ) -> conversations_pb2.GetGroupChatMessagesRes:
501 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
502 page_size = min(page_size, MAX_PAGE_SIZE)
504 results = (
505 session.execute(
506 select(Message)
507 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
508 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
509 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
510 .where(GroupChatSubscription.user_id == context.user_id)
511 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
512 .where(Message.time >= GroupChatSubscription.joined)
513 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
514 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
515 .where(or_(Message.id > GroupChatSubscription.last_seen_message_id, request.only_unseen == 0))
516 .order_by(Message.id.desc())
517 .limit(page_size + 1)
518 )
519 .scalars()
520 .all()
521 )
523 return conversations_pb2.GetGroupChatMessagesRes(
524 messages=[_message_to_pb(message) for message in results[:page_size]],
525 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
526 no_more=len(results) <= page_size,
527 )
529 def MarkLastSeenGroupChat(
530 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session
531 ) -> empty_pb2.Empty:
532 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
534 if not subscription:
535 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
537 if not subscription.last_seen_message_id <= request.last_seen_message_id:
538 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
540 subscription.last_seen_message_id = request.last_seen_message_id
542 return empty_pb2.Empty()
544 def MuteGroupChat(
545 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session
546 ) -> empty_pb2.Empty:
547 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
549 if not subscription:
550 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
552 if request.unmute:
553 subscription.muted_until = DATETIME_MINUS_INFINITY
554 elif request.forever:
555 subscription.muted_until = DATETIME_INFINITY
556 elif request.for_duration:
557 duration = request.for_duration.ToTimedelta()
558 if duration < timedelta(seconds=0):
559 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past")
560 subscription.muted_until = now() + duration
562 return empty_pb2.Empty()
564 def SearchMessages(
565 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session
566 ) -> conversations_pb2.SearchMessagesRes:
567 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
568 page_size = min(page_size, MAX_PAGE_SIZE)
570 results = (
571 session.execute(
572 select(Message)
573 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
574 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
575 .where_moderated_content_visible(context, GroupChat, is_list_operation=True)
576 .where(GroupChatSubscription.user_id == context.user_id)
577 .where(Message.time >= GroupChatSubscription.joined)
578 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
579 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
580 .where(Message.text.ilike(f"%{request.query}%"))
581 .order_by(Message.id.desc())
582 .limit(page_size + 1)
583 )
584 .scalars()
585 .all()
586 )
588 return conversations_pb2.SearchMessagesRes(
589 results=[
590 conversations_pb2.MessageSearchResult(
591 group_chat_id=message.conversation_id,
592 message=_message_to_pb(message),
593 )
594 for message in results[:page_size]
595 ],
596 last_message_id=results[-2].id if len(results) > 1 else 0,
597 no_more=len(results) <= page_size,
598 )
600 def CreateGroupChat(
601 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session
602 ) -> conversations_pb2.GroupChat:
603 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
604 if not user.has_completed_profile:
605 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
607 recipient_user_ids = list(
608 session.execute(select(User.id).where_users_visible(context).where(User.id.in_(request.recipient_user_ids)))
609 .scalars()
610 .all()
611 )
613 # make sure all requested users are visible
614 if len(recipient_user_ids) != len(request.recipient_user_ids):
615 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
617 if not recipient_user_ids:
618 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
620 if len(recipient_user_ids) != len(set(recipient_user_ids)):
621 # make sure there's no duplicate users
622 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients")
624 if context.user_id in recipient_user_ids:
625 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
627 if len(recipient_user_ids) == 1:
628 # can only have one DM at a time between any two users
629 other_user_id = recipient_user_ids[0]
631 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
632 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
633 # chat, you know they already have a shared group chat
634 count = func.count(GroupChatSubscription.id).label("count")
635 if session.execute(
636 select(count)
637 .where(
638 or_(
639 GroupChatSubscription.user_id == context.user_id,
640 GroupChatSubscription.user_id == other_user_id,
641 )
642 )
643 .where(GroupChatSubscription.left == None)
644 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
645 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
646 .where(GroupChat.is_dm == True)
647 .group_by(GroupChatSubscription.group_chat_id)
648 .having(count == 2)
649 ).scalar_one_or_none():
650 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm")
652 # Check if user has been initiating chats excessively
653 if process_rate_limits_and_check_abort(
654 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation
655 ):
656 context.abort_with_error_code(
657 grpc.StatusCode.RESOURCE_EXHAUSTED,
658 "chat_initiation_rate_limit",
659 substitutions={"hours": RATE_LIMIT_HOURS},
660 )
662 group_chat = _create_chat(
663 session,
664 creator_id=context.user_id,
665 recipient_ids=request.recipient_user_ids,
666 title=request.title.value,
667 )
669 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id)
671 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
673 session.flush()
675 return conversations_pb2.GroupChat(
676 group_chat_id=group_chat.conversation_id,
677 title=group_chat.title,
678 member_user_ids=_get_visible_members_for_subscription(your_subscription),
679 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
680 only_admins_invite=group_chat.only_admins_invite,
681 is_dm=group_chat.is_dm,
682 created=Timestamp_from_datetime(group_chat.conversation.created),
683 mute_info=_mute_info(your_subscription),
684 can_message=True,
685 )
687 def SendMessage(
688 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session
689 ) -> empty_pb2.Empty:
690 if request.text == "":
691 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
693 result = session.execute(
694 select(GroupChatSubscription, GroupChat)
695 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
696 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
697 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
698 .where(GroupChatSubscription.user_id == context.user_id)
699 .where(GroupChatSubscription.left == None)
700 ).one_or_none()
701 if not result:
702 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
704 subscription, group_chat = result
705 if not _user_can_message(session, context, group_chat):
706 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat")
708 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
710 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
711 sent_messages_counter.labels(
712 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
713 ).inc()
715 return empty_pb2.Empty()
717 def SendDirectMessage(
718 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session
719 ) -> conversations_pb2.SendDirectMessageRes:
720 user_id = context.user_id
721 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
723 recipient_id = request.recipient_user_id
725 if not user.has_completed_profile:
726 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
728 if not recipient_id:
729 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
731 recipient_user_id = session.execute(
732 select(User.id).where_users_visible(context).where(User.id == recipient_id)
733 ).scalar_one_or_none()
735 if not recipient_user_id:
736 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
738 if user_id == recipient_id:
739 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
741 if request.text == "":
742 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
744 # Look for an existing direct message (DM) chat between the two users
745 dm_chat_ids = (
746 select(GroupChatSubscription.group_chat_id)
747 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id]))
748 .group_by(GroupChatSubscription.group_chat_id)
749 .having(func.count(GroupChatSubscription.user_id) == 2)
750 )
752 chat = session.execute(
753 select(GroupChat)
754 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
755 .where(GroupChat.is_dm == True)
756 .where(GroupChat.conversation_id.in_(dm_chat_ids))
757 .limit(1)
758 ).scalar_one_or_none()
760 if not chat:
761 chat = _create_chat(session, user_id, [recipient_id])
763 # Retrieve the sender's active subscription to the chat
764 subscription = _get_message_subscription(session, user_id, chat.conversation_id)
766 # Add the message to the conversation
767 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
769 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one()
770 sent_messages_counter.labels(user_gender, "direct message").inc()
772 session.flush()
774 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id)
776 def EditGroupChat(
777 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session
778 ) -> empty_pb2.Empty:
779 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
781 if not subscription:
782 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
784 if subscription.role != GroupChatRole.admin:
785 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit")
787 if request.HasField("title"):
788 subscription.group_chat.title = request.title.value
790 if request.HasField("only_admins_invite"):
791 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
793 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
795 return empty_pb2.Empty()
797 def MakeGroupChatAdmin(
798 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session
799 ) -> empty_pb2.Empty:
800 if not session.execute(
801 select(User).where_users_visible(context).where(User.id == request.user_id)
802 ).scalar_one_or_none():
803 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
805 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
807 if not your_subscription:
808 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
810 if your_subscription.role != GroupChatRole.admin:
811 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin")
813 if request.user_id == context.user_id:
814 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin")
816 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
818 if not their_subscription:
819 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
821 if their_subscription.role != GroupChatRole.participant:
822 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin")
824 their_subscription.role = GroupChatRole.admin
826 _add_message_to_subscription(
827 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
828 )
830 return empty_pb2.Empty()
832 def RemoveGroupChatAdmin(
833 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session
834 ) -> empty_pb2.Empty:
835 if not session.execute(
836 select(User).where_users_visible(context).where(User.id == request.user_id)
837 ).scalar_one_or_none():
838 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
840 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
842 if not your_subscription:
843 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
845 if request.user_id == context.user_id:
846 # Race condition!
847 other_admins_count = session.execute(
848 select(func.count())
849 .select_from(GroupChatSubscription)
850 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
851 .where(GroupChatSubscription.user_id != context.user_id)
852 .where(GroupChatSubscription.role == GroupChatRole.admin)
853 .where(GroupChatSubscription.left == None)
854 ).scalar_one()
855 if not other_admins_count > 0:
856 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin")
858 if your_subscription.role != GroupChatRole.admin:
859 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin")
861 their_subscription = session.execute(
862 select(GroupChatSubscription)
863 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
864 .where(GroupChatSubscription.user_id == request.user_id)
865 .where(GroupChatSubscription.left == None)
866 .where(GroupChatSubscription.role == GroupChatRole.admin)
867 ).scalar_one_or_none()
869 if not their_subscription:
870 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin")
872 their_subscription.role = GroupChatRole.participant
874 _add_message_to_subscription(
875 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
876 )
878 return empty_pb2.Empty()
880 def InviteToGroupChat(
881 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session
882 ) -> empty_pb2.Empty:
883 if not session.execute(
884 select(User).where_users_visible(context).where(User.id == request.user_id)
885 ).scalar_one_or_none():
886 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
888 result = session.execute(
889 select(GroupChatSubscription, GroupChat)
890 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
891 .where_moderated_content_visible(context, GroupChat, is_list_operation=False)
892 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
893 .where(GroupChatSubscription.user_id == context.user_id)
894 .where(GroupChatSubscription.left == None)
895 ).one_or_none()
897 if not result:
898 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
900 your_subscription, group_chat = result
902 if request.user_id == context.user_id:
903 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self")
905 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
906 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied")
908 if group_chat.is_dm:
909 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm")
911 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
913 if their_subscription:
914 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat")
916 # TODO: race condition!
918 subscription = GroupChatSubscription(
919 user_id=request.user_id,
920 group_chat=your_subscription.group_chat,
921 role=GroupChatRole.participant,
922 )
923 session.add(subscription)
925 _add_message_to_subscription(
926 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
927 )
929 return empty_pb2.Empty()
931 def RemoveGroupChatUser(
932 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session
933 ) -> empty_pb2.Empty:
934 """
935 1. Get admin info and check it's correct
936 2. Get user data, check it's correct and remove user
937 """
938 # Admin info
939 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
941 # if user info is missing
942 if not your_subscription:
943 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
945 # if user not admin
946 if your_subscription.role != GroupChatRole.admin:
947 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user")
949 # if user wants to remove themselves
950 if request.user_id == context.user_id:
951 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self")
953 # get user info
954 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
956 # user not found
957 if not their_subscription:
958 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
960 _add_message_to_subscription(
961 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
962 )
964 their_subscription.left = func.now()
966 return empty_pb2.Empty()
968 def LeaveGroupChat(
969 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session
970 ) -> empty_pb2.Empty:
971 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
973 if not subscription:
974 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
976 if subscription.role == GroupChatRole.admin:
977 other_admins_count = session.execute(
978 select(func.count())
979 .select_from(GroupChatSubscription)
980 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
981 .where(GroupChatSubscription.user_id != context.user_id)
982 .where(GroupChatSubscription.role == GroupChatRole.admin)
983 .where(GroupChatSubscription.left == None)
984 ).scalar_one()
985 participants_count = session.execute(
986 select(func.count())
987 .select_from(GroupChatSubscription)
988 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
989 .where(GroupChatSubscription.user_id != context.user_id)
990 .where(GroupChatSubscription.role == GroupChatRole.participant)
991 .where(GroupChatSubscription.left == None)
992 ).scalar_one()
993 if not (other_admins_count > 0 or participants_count == 0):
994 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave")
996 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
998 subscription.left = func.now()
1000 return empty_pb2.Empty()