Coverage for app / backend / src / couchers / servicers / conversations.py: 88%
315 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import logging
2from collections.abc import Sequence
3from datetime import timedelta
4from typing import Any, cast
6import grpc
7from google.protobuf import empty_pb2
8from sqlalchemy import select
9from sqlalchemy.orm import Session, contains_eager
10from sqlalchemy.sql import func, not_, or_
12from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
13from couchers.context import CouchersContext, make_background_user_context
14from couchers.db import session_scope
15from couchers.event_log import log_event
16from couchers.helpers.completed_profile import has_completed_profile
17from couchers.jobs.enqueue import queue_job
18from couchers.metrics import sent_messages_counter
19from couchers.models import (
20 Conversation,
21 GroupChat,
22 GroupChatRole,
23 GroupChatSubscription,
24 Message,
25 MessageType,
26 ModerationObjectType,
27 RateLimitAction,
28 User,
29)
30from couchers.models.notifications import NotificationTopicAction
31from couchers.moderation.utils import create_moderation
32from couchers.notifications.notify import notify
33from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
34from couchers.proto.internal import jobs_pb2
35from couchers.rate_limits.check import process_rate_limits_and_check_abort
36from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
37from couchers.servicers.api import user_model_to_pb
38from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
39from couchers.utils import Timestamp_from_datetime, now
41logger = logging.getLogger(__name__)
43# TODO: Still needs custom pagination: GetUpdates
44DEFAULT_PAGINATION_LENGTH = 20
45MAX_PAGE_SIZE = 50
48def _message_to_pb(message: Message) -> conversations_pb2.Message:
49 """
50 Turns the given message to a protocol buffer
51 """
52 if message.is_normal_message:
53 return conversations_pb2.Message(
54 message_id=message.id,
55 author_user_id=message.author_id,
56 time=Timestamp_from_datetime(message.time),
57 text=conversations_pb2.MessageContentText(text=message.text),
58 )
59 else:
60 return conversations_pb2.Message(
61 message_id=message.id,
62 author_user_id=message.author_id,
63 time=Timestamp_from_datetime(message.time),
64 chat_created=(
65 conversations_pb2.MessageContentChatCreated()
66 if message.message_type == MessageType.chat_created
67 else None
68 ),
69 chat_edited=(
70 conversations_pb2.MessageContentChatEdited()
71 if message.message_type == MessageType.chat_edited
72 else None
73 ),
74 user_invited=(
75 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
76 if message.message_type == MessageType.user_invited
77 else None
78 ),
79 user_left=(
80 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
81 ),
82 user_made_admin=(
83 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
84 if message.message_type == MessageType.user_made_admin
85 else None
86 ),
87 user_removed_admin=(
88 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
89 if message.message_type == MessageType.user_removed_admin
90 else None
91 ),
92 group_chat_user_removed=(
93 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
94 if message.message_type == MessageType.user_removed
95 else None
96 ),
97 )
100def _get_visible_members_for_subscription(subscription: GroupChatSubscription) -> list[int]:
101 """
102 If a user leaves a group chat, they shouldn't be able to see who's added
103 after they left
104 """
105 if not subscription.left:
106 # still in the chat, we see everyone with a current subscription
107 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
108 else:
109 # not in chat anymore, see everyone who was in chat when we left
110 return [
111 sub.user_id
112 for sub in subscription.group_chat.subscriptions.where(
113 GroupChatSubscription.joined <= subscription.left
114 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
115 ]
118def _get_visible_admins_for_subscription(subscription: GroupChatSubscription) -> list[int]:
119 """
120 If a user leaves a group chat, they shouldn't be able to see who's added
121 after they left
122 """
123 if not subscription.left:
124 # still in the chat, we see everyone with a current subscription
125 return [
126 sub.user_id
127 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
128 GroupChatSubscription.role == GroupChatRole.admin
129 )
130 ]
131 else:
132 # not in chat anymore, see everyone who was in chat when we left
133 return [
134 sub.user_id
135 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
136 .where(GroupChatSubscription.joined <= subscription.left)
137 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
138 ]
141def _user_can_message(session: Session, context: CouchersContext, group_chat: GroupChat) -> bool:
142 """
143 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant
144 - Is not deleted/banned
145 - Has not been blocked by the user or is blocking the user
146 - Has not left the chat
147 """
148 if not group_chat.is_dm:
149 return True
151 query = select(
152 where_users_column_visible(
153 select(GroupChatSubscription)
154 .where(GroupChatSubscription.user_id != context.user_id)
155 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
156 .where(GroupChatSubscription.left == None),
157 context=context,
158 column=GroupChatSubscription.user_id,
159 ).exists()
160 )
161 return session.execute(query).scalar_one()
164def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload) -> None:
165 """
166 Background job to generate notifications for a message sent to a group chat
167 """
168 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
170 with session_scope() as session:
171 message, group_chat = session.execute(
172 select(Message, GroupChat)
173 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
174 .where(Message.id == payload.message_id)
175 ).one()
177 if message.message_type != MessageType.text:
178 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
179 return
181 context = make_background_user_context(user_id=message.author_id)
182 user_ids_to_notify = (
183 session.execute(
184 where_users_column_visible(
185 select(GroupChatSubscription.user_id)
186 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
187 .where(GroupChatSubscription.user_id != message.author_id)
188 .where(GroupChatSubscription.joined <= message.time)
189 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
190 .where(not_(GroupChatSubscription.is_muted)),
191 context=context,
192 column=GroupChatSubscription.user_id,
193 )
194 )
195 .scalars()
196 .all()
197 )
199 if group_chat.is_dm:
200 msg = f"{message.author.name} sent you a message"
201 else:
202 msg = f"{message.author.name} sent a message in {group_chat.title}"
204 for user_id in user_ids_to_notify:
205 notify(
206 session,
207 user_id=user_id,
208 topic_action=NotificationTopicAction.chat__message,
209 key=str(message.conversation_id),
210 data=notification_data_pb2.ChatMessage(
211 author=user_model_to_pb(
212 message.author,
213 session,
214 make_background_user_context(user_id=user_id),
215 ),
216 message=msg,
217 text=message.text,
218 group_chat_id=message.conversation_id,
219 ),
220 moderation_state_id=group_chat.moderation_state_id,
221 )
224def _add_message_to_subscription(session: Session, subscription: GroupChatSubscription, **kwargs: Any) -> Message:
225 """
226 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
228 Specify the keyword args for Message
229 """
230 message = Message(conversation_id=subscription.group_chat.conversation.id, author_id=subscription.user_id, **kwargs)
232 session.add(message)
233 session.flush()
235 subscription.last_seen_message_id = message.id
237 queue_job(
238 session,
239 job=generate_message_notifications,
240 payload=jobs_pb2.GenerateMessageNotificationsPayload(
241 message_id=message.id,
242 ),
243 )
245 return message
248def _create_chat(
249 session: Session,
250 creator_id: int,
251 recipient_ids: Sequence[int],
252 title: str | None = None,
253 only_admins_invite: bool = True,
254) -> GroupChat:
255 conversation = Conversation()
256 session.add(conversation)
257 session.flush()
259 # Create moderation state for UMS (starts as SHADOWED)
260 moderation_state = create_moderation(
261 session=session,
262 object_type=ModerationObjectType.group_chat,
263 object_id=conversation.id,
264 creator_user_id=creator_id,
265 )
267 chat = GroupChat(
268 conversation_id=conversation.id,
269 title=title,
270 creator_id=creator_id,
271 is_dm=True if len(recipient_ids) == 1 else False,
272 only_admins_invite=only_admins_invite,
273 moderation_state_id=moderation_state.id,
274 )
275 session.add(chat)
276 session.flush()
278 creator_subscription = GroupChatSubscription(
279 user_id=creator_id,
280 group_chat_id=chat.conversation_id,
281 role=GroupChatRole.admin,
282 )
283 session.add(creator_subscription)
285 for uid in recipient_ids:
286 session.add(
287 GroupChatSubscription(
288 user_id=uid,
289 group_chat_id=chat.conversation_id,
290 role=GroupChatRole.participant,
291 )
292 )
294 return chat
297def _get_message_subscription(session: Session, user_id: int, conversation_id: int) -> GroupChatSubscription:
298 subscription = session.execute(
299 select(GroupChatSubscription)
300 .where(GroupChatSubscription.group_chat_id == conversation_id)
301 .where(GroupChatSubscription.user_id == user_id)
302 .where(GroupChatSubscription.left == None)
303 ).scalar_one_or_none()
305 return cast(GroupChatSubscription, subscription)
308def _get_visible_message_subscription(
309 session: Session, context: CouchersContext, conversation_id: int
310) -> GroupChatSubscription:
311 """Get subscription with visibility filtering"""
312 subscription = session.execute(
313 where_moderated_content_visible(
314 select(GroupChatSubscription)
315 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
316 .where(GroupChatSubscription.group_chat_id == conversation_id)
317 .where(GroupChatSubscription.user_id == context.user_id)
318 .where(GroupChatSubscription.left == None),
319 context,
320 GroupChat,
321 is_list_operation=False,
322 )
323 ).scalar_one_or_none()
325 return cast(GroupChatSubscription, subscription)
328def _unseen_message_count(session: Session, subscription_id: int) -> int:
329 query = (
330 select(func.count())
331 .select_from(Message)
332 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
333 .where(GroupChatSubscription.id == subscription_id)
334 .where(Message.id > GroupChatSubscription.last_seen_message_id)
335 )
336 return session.execute(query).scalar_one()
339def _mute_info(subscription: GroupChatSubscription) -> conversations_pb2.MuteInfo:
340 (muted, muted_until) = subscription.muted_display()
341 return conversations_pb2.MuteInfo(
342 muted=muted,
343 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
344 )
347class Conversations(conversations_pb2_grpc.ConversationsServicer):
348 def ListGroupChats(
349 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session
350 ) -> conversations_pb2.ListGroupChatsRes:
351 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
352 page_size = min(page_size, MAX_PAGE_SIZE)
354 # select group chats where you have a subscription, and for each of
355 # these, the latest message from them
357 t = (
358 select(
359 GroupChatSubscription.group_chat_id.label("group_chat_id"),
360 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
361 func.max(Message.id).label("message_id"),
362 )
363 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
364 .where(GroupChatSubscription.user_id == context.user_id)
365 .where(Message.time >= GroupChatSubscription.joined)
366 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
367 .where(
368 or_(
369 to_bool(request.HasField("only_archived") == False),
370 GroupChatSubscription.is_archived == request.only_archived,
371 )
372 )
373 .group_by(GroupChatSubscription.group_chat_id)
374 .order_by(func.max(Message.id).desc())
375 .subquery()
376 )
378 results = session.execute(
379 where_moderated_content_visible(
380 select(t, GroupChat, GroupChatSubscription, Message)
381 .join(Message, Message.id == t.c.message_id)
382 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
383 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
384 .join(Conversation, Conversation.id == GroupChat.conversation_id)
385 .options(contains_eager(GroupChat.conversation))
386 .where(or_(t.c.message_id < request.last_message_id, to_bool(request.last_message_id == 0)))
387 .order_by(t.c.message_id.desc())
388 .limit(page_size + 1),
389 context,
390 GroupChat,
391 is_list_operation=True,
392 )
393 ).all()
395 # Batch: unseen message counts in one query instead of N individual queries
396 subscription_ids = [r.GroupChatSubscription.id for r in results[:page_size]]
397 unseen_counts: dict[int, int] = dict(
398 session.execute( # type: ignore[arg-type]
399 select(GroupChatSubscription.id, func.count(Message.id))
400 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
401 .where(GroupChatSubscription.id.in_(subscription_ids))
402 .where(Message.id > GroupChatSubscription.last_seen_message_id)
403 .group_by(GroupChatSubscription.id)
404 ).all()
405 )
407 return conversations_pb2.ListGroupChatsRes(
408 group_chats=[
409 conversations_pb2.GroupChat(
410 group_chat_id=result.GroupChat.conversation_id,
411 title=result.GroupChat.title, # TODO: proper title for DMs, etc
412 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
413 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
414 only_admins_invite=result.GroupChat.only_admins_invite,
415 is_dm=result.GroupChat.is_dm,
416 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
417 unseen_message_count=unseen_counts.get(result.GroupChatSubscription.id, 0),
418 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
419 latest_message=_message_to_pb(result.Message) if result.Message else None,
420 mute_info=_mute_info(result.GroupChatSubscription),
421 can_message=_user_can_message(session, context, result.GroupChat),
422 is_archived=result.GroupChatSubscription.is_archived,
423 )
424 for result in results[:page_size]
425 ],
426 last_message_id=(
427 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
428 ), # TODO
429 no_more=len(results) <= page_size,
430 )
432 def GetGroupChat(
433 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session
434 ) -> conversations_pb2.GroupChat:
435 result = session.execute(
436 where_moderated_content_visible(
437 select(GroupChat, GroupChatSubscription, Message)
438 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
439 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
440 .join(Conversation, Conversation.id == GroupChat.conversation_id)
441 .options(contains_eager(GroupChat.conversation))
442 .where(GroupChatSubscription.user_id == context.user_id)
443 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
444 .where(Message.time >= GroupChatSubscription.joined)
445 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
446 .order_by(Message.id.desc())
447 .limit(1),
448 context,
449 GroupChat,
450 is_list_operation=False,
451 )
452 ).one_or_none()
454 if not result:
455 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
457 return conversations_pb2.GroupChat(
458 group_chat_id=result.GroupChat.conversation_id,
459 title=result.GroupChat.title,
460 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
461 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
462 only_admins_invite=result.GroupChat.only_admins_invite,
463 is_dm=result.GroupChat.is_dm,
464 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
465 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
466 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
467 latest_message=_message_to_pb(result.Message) if result.Message else None,
468 mute_info=_mute_info(result.GroupChatSubscription),
469 can_message=_user_can_message(session, context, result.GroupChat),
470 is_archived=result.GroupChatSubscription.is_archived,
471 )
473 def GetDirectMessage(
474 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session
475 ) -> conversations_pb2.GroupChat:
476 count = func.count(GroupChatSubscription.id).label("count")
477 subquery = (
478 select(GroupChatSubscription.group_chat_id)
479 .where(
480 or_(
481 GroupChatSubscription.user_id == context.user_id,
482 GroupChatSubscription.user_id == request.user_id,
483 )
484 )
485 .where(GroupChatSubscription.left == None)
486 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
487 .where(GroupChat.is_dm == True)
488 .group_by(GroupChatSubscription.group_chat_id)
489 .having(count == 2)
490 .subquery()
491 )
493 result = session.execute(
494 where_moderated_content_visible(
495 select(subquery, GroupChat, GroupChatSubscription, Message)
496 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
497 .join(Message, Message.conversation_id == GroupChat.conversation_id)
498 .join(Conversation, Conversation.id == GroupChat.conversation_id)
499 .options(contains_eager(GroupChat.conversation))
500 .where(GroupChatSubscription.user_id == context.user_id)
501 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
502 .where(Message.time >= GroupChatSubscription.joined)
503 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
504 .order_by(Message.id.desc())
505 .limit(1),
506 context,
507 GroupChat,
508 is_list_operation=False,
509 )
510 ).one_or_none()
512 if not result:
513 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
515 return conversations_pb2.GroupChat(
516 group_chat_id=result.GroupChat.conversation_id,
517 title=result.GroupChat.title,
518 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
519 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
520 only_admins_invite=result.GroupChat.only_admins_invite,
521 is_dm=result.GroupChat.is_dm,
522 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
523 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
524 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
525 latest_message=_message_to_pb(result.Message) if result.Message else None,
526 mute_info=_mute_info(result.GroupChatSubscription),
527 can_message=_user_can_message(session, context, result.GroupChat),
528 is_archived=result.GroupChatSubscription.is_archived,
529 )
531 def GetUpdates(
532 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session
533 ) -> conversations_pb2.GetUpdatesRes:
534 results = (
535 session.execute(
536 where_moderated_content_visible(
537 select(Message)
538 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
539 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
540 .where(GroupChatSubscription.user_id == context.user_id)
541 .where(Message.time >= GroupChatSubscription.joined)
542 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
543 .where(Message.id > request.newest_message_id)
544 .order_by(Message.id.asc())
545 .limit(DEFAULT_PAGINATION_LENGTH + 1),
546 context,
547 GroupChat,
548 is_list_operation=False,
549 )
550 )
551 .scalars()
552 .all()
553 )
555 return conversations_pb2.GetUpdatesRes(
556 updates=[
557 conversations_pb2.Update(
558 group_chat_id=message.conversation_id,
559 message=_message_to_pb(message),
560 )
561 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
562 ],
563 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
564 )
566 def GetGroupChatMessages(
567 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session
568 ) -> conversations_pb2.GetGroupChatMessagesRes:
569 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
570 page_size = min(page_size, MAX_PAGE_SIZE)
572 results = (
573 session.execute(
574 where_moderated_content_visible(
575 select(Message)
576 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
577 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
578 .where(GroupChatSubscription.user_id == context.user_id)
579 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
580 .where(Message.time >= GroupChatSubscription.joined)
581 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
582 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
583 .where(
584 or_(Message.id > GroupChatSubscription.last_seen_message_id, to_bool(request.only_unseen == 0))
585 )
586 .order_by(Message.id.desc())
587 .limit(page_size + 1),
588 context,
589 GroupChat,
590 is_list_operation=False,
591 )
592 )
593 .scalars()
594 .all()
595 )
597 return conversations_pb2.GetGroupChatMessagesRes(
598 messages=[_message_to_pb(message) for message in results[:page_size]],
599 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
600 no_more=len(results) <= page_size,
601 )
603 def MarkLastSeenGroupChat(
604 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session
605 ) -> empty_pb2.Empty:
606 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
608 if not subscription: 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true
609 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
611 if not subscription.last_seen_message_id <= request.last_seen_message_id: 611 ↛ 612line 611 didn't jump to line 612 because the condition on line 611 was never true
612 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
614 subscription.last_seen_message_id = request.last_seen_message_id
616 return empty_pb2.Empty()
618 def MuteGroupChat(
619 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session
620 ) -> empty_pb2.Empty:
621 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
623 if not subscription: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true
624 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
626 if request.unmute:
627 subscription.muted_until = DATETIME_MINUS_INFINITY
628 elif request.forever:
629 subscription.muted_until = DATETIME_INFINITY
630 elif request.for_duration: 630 ↛ 636line 630 didn't jump to line 636 because the condition on line 630 was always true
631 duration = request.for_duration.ToTimedelta()
632 if duration < timedelta(seconds=0): 632 ↛ 633line 632 didn't jump to line 633 because the condition on line 632 was never true
633 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past")
634 subscription.muted_until = now() + duration
636 return empty_pb2.Empty()
638 def SetGroupChatArchiveStatus(
639 self, request: conversations_pb2.SetGroupChatArchiveStatusReq, context: CouchersContext, session: Session
640 ) -> conversations_pb2.SetGroupChatArchiveStatusRes:
641 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
643 if not subscription:
644 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
646 subscription.is_archived = request.is_archived
648 return conversations_pb2.SetGroupChatArchiveStatusRes(
649 group_chat_id=request.group_chat_id,
650 is_archived=request.is_archived,
651 )
653 def SearchMessages(
654 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session
655 ) -> conversations_pb2.SearchMessagesRes:
656 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
657 page_size = min(page_size, MAX_PAGE_SIZE)
659 results = (
660 session.execute(
661 where_moderated_content_visible(
662 select(Message)
663 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
664 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
665 .where(GroupChatSubscription.user_id == context.user_id)
666 .where(Message.time >= GroupChatSubscription.joined)
667 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
668 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
669 .where(Message.text.ilike(f"%{request.query}%"))
670 .order_by(Message.id.desc())
671 .limit(page_size + 1),
672 context,
673 GroupChat,
674 is_list_operation=True,
675 )
676 )
677 .scalars()
678 .all()
679 )
681 return conversations_pb2.SearchMessagesRes(
682 results=[
683 conversations_pb2.MessageSearchResult(
684 group_chat_id=message.conversation_id,
685 message=_message_to_pb(message),
686 )
687 for message in results[:page_size]
688 ],
689 last_message_id=results[-2].id if len(results) > 1 else 0,
690 no_more=len(results) <= page_size,
691 )
693 def CreateGroupChat(
694 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session
695 ) -> conversations_pb2.GroupChat:
696 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
697 if not has_completed_profile(session, user):
698 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
700 recipient_user_ids = list(
701 session.execute(
702 select(User.id).where(users_visible(context)).where(User.id.in_(request.recipient_user_ids))
703 )
704 .scalars()
705 .all()
706 )
708 # make sure all requested users are visible
709 if len(recipient_user_ids) != len(request.recipient_user_ids): 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true
710 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
712 if not recipient_user_ids: 712 ↛ 713line 712 didn't jump to line 713 because the condition on line 712 was never true
713 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
715 if len(recipient_user_ids) != len(set(recipient_user_ids)): 715 ↛ 717line 715 didn't jump to line 717 because the condition on line 715 was never true
716 # make sure there's no duplicate users
717 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients")
719 if context.user_id in recipient_user_ids: 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true
720 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
722 if len(recipient_user_ids) == 1:
723 # can only have one DM at a time between any two users
724 other_user_id = recipient_user_ids[0]
726 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
727 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
728 # chat, you know they already have a shared group chat
729 count = func.count(GroupChatSubscription.id).label("count")
730 if session.execute(
731 where_moderated_content_visible(
732 select(count)
733 .where(
734 or_(
735 GroupChatSubscription.user_id == context.user_id,
736 GroupChatSubscription.user_id == other_user_id,
737 )
738 )
739 .where(GroupChatSubscription.left == None)
740 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
741 .where(GroupChat.is_dm == True)
742 .group_by(GroupChatSubscription.group_chat_id)
743 .having(count == 2),
744 context,
745 GroupChat,
746 is_list_operation=False,
747 )
748 ).scalar_one_or_none():
749 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm")
751 # Check if user has been initiating chats excessively
752 if process_rate_limits_and_check_abort(
753 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation
754 ):
755 context.abort_with_error_code(
756 grpc.StatusCode.RESOURCE_EXHAUSTED,
757 "chat_initiation_rate_limit2",
758 substitutions={"count": RATE_LIMIT_HOURS},
759 )
761 group_chat = _create_chat(
762 session,
763 creator_id=context.user_id,
764 recipient_ids=request.recipient_user_ids,
765 title=request.title.value,
766 )
768 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id)
770 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
772 session.flush()
774 log_event(
775 context,
776 session,
777 "group_chat.created",
778 {
779 "group_chat_id": group_chat.conversation_id,
780 "is_dm": group_chat.is_dm,
781 "recipient_count": len(request.recipient_user_ids),
782 },
783 )
785 return conversations_pb2.GroupChat(
786 group_chat_id=group_chat.conversation_id,
787 title=group_chat.title,
788 member_user_ids=_get_visible_members_for_subscription(your_subscription),
789 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
790 only_admins_invite=group_chat.only_admins_invite,
791 is_dm=group_chat.is_dm,
792 created=Timestamp_from_datetime(group_chat.conversation.created),
793 mute_info=_mute_info(your_subscription),
794 can_message=True,
795 )
797 def SendMessage(
798 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session
799 ) -> empty_pb2.Empty:
800 if request.text == "": 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true
801 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
803 result = session.execute(
804 where_moderated_content_visible(
805 select(GroupChatSubscription, GroupChat)
806 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
807 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
808 .where(GroupChatSubscription.user_id == context.user_id)
809 .where(GroupChatSubscription.left == None),
810 context,
811 GroupChat,
812 is_list_operation=False,
813 )
814 ).one_or_none()
815 if not result:
816 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
818 subscription, group_chat = result._tuple()
819 if not _user_can_message(session, context, group_chat):
820 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat")
822 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
824 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
825 sent_messages_counter.labels(
826 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
827 ).inc()
828 log_event(
829 context,
830 session,
831 "message.sent",
832 {"group_chat_id": request.group_chat_id, "is_dm": subscription.group_chat.is_dm},
833 )
835 return empty_pb2.Empty()
837 def SendDirectMessage(
838 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session
839 ) -> conversations_pb2.SendDirectMessageRes:
840 user_id = context.user_id
841 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
843 recipient_id = request.recipient_user_id
845 if not has_completed_profile(session, user): 845 ↛ 846line 845 didn't jump to line 846 because the condition on line 845 was never true
846 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
848 if not recipient_id: 848 ↛ 849line 848 didn't jump to line 849 because the condition on line 848 was never true
849 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
851 recipient_user_id = session.execute(
852 select(User.id).where(users_visible(context)).where(User.id == recipient_id)
853 ).scalar_one_or_none()
855 if not recipient_user_id: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true
856 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
858 if user_id == recipient_id: 858 ↛ 859line 858 didn't jump to line 859 because the condition on line 858 was never true
859 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
861 if request.text == "": 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true
862 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
864 # Look for an existing direct message (DM) chat between the two users
865 dm_chat_ids = (
866 select(GroupChatSubscription.group_chat_id)
867 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id]))
868 .group_by(GroupChatSubscription.group_chat_id)
869 .having(func.count(GroupChatSubscription.user_id) == 2)
870 )
872 chat = session.execute(
873 where_moderated_content_visible(
874 select(GroupChat)
875 .where(GroupChat.is_dm == True)
876 .where(GroupChat.conversation_id.in_(dm_chat_ids))
877 .limit(1),
878 context,
879 GroupChat,
880 is_list_operation=False,
881 )
882 ).scalar_one_or_none()
884 if not chat:
885 chat = _create_chat(session, user_id, [recipient_id])
887 # Retrieve the sender's active subscription to the chat
888 subscription = _get_message_subscription(session, user_id, chat.conversation_id)
890 # Add the message to the conversation
891 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
893 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one()
894 sent_messages_counter.labels(user_gender, "direct message").inc()
895 log_event(
896 context,
897 session,
898 "message.sent",
899 {"group_chat_id": chat.conversation_id, "is_dm": True, "recipient_id": recipient_id},
900 )
902 session.flush()
904 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id)
906 def EditGroupChat(
907 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session
908 ) -> empty_pb2.Empty:
909 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
911 if not subscription:
912 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
914 if subscription.role != GroupChatRole.admin:
915 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit")
917 if request.HasField("title"):
918 subscription.group_chat.title = request.title.value
920 if request.HasField("only_admins_invite"): 920 ↛ 923line 920 didn't jump to line 923 because the condition on line 920 was always true
921 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
923 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
925 return empty_pb2.Empty()
927 def MakeGroupChatAdmin(
928 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session
929 ) -> empty_pb2.Empty:
930 if not session.execute(
931 select(User).where(users_visible(context)).where(User.id == request.user_id)
932 ).scalar_one_or_none():
933 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
935 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
937 if not your_subscription: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true
938 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
940 if your_subscription.role != GroupChatRole.admin:
941 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin")
943 if request.user_id == context.user_id: 943 ↛ 944line 943 didn't jump to line 944 because the condition on line 943 was never true
944 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin")
946 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
948 if not their_subscription: 948 ↛ 949line 948 didn't jump to line 949 because the condition on line 948 was never true
949 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
951 if their_subscription.role != GroupChatRole.participant:
952 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin")
954 their_subscription.role = GroupChatRole.admin
956 _add_message_to_subscription(
957 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
958 )
960 return empty_pb2.Empty()
962 def RemoveGroupChatAdmin(
963 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session
964 ) -> empty_pb2.Empty:
965 if not session.execute(
966 select(User).where(users_visible(context)).where(User.id == request.user_id)
967 ).scalar_one_or_none():
968 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
970 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
972 if not your_subscription: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true
973 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
975 if request.user_id == context.user_id:
976 # Race condition!
977 other_admins_count = session.execute(
978 select(func.count())
979 .select_from(GroupChatSubscription)
980 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
981 .where(GroupChatSubscription.user_id != context.user_id)
982 .where(GroupChatSubscription.role == GroupChatRole.admin)
983 .where(GroupChatSubscription.left == None)
984 ).scalar_one()
985 if not other_admins_count > 0:
986 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin")
988 if your_subscription.role != GroupChatRole.admin:
989 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin")
991 their_subscription = session.execute(
992 select(GroupChatSubscription)
993 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
994 .where(GroupChatSubscription.user_id == request.user_id)
995 .where(GroupChatSubscription.left == None)
996 .where(GroupChatSubscription.role == GroupChatRole.admin)
997 ).scalar_one_or_none()
999 if not their_subscription: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true
1000 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin")
1002 their_subscription.role = GroupChatRole.participant
1004 _add_message_to_subscription(
1005 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
1006 )
1008 return empty_pb2.Empty()
1010 def InviteToGroupChat(
1011 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session
1012 ) -> empty_pb2.Empty:
1013 if not session.execute(
1014 select(User).where(users_visible(context)).where(User.id == request.user_id)
1015 ).scalar_one_or_none():
1016 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1018 result = session.execute(
1019 where_moderated_content_visible(
1020 select(GroupChatSubscription, GroupChat)
1021 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
1022 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1023 .where(GroupChatSubscription.user_id == context.user_id)
1024 .where(GroupChatSubscription.left == None),
1025 context,
1026 GroupChat,
1027 is_list_operation=False,
1028 )
1029 ).one_or_none()
1031 if not result:
1032 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1034 your_subscription, group_chat = result._tuple()
1036 if request.user_id == context.user_id: 1036 ↛ 1037line 1036 didn't jump to line 1037 because the condition on line 1036 was never true
1037 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self")
1039 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
1040 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied")
1042 if group_chat.is_dm:
1043 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm")
1045 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1047 if their_subscription: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the condition on line 1047 was never true
1048 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat")
1050 # TODO: race condition!
1052 subscription = GroupChatSubscription(
1053 user_id=request.user_id,
1054 group_chat_id=your_subscription.group_chat.conversation_id,
1055 role=GroupChatRole.participant,
1056 )
1057 session.add(subscription)
1059 _add_message_to_subscription(
1060 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
1061 )
1063 return empty_pb2.Empty()
1065 def RemoveGroupChatUser(
1066 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session
1067 ) -> empty_pb2.Empty:
1068 """
1069 1. Get admin info and check it's correct
1070 2. Get user data, check it's correct and remove user
1071 """
1072 # Admin info
1073 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1075 # if user info is missing
1076 if not your_subscription: 1076 ↛ 1077line 1076 didn't jump to line 1077 because the condition on line 1076 was never true
1077 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1079 # if user not admin
1080 if your_subscription.role != GroupChatRole.admin: 1080 ↛ 1081line 1080 didn't jump to line 1081 because the condition on line 1080 was never true
1081 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user")
1083 # if user wants to remove themselves
1084 if request.user_id == context.user_id: 1084 ↛ 1085line 1084 didn't jump to line 1085 because the condition on line 1084 was never true
1085 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self")
1087 # get user info
1088 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1090 # user not found
1091 if not their_subscription:
1092 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
1094 _add_message_to_subscription(
1095 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
1096 )
1098 their_subscription.left = func.now()
1100 return empty_pb2.Empty()
1102 def LeaveGroupChat(
1103 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session
1104 ) -> empty_pb2.Empty:
1105 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1107 if not subscription:
1108 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1110 if subscription.role == GroupChatRole.admin:
1111 other_admins_count = session.execute(
1112 select(func.count())
1113 .select_from(GroupChatSubscription)
1114 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1115 .where(GroupChatSubscription.user_id != context.user_id)
1116 .where(GroupChatSubscription.role == GroupChatRole.admin)
1117 .where(GroupChatSubscription.left == None)
1118 ).scalar_one()
1119 participants_count = session.execute(
1120 select(func.count())
1121 .select_from(GroupChatSubscription)
1122 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1123 .where(GroupChatSubscription.user_id != context.user_id)
1124 .where(GroupChatSubscription.role == GroupChatRole.participant)
1125 .where(GroupChatSubscription.left == None)
1126 ).scalar_one()
1127 if not (other_admins_count > 0 or participants_count == 0):
1128 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave")
1130 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
1132 subscription.left = func.now()
1134 return empty_pb2.Empty()