Coverage for src / couchers / servicers / conversations.py: 88%
309 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-26 03:52 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-26 03:52 +0000
1import logging
2from collections.abc import Sequence
3from datetime import timedelta
4from typing import Any, cast
6import grpc
7from google.protobuf import empty_pb2
8from sqlalchemy import select
9from sqlalchemy.orm import Session
10from sqlalchemy.sql import func, not_, or_
12from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
13from couchers.context import CouchersContext, make_background_user_context
14from couchers.db import session_scope
15from couchers.helpers.completed_profile import has_completed_profile
16from couchers.jobs.enqueue import queue_job
17from couchers.metrics import sent_messages_counter
18from couchers.models import (
19 Conversation,
20 GroupChat,
21 GroupChatRole,
22 GroupChatSubscription,
23 Message,
24 MessageType,
25 ModerationObjectType,
26 RateLimitAction,
27 User,
28)
29from couchers.models.notifications import NotificationTopicAction
30from couchers.moderation.utils import create_moderation
31from couchers.notifications.notify import notify
32from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
33from couchers.proto.internal import jobs_pb2
34from couchers.rate_limits.check import process_rate_limits_and_check_abort
35from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
36from couchers.servicers.api import user_model_to_pb
37from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
38from couchers.utils import Timestamp_from_datetime, now
40logger = logging.getLogger(__name__)
42# TODO: Still needs custom pagination: GetUpdates
43DEFAULT_PAGINATION_LENGTH = 20
44MAX_PAGE_SIZE = 50
47def _message_to_pb(message: Message) -> conversations_pb2.Message:
48 """
49 Turns the given message to a protocol buffer
50 """
51 if message.is_normal_message:
52 return conversations_pb2.Message(
53 message_id=message.id,
54 author_user_id=message.author_id,
55 time=Timestamp_from_datetime(message.time),
56 text=conversations_pb2.MessageContentText(text=message.text),
57 )
58 else:
59 return conversations_pb2.Message(
60 message_id=message.id,
61 author_user_id=message.author_id,
62 time=Timestamp_from_datetime(message.time),
63 chat_created=(
64 conversations_pb2.MessageContentChatCreated()
65 if message.message_type == MessageType.chat_created
66 else None
67 ),
68 chat_edited=(
69 conversations_pb2.MessageContentChatEdited()
70 if message.message_type == MessageType.chat_edited
71 else None
72 ),
73 user_invited=(
74 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
75 if message.message_type == MessageType.user_invited
76 else None
77 ),
78 user_left=(
79 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
80 ),
81 user_made_admin=(
82 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
83 if message.message_type == MessageType.user_made_admin
84 else None
85 ),
86 user_removed_admin=(
87 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
88 if message.message_type == MessageType.user_removed_admin
89 else None
90 ),
91 group_chat_user_removed=(
92 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
93 if message.message_type == MessageType.user_removed
94 else None
95 ),
96 )
99def _get_visible_members_for_subscription(subscription: GroupChatSubscription) -> list[int]:
100 """
101 If a user leaves a group chat, they shouldn't be able to see who's added
102 after they left
103 """
104 if not subscription.left:
105 # still in the chat, we see everyone with a current subscription
106 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
107 else:
108 # not in chat anymore, see everyone who was in chat when we left
109 return [
110 sub.user_id
111 for sub in subscription.group_chat.subscriptions.where(
112 GroupChatSubscription.joined <= subscription.left
113 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
114 ]
117def _get_visible_admins_for_subscription(subscription: GroupChatSubscription) -> list[int]:
118 """
119 If a user leaves a group chat, they shouldn't be able to see who's added
120 after they left
121 """
122 if not subscription.left:
123 # still in the chat, we see everyone with a current subscription
124 return [
125 sub.user_id
126 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
127 GroupChatSubscription.role == GroupChatRole.admin
128 )
129 ]
130 else:
131 # not in chat anymore, see everyone who was in chat when we left
132 return [
133 sub.user_id
134 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
135 .where(GroupChatSubscription.joined <= subscription.left)
136 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
137 ]
140def _user_can_message(session: Session, context: CouchersContext, group_chat: GroupChat) -> bool:
141 """
142 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant
143 - Is not deleted/banned
144 - Has not been blocked by the user or is blocking the user
145 - Has not left the chat
146 """
147 if not group_chat.is_dm:
148 return True
150 query = select(
151 where_users_column_visible(
152 select(GroupChatSubscription)
153 .where(GroupChatSubscription.user_id != context.user_id)
154 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
155 .where(GroupChatSubscription.left == None),
156 context=context,
157 column=GroupChatSubscription.user_id,
158 ).exists()
159 )
160 return session.execute(query).scalar_one()
163def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload) -> None:
164 """
165 Background job to generate notifications for a message sent to a group chat
166 """
167 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
169 with session_scope() as session:
170 message, group_chat = session.execute(
171 select(Message, GroupChat)
172 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
173 .where(Message.id == payload.message_id)
174 ).one()
176 if message.message_type != MessageType.text:
177 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
178 return
180 context = make_background_user_context(user_id=message.author_id)
181 user_ids_to_notify = (
182 session.execute(
183 where_users_column_visible(
184 select(GroupChatSubscription.user_id)
185 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
186 .where(GroupChatSubscription.user_id != message.author_id)
187 .where(GroupChatSubscription.joined <= message.time)
188 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
189 .where(not_(GroupChatSubscription.is_muted)),
190 context=context,
191 column=GroupChatSubscription.user_id,
192 )
193 )
194 .scalars()
195 .all()
196 )
198 if group_chat.is_dm:
199 msg = f"{message.author.name} sent you a message"
200 else:
201 msg = f"{message.author.name} sent a message in {group_chat.title}"
203 for user_id in user_ids_to_notify:
204 notify(
205 session,
206 user_id=user_id,
207 topic_action=NotificationTopicAction.chat__message,
208 key=str(message.conversation_id),
209 data=notification_data_pb2.ChatMessage(
210 author=user_model_to_pb(
211 message.author,
212 session,
213 make_background_user_context(user_id=user_id),
214 ),
215 message=msg,
216 text=message.text,
217 group_chat_id=message.conversation_id,
218 ),
219 moderation_state_id=group_chat.moderation_state_id,
220 )
223def _add_message_to_subscription(session: Session, subscription: GroupChatSubscription, **kwargs: Any) -> Message:
224 """
225 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
227 Specify the keyword args for Message
228 """
229 message = Message(conversation_id=subscription.group_chat.conversation.id, author_id=subscription.user_id, **kwargs)
231 session.add(message)
232 session.flush()
234 subscription.last_seen_message_id = message.id
236 queue_job(
237 session,
238 job=generate_message_notifications,
239 payload=jobs_pb2.GenerateMessageNotificationsPayload(
240 message_id=message.id,
241 ),
242 )
244 return message
247def _create_chat(
248 session: Session,
249 creator_id: int,
250 recipient_ids: Sequence[int],
251 title: str | None = None,
252 only_admins_invite: bool = True,
253) -> GroupChat:
254 conversation = Conversation()
255 session.add(conversation)
256 session.flush()
258 # Create moderation state for UMS (starts as SHADOWED)
259 moderation_state = create_moderation(
260 session=session,
261 object_type=ModerationObjectType.GROUP_CHAT,
262 object_id=conversation.id,
263 creator_user_id=creator_id,
264 )
266 chat = GroupChat(
267 conversation_id=conversation.id,
268 title=title,
269 creator_id=creator_id,
270 is_dm=True if len(recipient_ids) == 1 else False,
271 only_admins_invite=only_admins_invite,
272 moderation_state_id=moderation_state.id,
273 )
274 session.add(chat)
275 session.flush()
277 creator_subscription = GroupChatSubscription(
278 user_id=creator_id,
279 group_chat_id=chat.conversation_id,
280 role=GroupChatRole.admin,
281 )
282 session.add(creator_subscription)
284 for uid in recipient_ids:
285 session.add(
286 GroupChatSubscription(
287 user_id=uid,
288 group_chat_id=chat.conversation_id,
289 role=GroupChatRole.participant,
290 )
291 )
293 return chat
296def _get_message_subscription(session: Session, user_id: int, conversation_id: int) -> GroupChatSubscription:
297 subscription = session.execute(
298 select(GroupChatSubscription)
299 .where(GroupChatSubscription.group_chat_id == conversation_id)
300 .where(GroupChatSubscription.user_id == user_id)
301 .where(GroupChatSubscription.left == None)
302 ).scalar_one_or_none()
304 return cast(GroupChatSubscription, subscription)
307def _get_visible_message_subscription(
308 session: Session, context: CouchersContext, conversation_id: int
309) -> GroupChatSubscription:
310 """Get subscription with visibility filtering"""
311 subscription = session.execute(
312 where_moderated_content_visible(
313 select(GroupChatSubscription)
314 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
315 .where(GroupChatSubscription.group_chat_id == conversation_id)
316 .where(GroupChatSubscription.user_id == context.user_id)
317 .where(GroupChatSubscription.left == None),
318 context,
319 GroupChat,
320 is_list_operation=False,
321 )
322 ).scalar_one_or_none()
324 return cast(GroupChatSubscription, subscription)
327def _unseen_message_count(session: Session, subscription_id: int) -> int:
328 query = (
329 select(func.count())
330 .select_from(Message)
331 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
332 .where(GroupChatSubscription.id == subscription_id)
333 .where(Message.id > GroupChatSubscription.last_seen_message_id)
334 )
335 return session.execute(query).scalar_one()
338def _mute_info(subscription: GroupChatSubscription) -> conversations_pb2.MuteInfo:
339 (muted, muted_until) = subscription.muted_display()
340 return conversations_pb2.MuteInfo(
341 muted=muted,
342 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
343 )
346class Conversations(conversations_pb2_grpc.ConversationsServicer):
347 def ListGroupChats(
348 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session
349 ) -> conversations_pb2.ListGroupChatsRes:
350 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
351 page_size = min(page_size, MAX_PAGE_SIZE)
353 # select group chats where you have a subscription, and for each of
354 # these, the latest message from them
356 t = (
357 select(
358 GroupChatSubscription.group_chat_id.label("group_chat_id"),
359 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
360 func.max(Message.id).label("message_id"),
361 )
362 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
363 .where(GroupChatSubscription.user_id == context.user_id)
364 .where(Message.time >= GroupChatSubscription.joined)
365 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
366 .where(
367 or_(
368 to_bool(request.HasField("only_archived") == False),
369 GroupChatSubscription.is_archived == request.only_archived,
370 )
371 )
372 .group_by(GroupChatSubscription.group_chat_id)
373 .order_by(func.max(Message.id).desc())
374 .subquery()
375 )
377 results = session.execute(
378 where_moderated_content_visible(
379 select(t, GroupChat, GroupChatSubscription, Message)
380 .join(Message, Message.id == t.c.message_id)
381 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
382 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
383 .where(or_(t.c.message_id < request.last_message_id, to_bool(request.last_message_id == 0)))
384 .order_by(t.c.message_id.desc())
385 .limit(page_size + 1),
386 context,
387 GroupChat,
388 is_list_operation=True,
389 )
390 ).all()
392 return conversations_pb2.ListGroupChatsRes(
393 group_chats=[
394 conversations_pb2.GroupChat(
395 group_chat_id=result.GroupChat.conversation_id,
396 title=result.GroupChat.title, # TODO: proper title for DMs, etc
397 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
398 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
399 only_admins_invite=result.GroupChat.only_admins_invite,
400 is_dm=result.GroupChat.is_dm,
401 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
402 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
403 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
404 latest_message=_message_to_pb(result.Message) if result.Message else None,
405 mute_info=_mute_info(result.GroupChatSubscription),
406 can_message=_user_can_message(session, context, result.GroupChat),
407 is_archived=result.GroupChatSubscription.is_archived,
408 )
409 for result in results[:page_size]
410 ],
411 last_message_id=(
412 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
413 ), # TODO
414 no_more=len(results) <= page_size,
415 )
417 def GetGroupChat(
418 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session
419 ) -> conversations_pb2.GroupChat:
420 result = session.execute(
421 where_moderated_content_visible(
422 select(GroupChat, GroupChatSubscription, Message)
423 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
424 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
425 .where(GroupChatSubscription.user_id == context.user_id)
426 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
427 .where(Message.time >= GroupChatSubscription.joined)
428 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
429 .order_by(Message.id.desc())
430 .limit(1),
431 context,
432 GroupChat,
433 is_list_operation=False,
434 )
435 ).one_or_none()
437 if not result:
438 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
440 return conversations_pb2.GroupChat(
441 group_chat_id=result.GroupChat.conversation_id,
442 title=result.GroupChat.title,
443 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
444 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
445 only_admins_invite=result.GroupChat.only_admins_invite,
446 is_dm=result.GroupChat.is_dm,
447 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
448 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
449 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
450 latest_message=_message_to_pb(result.Message) if result.Message else None,
451 mute_info=_mute_info(result.GroupChatSubscription),
452 can_message=_user_can_message(session, context, result.GroupChat),
453 is_archived=result.GroupChatSubscription.is_archived,
454 )
456 def GetDirectMessage(
457 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session
458 ) -> conversations_pb2.GroupChat:
459 count = func.count(GroupChatSubscription.id).label("count")
460 subquery = (
461 select(GroupChatSubscription.group_chat_id)
462 .where(
463 or_(
464 GroupChatSubscription.user_id == context.user_id,
465 GroupChatSubscription.user_id == request.user_id,
466 )
467 )
468 .where(GroupChatSubscription.left == None)
469 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
470 .where(GroupChat.is_dm == True)
471 .group_by(GroupChatSubscription.group_chat_id)
472 .having(count == 2)
473 .subquery()
474 )
476 result = session.execute(
477 where_moderated_content_visible(
478 select(subquery, GroupChat, GroupChatSubscription, Message)
479 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
480 .join(Message, Message.conversation_id == GroupChat.conversation_id)
481 .where(GroupChatSubscription.user_id == context.user_id)
482 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
483 .where(Message.time >= GroupChatSubscription.joined)
484 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
485 .order_by(Message.id.desc())
486 .limit(1),
487 context,
488 GroupChat,
489 is_list_operation=False,
490 )
491 ).one_or_none()
493 if not result:
494 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
496 return conversations_pb2.GroupChat(
497 group_chat_id=result.GroupChat.conversation_id,
498 title=result.GroupChat.title,
499 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
500 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
501 only_admins_invite=result.GroupChat.only_admins_invite,
502 is_dm=result.GroupChat.is_dm,
503 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
504 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
505 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
506 latest_message=_message_to_pb(result.Message) if result.Message else None,
507 mute_info=_mute_info(result.GroupChatSubscription),
508 can_message=_user_can_message(session, context, result.GroupChat),
509 is_archived=result.GroupChatSubscription.is_archived,
510 )
512 def GetUpdates(
513 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session
514 ) -> conversations_pb2.GetUpdatesRes:
515 results = (
516 session.execute(
517 where_moderated_content_visible(
518 select(Message)
519 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
520 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
521 .where(GroupChatSubscription.user_id == context.user_id)
522 .where(Message.time >= GroupChatSubscription.joined)
523 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
524 .where(Message.id > request.newest_message_id)
525 .order_by(Message.id.asc())
526 .limit(DEFAULT_PAGINATION_LENGTH + 1),
527 context,
528 GroupChat,
529 is_list_operation=False,
530 )
531 )
532 .scalars()
533 .all()
534 )
536 return conversations_pb2.GetUpdatesRes(
537 updates=[
538 conversations_pb2.Update(
539 group_chat_id=message.conversation_id,
540 message=_message_to_pb(message),
541 )
542 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
543 ],
544 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
545 )
547 def GetGroupChatMessages(
548 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session
549 ) -> conversations_pb2.GetGroupChatMessagesRes:
550 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
551 page_size = min(page_size, MAX_PAGE_SIZE)
553 results = (
554 session.execute(
555 where_moderated_content_visible(
556 select(Message)
557 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
558 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
559 .where(GroupChatSubscription.user_id == context.user_id)
560 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
561 .where(Message.time >= GroupChatSubscription.joined)
562 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
563 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
564 .where(
565 or_(Message.id > GroupChatSubscription.last_seen_message_id, to_bool(request.only_unseen == 0))
566 )
567 .order_by(Message.id.desc())
568 .limit(page_size + 1),
569 context,
570 GroupChat,
571 is_list_operation=False,
572 )
573 )
574 .scalars()
575 .all()
576 )
578 return conversations_pb2.GetGroupChatMessagesRes(
579 messages=[_message_to_pb(message) for message in results[:page_size]],
580 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
581 no_more=len(results) <= page_size,
582 )
584 def MarkLastSeenGroupChat(
585 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session
586 ) -> empty_pb2.Empty:
587 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
589 if not subscription: 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true
590 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
592 if not subscription.last_seen_message_id <= request.last_seen_message_id: 592 ↛ 593line 592 didn't jump to line 593 because the condition on line 592 was never true
593 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
595 subscription.last_seen_message_id = request.last_seen_message_id
597 return empty_pb2.Empty()
599 def MuteGroupChat(
600 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session
601 ) -> empty_pb2.Empty:
602 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
604 if not subscription: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true
605 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
607 if request.unmute:
608 subscription.muted_until = DATETIME_MINUS_INFINITY
609 elif request.forever:
610 subscription.muted_until = DATETIME_INFINITY
611 elif request.for_duration: 611 ↛ 617line 611 didn't jump to line 617 because the condition on line 611 was always true
612 duration = request.for_duration.ToTimedelta()
613 if duration < timedelta(seconds=0): 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true
614 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past")
615 subscription.muted_until = now() + duration
617 return empty_pb2.Empty()
619 def SetGroupChatArchiveStatus(
620 self, request: conversations_pb2.SetGroupChatArchiveStatusReq, context: CouchersContext, session: Session
621 ) -> conversations_pb2.SetGroupChatArchiveStatusRes:
622 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
624 if not subscription:
625 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
627 subscription.is_archived = request.is_archived
629 return conversations_pb2.SetGroupChatArchiveStatusRes(
630 group_chat_id=request.group_chat_id,
631 is_archived=request.is_archived,
632 )
634 def SearchMessages(
635 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session
636 ) -> conversations_pb2.SearchMessagesRes:
637 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
638 page_size = min(page_size, MAX_PAGE_SIZE)
640 results = (
641 session.execute(
642 where_moderated_content_visible(
643 select(Message)
644 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
645 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
646 .where(GroupChatSubscription.user_id == context.user_id)
647 .where(Message.time >= GroupChatSubscription.joined)
648 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
649 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
650 .where(Message.text.ilike(f"%{request.query}%"))
651 .order_by(Message.id.desc())
652 .limit(page_size + 1),
653 context,
654 GroupChat,
655 is_list_operation=True,
656 )
657 )
658 .scalars()
659 .all()
660 )
662 return conversations_pb2.SearchMessagesRes(
663 results=[
664 conversations_pb2.MessageSearchResult(
665 group_chat_id=message.conversation_id,
666 message=_message_to_pb(message),
667 )
668 for message in results[:page_size]
669 ],
670 last_message_id=results[-2].id if len(results) > 1 else 0,
671 no_more=len(results) <= page_size,
672 )
674 def CreateGroupChat(
675 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session
676 ) -> conversations_pb2.GroupChat:
677 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
678 if not has_completed_profile(session, user):
679 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
681 recipient_user_ids = list(
682 session.execute(
683 select(User.id).where(users_visible(context)).where(User.id.in_(request.recipient_user_ids))
684 )
685 .scalars()
686 .all()
687 )
689 # make sure all requested users are visible
690 if len(recipient_user_ids) != len(request.recipient_user_ids): 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true
691 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
693 if not recipient_user_ids: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true
694 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
696 if len(recipient_user_ids) != len(set(recipient_user_ids)): 696 ↛ 698line 696 didn't jump to line 698 because the condition on line 696 was never true
697 # make sure there's no duplicate users
698 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients")
700 if context.user_id in recipient_user_ids: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true
701 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
703 if len(recipient_user_ids) == 1:
704 # can only have one DM at a time between any two users
705 other_user_id = recipient_user_ids[0]
707 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
708 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
709 # chat, you know they already have a shared group chat
710 count = func.count(GroupChatSubscription.id).label("count")
711 if session.execute(
712 where_moderated_content_visible(
713 select(count)
714 .where(
715 or_(
716 GroupChatSubscription.user_id == context.user_id,
717 GroupChatSubscription.user_id == other_user_id,
718 )
719 )
720 .where(GroupChatSubscription.left == None)
721 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
722 .where(GroupChat.is_dm == True)
723 .group_by(GroupChatSubscription.group_chat_id)
724 .having(count == 2),
725 context,
726 GroupChat,
727 is_list_operation=False,
728 )
729 ).scalar_one_or_none():
730 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm")
732 # Check if user has been initiating chats excessively
733 if process_rate_limits_and_check_abort(
734 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation
735 ):
736 context.abort_with_error_code(
737 grpc.StatusCode.RESOURCE_EXHAUSTED,
738 "chat_initiation_rate_limit",
739 substitutions={"hours": str(RATE_LIMIT_HOURS)},
740 )
742 group_chat = _create_chat(
743 session,
744 creator_id=context.user_id,
745 recipient_ids=request.recipient_user_ids,
746 title=request.title.value,
747 )
749 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id)
751 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
753 session.flush()
755 return conversations_pb2.GroupChat(
756 group_chat_id=group_chat.conversation_id,
757 title=group_chat.title,
758 member_user_ids=_get_visible_members_for_subscription(your_subscription),
759 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
760 only_admins_invite=group_chat.only_admins_invite,
761 is_dm=group_chat.is_dm,
762 created=Timestamp_from_datetime(group_chat.conversation.created),
763 mute_info=_mute_info(your_subscription),
764 can_message=True,
765 )
767 def SendMessage(
768 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session
769 ) -> empty_pb2.Empty:
770 if request.text == "": 770 ↛ 771line 770 didn't jump to line 771 because the condition on line 770 was never true
771 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
773 result = session.execute(
774 where_moderated_content_visible(
775 select(GroupChatSubscription, GroupChat)
776 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
777 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
778 .where(GroupChatSubscription.user_id == context.user_id)
779 .where(GroupChatSubscription.left == None),
780 context,
781 GroupChat,
782 is_list_operation=False,
783 )
784 ).one_or_none()
785 if not result:
786 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
788 subscription, group_chat = result._tuple()
789 if not _user_can_message(session, context, group_chat):
790 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat")
792 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
794 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
795 sent_messages_counter.labels(
796 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
797 ).inc()
799 return empty_pb2.Empty()
801 def SendDirectMessage(
802 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session
803 ) -> conversations_pb2.SendDirectMessageRes:
804 user_id = context.user_id
805 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
807 recipient_id = request.recipient_user_id
809 if not has_completed_profile(session, user): 809 ↛ 810line 809 didn't jump to line 810 because the condition on line 809 was never true
810 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
812 if not recipient_id: 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true
813 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
815 recipient_user_id = session.execute(
816 select(User.id).where(users_visible(context)).where(User.id == recipient_id)
817 ).scalar_one_or_none()
819 if not recipient_user_id: 819 ↛ 820line 819 didn't jump to line 820 because the condition on line 819 was never true
820 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
822 if user_id == recipient_id: 822 ↛ 823line 822 didn't jump to line 823 because the condition on line 822 was never true
823 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
825 if request.text == "": 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true
826 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
828 # Look for an existing direct message (DM) chat between the two users
829 dm_chat_ids = (
830 select(GroupChatSubscription.group_chat_id)
831 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id]))
832 .group_by(GroupChatSubscription.group_chat_id)
833 .having(func.count(GroupChatSubscription.user_id) == 2)
834 )
836 chat = session.execute(
837 where_moderated_content_visible(
838 select(GroupChat)
839 .where(GroupChat.is_dm == True)
840 .where(GroupChat.conversation_id.in_(dm_chat_ids))
841 .limit(1),
842 context,
843 GroupChat,
844 is_list_operation=False,
845 )
846 ).scalar_one_or_none()
848 if not chat:
849 chat = _create_chat(session, user_id, [recipient_id])
851 # Retrieve the sender's active subscription to the chat
852 subscription = _get_message_subscription(session, user_id, chat.conversation_id)
854 # Add the message to the conversation
855 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
857 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one()
858 sent_messages_counter.labels(user_gender, "direct message").inc()
860 session.flush()
862 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id)
864 def EditGroupChat(
865 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session
866 ) -> empty_pb2.Empty:
867 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
869 if not subscription:
870 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
872 if subscription.role != GroupChatRole.admin:
873 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit")
875 if request.HasField("title"):
876 subscription.group_chat.title = request.title.value
878 if request.HasField("only_admins_invite"): 878 ↛ 881line 878 didn't jump to line 881 because the condition on line 878 was always true
879 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
881 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
883 return empty_pb2.Empty()
885 def MakeGroupChatAdmin(
886 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session
887 ) -> empty_pb2.Empty:
888 if not session.execute(
889 select(User).where(users_visible(context)).where(User.id == request.user_id)
890 ).scalar_one_or_none():
891 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
893 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
895 if not your_subscription: 895 ↛ 896line 895 didn't jump to line 896 because the condition on line 895 was never true
896 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
898 if your_subscription.role != GroupChatRole.admin:
899 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin")
901 if request.user_id == context.user_id: 901 ↛ 902line 901 didn't jump to line 902 because the condition on line 901 was never true
902 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin")
904 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
906 if not their_subscription: 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true
907 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
909 if their_subscription.role != GroupChatRole.participant:
910 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin")
912 their_subscription.role = GroupChatRole.admin
914 _add_message_to_subscription(
915 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
916 )
918 return empty_pb2.Empty()
920 def RemoveGroupChatAdmin(
921 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session
922 ) -> empty_pb2.Empty:
923 if not session.execute(
924 select(User).where(users_visible(context)).where(User.id == request.user_id)
925 ).scalar_one_or_none():
926 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
928 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
930 if not your_subscription: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true
931 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
933 if request.user_id == context.user_id:
934 # Race condition!
935 other_admins_count = session.execute(
936 select(func.count())
937 .select_from(GroupChatSubscription)
938 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
939 .where(GroupChatSubscription.user_id != context.user_id)
940 .where(GroupChatSubscription.role == GroupChatRole.admin)
941 .where(GroupChatSubscription.left == None)
942 ).scalar_one()
943 if not other_admins_count > 0:
944 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin")
946 if your_subscription.role != GroupChatRole.admin:
947 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin")
949 their_subscription = session.execute(
950 select(GroupChatSubscription)
951 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
952 .where(GroupChatSubscription.user_id == request.user_id)
953 .where(GroupChatSubscription.left == None)
954 .where(GroupChatSubscription.role == GroupChatRole.admin)
955 ).scalar_one_or_none()
957 if not their_subscription: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true
958 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin")
960 their_subscription.role = GroupChatRole.participant
962 _add_message_to_subscription(
963 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
964 )
966 return empty_pb2.Empty()
968 def InviteToGroupChat(
969 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session
970 ) -> empty_pb2.Empty:
971 if not session.execute(
972 select(User).where(users_visible(context)).where(User.id == request.user_id)
973 ).scalar_one_or_none():
974 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
976 result = session.execute(
977 where_moderated_content_visible(
978 select(GroupChatSubscription, GroupChat)
979 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
980 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
981 .where(GroupChatSubscription.user_id == context.user_id)
982 .where(GroupChatSubscription.left == None),
983 context,
984 GroupChat,
985 is_list_operation=False,
986 )
987 ).one_or_none()
989 if not result:
990 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
992 your_subscription, group_chat = result._tuple()
994 if request.user_id == context.user_id: 994 ↛ 995line 994 didn't jump to line 995 because the condition on line 994 was never true
995 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self")
997 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
998 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied")
1000 if group_chat.is_dm:
1001 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm")
1003 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1005 if their_subscription: 1005 ↛ 1006line 1005 didn't jump to line 1006 because the condition on line 1005 was never true
1006 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat")
1008 # TODO: race condition!
1010 subscription = GroupChatSubscription(
1011 user_id=request.user_id,
1012 group_chat_id=your_subscription.group_chat.conversation_id,
1013 role=GroupChatRole.participant,
1014 )
1015 session.add(subscription)
1017 _add_message_to_subscription(
1018 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
1019 )
1021 return empty_pb2.Empty()
1023 def RemoveGroupChatUser(
1024 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session
1025 ) -> empty_pb2.Empty:
1026 """
1027 1. Get admin info and check it's correct
1028 2. Get user data, check it's correct and remove user
1029 """
1030 # Admin info
1031 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1033 # if user info is missing
1034 if not your_subscription: 1034 ↛ 1035line 1034 didn't jump to line 1035 because the condition on line 1034 was never true
1035 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1037 # if user not admin
1038 if your_subscription.role != GroupChatRole.admin: 1038 ↛ 1039line 1038 didn't jump to line 1039 because the condition on line 1038 was never true
1039 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user")
1041 # if user wants to remove themselves
1042 if request.user_id == context.user_id: 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true
1043 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self")
1045 # get user info
1046 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1048 # user not found
1049 if not their_subscription:
1050 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
1052 _add_message_to_subscription(
1053 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
1054 )
1056 their_subscription.left = func.now()
1058 return empty_pb2.Empty()
1060 def LeaveGroupChat(
1061 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session
1062 ) -> empty_pb2.Empty:
1063 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1065 if not subscription:
1066 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1068 if subscription.role == GroupChatRole.admin:
1069 other_admins_count = session.execute(
1070 select(func.count())
1071 .select_from(GroupChatSubscription)
1072 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1073 .where(GroupChatSubscription.user_id != context.user_id)
1074 .where(GroupChatSubscription.role == GroupChatRole.admin)
1075 .where(GroupChatSubscription.left == None)
1076 ).scalar_one()
1077 participants_count = session.execute(
1078 select(func.count())
1079 .select_from(GroupChatSubscription)
1080 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1081 .where(GroupChatSubscription.user_id != context.user_id)
1082 .where(GroupChatSubscription.role == GroupChatRole.participant)
1083 .where(GroupChatSubscription.left == None)
1084 ).scalar_one()
1085 if not (other_admins_count > 0 or participants_count == 0):
1086 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave")
1088 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
1090 subscription.left = func.now()
1092 return empty_pb2.Empty()