Coverage for app / backend / src / couchers / servicers / conversations.py: 88%
313 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import logging
2from collections.abc import Sequence
3from datetime import timedelta
4from typing import Any, cast
6import grpc
7from google.protobuf import empty_pb2
8from sqlalchemy import select
9from sqlalchemy.orm import Session
10from sqlalchemy.sql import func, not_, or_
12from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
13from couchers.context import CouchersContext, make_background_user_context
14from couchers.db import session_scope
15from couchers.event_log import log_event
16from couchers.helpers.completed_profile import has_completed_profile
17from couchers.jobs.enqueue import queue_job
18from couchers.metrics import sent_messages_counter
19from couchers.models import (
20 Conversation,
21 GroupChat,
22 GroupChatRole,
23 GroupChatSubscription,
24 Message,
25 MessageType,
26 ModerationObjectType,
27 RateLimitAction,
28 User,
29)
30from couchers.models.notifications import NotificationTopicAction
31from couchers.moderation.utils import create_moderation
32from couchers.notifications.notify import notify
33from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
34from couchers.proto.internal import jobs_pb2
35from couchers.rate_limits.check import process_rate_limits_and_check_abort
36from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
37from couchers.servicers.api import user_model_to_pb
38from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
39from couchers.utils import Timestamp_from_datetime, now
41logger = logging.getLogger(__name__)
43# TODO: Still needs custom pagination: GetUpdates
44DEFAULT_PAGINATION_LENGTH = 20
45MAX_PAGE_SIZE = 50
48def _message_to_pb(message: Message) -> conversations_pb2.Message:
49 """
50 Turns the given message to a protocol buffer
51 """
52 if message.is_normal_message:
53 return conversations_pb2.Message(
54 message_id=message.id,
55 author_user_id=message.author_id,
56 time=Timestamp_from_datetime(message.time),
57 text=conversations_pb2.MessageContentText(text=message.text),
58 )
59 else:
60 return conversations_pb2.Message(
61 message_id=message.id,
62 author_user_id=message.author_id,
63 time=Timestamp_from_datetime(message.time),
64 chat_created=(
65 conversations_pb2.MessageContentChatCreated()
66 if message.message_type == MessageType.chat_created
67 else None
68 ),
69 chat_edited=(
70 conversations_pb2.MessageContentChatEdited()
71 if message.message_type == MessageType.chat_edited
72 else None
73 ),
74 user_invited=(
75 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
76 if message.message_type == MessageType.user_invited
77 else None
78 ),
79 user_left=(
80 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
81 ),
82 user_made_admin=(
83 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
84 if message.message_type == MessageType.user_made_admin
85 else None
86 ),
87 user_removed_admin=(
88 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
89 if message.message_type == MessageType.user_removed_admin
90 else None
91 ),
92 group_chat_user_removed=(
93 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
94 if message.message_type == MessageType.user_removed
95 else None
96 ),
97 )
100def _get_visible_members_for_subscription(subscription: GroupChatSubscription) -> list[int]:
101 """
102 If a user leaves a group chat, they shouldn't be able to see who's added
103 after they left
104 """
105 if not subscription.left:
106 # still in the chat, we see everyone with a current subscription
107 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
108 else:
109 # not in chat anymore, see everyone who was in chat when we left
110 return [
111 sub.user_id
112 for sub in subscription.group_chat.subscriptions.where(
113 GroupChatSubscription.joined <= subscription.left
114 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
115 ]
118def _get_visible_admins_for_subscription(subscription: GroupChatSubscription) -> list[int]:
119 """
120 If a user leaves a group chat, they shouldn't be able to see who's added
121 after they left
122 """
123 if not subscription.left:
124 # still in the chat, we see everyone with a current subscription
125 return [
126 sub.user_id
127 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
128 GroupChatSubscription.role == GroupChatRole.admin
129 )
130 ]
131 else:
132 # not in chat anymore, see everyone who was in chat when we left
133 return [
134 sub.user_id
135 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
136 .where(GroupChatSubscription.joined <= subscription.left)
137 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
138 ]
141def _user_can_message(session: Session, context: CouchersContext, group_chat: GroupChat) -> bool:
142 """
143 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant
144 - Is not deleted/banned
145 - Has not been blocked by the user or is blocking the user
146 - Has not left the chat
147 """
148 if not group_chat.is_dm:
149 return True
151 query = select(
152 where_users_column_visible(
153 select(GroupChatSubscription)
154 .where(GroupChatSubscription.user_id != context.user_id)
155 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
156 .where(GroupChatSubscription.left == None),
157 context=context,
158 column=GroupChatSubscription.user_id,
159 ).exists()
160 )
161 return session.execute(query).scalar_one()
164def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload) -> None:
165 """
166 Background job to generate notifications for a message sent to a group chat
167 """
168 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
170 with session_scope() as session:
171 message, group_chat = session.execute(
172 select(Message, GroupChat)
173 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
174 .where(Message.id == payload.message_id)
175 ).one()
177 if message.message_type != MessageType.text:
178 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
179 return
181 context = make_background_user_context(user_id=message.author_id)
182 user_ids_to_notify = (
183 session.execute(
184 where_users_column_visible(
185 select(GroupChatSubscription.user_id)
186 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
187 .where(GroupChatSubscription.user_id != message.author_id)
188 .where(GroupChatSubscription.joined <= message.time)
189 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
190 .where(not_(GroupChatSubscription.is_muted)),
191 context=context,
192 column=GroupChatSubscription.user_id,
193 )
194 )
195 .scalars()
196 .all()
197 )
199 if group_chat.is_dm:
200 msg = f"{message.author.name} sent you a message"
201 else:
202 msg = f"{message.author.name} sent a message in {group_chat.title}"
204 for user_id in user_ids_to_notify:
205 notify(
206 session,
207 user_id=user_id,
208 topic_action=NotificationTopicAction.chat__message,
209 key=str(message.conversation_id),
210 data=notification_data_pb2.ChatMessage(
211 author=user_model_to_pb(
212 message.author,
213 session,
214 make_background_user_context(user_id=user_id),
215 ),
216 message=msg,
217 text=message.text,
218 group_chat_id=message.conversation_id,
219 ),
220 moderation_state_id=group_chat.moderation_state_id,
221 )
224def _add_message_to_subscription(session: Session, subscription: GroupChatSubscription, **kwargs: Any) -> Message:
225 """
226 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
228 Specify the keyword args for Message
229 """
230 message = Message(conversation_id=subscription.group_chat.conversation.id, author_id=subscription.user_id, **kwargs)
232 session.add(message)
233 session.flush()
235 subscription.last_seen_message_id = message.id
237 queue_job(
238 session,
239 job=generate_message_notifications,
240 payload=jobs_pb2.GenerateMessageNotificationsPayload(
241 message_id=message.id,
242 ),
243 )
245 return message
248def _create_chat(
249 session: Session,
250 creator_id: int,
251 recipient_ids: Sequence[int],
252 title: str | None = None,
253 only_admins_invite: bool = True,
254) -> GroupChat:
255 conversation = Conversation()
256 session.add(conversation)
257 session.flush()
259 # Create moderation state for UMS (starts as SHADOWED)
260 moderation_state = create_moderation(
261 session=session,
262 object_type=ModerationObjectType.group_chat,
263 object_id=conversation.id,
264 creator_user_id=creator_id,
265 )
267 chat = GroupChat(
268 conversation_id=conversation.id,
269 title=title,
270 creator_id=creator_id,
271 is_dm=True if len(recipient_ids) == 1 else False,
272 only_admins_invite=only_admins_invite,
273 moderation_state_id=moderation_state.id,
274 )
275 session.add(chat)
276 session.flush()
278 creator_subscription = GroupChatSubscription(
279 user_id=creator_id,
280 group_chat_id=chat.conversation_id,
281 role=GroupChatRole.admin,
282 )
283 session.add(creator_subscription)
285 for uid in recipient_ids:
286 session.add(
287 GroupChatSubscription(
288 user_id=uid,
289 group_chat_id=chat.conversation_id,
290 role=GroupChatRole.participant,
291 )
292 )
294 return chat
297def _get_message_subscription(session: Session, user_id: int, conversation_id: int) -> GroupChatSubscription:
298 subscription = session.execute(
299 select(GroupChatSubscription)
300 .where(GroupChatSubscription.group_chat_id == conversation_id)
301 .where(GroupChatSubscription.user_id == user_id)
302 .where(GroupChatSubscription.left == None)
303 ).scalar_one_or_none()
305 return cast(GroupChatSubscription, subscription)
308def _get_visible_message_subscription(
309 session: Session, context: CouchersContext, conversation_id: int
310) -> GroupChatSubscription:
311 """Get subscription with visibility filtering"""
312 subscription = session.execute(
313 where_moderated_content_visible(
314 select(GroupChatSubscription)
315 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
316 .where(GroupChatSubscription.group_chat_id == conversation_id)
317 .where(GroupChatSubscription.user_id == context.user_id)
318 .where(GroupChatSubscription.left == None),
319 context,
320 GroupChat,
321 is_list_operation=False,
322 )
323 ).scalar_one_or_none()
325 return cast(GroupChatSubscription, subscription)
328def _unseen_message_count(session: Session, subscription_id: int) -> int:
329 query = (
330 select(func.count())
331 .select_from(Message)
332 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
333 .where(GroupChatSubscription.id == subscription_id)
334 .where(Message.id > GroupChatSubscription.last_seen_message_id)
335 )
336 return session.execute(query).scalar_one()
339def _mute_info(subscription: GroupChatSubscription) -> conversations_pb2.MuteInfo:
340 (muted, muted_until) = subscription.muted_display()
341 return conversations_pb2.MuteInfo(
342 muted=muted,
343 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
344 )
347class Conversations(conversations_pb2_grpc.ConversationsServicer):
348 def ListGroupChats(
349 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session
350 ) -> conversations_pb2.ListGroupChatsRes:
351 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
352 page_size = min(page_size, MAX_PAGE_SIZE)
354 # select group chats where you have a subscription, and for each of
355 # these, the latest message from them
357 t = (
358 select(
359 GroupChatSubscription.group_chat_id.label("group_chat_id"),
360 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
361 func.max(Message.id).label("message_id"),
362 )
363 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
364 .where(GroupChatSubscription.user_id == context.user_id)
365 .where(Message.time >= GroupChatSubscription.joined)
366 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
367 .where(
368 or_(
369 to_bool(request.HasField("only_archived") == False),
370 GroupChatSubscription.is_archived == request.only_archived,
371 )
372 )
373 .group_by(GroupChatSubscription.group_chat_id)
374 .order_by(func.max(Message.id).desc())
375 .subquery()
376 )
378 results = session.execute(
379 where_moderated_content_visible(
380 select(t, GroupChat, GroupChatSubscription, Message)
381 .join(Message, Message.id == t.c.message_id)
382 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
383 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
384 .where(or_(t.c.message_id < request.last_message_id, to_bool(request.last_message_id == 0)))
385 .order_by(t.c.message_id.desc())
386 .limit(page_size + 1),
387 context,
388 GroupChat,
389 is_list_operation=True,
390 )
391 ).all()
393 return conversations_pb2.ListGroupChatsRes(
394 group_chats=[
395 conversations_pb2.GroupChat(
396 group_chat_id=result.GroupChat.conversation_id,
397 title=result.GroupChat.title, # TODO: proper title for DMs, etc
398 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
399 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
400 only_admins_invite=result.GroupChat.only_admins_invite,
401 is_dm=result.GroupChat.is_dm,
402 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
403 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
404 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
405 latest_message=_message_to_pb(result.Message) if result.Message else None,
406 mute_info=_mute_info(result.GroupChatSubscription),
407 can_message=_user_can_message(session, context, result.GroupChat),
408 is_archived=result.GroupChatSubscription.is_archived,
409 )
410 for result in results[:page_size]
411 ],
412 last_message_id=(
413 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
414 ), # TODO
415 no_more=len(results) <= page_size,
416 )
418 def GetGroupChat(
419 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session
420 ) -> conversations_pb2.GroupChat:
421 result = session.execute(
422 where_moderated_content_visible(
423 select(GroupChat, GroupChatSubscription, Message)
424 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
425 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
426 .where(GroupChatSubscription.user_id == context.user_id)
427 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
428 .where(Message.time >= GroupChatSubscription.joined)
429 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
430 .order_by(Message.id.desc())
431 .limit(1),
432 context,
433 GroupChat,
434 is_list_operation=False,
435 )
436 ).one_or_none()
438 if not result:
439 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
441 return conversations_pb2.GroupChat(
442 group_chat_id=result.GroupChat.conversation_id,
443 title=result.GroupChat.title,
444 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
445 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
446 only_admins_invite=result.GroupChat.only_admins_invite,
447 is_dm=result.GroupChat.is_dm,
448 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
449 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
450 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
451 latest_message=_message_to_pb(result.Message) if result.Message else None,
452 mute_info=_mute_info(result.GroupChatSubscription),
453 can_message=_user_can_message(session, context, result.GroupChat),
454 is_archived=result.GroupChatSubscription.is_archived,
455 )
457 def GetDirectMessage(
458 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session
459 ) -> conversations_pb2.GroupChat:
460 count = func.count(GroupChatSubscription.id).label("count")
461 subquery = (
462 select(GroupChatSubscription.group_chat_id)
463 .where(
464 or_(
465 GroupChatSubscription.user_id == context.user_id,
466 GroupChatSubscription.user_id == request.user_id,
467 )
468 )
469 .where(GroupChatSubscription.left == None)
470 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
471 .where(GroupChat.is_dm == True)
472 .group_by(GroupChatSubscription.group_chat_id)
473 .having(count == 2)
474 .subquery()
475 )
477 result = session.execute(
478 where_moderated_content_visible(
479 select(subquery, GroupChat, GroupChatSubscription, Message)
480 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
481 .join(Message, Message.conversation_id == GroupChat.conversation_id)
482 .where(GroupChatSubscription.user_id == context.user_id)
483 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
484 .where(Message.time >= GroupChatSubscription.joined)
485 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
486 .order_by(Message.id.desc())
487 .limit(1),
488 context,
489 GroupChat,
490 is_list_operation=False,
491 )
492 ).one_or_none()
494 if not result:
495 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
497 return conversations_pb2.GroupChat(
498 group_chat_id=result.GroupChat.conversation_id,
499 title=result.GroupChat.title,
500 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
501 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
502 only_admins_invite=result.GroupChat.only_admins_invite,
503 is_dm=result.GroupChat.is_dm,
504 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
505 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
506 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
507 latest_message=_message_to_pb(result.Message) if result.Message else None,
508 mute_info=_mute_info(result.GroupChatSubscription),
509 can_message=_user_can_message(session, context, result.GroupChat),
510 is_archived=result.GroupChatSubscription.is_archived,
511 )
513 def GetUpdates(
514 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session
515 ) -> conversations_pb2.GetUpdatesRes:
516 results = (
517 session.execute(
518 where_moderated_content_visible(
519 select(Message)
520 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
521 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
522 .where(GroupChatSubscription.user_id == context.user_id)
523 .where(Message.time >= GroupChatSubscription.joined)
524 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
525 .where(Message.id > request.newest_message_id)
526 .order_by(Message.id.asc())
527 .limit(DEFAULT_PAGINATION_LENGTH + 1),
528 context,
529 GroupChat,
530 is_list_operation=False,
531 )
532 )
533 .scalars()
534 .all()
535 )
537 return conversations_pb2.GetUpdatesRes(
538 updates=[
539 conversations_pb2.Update(
540 group_chat_id=message.conversation_id,
541 message=_message_to_pb(message),
542 )
543 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
544 ],
545 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
546 )
548 def GetGroupChatMessages(
549 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session
550 ) -> conversations_pb2.GetGroupChatMessagesRes:
551 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
552 page_size = min(page_size, MAX_PAGE_SIZE)
554 results = (
555 session.execute(
556 where_moderated_content_visible(
557 select(Message)
558 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
559 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
560 .where(GroupChatSubscription.user_id == context.user_id)
561 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
562 .where(Message.time >= GroupChatSubscription.joined)
563 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
564 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
565 .where(
566 or_(Message.id > GroupChatSubscription.last_seen_message_id, to_bool(request.only_unseen == 0))
567 )
568 .order_by(Message.id.desc())
569 .limit(page_size + 1),
570 context,
571 GroupChat,
572 is_list_operation=False,
573 )
574 )
575 .scalars()
576 .all()
577 )
579 return conversations_pb2.GetGroupChatMessagesRes(
580 messages=[_message_to_pb(message) for message in results[:page_size]],
581 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
582 no_more=len(results) <= page_size,
583 )
585 def MarkLastSeenGroupChat(
586 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session
587 ) -> empty_pb2.Empty:
588 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
590 if not subscription: 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true
591 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
593 if not subscription.last_seen_message_id <= request.last_seen_message_id: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true
594 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
596 subscription.last_seen_message_id = request.last_seen_message_id
598 return empty_pb2.Empty()
600 def MuteGroupChat(
601 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session
602 ) -> empty_pb2.Empty:
603 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
605 if not subscription: 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true
606 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
608 if request.unmute:
609 subscription.muted_until = DATETIME_MINUS_INFINITY
610 elif request.forever:
611 subscription.muted_until = DATETIME_INFINITY
612 elif request.for_duration: 612 ↛ 618line 612 didn't jump to line 618 because the condition on line 612 was always true
613 duration = request.for_duration.ToTimedelta()
614 if duration < timedelta(seconds=0): 614 ↛ 615line 614 didn't jump to line 615 because the condition on line 614 was never true
615 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past")
616 subscription.muted_until = now() + duration
618 return empty_pb2.Empty()
620 def SetGroupChatArchiveStatus(
621 self, request: conversations_pb2.SetGroupChatArchiveStatusReq, context: CouchersContext, session: Session
622 ) -> conversations_pb2.SetGroupChatArchiveStatusRes:
623 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
625 if not subscription:
626 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
628 subscription.is_archived = request.is_archived
630 return conversations_pb2.SetGroupChatArchiveStatusRes(
631 group_chat_id=request.group_chat_id,
632 is_archived=request.is_archived,
633 )
635 def SearchMessages(
636 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session
637 ) -> conversations_pb2.SearchMessagesRes:
638 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
639 page_size = min(page_size, MAX_PAGE_SIZE)
641 results = (
642 session.execute(
643 where_moderated_content_visible(
644 select(Message)
645 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
646 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
647 .where(GroupChatSubscription.user_id == context.user_id)
648 .where(Message.time >= GroupChatSubscription.joined)
649 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
650 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
651 .where(Message.text.ilike(f"%{request.query}%"))
652 .order_by(Message.id.desc())
653 .limit(page_size + 1),
654 context,
655 GroupChat,
656 is_list_operation=True,
657 )
658 )
659 .scalars()
660 .all()
661 )
663 return conversations_pb2.SearchMessagesRes(
664 results=[
665 conversations_pb2.MessageSearchResult(
666 group_chat_id=message.conversation_id,
667 message=_message_to_pb(message),
668 )
669 for message in results[:page_size]
670 ],
671 last_message_id=results[-2].id if len(results) > 1 else 0,
672 no_more=len(results) <= page_size,
673 )
675 def CreateGroupChat(
676 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session
677 ) -> conversations_pb2.GroupChat:
678 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
679 if not has_completed_profile(session, user):
680 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
682 recipient_user_ids = list(
683 session.execute(
684 select(User.id).where(users_visible(context)).where(User.id.in_(request.recipient_user_ids))
685 )
686 .scalars()
687 .all()
688 )
690 # make sure all requested users are visible
691 if len(recipient_user_ids) != len(request.recipient_user_ids): 691 ↛ 692line 691 didn't jump to line 692 because the condition on line 691 was never true
692 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
694 if not recipient_user_ids: 694 ↛ 695line 694 didn't jump to line 695 because the condition on line 694 was never true
695 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
697 if len(recipient_user_ids) != len(set(recipient_user_ids)): 697 ↛ 699line 697 didn't jump to line 699 because the condition on line 697 was never true
698 # make sure there's no duplicate users
699 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients")
701 if context.user_id in recipient_user_ids: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true
702 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
704 if len(recipient_user_ids) == 1:
705 # can only have one DM at a time between any two users
706 other_user_id = recipient_user_ids[0]
708 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
709 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
710 # chat, you know they already have a shared group chat
711 count = func.count(GroupChatSubscription.id).label("count")
712 if session.execute(
713 where_moderated_content_visible(
714 select(count)
715 .where(
716 or_(
717 GroupChatSubscription.user_id == context.user_id,
718 GroupChatSubscription.user_id == other_user_id,
719 )
720 )
721 .where(GroupChatSubscription.left == None)
722 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
723 .where(GroupChat.is_dm == True)
724 .group_by(GroupChatSubscription.group_chat_id)
725 .having(count == 2),
726 context,
727 GroupChat,
728 is_list_operation=False,
729 )
730 ).scalar_one_or_none():
731 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm")
733 # Check if user has been initiating chats excessively
734 if process_rate_limits_and_check_abort(
735 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation
736 ):
737 context.abort_with_error_code(
738 grpc.StatusCode.RESOURCE_EXHAUSTED,
739 "chat_initiation_rate_limit",
740 substitutions={"hours": str(RATE_LIMIT_HOURS)},
741 )
743 group_chat = _create_chat(
744 session,
745 creator_id=context.user_id,
746 recipient_ids=request.recipient_user_ids,
747 title=request.title.value,
748 )
750 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id)
752 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
754 session.flush()
756 log_event(
757 context,
758 session,
759 "group_chat.created",
760 {
761 "group_chat_id": group_chat.conversation_id,
762 "is_dm": group_chat.is_dm,
763 "recipient_count": len(request.recipient_user_ids),
764 },
765 )
767 return conversations_pb2.GroupChat(
768 group_chat_id=group_chat.conversation_id,
769 title=group_chat.title,
770 member_user_ids=_get_visible_members_for_subscription(your_subscription),
771 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
772 only_admins_invite=group_chat.only_admins_invite,
773 is_dm=group_chat.is_dm,
774 created=Timestamp_from_datetime(group_chat.conversation.created),
775 mute_info=_mute_info(your_subscription),
776 can_message=True,
777 )
779 def SendMessage(
780 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session
781 ) -> empty_pb2.Empty:
782 if request.text == "": 782 ↛ 783line 782 didn't jump to line 783 because the condition on line 782 was never true
783 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
785 result = session.execute(
786 where_moderated_content_visible(
787 select(GroupChatSubscription, GroupChat)
788 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
789 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
790 .where(GroupChatSubscription.user_id == context.user_id)
791 .where(GroupChatSubscription.left == None),
792 context,
793 GroupChat,
794 is_list_operation=False,
795 )
796 ).one_or_none()
797 if not result:
798 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
800 subscription, group_chat = result._tuple()
801 if not _user_can_message(session, context, group_chat):
802 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat")
804 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
806 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
807 sent_messages_counter.labels(
808 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
809 ).inc()
810 log_event(
811 context,
812 session,
813 "message.sent",
814 {"group_chat_id": request.group_chat_id, "is_dm": subscription.group_chat.is_dm},
815 )
817 return empty_pb2.Empty()
819 def SendDirectMessage(
820 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session
821 ) -> conversations_pb2.SendDirectMessageRes:
822 user_id = context.user_id
823 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
825 recipient_id = request.recipient_user_id
827 if not has_completed_profile(session, user): 827 ↛ 828line 827 didn't jump to line 828 because the condition on line 827 was never true
828 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
830 if not recipient_id: 830 ↛ 831line 830 didn't jump to line 831 because the condition on line 830 was never true
831 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
833 recipient_user_id = session.execute(
834 select(User.id).where(users_visible(context)).where(User.id == recipient_id)
835 ).scalar_one_or_none()
837 if not recipient_user_id: 837 ↛ 838line 837 didn't jump to line 838 because the condition on line 837 was never true
838 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
840 if user_id == recipient_id: 840 ↛ 841line 840 didn't jump to line 841 because the condition on line 840 was never true
841 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
843 if request.text == "": 843 ↛ 844line 843 didn't jump to line 844 because the condition on line 843 was never true
844 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
846 # Look for an existing direct message (DM) chat between the two users
847 dm_chat_ids = (
848 select(GroupChatSubscription.group_chat_id)
849 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id]))
850 .group_by(GroupChatSubscription.group_chat_id)
851 .having(func.count(GroupChatSubscription.user_id) == 2)
852 )
854 chat = session.execute(
855 where_moderated_content_visible(
856 select(GroupChat)
857 .where(GroupChat.is_dm == True)
858 .where(GroupChat.conversation_id.in_(dm_chat_ids))
859 .limit(1),
860 context,
861 GroupChat,
862 is_list_operation=False,
863 )
864 ).scalar_one_or_none()
866 if not chat:
867 chat = _create_chat(session, user_id, [recipient_id])
869 # Retrieve the sender's active subscription to the chat
870 subscription = _get_message_subscription(session, user_id, chat.conversation_id)
872 # Add the message to the conversation
873 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
875 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one()
876 sent_messages_counter.labels(user_gender, "direct message").inc()
877 log_event(
878 context,
879 session,
880 "message.sent",
881 {"group_chat_id": chat.conversation_id, "is_dm": True, "recipient_id": recipient_id},
882 )
884 session.flush()
886 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id)
888 def EditGroupChat(
889 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session
890 ) -> empty_pb2.Empty:
891 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
893 if not subscription:
894 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
896 if subscription.role != GroupChatRole.admin:
897 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit")
899 if request.HasField("title"):
900 subscription.group_chat.title = request.title.value
902 if request.HasField("only_admins_invite"): 902 ↛ 905line 902 didn't jump to line 905 because the condition on line 902 was always true
903 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
905 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
907 return empty_pb2.Empty()
909 def MakeGroupChatAdmin(
910 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session
911 ) -> empty_pb2.Empty:
912 if not session.execute(
913 select(User).where(users_visible(context)).where(User.id == request.user_id)
914 ).scalar_one_or_none():
915 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
917 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
919 if not your_subscription: 919 ↛ 920line 919 didn't jump to line 920 because the condition on line 919 was never true
920 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
922 if your_subscription.role != GroupChatRole.admin:
923 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin")
925 if request.user_id == context.user_id: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true
926 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin")
928 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
930 if not their_subscription: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true
931 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
933 if their_subscription.role != GroupChatRole.participant:
934 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin")
936 their_subscription.role = GroupChatRole.admin
938 _add_message_to_subscription(
939 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
940 )
942 return empty_pb2.Empty()
944 def RemoveGroupChatAdmin(
945 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session
946 ) -> empty_pb2.Empty:
947 if not session.execute(
948 select(User).where(users_visible(context)).where(User.id == request.user_id)
949 ).scalar_one_or_none():
950 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
952 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
954 if not your_subscription: 954 ↛ 955line 954 didn't jump to line 955 because the condition on line 954 was never true
955 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
957 if request.user_id == context.user_id:
958 # Race condition!
959 other_admins_count = session.execute(
960 select(func.count())
961 .select_from(GroupChatSubscription)
962 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
963 .where(GroupChatSubscription.user_id != context.user_id)
964 .where(GroupChatSubscription.role == GroupChatRole.admin)
965 .where(GroupChatSubscription.left == None)
966 ).scalar_one()
967 if not other_admins_count > 0:
968 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin")
970 if your_subscription.role != GroupChatRole.admin:
971 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin")
973 their_subscription = session.execute(
974 select(GroupChatSubscription)
975 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
976 .where(GroupChatSubscription.user_id == request.user_id)
977 .where(GroupChatSubscription.left == None)
978 .where(GroupChatSubscription.role == GroupChatRole.admin)
979 ).scalar_one_or_none()
981 if not their_subscription: 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true
982 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin")
984 their_subscription.role = GroupChatRole.participant
986 _add_message_to_subscription(
987 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
988 )
990 return empty_pb2.Empty()
992 def InviteToGroupChat(
993 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session
994 ) -> empty_pb2.Empty:
995 if not session.execute(
996 select(User).where(users_visible(context)).where(User.id == request.user_id)
997 ).scalar_one_or_none():
998 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1000 result = session.execute(
1001 where_moderated_content_visible(
1002 select(GroupChatSubscription, GroupChat)
1003 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
1004 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1005 .where(GroupChatSubscription.user_id == context.user_id)
1006 .where(GroupChatSubscription.left == None),
1007 context,
1008 GroupChat,
1009 is_list_operation=False,
1010 )
1011 ).one_or_none()
1013 if not result:
1014 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1016 your_subscription, group_chat = result._tuple()
1018 if request.user_id == context.user_id: 1018 ↛ 1019line 1018 didn't jump to line 1019 because the condition on line 1018 was never true
1019 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self")
1021 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
1022 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied")
1024 if group_chat.is_dm:
1025 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm")
1027 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1029 if their_subscription: 1029 ↛ 1030line 1029 didn't jump to line 1030 because the condition on line 1029 was never true
1030 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat")
1032 # TODO: race condition!
1034 subscription = GroupChatSubscription(
1035 user_id=request.user_id,
1036 group_chat_id=your_subscription.group_chat.conversation_id,
1037 role=GroupChatRole.participant,
1038 )
1039 session.add(subscription)
1041 _add_message_to_subscription(
1042 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
1043 )
1045 return empty_pb2.Empty()
1047 def RemoveGroupChatUser(
1048 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session
1049 ) -> empty_pb2.Empty:
1050 """
1051 1. Get admin info and check it's correct
1052 2. Get user data, check it's correct and remove user
1053 """
1054 # Admin info
1055 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1057 # if user info is missing
1058 if not your_subscription: 1058 ↛ 1059line 1058 didn't jump to line 1059 because the condition on line 1058 was never true
1059 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1061 # if user not admin
1062 if your_subscription.role != GroupChatRole.admin: 1062 ↛ 1063line 1062 didn't jump to line 1063 because the condition on line 1062 was never true
1063 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user")
1065 # if user wants to remove themselves
1066 if request.user_id == context.user_id: 1066 ↛ 1067line 1066 didn't jump to line 1067 because the condition on line 1066 was never true
1067 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self")
1069 # get user info
1070 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1072 # user not found
1073 if not their_subscription:
1074 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
1076 _add_message_to_subscription(
1077 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
1078 )
1080 their_subscription.left = func.now()
1082 return empty_pb2.Empty()
1084 def LeaveGroupChat(
1085 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session
1086 ) -> empty_pb2.Empty:
1087 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1089 if not subscription:
1090 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1092 if subscription.role == GroupChatRole.admin:
1093 other_admins_count = session.execute(
1094 select(func.count())
1095 .select_from(GroupChatSubscription)
1096 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1097 .where(GroupChatSubscription.user_id != context.user_id)
1098 .where(GroupChatSubscription.role == GroupChatRole.admin)
1099 .where(GroupChatSubscription.left == None)
1100 ).scalar_one()
1101 participants_count = session.execute(
1102 select(func.count())
1103 .select_from(GroupChatSubscription)
1104 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1105 .where(GroupChatSubscription.user_id != context.user_id)
1106 .where(GroupChatSubscription.role == GroupChatRole.participant)
1107 .where(GroupChatSubscription.left == None)
1108 ).scalar_one()
1109 if not (other_admins_count > 0 or participants_count == 0):
1110 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave")
1112 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
1114 subscription.left = func.now()
1116 return empty_pb2.Empty()