Coverage for app/backend/src/couchers/servicers/conversations.py: 88%
315 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import logging
2from collections.abc import Sequence
3from datetime import timedelta
4from typing import Any, cast
6import grpc
7from google.protobuf import empty_pb2
8from sqlalchemy import select
9from sqlalchemy.orm import Session, contains_eager
10from sqlalchemy.sql import func, not_, or_
12from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
13from couchers.context import CouchersContext, make_background_user_context, make_notification_user_context
14from couchers.db import session_scope
15from couchers.event_log import log_event
16from couchers.helpers.completed_profile import has_completed_profile
17from couchers.jobs.enqueue import queue_job
18from couchers.metrics import sent_messages_counter
19from couchers.models import (
20 Conversation,
21 GroupChat,
22 GroupChatRole,
23 GroupChatSubscription,
24 Message,
25 MessageType,
26 ModerationObjectType,
27 RateLimitAction,
28 User,
29)
30from couchers.models.notifications import NotificationTopicAction
31from couchers.moderation.utils import create_moderation
32from couchers.notifications.notify import mark_notifications_seen, notify
33from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
34from couchers.proto.internal import jobs_pb2
35from couchers.rate_limits.check import process_rate_limits_and_check_abort
36from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
37from couchers.servicers.api import user_model_to_pb
38from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
39from couchers.utils import Timestamp_from_datetime, now
41logger = logging.getLogger(__name__)
43# TODO: Still needs custom pagination: GetUpdates
44DEFAULT_PAGINATION_LENGTH = 20
45MAX_PAGE_SIZE = 50
48def _message_to_pb(message: Message) -> conversations_pb2.Message:
49 """
50 Turns the given message to a protocol buffer
51 """
52 if message.is_normal_message:
53 return conversations_pb2.Message(
54 message_id=message.id,
55 author_user_id=message.author_id,
56 time=Timestamp_from_datetime(message.time),
57 text=conversations_pb2.MessageContentText(text=message.text),
58 )
59 else:
60 return conversations_pb2.Message(
61 message_id=message.id,
62 author_user_id=message.author_id,
63 time=Timestamp_from_datetime(message.time),
64 chat_created=(
65 conversations_pb2.MessageContentChatCreated()
66 if message.message_type == MessageType.chat_created
67 else None
68 ),
69 chat_edited=(
70 conversations_pb2.MessageContentChatEdited()
71 if message.message_type == MessageType.chat_edited
72 else None
73 ),
74 user_invited=(
75 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
76 if message.message_type == MessageType.user_invited
77 else None
78 ),
79 user_left=(
80 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
81 ),
82 user_made_admin=(
83 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
84 if message.message_type == MessageType.user_made_admin
85 else None
86 ),
87 user_removed_admin=(
88 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
89 if message.message_type == MessageType.user_removed_admin
90 else None
91 ),
92 group_chat_user_removed=(
93 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
94 if message.message_type == MessageType.user_removed
95 else None
96 ),
97 )
100def _get_visible_members_for_subscription(subscription: GroupChatSubscription) -> list[int]:
101 """
102 If a user leaves a group chat, they shouldn't be able to see who's added
103 after they left
104 """
105 if not subscription.left:
106 # still in the chat, we see everyone with a current subscription
107 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
108 else:
109 # not in chat anymore, see everyone who was in chat when we left
110 return [
111 sub.user_id
112 for sub in subscription.group_chat.subscriptions.where(
113 GroupChatSubscription.joined <= subscription.left
114 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
115 ]
118def _get_visible_admins_for_subscription(subscription: GroupChatSubscription) -> list[int]:
119 """
120 If a user leaves a group chat, they shouldn't be able to see who's added
121 after they left
122 """
123 if not subscription.left:
124 # still in the chat, we see everyone with a current subscription
125 return [
126 sub.user_id
127 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
128 GroupChatSubscription.role == GroupChatRole.admin
129 )
130 ]
131 else:
132 # not in chat anymore, see everyone who was in chat when we left
133 return [
134 sub.user_id
135 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
136 .where(GroupChatSubscription.joined <= subscription.left)
137 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
138 ]
141def _user_can_message(session: Session, context: CouchersContext, group_chat: GroupChat) -> bool:
142 """
143 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant
144 - Is not deleted/banned
145 - Has not been blocked by the user or is blocking the user
146 - Has not left the chat
147 """
148 if not group_chat.is_dm:
149 return True
151 query = select(
152 where_users_column_visible(
153 select(GroupChatSubscription)
154 .where(GroupChatSubscription.user_id != context.user_id)
155 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
156 .where(GroupChatSubscription.left == None),
157 context=context,
158 column=GroupChatSubscription.user_id,
159 ).exists()
160 )
161 return session.execute(query).scalar_one()
164def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload) -> None:
165 """
166 Background job to generate notifications for a message sent to a group chat
167 """
168 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
170 with session_scope() as session:
171 message, group_chat = session.execute(
172 select(Message, GroupChat)
173 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
174 .where(Message.id == payload.message_id)
175 ).one()
177 if message.message_type != MessageType.text:
178 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
179 return
181 context = make_background_user_context(user_id=message.author_id)
182 user_ids_to_notify = (
183 session.execute(
184 where_users_column_visible(
185 select(GroupChatSubscription.user_id)
186 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
187 .where(GroupChatSubscription.user_id != message.author_id)
188 .where(GroupChatSubscription.joined <= message.time)
189 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
190 .where(not_(GroupChatSubscription.is_muted)),
191 context=context,
192 column=GroupChatSubscription.user_id,
193 )
194 )
195 .scalars()
196 .all()
197 )
199 for user_id in user_ids_to_notify:
200 notify(
201 session,
202 user_id=user_id,
203 topic_action=NotificationTopicAction.chat__message,
204 key=str(message.conversation_id),
205 data=notification_data_pb2.ChatMessage(
206 author=user_model_to_pb(
207 message.author,
208 session,
209 make_notification_user_context(user_id=user_id),
210 ),
211 text=message.text,
212 group_chat_id=message.conversation_id,
213 group_chat_title=group_chat.title or None,
214 # unseen_count irrelevant for this notification
215 ),
216 moderation_state_id=group_chat.moderation_state_id,
217 )
220def _add_message_to_subscription(session: Session, subscription: GroupChatSubscription, **kwargs: Any) -> Message:
221 """
222 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
224 Specify the keyword args for Message
225 """
226 message = Message(conversation_id=subscription.group_chat.conversation.id, author_id=subscription.user_id, **kwargs)
228 session.add(message)
229 session.flush()
231 subscription.last_seen_message_id = message.id
233 queue_job(
234 session,
235 job=generate_message_notifications,
236 payload=jobs_pb2.GenerateMessageNotificationsPayload(
237 message_id=message.id,
238 ),
239 )
241 return message
244def _create_chat(
245 session: Session,
246 creator_id: int,
247 recipient_ids: Sequence[int],
248 title: str | None = None,
249 only_admins_invite: bool = True,
250) -> GroupChat:
251 conversation = Conversation()
252 session.add(conversation)
253 session.flush()
255 # Create moderation state for UMS (starts as SHADOWED)
256 moderation_state = create_moderation(
257 session=session,
258 object_type=ModerationObjectType.group_chat,
259 object_id=conversation.id,
260 creator_user_id=creator_id,
261 )
263 chat = GroupChat(
264 conversation_id=conversation.id,
265 title=title,
266 creator_id=creator_id,
267 is_dm=True if len(recipient_ids) == 1 else False,
268 only_admins_invite=only_admins_invite,
269 moderation_state_id=moderation_state.id,
270 )
271 session.add(chat)
272 session.flush()
274 creator_subscription = GroupChatSubscription(
275 user_id=creator_id,
276 group_chat_id=chat.conversation_id,
277 role=GroupChatRole.admin,
278 )
279 session.add(creator_subscription)
281 for uid in recipient_ids:
282 session.add(
283 GroupChatSubscription(
284 user_id=uid,
285 group_chat_id=chat.conversation_id,
286 role=GroupChatRole.participant,
287 )
288 )
290 return chat
293def _get_message_subscription(session: Session, user_id: int, conversation_id: int) -> GroupChatSubscription:
294 subscription = session.execute(
295 select(GroupChatSubscription)
296 .where(GroupChatSubscription.group_chat_id == conversation_id)
297 .where(GroupChatSubscription.user_id == user_id)
298 .where(GroupChatSubscription.left == None)
299 ).scalar_one_or_none()
301 return cast(GroupChatSubscription, subscription)
304def _get_visible_message_subscription(
305 session: Session, context: CouchersContext, conversation_id: int
306) -> GroupChatSubscription:
307 """Get subscription with visibility filtering"""
308 subscription = session.execute(
309 where_moderated_content_visible(
310 select(GroupChatSubscription)
311 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
312 .where(GroupChatSubscription.group_chat_id == conversation_id)
313 .where(GroupChatSubscription.user_id == context.user_id)
314 .where(GroupChatSubscription.left == None),
315 context,
316 GroupChat,
317 is_list_operation=False,
318 )
319 ).scalar_one_or_none()
321 return cast(GroupChatSubscription, subscription)
324def _unseen_message_count(session: Session, subscription_id: int) -> int:
325 query = (
326 select(func.count())
327 .select_from(Message)
328 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
329 .where(GroupChatSubscription.id == subscription_id)
330 .where(Message.id > GroupChatSubscription.last_seen_message_id)
331 )
332 return session.execute(query).scalar_one()
335def _mute_info(subscription: GroupChatSubscription) -> conversations_pb2.MuteInfo:
336 (muted, muted_until) = subscription.muted_display()
337 return conversations_pb2.MuteInfo(
338 muted=muted,
339 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
340 )
343class Conversations(conversations_pb2_grpc.ConversationsServicer):
344 def ListGroupChats(
345 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session
346 ) -> conversations_pb2.ListGroupChatsRes:
347 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
348 page_size = min(page_size, MAX_PAGE_SIZE)
350 # select group chats where you have a subscription, and for each of
351 # these, the latest message from them
353 t = (
354 select(
355 GroupChatSubscription.group_chat_id.label("group_chat_id"),
356 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
357 func.max(Message.id).label("message_id"),
358 )
359 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
360 .where(GroupChatSubscription.user_id == context.user_id)
361 .where(Message.time >= GroupChatSubscription.joined)
362 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
363 .where(
364 or_(
365 to_bool(request.HasField("only_archived") == False),
366 GroupChatSubscription.is_archived == request.only_archived,
367 )
368 )
369 .group_by(GroupChatSubscription.group_chat_id)
370 .order_by(func.max(Message.id).desc())
371 .subquery()
372 )
374 results = session.execute(
375 where_moderated_content_visible(
376 select(t, GroupChat, GroupChatSubscription, Message)
377 .join(Message, Message.id == t.c.message_id)
378 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
379 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
380 .join(Conversation, Conversation.id == GroupChat.conversation_id)
381 .options(contains_eager(GroupChat.conversation))
382 .where(or_(t.c.message_id < request.last_message_id, to_bool(request.last_message_id == 0)))
383 .order_by(t.c.message_id.desc())
384 .limit(page_size + 1),
385 context,
386 GroupChat,
387 is_list_operation=True,
388 )
389 ).all()
391 # Batch: unseen message counts in one query instead of N individual queries
392 subscription_ids = [r.GroupChatSubscription.id for r in results[:page_size]]
393 unseen_counts: dict[int, int] = dict(
394 session.execute( # type: ignore[arg-type]
395 select(GroupChatSubscription.id, func.count(Message.id))
396 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
397 .where(GroupChatSubscription.id.in_(subscription_ids))
398 .where(Message.id > GroupChatSubscription.last_seen_message_id)
399 .group_by(GroupChatSubscription.id)
400 ).all()
401 )
403 return conversations_pb2.ListGroupChatsRes(
404 group_chats=[
405 conversations_pb2.GroupChat(
406 group_chat_id=result.GroupChat.conversation_id,
407 title=result.GroupChat.title, # TODO: proper title for DMs, etc
408 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
409 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
410 only_admins_invite=result.GroupChat.only_admins_invite,
411 is_dm=result.GroupChat.is_dm,
412 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
413 unseen_message_count=unseen_counts.get(result.GroupChatSubscription.id, 0),
414 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
415 latest_message=_message_to_pb(result.Message) if result.Message else None,
416 mute_info=_mute_info(result.GroupChatSubscription),
417 can_message=_user_can_message(session, context, result.GroupChat),
418 is_archived=result.GroupChatSubscription.is_archived,
419 )
420 for result in results[:page_size]
421 ],
422 last_message_id=(
423 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
424 ), # TODO
425 no_more=len(results) <= page_size,
426 )
428 def GetGroupChat(
429 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session
430 ) -> conversations_pb2.GroupChat:
431 result = session.execute(
432 where_moderated_content_visible(
433 select(GroupChat, GroupChatSubscription, Message)
434 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
435 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
436 .join(Conversation, Conversation.id == GroupChat.conversation_id)
437 .options(contains_eager(GroupChat.conversation))
438 .where(GroupChatSubscription.user_id == context.user_id)
439 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
440 .where(Message.time >= GroupChatSubscription.joined)
441 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
442 .order_by(Message.id.desc())
443 .limit(1),
444 context,
445 GroupChat,
446 is_list_operation=False,
447 )
448 ).one_or_none()
450 if not result:
451 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
453 return conversations_pb2.GroupChat(
454 group_chat_id=result.GroupChat.conversation_id,
455 title=result.GroupChat.title,
456 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
457 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
458 only_admins_invite=result.GroupChat.only_admins_invite,
459 is_dm=result.GroupChat.is_dm,
460 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
461 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
462 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
463 latest_message=_message_to_pb(result.Message) if result.Message else None,
464 mute_info=_mute_info(result.GroupChatSubscription),
465 can_message=_user_can_message(session, context, result.GroupChat),
466 is_archived=result.GroupChatSubscription.is_archived,
467 )
469 def GetDirectMessage(
470 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session
471 ) -> conversations_pb2.GroupChat:
472 count = func.count(GroupChatSubscription.id).label("count")
473 subquery = (
474 select(GroupChatSubscription.group_chat_id)
475 .where(
476 or_(
477 GroupChatSubscription.user_id == context.user_id,
478 GroupChatSubscription.user_id == request.user_id,
479 )
480 )
481 .where(GroupChatSubscription.left == None)
482 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
483 .where(GroupChat.is_dm == True)
484 .group_by(GroupChatSubscription.group_chat_id)
485 .having(count == 2)
486 .subquery()
487 )
489 result = session.execute(
490 where_moderated_content_visible(
491 select(subquery, GroupChat, GroupChatSubscription, Message)
492 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
493 .join(Message, Message.conversation_id == GroupChat.conversation_id)
494 .join(Conversation, Conversation.id == GroupChat.conversation_id)
495 .options(contains_eager(GroupChat.conversation))
496 .where(GroupChatSubscription.user_id == context.user_id)
497 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
498 .where(Message.time >= GroupChatSubscription.joined)
499 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
500 .order_by(Message.id.desc())
501 .limit(1),
502 context,
503 GroupChat,
504 is_list_operation=False,
505 )
506 ).one_or_none()
508 if not result:
509 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
511 return conversations_pb2.GroupChat(
512 group_chat_id=result.GroupChat.conversation_id,
513 title=result.GroupChat.title,
514 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
515 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
516 only_admins_invite=result.GroupChat.only_admins_invite,
517 is_dm=result.GroupChat.is_dm,
518 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
519 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
520 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
521 latest_message=_message_to_pb(result.Message) if result.Message else None,
522 mute_info=_mute_info(result.GroupChatSubscription),
523 can_message=_user_can_message(session, context, result.GroupChat),
524 is_archived=result.GroupChatSubscription.is_archived,
525 )
527 def GetUpdates(
528 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session
529 ) -> conversations_pb2.GetUpdatesRes:
530 results = (
531 session.execute(
532 where_moderated_content_visible(
533 select(Message)
534 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
535 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
536 .where(GroupChatSubscription.user_id == context.user_id)
537 .where(Message.time >= GroupChatSubscription.joined)
538 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
539 .where(Message.id > request.newest_message_id)
540 .order_by(Message.id.asc())
541 .limit(DEFAULT_PAGINATION_LENGTH + 1),
542 context,
543 GroupChat,
544 is_list_operation=False,
545 )
546 )
547 .scalars()
548 .all()
549 )
551 return conversations_pb2.GetUpdatesRes(
552 updates=[
553 conversations_pb2.Update(
554 group_chat_id=message.conversation_id,
555 message=_message_to_pb(message),
556 )
557 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
558 ],
559 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
560 )
562 def GetGroupChatMessages(
563 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session
564 ) -> conversations_pb2.GetGroupChatMessagesRes:
565 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
566 page_size = min(page_size, MAX_PAGE_SIZE)
568 results = (
569 session.execute(
570 where_moderated_content_visible(
571 select(Message)
572 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
573 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
574 .where(GroupChatSubscription.user_id == context.user_id)
575 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
576 .where(Message.time >= GroupChatSubscription.joined)
577 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
578 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
579 .where(
580 or_(Message.id > GroupChatSubscription.last_seen_message_id, to_bool(request.only_unseen == 0))
581 )
582 .order_by(Message.id.desc())
583 .limit(page_size + 1),
584 context,
585 GroupChat,
586 is_list_operation=False,
587 )
588 )
589 .scalars()
590 .all()
591 )
593 return conversations_pb2.GetGroupChatMessagesRes(
594 messages=[_message_to_pb(message) for message in results[:page_size]],
595 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
596 no_more=len(results) <= page_size,
597 )
599 def MarkLastSeenGroupChat(
600 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session
601 ) -> empty_pb2.Empty:
602 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
604 if not subscription: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true
605 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
607 if not subscription.last_seen_message_id <= request.last_seen_message_id: 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true
608 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
610 subscription.last_seen_message_id = request.last_seen_message_id
612 mark_notifications_seen(
613 session,
614 user_id=context.user_id,
615 key=str(request.group_chat_id),
616 topic_actions=[
617 NotificationTopicAction.chat__message,
618 NotificationTopicAction.chat__missed_messages,
619 ],
620 )
622 return empty_pb2.Empty()
624 def MuteGroupChat(
625 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session
626 ) -> empty_pb2.Empty:
627 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
629 if not subscription: 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true
630 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
632 if request.unmute:
633 subscription.muted_until = DATETIME_MINUS_INFINITY
634 elif request.forever:
635 subscription.muted_until = DATETIME_INFINITY
636 elif request.for_duration: 636 ↛ 642line 636 didn't jump to line 642 because the condition on line 636 was always true
637 duration = request.for_duration.ToTimedelta()
638 if duration < timedelta(seconds=0): 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true
639 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past")
640 subscription.muted_until = now() + duration
642 return empty_pb2.Empty()
644 def SetGroupChatArchiveStatus(
645 self, request: conversations_pb2.SetGroupChatArchiveStatusReq, context: CouchersContext, session: Session
646 ) -> conversations_pb2.SetGroupChatArchiveStatusRes:
647 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
649 if not subscription:
650 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
652 subscription.is_archived = request.is_archived
654 return conversations_pb2.SetGroupChatArchiveStatusRes(
655 group_chat_id=request.group_chat_id,
656 is_archived=request.is_archived,
657 )
659 def SearchMessages(
660 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session
661 ) -> conversations_pb2.SearchMessagesRes:
662 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
663 page_size = min(page_size, MAX_PAGE_SIZE)
665 results = (
666 session.execute(
667 where_moderated_content_visible(
668 select(Message)
669 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
670 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
671 .where(GroupChatSubscription.user_id == context.user_id)
672 .where(Message.time >= GroupChatSubscription.joined)
673 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
674 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
675 .where(Message.text.ilike(f"%{request.query}%"))
676 .order_by(Message.id.desc())
677 .limit(page_size + 1),
678 context,
679 GroupChat,
680 is_list_operation=True,
681 )
682 )
683 .scalars()
684 .all()
685 )
687 return conversations_pb2.SearchMessagesRes(
688 results=[
689 conversations_pb2.MessageSearchResult(
690 group_chat_id=message.conversation_id,
691 message=_message_to_pb(message),
692 )
693 for message in results[:page_size]
694 ],
695 last_message_id=results[-2].id if len(results) > 1 else 0,
696 no_more=len(results) <= page_size,
697 )
699 def CreateGroupChat(
700 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session
701 ) -> conversations_pb2.GroupChat:
702 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
703 if not has_completed_profile(session, user):
704 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
706 recipient_user_ids = list(
707 session.execute(
708 select(User.id).where(users_visible(context)).where(User.id.in_(request.recipient_user_ids))
709 )
710 .scalars()
711 .all()
712 )
714 # make sure all requested users are visible
715 if len(recipient_user_ids) != len(request.recipient_user_ids): 715 ↛ 716line 715 didn't jump to line 716 because the condition on line 715 was never true
716 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
718 if not recipient_user_ids: 718 ↛ 719line 718 didn't jump to line 719 because the condition on line 718 was never true
719 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
721 if len(recipient_user_ids) != len(set(recipient_user_ids)): 721 ↛ 723line 721 didn't jump to line 723 because the condition on line 721 was never true
722 # make sure there's no duplicate users
723 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients")
725 if context.user_id in recipient_user_ids: 725 ↛ 726line 725 didn't jump to line 726 because the condition on line 725 was never true
726 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
728 if len(recipient_user_ids) == 1:
729 # can only have one DM at a time between any two users
730 other_user_id = recipient_user_ids[0]
732 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
733 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
734 # chat, you know they already have a shared group chat
735 count = func.count(GroupChatSubscription.id).label("count")
736 if session.execute(
737 where_moderated_content_visible(
738 select(count)
739 .where(
740 or_(
741 GroupChatSubscription.user_id == context.user_id,
742 GroupChatSubscription.user_id == other_user_id,
743 )
744 )
745 .where(GroupChatSubscription.left == None)
746 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
747 .where(GroupChat.is_dm == True)
748 .group_by(GroupChatSubscription.group_chat_id)
749 .having(count == 2),
750 context,
751 GroupChat,
752 is_list_operation=False,
753 )
754 ).scalar_one_or_none():
755 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm")
757 # Check if user has been initiating chats excessively
758 if process_rate_limits_and_check_abort(
759 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation
760 ):
761 context.abort_with_error_code(
762 grpc.StatusCode.RESOURCE_EXHAUSTED,
763 "chat_initiation_rate_limit2",
764 substitutions={"count": RATE_LIMIT_HOURS},
765 )
767 group_chat = _create_chat(
768 session,
769 creator_id=context.user_id,
770 recipient_ids=request.recipient_user_ids,
771 title=request.title.value,
772 )
774 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id)
776 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
778 session.flush()
780 log_event(
781 context,
782 session,
783 "group_chat.created",
784 {
785 "group_chat_id": group_chat.conversation_id,
786 "is_dm": group_chat.is_dm,
787 "recipient_count": len(request.recipient_user_ids),
788 },
789 )
791 return conversations_pb2.GroupChat(
792 group_chat_id=group_chat.conversation_id,
793 title=group_chat.title,
794 member_user_ids=_get_visible_members_for_subscription(your_subscription),
795 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
796 only_admins_invite=group_chat.only_admins_invite,
797 is_dm=group_chat.is_dm,
798 created=Timestamp_from_datetime(group_chat.conversation.created),
799 mute_info=_mute_info(your_subscription),
800 can_message=True,
801 )
803 def SendMessage(
804 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session
805 ) -> empty_pb2.Empty:
806 if request.text == "": 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true
807 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
809 result = session.execute(
810 where_moderated_content_visible(
811 select(GroupChatSubscription, GroupChat)
812 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
813 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
814 .where(GroupChatSubscription.user_id == context.user_id)
815 .where(GroupChatSubscription.left == None),
816 context,
817 GroupChat,
818 is_list_operation=False,
819 )
820 ).one_or_none()
821 if not result:
822 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
824 subscription, group_chat = result._tuple()
825 if not _user_can_message(session, context, group_chat):
826 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat")
828 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
830 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
831 sent_messages_counter.labels(
832 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
833 ).inc()
834 log_event(
835 context,
836 session,
837 "message.sent",
838 {"group_chat_id": request.group_chat_id, "is_dm": subscription.group_chat.is_dm},
839 )
841 return empty_pb2.Empty()
843 def SendDirectMessage(
844 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session
845 ) -> conversations_pb2.SendDirectMessageRes:
846 user_id = context.user_id
847 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
849 recipient_id = request.recipient_user_id
851 if not has_completed_profile(session, user): 851 ↛ 852line 851 didn't jump to line 852 because the condition on line 851 was never true
852 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
854 if not recipient_id: 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true
855 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
857 recipient_user_id = session.execute(
858 select(User.id).where(users_visible(context)).where(User.id == recipient_id)
859 ).scalar_one_or_none()
861 if not recipient_user_id: 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true
862 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
864 if user_id == recipient_id: 864 ↛ 865line 864 didn't jump to line 865 because the condition on line 864 was never true
865 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
867 if request.text == "": 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true
868 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
870 # Look for an existing direct message (DM) chat between the two users
871 dm_chat_ids = (
872 select(GroupChatSubscription.group_chat_id)
873 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id]))
874 .group_by(GroupChatSubscription.group_chat_id)
875 .having(func.count(GroupChatSubscription.user_id) == 2)
876 )
878 chat = session.execute(
879 where_moderated_content_visible(
880 select(GroupChat)
881 .where(GroupChat.is_dm == True)
882 .where(GroupChat.conversation_id.in_(dm_chat_ids))
883 .limit(1),
884 context,
885 GroupChat,
886 is_list_operation=False,
887 )
888 ).scalar_one_or_none()
890 if not chat:
891 if process_rate_limits_and_check_abort(
892 session=session, user_id=user_id, action=RateLimitAction.chat_initiation
893 ):
894 context.abort_with_error_code(
895 grpc.StatusCode.RESOURCE_EXHAUSTED,
896 "chat_initiation_rate_limit2",
897 substitutions={"count": RATE_LIMIT_HOURS},
898 )
899 chat = _create_chat(session, user_id, [recipient_id])
901 # Retrieve the sender's active subscription to the chat
902 subscription = _get_message_subscription(session, user_id, chat.conversation_id)
904 # Add the message to the conversation
905 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
907 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one()
908 sent_messages_counter.labels(user_gender, "direct message").inc()
909 log_event(
910 context,
911 session,
912 "message.sent",
913 {"group_chat_id": chat.conversation_id, "is_dm": True, "recipient_id": recipient_id},
914 )
916 session.flush()
918 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id)
920 def EditGroupChat(
921 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session
922 ) -> empty_pb2.Empty:
923 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
925 if not subscription:
926 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
928 if subscription.role != GroupChatRole.admin:
929 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit")
931 if request.HasField("title"):
932 subscription.group_chat.title = request.title.value
934 if request.HasField("only_admins_invite"): 934 ↛ 937line 934 didn't jump to line 937 because the condition on line 934 was always true
935 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
937 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
939 return empty_pb2.Empty()
941 def MakeGroupChatAdmin(
942 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session
943 ) -> empty_pb2.Empty:
944 if not session.execute(
945 select(User).where(users_visible(context)).where(User.id == request.user_id)
946 ).scalar_one_or_none():
947 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
949 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
951 if not your_subscription: 951 ↛ 952line 951 didn't jump to line 952 because the condition on line 951 was never true
952 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
954 if your_subscription.role != GroupChatRole.admin:
955 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin")
957 if request.user_id == context.user_id: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true
958 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin")
960 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
962 if not their_subscription: 962 ↛ 963line 962 didn't jump to line 963 because the condition on line 962 was never true
963 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
965 if their_subscription.role != GroupChatRole.participant:
966 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin")
968 their_subscription.role = GroupChatRole.admin
970 _add_message_to_subscription(
971 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
972 )
974 return empty_pb2.Empty()
976 def RemoveGroupChatAdmin(
977 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session
978 ) -> empty_pb2.Empty:
979 if not session.execute(
980 select(User).where(users_visible(context)).where(User.id == request.user_id)
981 ).scalar_one_or_none():
982 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
984 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
986 if not your_subscription: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true
987 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
989 if request.user_id == context.user_id:
990 # Race condition!
991 other_admins_count = session.execute(
992 select(func.count())
993 .select_from(GroupChatSubscription)
994 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
995 .where(GroupChatSubscription.user_id != context.user_id)
996 .where(GroupChatSubscription.role == GroupChatRole.admin)
997 .where(GroupChatSubscription.left == None)
998 ).scalar_one()
999 if not other_admins_count > 0:
1000 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin")
1002 if your_subscription.role != GroupChatRole.admin:
1003 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin")
1005 their_subscription = session.execute(
1006 select(GroupChatSubscription)
1007 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1008 .where(GroupChatSubscription.user_id == request.user_id)
1009 .where(GroupChatSubscription.left == None)
1010 .where(GroupChatSubscription.role == GroupChatRole.admin)
1011 ).scalar_one_or_none()
1013 if not their_subscription: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true
1014 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin")
1016 their_subscription.role = GroupChatRole.participant
1018 _add_message_to_subscription(
1019 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
1020 )
1022 return empty_pb2.Empty()
1024 def InviteToGroupChat(
1025 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session
1026 ) -> empty_pb2.Empty:
1027 if not session.execute(
1028 select(User).where(users_visible(context)).where(User.id == request.user_id)
1029 ).scalar_one_or_none():
1030 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1032 result = session.execute(
1033 where_moderated_content_visible(
1034 select(GroupChatSubscription, GroupChat)
1035 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
1036 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1037 .where(GroupChatSubscription.user_id == context.user_id)
1038 .where(GroupChatSubscription.left == None),
1039 context,
1040 GroupChat,
1041 is_list_operation=False,
1042 )
1043 ).one_or_none()
1045 if not result:
1046 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1048 your_subscription, group_chat = result._tuple()
1050 if request.user_id == context.user_id: 1050 ↛ 1051line 1050 didn't jump to line 1051 because the condition on line 1050 was never true
1051 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self")
1053 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
1054 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied")
1056 if group_chat.is_dm:
1057 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm")
1059 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1061 if their_subscription: 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true
1062 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat")
1064 # TODO: race condition!
1066 subscription = GroupChatSubscription(
1067 user_id=request.user_id,
1068 group_chat_id=your_subscription.group_chat.conversation_id,
1069 role=GroupChatRole.participant,
1070 )
1071 session.add(subscription)
1073 _add_message_to_subscription(
1074 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
1075 )
1077 return empty_pb2.Empty()
1079 def RemoveGroupChatUser(
1080 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session
1081 ) -> empty_pb2.Empty:
1082 """
1083 1. Get admin info and check it's correct
1084 2. Get user data, check it's correct and remove user
1085 """
1086 # Admin info
1087 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1089 # if user info is missing
1090 if not your_subscription: 1090 ↛ 1091line 1090 didn't jump to line 1091 because the condition on line 1090 was never true
1091 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1093 # if user not admin
1094 if your_subscription.role != GroupChatRole.admin: 1094 ↛ 1095line 1094 didn't jump to line 1095 because the condition on line 1094 was never true
1095 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user")
1097 # if user wants to remove themselves
1098 if request.user_id == context.user_id: 1098 ↛ 1099line 1098 didn't jump to line 1099 because the condition on line 1098 was never true
1099 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self")
1101 # get user info
1102 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1104 # user not found
1105 if not their_subscription:
1106 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
1108 _add_message_to_subscription(
1109 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
1110 )
1112 their_subscription.left = func.now()
1114 return empty_pb2.Empty()
1116 def LeaveGroupChat(
1117 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session
1118 ) -> empty_pb2.Empty:
1119 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1121 if not subscription:
1122 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1124 if subscription.role == GroupChatRole.admin:
1125 other_admins_count = session.execute(
1126 select(func.count())
1127 .select_from(GroupChatSubscription)
1128 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1129 .where(GroupChatSubscription.user_id != context.user_id)
1130 .where(GroupChatSubscription.role == GroupChatRole.admin)
1131 .where(GroupChatSubscription.left == None)
1132 ).scalar_one()
1133 participants_count = session.execute(
1134 select(func.count())
1135 .select_from(GroupChatSubscription)
1136 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1137 .where(GroupChatSubscription.user_id != context.user_id)
1138 .where(GroupChatSubscription.role == GroupChatRole.participant)
1139 .where(GroupChatSubscription.left == None)
1140 ).scalar_one()
1141 if not (other_admins_count > 0 or participants_count == 0):
1142 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave")
1144 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
1146 subscription.left = func.now()
1148 return empty_pb2.Empty()