Coverage for src / couchers / servicers / conversations.py: 88%
302 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-09 02:44 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-09 02:44 +0000
1import logging
2from collections.abc import Sequence
3from datetime import timedelta
4from typing import Any, cast
6import grpc
7from google.protobuf import empty_pb2
8from sqlalchemy import select
9from sqlalchemy.orm import Session
10from sqlalchemy.sql import func, not_, or_
12from couchers.constants import DATETIME_INFINITY, DATETIME_MINUS_INFINITY
13from couchers.context import CouchersContext, make_background_user_context
14from couchers.db import session_scope
15from couchers.jobs.enqueue import queue_job
16from couchers.metrics import sent_messages_counter
17from couchers.models import (
18 Conversation,
19 GroupChat,
20 GroupChatRole,
21 GroupChatSubscription,
22 Message,
23 MessageType,
24 ModerationObjectType,
25 RateLimitAction,
26 User,
27)
28from couchers.models.notifications import NotificationTopicAction
29from couchers.moderation.utils import create_moderation
30from couchers.notifications.notify import notify
31from couchers.proto import conversations_pb2, conversations_pb2_grpc, notification_data_pb2
32from couchers.proto.internal import jobs_pb2
33from couchers.rate_limits.check import process_rate_limits_and_check_abort
34from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
35from couchers.servicers.api import user_model_to_pb
36from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
37from couchers.utils import Timestamp_from_datetime, now
39logger = logging.getLogger(__name__)
41# TODO: Still needs custom pagination: GetUpdates
42DEFAULT_PAGINATION_LENGTH = 20
43MAX_PAGE_SIZE = 50
46def _message_to_pb(message: Message) -> conversations_pb2.Message:
47 """
48 Turns the given message to a protocol buffer
49 """
50 if message.is_normal_message:
51 return conversations_pb2.Message(
52 message_id=message.id,
53 author_user_id=message.author_id,
54 time=Timestamp_from_datetime(message.time),
55 text=conversations_pb2.MessageContentText(text=message.text),
56 )
57 else:
58 return conversations_pb2.Message(
59 message_id=message.id,
60 author_user_id=message.author_id,
61 time=Timestamp_from_datetime(message.time),
62 chat_created=(
63 conversations_pb2.MessageContentChatCreated()
64 if message.message_type == MessageType.chat_created
65 else None
66 ),
67 chat_edited=(
68 conversations_pb2.MessageContentChatEdited()
69 if message.message_type == MessageType.chat_edited
70 else None
71 ),
72 user_invited=(
73 conversations_pb2.MessageContentUserInvited(target_user_id=message.target_id)
74 if message.message_type == MessageType.user_invited
75 else None
76 ),
77 user_left=(
78 conversations_pb2.MessageContentUserLeft() if message.message_type == MessageType.user_left else None
79 ),
80 user_made_admin=(
81 conversations_pb2.MessageContentUserMadeAdmin(target_user_id=message.target_id)
82 if message.message_type == MessageType.user_made_admin
83 else None
84 ),
85 user_removed_admin=(
86 conversations_pb2.MessageContentUserRemovedAdmin(target_user_id=message.target_id)
87 if message.message_type == MessageType.user_removed_admin
88 else None
89 ),
90 group_chat_user_removed=(
91 conversations_pb2.MessageContentUserRemoved(target_user_id=message.target_id)
92 if message.message_type == MessageType.user_removed
93 else None
94 ),
95 )
98def _get_visible_members_for_subscription(subscription: GroupChatSubscription) -> list[int]:
99 """
100 If a user leaves a group chat, they shouldn't be able to see who's added
101 after they left
102 """
103 if not subscription.left:
104 # still in the chat, we see everyone with a current subscription
105 return [sub.user_id for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None)]
106 else:
107 # not in chat anymore, see everyone who was in chat when we left
108 return [
109 sub.user_id
110 for sub in subscription.group_chat.subscriptions.where(
111 GroupChatSubscription.joined <= subscription.left
112 ).where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
113 ]
116def _get_visible_admins_for_subscription(subscription: GroupChatSubscription) -> list[int]:
117 """
118 If a user leaves a group chat, they shouldn't be able to see who's added
119 after they left
120 """
121 if not subscription.left:
122 # still in the chat, we see everyone with a current subscription
123 return [
124 sub.user_id
125 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.left == None).where(
126 GroupChatSubscription.role == GroupChatRole.admin
127 )
128 ]
129 else:
130 # not in chat anymore, see everyone who was in chat when we left
131 return [
132 sub.user_id
133 for sub in subscription.group_chat.subscriptions.where(GroupChatSubscription.role == GroupChatRole.admin)
134 .where(GroupChatSubscription.joined <= subscription.left)
135 .where(or_(GroupChatSubscription.left >= subscription.left, GroupChatSubscription.left == None))
136 ]
139def _user_can_message(session: Session, context: CouchersContext, group_chat: GroupChat) -> bool:
140 """
141 If it is a true group chat (not a DM), user can always message. For a DM, user can message if the other participant
142 - Is not deleted/banned
143 - Has not been blocked by the user or is blocking the user
144 - Has not left the chat
145 """
146 if not group_chat.is_dm:
147 return True
149 query = select(
150 where_users_column_visible(
151 select(GroupChatSubscription)
152 .where(GroupChatSubscription.user_id != context.user_id)
153 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
154 .where(GroupChatSubscription.left == None),
155 context=context,
156 column=GroupChatSubscription.user_id,
157 ).exists()
158 )
159 return session.execute(query).scalar_one()
162def generate_message_notifications(payload: jobs_pb2.GenerateMessageNotificationsPayload) -> None:
163 """
164 Background job to generate notifications for a message sent to a group chat
165 """
166 logger.info(f"Fanning notifications for message_id = {payload.message_id}")
168 with session_scope() as session:
169 message, group_chat = session.execute(
170 select(Message, GroupChat)
171 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
172 .where(Message.id == payload.message_id)
173 ).one()
175 if message.message_type != MessageType.text:
176 logger.info(f"Not a text message, not notifying. message_id = {payload.message_id}")
177 return
179 context = make_background_user_context(user_id=message.author_id)
180 user_ids_to_notify = (
181 session.execute(
182 where_users_column_visible(
183 select(GroupChatSubscription.user_id)
184 .where(GroupChatSubscription.group_chat_id == message.conversation_id)
185 .where(GroupChatSubscription.user_id != message.author_id)
186 .where(GroupChatSubscription.joined <= message.time)
187 .where(or_(GroupChatSubscription.left == None, GroupChatSubscription.left >= message.time))
188 .where(not_(GroupChatSubscription.is_muted)),
189 context=context,
190 column=GroupChatSubscription.user_id,
191 )
192 )
193 .scalars()
194 .all()
195 )
197 if group_chat.is_dm:
198 msg = f"{message.author.name} sent you a message"
199 else:
200 msg = f"{message.author.name} sent a message in {group_chat.title}"
202 for user_id in user_ids_to_notify:
203 notify(
204 session,
205 user_id=user_id,
206 topic_action=NotificationTopicAction.chat__message,
207 key=str(message.conversation_id),
208 data=notification_data_pb2.ChatMessage(
209 author=user_model_to_pb(
210 message.author,
211 session,
212 make_background_user_context(user_id=user_id),
213 ),
214 message=msg,
215 text=message.text,
216 group_chat_id=message.conversation_id,
217 ),
218 moderation_state_id=group_chat.moderation_state_id,
219 )
222def _add_message_to_subscription(session: Session, subscription: GroupChatSubscription, **kwargs: Any) -> Message:
223 """
224 Creates a new message for a subscription, from the user whose subscription that is. Updates last seen message id
226 Specify the keyword args for Message
227 """
228 message = Message(conversation=subscription.group_chat.conversation, author_id=subscription.user_id, **kwargs)
230 session.add(message)
231 session.flush()
233 subscription.last_seen_message_id = message.id
235 queue_job(
236 session,
237 job=generate_message_notifications,
238 payload=jobs_pb2.GenerateMessageNotificationsPayload(
239 message_id=message.id,
240 ),
241 )
243 return message
246def _create_chat(
247 session: Session,
248 creator_id: int,
249 recipient_ids: Sequence[int],
250 title: str | None = None,
251 only_admins_invite: bool = True,
252) -> GroupChat:
253 conversation = Conversation()
254 session.add(conversation)
255 session.flush()
257 # Create moderation state for UMS (starts as SHADOWED)
258 moderation_state = create_moderation(
259 session=session,
260 object_type=ModerationObjectType.GROUP_CHAT,
261 object_id=conversation.id,
262 creator_user_id=creator_id,
263 )
265 chat = GroupChat(
266 conversation_id=conversation.id,
267 title=title,
268 creator_id=creator_id,
269 is_dm=True if len(recipient_ids) == 1 else False,
270 only_admins_invite=only_admins_invite,
271 moderation_state_id=moderation_state.id,
272 )
273 session.add(chat)
274 session.flush()
276 creator_subscription = GroupChatSubscription(
277 user_id=creator_id,
278 group_chat=chat,
279 role=GroupChatRole.admin,
280 )
281 session.add(creator_subscription)
283 for uid in recipient_ids:
284 session.add(
285 GroupChatSubscription(
286 user_id=uid,
287 group_chat=chat,
288 role=GroupChatRole.participant,
289 )
290 )
292 return chat
295def _get_message_subscription(session: Session, user_id: int, conversation_id: int) -> GroupChatSubscription:
296 subscription = session.execute(
297 select(GroupChatSubscription)
298 .where(GroupChatSubscription.group_chat_id == conversation_id)
299 .where(GroupChatSubscription.user_id == user_id)
300 .where(GroupChatSubscription.left == None)
301 ).scalar_one_or_none()
303 return cast(GroupChatSubscription, subscription)
306def _get_visible_message_subscription(
307 session: Session, context: CouchersContext, conversation_id: int
308) -> GroupChatSubscription:
309 """Get subscription with visibility filtering"""
310 subscription = session.execute(
311 where_moderated_content_visible(
312 select(GroupChatSubscription)
313 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
314 .where(GroupChatSubscription.group_chat_id == conversation_id)
315 .where(GroupChatSubscription.user_id == context.user_id)
316 .where(GroupChatSubscription.left == None),
317 context,
318 GroupChat,
319 is_list_operation=False,
320 )
321 ).scalar_one_or_none()
323 return cast(GroupChatSubscription, subscription)
326def _unseen_message_count(session: Session, subscription_id: int) -> int:
327 query = (
328 select(func.count())
329 .select_from(Message)
330 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
331 .where(GroupChatSubscription.id == subscription_id)
332 .where(Message.id > GroupChatSubscription.last_seen_message_id)
333 )
334 return session.execute(query).scalar_one()
337def _mute_info(subscription: GroupChatSubscription) -> conversations_pb2.MuteInfo:
338 (muted, muted_until) = subscription.muted_display()
339 return conversations_pb2.MuteInfo(
340 muted=muted,
341 muted_until=Timestamp_from_datetime(muted_until) if muted_until else None,
342 )
345class Conversations(conversations_pb2_grpc.ConversationsServicer):
346 def ListGroupChats(
347 self, request: conversations_pb2.ListGroupChatsReq, context: CouchersContext, session: Session
348 ) -> conversations_pb2.ListGroupChatsRes:
349 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
350 page_size = min(page_size, MAX_PAGE_SIZE)
352 # select group chats where you have a subscription, and for each of
353 # these, the latest message from them
355 t = (
356 select(
357 GroupChatSubscription.group_chat_id.label("group_chat_id"),
358 func.max(GroupChatSubscription.id).label("group_chat_subscriptions_id"),
359 func.max(Message.id).label("message_id"),
360 )
361 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
362 .where(GroupChatSubscription.user_id == context.user_id)
363 .where(Message.time >= GroupChatSubscription.joined)
364 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
365 .group_by(GroupChatSubscription.group_chat_id)
366 .order_by(func.max(Message.id).desc())
367 .subquery()
368 )
370 results = session.execute(
371 where_moderated_content_visible(
372 select(t, GroupChat, GroupChatSubscription, Message)
373 .join(Message, Message.id == t.c.message_id)
374 .join(GroupChatSubscription, GroupChatSubscription.id == t.c.group_chat_subscriptions_id)
375 .join(GroupChat, GroupChat.conversation_id == t.c.group_chat_id)
376 .where(or_(t.c.message_id < request.last_message_id, to_bool(request.last_message_id == 0)))
377 .order_by(t.c.message_id.desc())
378 .limit(page_size + 1),
379 context,
380 GroupChat,
381 is_list_operation=True,
382 )
383 ).all()
385 return conversations_pb2.ListGroupChatsRes(
386 group_chats=[
387 conversations_pb2.GroupChat(
388 group_chat_id=result.GroupChat.conversation_id,
389 title=result.GroupChat.title, # TODO: proper title for DMs, etc
390 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
391 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
392 only_admins_invite=result.GroupChat.only_admins_invite,
393 is_dm=result.GroupChat.is_dm,
394 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
395 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
396 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
397 latest_message=_message_to_pb(result.Message) if result.Message else None,
398 mute_info=_mute_info(result.GroupChatSubscription),
399 can_message=_user_can_message(session, context, result.GroupChat),
400 )
401 for result in results[:page_size]
402 ],
403 last_message_id=(
404 min(g.Message.id if g.Message else 1 for g in results[:page_size]) if len(results) > 0 else 0
405 ), # TODO
406 no_more=len(results) <= page_size,
407 )
409 def GetGroupChat(
410 self, request: conversations_pb2.GetGroupChatReq, context: CouchersContext, session: Session
411 ) -> conversations_pb2.GroupChat:
412 result = session.execute(
413 where_moderated_content_visible(
414 select(GroupChat, GroupChatSubscription, Message)
415 .join(Message, Message.conversation_id == GroupChatSubscription.group_chat_id)
416 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
417 .where(GroupChatSubscription.user_id == context.user_id)
418 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
419 .where(Message.time >= GroupChatSubscription.joined)
420 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
421 .order_by(Message.id.desc())
422 .limit(1),
423 context,
424 GroupChat,
425 is_list_operation=False,
426 )
427 ).one_or_none()
429 if not result:
430 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
432 return conversations_pb2.GroupChat(
433 group_chat_id=result.GroupChat.conversation_id,
434 title=result.GroupChat.title,
435 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
436 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
437 only_admins_invite=result.GroupChat.only_admins_invite,
438 is_dm=result.GroupChat.is_dm,
439 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
440 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
441 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
442 latest_message=_message_to_pb(result.Message) if result.Message else None,
443 mute_info=_mute_info(result.GroupChatSubscription),
444 can_message=_user_can_message(session, context, result.GroupChat),
445 )
447 def GetDirectMessage(
448 self, request: conversations_pb2.GetDirectMessageReq, context: CouchersContext, session: Session
449 ) -> conversations_pb2.GroupChat:
450 count = func.count(GroupChatSubscription.id).label("count")
451 subquery = (
452 select(GroupChatSubscription.group_chat_id)
453 .where(
454 or_(
455 GroupChatSubscription.user_id == context.user_id,
456 GroupChatSubscription.user_id == request.user_id,
457 )
458 )
459 .where(GroupChatSubscription.left == None)
460 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
461 .where(GroupChat.is_dm == True)
462 .group_by(GroupChatSubscription.group_chat_id)
463 .having(count == 2)
464 .subquery()
465 )
467 result = session.execute(
468 where_moderated_content_visible(
469 select(subquery, GroupChat, GroupChatSubscription, Message)
470 .join(subquery, subquery.c.group_chat_id == GroupChat.conversation_id)
471 .join(Message, Message.conversation_id == GroupChat.conversation_id)
472 .where(GroupChatSubscription.user_id == context.user_id)
473 .where(GroupChatSubscription.group_chat_id == GroupChat.conversation_id)
474 .where(Message.time >= GroupChatSubscription.joined)
475 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
476 .order_by(Message.id.desc())
477 .limit(1),
478 context,
479 GroupChat,
480 is_list_operation=False,
481 )
482 ).one_or_none()
484 if not result:
485 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
487 return conversations_pb2.GroupChat(
488 group_chat_id=result.GroupChat.conversation_id,
489 title=result.GroupChat.title,
490 member_user_ids=_get_visible_members_for_subscription(result.GroupChatSubscription),
491 admin_user_ids=_get_visible_admins_for_subscription(result.GroupChatSubscription),
492 only_admins_invite=result.GroupChat.only_admins_invite,
493 is_dm=result.GroupChat.is_dm,
494 created=Timestamp_from_datetime(result.GroupChat.conversation.created),
495 unseen_message_count=_unseen_message_count(session, result.GroupChatSubscription.id),
496 last_seen_message_id=result.GroupChatSubscription.last_seen_message_id,
497 latest_message=_message_to_pb(result.Message) if result.Message else None,
498 mute_info=_mute_info(result.GroupChatSubscription),
499 can_message=_user_can_message(session, context, result.GroupChat),
500 )
502 def GetUpdates(
503 self, request: conversations_pb2.GetUpdatesReq, context: CouchersContext, session: Session
504 ) -> conversations_pb2.GetUpdatesRes:
505 results = (
506 session.execute(
507 where_moderated_content_visible(
508 select(Message)
509 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
510 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
511 .where(GroupChatSubscription.user_id == context.user_id)
512 .where(Message.time >= GroupChatSubscription.joined)
513 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
514 .where(Message.id > request.newest_message_id)
515 .order_by(Message.id.asc())
516 .limit(DEFAULT_PAGINATION_LENGTH + 1),
517 context,
518 GroupChat,
519 is_list_operation=False,
520 )
521 )
522 .scalars()
523 .all()
524 )
526 return conversations_pb2.GetUpdatesRes(
527 updates=[
528 conversations_pb2.Update(
529 group_chat_id=message.conversation_id,
530 message=_message_to_pb(message),
531 )
532 for message in sorted(results, key=lambda message: message.id)[:DEFAULT_PAGINATION_LENGTH]
533 ],
534 no_more=len(results) <= DEFAULT_PAGINATION_LENGTH,
535 )
537 def GetGroupChatMessages(
538 self, request: conversations_pb2.GetGroupChatMessagesReq, context: CouchersContext, session: Session
539 ) -> conversations_pb2.GetGroupChatMessagesRes:
540 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
541 page_size = min(page_size, MAX_PAGE_SIZE)
543 results = (
544 session.execute(
545 where_moderated_content_visible(
546 select(Message)
547 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
548 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
549 .where(GroupChatSubscription.user_id == context.user_id)
550 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
551 .where(Message.time >= GroupChatSubscription.joined)
552 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
553 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
554 .where(
555 or_(Message.id > GroupChatSubscription.last_seen_message_id, to_bool(request.only_unseen == 0))
556 )
557 .order_by(Message.id.desc())
558 .limit(page_size + 1),
559 context,
560 GroupChat,
561 is_list_operation=False,
562 )
563 )
564 .scalars()
565 .all()
566 )
568 return conversations_pb2.GetGroupChatMessagesRes(
569 messages=[_message_to_pb(message) for message in results[:page_size]],
570 last_message_id=results[-2].id if len(results) > 1 else 0, # TODO
571 no_more=len(results) <= page_size,
572 )
574 def MarkLastSeenGroupChat(
575 self, request: conversations_pb2.MarkLastSeenGroupChatReq, context: CouchersContext, session: Session
576 ) -> empty_pb2.Empty:
577 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
579 if not subscription: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true
580 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
582 if not subscription.last_seen_message_id <= request.last_seen_message_id: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true
583 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
585 subscription.last_seen_message_id = request.last_seen_message_id
587 return empty_pb2.Empty()
589 def MuteGroupChat(
590 self, request: conversations_pb2.MuteGroupChatReq, context: CouchersContext, session: Session
591 ) -> empty_pb2.Empty:
592 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
594 if not subscription: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true
595 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
597 if request.unmute:
598 subscription.muted_until = DATETIME_MINUS_INFINITY
599 elif request.forever:
600 subscription.muted_until = DATETIME_INFINITY
601 elif request.for_duration: 601 ↛ 607line 601 didn't jump to line 607 because the condition on line 601 was always true
602 duration = request.for_duration.ToTimedelta()
603 if duration < timedelta(seconds=0): 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_mute_past")
605 subscription.muted_until = now() + duration
607 return empty_pb2.Empty()
609 def SearchMessages(
610 self, request: conversations_pb2.SearchMessagesReq, context: CouchersContext, session: Session
611 ) -> conversations_pb2.SearchMessagesRes:
612 page_size = request.number if request.number != 0 else DEFAULT_PAGINATION_LENGTH
613 page_size = min(page_size, MAX_PAGE_SIZE)
615 results = (
616 session.execute(
617 where_moderated_content_visible(
618 select(Message)
619 .join(GroupChatSubscription, GroupChatSubscription.group_chat_id == Message.conversation_id)
620 .join(GroupChat, GroupChat.conversation_id == Message.conversation_id)
621 .where(GroupChatSubscription.user_id == context.user_id)
622 .where(Message.time >= GroupChatSubscription.joined)
623 .where(or_(Message.time <= GroupChatSubscription.left, GroupChatSubscription.left == None))
624 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
625 .where(Message.text.ilike(f"%{request.query}%"))
626 .order_by(Message.id.desc())
627 .limit(page_size + 1),
628 context,
629 GroupChat,
630 is_list_operation=True,
631 )
632 )
633 .scalars()
634 .all()
635 )
637 return conversations_pb2.SearchMessagesRes(
638 results=[
639 conversations_pb2.MessageSearchResult(
640 group_chat_id=message.conversation_id,
641 message=_message_to_pb(message),
642 )
643 for message in results[:page_size]
644 ],
645 last_message_id=results[-2].id if len(results) > 1 else 0,
646 no_more=len(results) <= page_size,
647 )
649 def CreateGroupChat(
650 self, request: conversations_pb2.CreateGroupChatReq, context: CouchersContext, session: Session
651 ) -> conversations_pb2.GroupChat:
652 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
653 if not user.has_completed_profile:
654 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
656 recipient_user_ids = list(
657 session.execute(
658 select(User.id).where(users_visible(context)).where(User.id.in_(request.recipient_user_ids))
659 )
660 .scalars()
661 .all()
662 )
664 # make sure all requested users are visible
665 if len(recipient_user_ids) != len(request.recipient_user_ids): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
668 if not recipient_user_ids: 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true
669 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
671 if len(recipient_user_ids) != len(set(recipient_user_ids)): 671 ↛ 673line 671 didn't jump to line 673 because the condition on line 671 was never true
672 # make sure there's no duplicate users
673 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_recipients")
675 if context.user_id in recipient_user_ids: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true
676 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
678 if len(recipient_user_ids) == 1:
679 # can only have one DM at a time between any two users
680 other_user_id = recipient_user_ids[0]
682 # the following sql statement selects subscriptions that are DMs and have the same group_chat_id, and have
683 # user_id either this user or the recipient user. If you find two subscriptions to the same DM group
684 # chat, you know they already have a shared group chat
685 count = func.count(GroupChatSubscription.id).label("count")
686 if session.execute(
687 where_moderated_content_visible(
688 select(count)
689 .where(
690 or_(
691 GroupChatSubscription.user_id == context.user_id,
692 GroupChatSubscription.user_id == other_user_id,
693 )
694 )
695 .where(GroupChatSubscription.left == None)
696 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
697 .where(GroupChat.is_dm == True)
698 .group_by(GroupChatSubscription.group_chat_id)
699 .having(count == 2),
700 context,
701 GroupChat,
702 is_list_operation=False,
703 )
704 ).scalar_one_or_none():
705 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_have_dm")
707 # Check if user has been initiating chats excessively
708 if process_rate_limits_and_check_abort(
709 session=session, user_id=context.user_id, action=RateLimitAction.chat_initiation
710 ):
711 context.abort_with_error_code(
712 grpc.StatusCode.RESOURCE_EXHAUSTED,
713 "chat_initiation_rate_limit",
714 substitutions={"hours": str(RATE_LIMIT_HOURS)},
715 )
717 group_chat = _create_chat(
718 session,
719 creator_id=context.user_id,
720 recipient_ids=request.recipient_user_ids,
721 title=request.title.value,
722 )
724 your_subscription = _get_message_subscription(session, context.user_id, group_chat.conversation_id)
726 _add_message_to_subscription(session, your_subscription, message_type=MessageType.chat_created)
728 session.flush()
730 return conversations_pb2.GroupChat(
731 group_chat_id=group_chat.conversation_id,
732 title=group_chat.title,
733 member_user_ids=_get_visible_members_for_subscription(your_subscription),
734 admin_user_ids=_get_visible_admins_for_subscription(your_subscription),
735 only_admins_invite=group_chat.only_admins_invite,
736 is_dm=group_chat.is_dm,
737 created=Timestamp_from_datetime(group_chat.conversation.created),
738 mute_info=_mute_info(your_subscription),
739 can_message=True,
740 )
742 def SendMessage(
743 self, request: conversations_pb2.SendMessageReq, context: CouchersContext, session: Session
744 ) -> empty_pb2.Empty:
745 if request.text == "": 745 ↛ 746line 745 didn't jump to line 746 because the condition on line 745 was never true
746 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
748 result = session.execute(
749 where_moderated_content_visible(
750 select(GroupChatSubscription, GroupChat)
751 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
752 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
753 .where(GroupChatSubscription.user_id == context.user_id)
754 .where(GroupChatSubscription.left == None),
755 context,
756 GroupChat,
757 is_list_operation=False,
758 )
759 ).one_or_none()
760 if not result:
761 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
763 subscription, group_chat = result
764 if not _user_can_message(session, context, group_chat):
765 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_message_in_chat")
767 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
769 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
770 sent_messages_counter.labels(
771 user_gender, "direct message" if subscription.group_chat.is_dm else "group chat"
772 ).inc()
774 return empty_pb2.Empty()
776 def SendDirectMessage(
777 self, request: conversations_pb2.SendDirectMessageReq, context: CouchersContext, session: Session
778 ) -> conversations_pb2.SendDirectMessageRes:
779 user_id = context.user_id
780 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
782 recipient_id = request.recipient_user_id
784 if not user.has_completed_profile: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true
785 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_message")
787 if not recipient_id: 787 ↛ 788line 787 didn't jump to line 788 because the condition on line 787 was never true
788 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_recipients")
790 recipient_user_id = session.execute(
791 select(User.id).where(users_visible(context)).where(User.id == recipient_id)
792 ).scalar_one_or_none()
794 if not recipient_user_id: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true
795 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "user_not_found")
797 if user_id == recipient_id: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true
798 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_add_self")
800 if request.text == "": 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true
801 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
803 # Look for an existing direct message (DM) chat between the two users
804 dm_chat_ids = (
805 select(GroupChatSubscription.group_chat_id)
806 .where(GroupChatSubscription.user_id.in_([user_id, recipient_id]))
807 .group_by(GroupChatSubscription.group_chat_id)
808 .having(func.count(GroupChatSubscription.user_id) == 2)
809 )
811 chat = session.execute(
812 where_moderated_content_visible(
813 select(GroupChat)
814 .where(GroupChat.is_dm == True)
815 .where(GroupChat.conversation_id.in_(dm_chat_ids))
816 .limit(1),
817 context,
818 GroupChat,
819 is_list_operation=False,
820 )
821 ).scalar_one_or_none()
823 if not chat:
824 chat = _create_chat(session, user_id, [recipient_id])
826 # Retrieve the sender's active subscription to the chat
827 subscription = _get_message_subscription(session, user_id, chat.conversation_id)
829 # Add the message to the conversation
830 _add_message_to_subscription(session, subscription, message_type=MessageType.text, text=request.text)
832 user_gender = session.execute(select(User.gender).where(User.id == user_id)).scalar_one()
833 sent_messages_counter.labels(user_gender, "direct message").inc()
835 session.flush()
837 return conversations_pb2.SendDirectMessageRes(group_chat_id=chat.conversation_id)
839 def EditGroupChat(
840 self, request: conversations_pb2.EditGroupChatReq, context: CouchersContext, session: Session
841 ) -> empty_pb2.Empty:
842 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
844 if not subscription:
845 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
847 if subscription.role != GroupChatRole.admin:
848 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_edit")
850 if request.HasField("title"):
851 subscription.group_chat.title = request.title.value
853 if request.HasField("only_admins_invite"): 853 ↛ 856line 853 didn't jump to line 856 because the condition on line 853 was always true
854 subscription.group_chat.only_admins_invite = request.only_admins_invite.value
856 _add_message_to_subscription(session, subscription, message_type=MessageType.chat_edited)
858 return empty_pb2.Empty()
860 def MakeGroupChatAdmin(
861 self, request: conversations_pb2.MakeGroupChatAdminReq, context: CouchersContext, session: Session
862 ) -> empty_pb2.Empty:
863 if not session.execute(
864 select(User).where(users_visible(context)).where(User.id == request.user_id)
865 ).scalar_one_or_none():
866 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
868 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
870 if not your_subscription: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true
871 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
873 if your_subscription.role != GroupChatRole.admin:
874 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_make_admin")
876 if request.user_id == context.user_id: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true
877 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_make_self_admin")
879 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
881 if not their_subscription: 881 ↛ 882line 881 didn't jump to line 882 because the condition on line 881 was never true
882 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
884 if their_subscription.role != GroupChatRole.participant:
885 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_admin")
887 their_subscription.role = GroupChatRole.admin
889 _add_message_to_subscription(
890 session, your_subscription, message_type=MessageType.user_made_admin, target_id=request.user_id
891 )
893 return empty_pb2.Empty()
895 def RemoveGroupChatAdmin(
896 self, request: conversations_pb2.RemoveGroupChatAdminReq, context: CouchersContext, session: Session
897 ) -> empty_pb2.Empty:
898 if not session.execute(
899 select(User).where(users_visible(context)).where(User.id == request.user_id)
900 ).scalar_one_or_none():
901 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
903 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
905 if not your_subscription: 905 ↛ 906line 905 didn't jump to line 906 because the condition on line 905 was never true
906 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
908 if request.user_id == context.user_id:
909 # Race condition!
910 other_admins_count = session.execute(
911 select(func.count())
912 .select_from(GroupChatSubscription)
913 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
914 .where(GroupChatSubscription.user_id != context.user_id)
915 .where(GroupChatSubscription.role == GroupChatRole.admin)
916 .where(GroupChatSubscription.left == None)
917 ).scalar_one()
918 if not other_admins_count > 0:
919 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_last_admin")
921 if your_subscription.role != GroupChatRole.admin:
922 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_admin")
924 their_subscription = session.execute(
925 select(GroupChatSubscription)
926 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
927 .where(GroupChatSubscription.user_id == request.user_id)
928 .where(GroupChatSubscription.left == None)
929 .where(GroupChatSubscription.role == GroupChatRole.admin)
930 ).scalar_one_or_none()
932 if not their_subscription: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true
933 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin")
935 their_subscription.role = GroupChatRole.participant
937 _add_message_to_subscription(
938 session, your_subscription, message_type=MessageType.user_removed_admin, target_id=request.user_id
939 )
941 return empty_pb2.Empty()
943 def InviteToGroupChat(
944 self, request: conversations_pb2.InviteToGroupChatReq, context: CouchersContext, session: Session
945 ) -> empty_pb2.Empty:
946 if not session.execute(
947 select(User).where(users_visible(context)).where(User.id == request.user_id)
948 ).scalar_one_or_none():
949 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
951 result = session.execute(
952 where_moderated_content_visible(
953 select(GroupChatSubscription, GroupChat)
954 .join(GroupChat, GroupChat.conversation_id == GroupChatSubscription.group_chat_id)
955 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
956 .where(GroupChatSubscription.user_id == context.user_id)
957 .where(GroupChatSubscription.left == None),
958 context,
959 GroupChat,
960 is_list_operation=False,
961 )
962 ).one_or_none()
964 if not result:
965 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
967 your_subscription, group_chat = result
969 if request.user_id == context.user_id: 969 ↛ 970line 969 didn't jump to line 970 because the condition on line 969 was never true
970 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_self")
972 if your_subscription.role != GroupChatRole.admin and your_subscription.group_chat.only_admins_invite:
973 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invite_permission_denied")
975 if group_chat.is_dm:
976 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_invite_to_dm")
978 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
980 if their_subscription: 980 ↛ 981line 980 didn't jump to line 981 because the condition on line 980 was never true
981 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_chat")
983 # TODO: race condition!
985 subscription = GroupChatSubscription(
986 user_id=request.user_id,
987 group_chat=your_subscription.group_chat,
988 role=GroupChatRole.participant,
989 )
990 session.add(subscription)
992 _add_message_to_subscription(
993 session, your_subscription, message_type=MessageType.user_invited, target_id=request.user_id
994 )
996 return empty_pb2.Empty()
998 def RemoveGroupChatUser(
999 self, request: conversations_pb2.RemoveGroupChatUserReq, context: CouchersContext, session: Session
1000 ) -> empty_pb2.Empty:
1001 """
1002 1. Get admin info and check it's correct
1003 2. Get user data, check it's correct and remove user
1004 """
1005 # Admin info
1006 your_subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1008 # if user info is missing
1009 if not your_subscription: 1009 ↛ 1010line 1009 didn't jump to line 1010 because the condition on line 1009 was never true
1010 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1012 # if user not admin
1013 if your_subscription.role != GroupChatRole.admin: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true
1014 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "only_admin_can_remove_user")
1016 # if user wants to remove themselves
1017 if request.user_id == context.user_id: 1017 ↛ 1018line 1017 didn't jump to line 1018 because the condition on line 1017 was never true
1018 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_remove_self")
1020 # get user info
1021 their_subscription = _get_message_subscription(session, request.user_id, request.group_chat_id)
1023 # user not found
1024 if not their_subscription:
1025 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_chat")
1027 _add_message_to_subscription(
1028 session, your_subscription, message_type=MessageType.user_removed, target_id=request.user_id
1029 )
1031 their_subscription.left = func.now()
1033 return empty_pb2.Empty()
1035 def LeaveGroupChat(
1036 self, request: conversations_pb2.LeaveGroupChatReq, context: CouchersContext, session: Session
1037 ) -> empty_pb2.Empty:
1038 subscription = _get_visible_message_subscription(session, context, request.group_chat_id)
1040 if not subscription:
1041 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "chat_not_found")
1043 if subscription.role == GroupChatRole.admin:
1044 other_admins_count = session.execute(
1045 select(func.count())
1046 .select_from(GroupChatSubscription)
1047 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1048 .where(GroupChatSubscription.user_id != context.user_id)
1049 .where(GroupChatSubscription.role == GroupChatRole.admin)
1050 .where(GroupChatSubscription.left == None)
1051 ).scalar_one()
1052 participants_count = session.execute(
1053 select(func.count())
1054 .select_from(GroupChatSubscription)
1055 .where(GroupChatSubscription.group_chat_id == request.group_chat_id)
1056 .where(GroupChatSubscription.user_id != context.user_id)
1057 .where(GroupChatSubscription.role == GroupChatRole.participant)
1058 .where(GroupChatSubscription.left == None)
1059 ).scalar_one()
1060 if not (other_admins_count > 0 or participants_count == 0):
1061 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "last_admin_cant_leave")
1063 _add_message_to_subscription(session, subscription, message_type=MessageType.user_left)
1065 subscription.left = func.now()
1067 return empty_pb2.Empty()