Coverage for src/couchers/servicers/discussions.py: 90%
61 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1import logging
3import grpc
5from couchers import errors
6from couchers.context import make_background_user_context
7from couchers.db import can_moderate_node, session_scope
8from couchers.jobs.enqueue import queue_job
9from couchers.models import Cluster, Discussion, Thread, User
10from couchers.notifications.notify import notify
11from couchers.servicers.api import user_model_to_pb
12from couchers.servicers.blocking import is_not_visible
13from couchers.servicers.threads import thread_to_pb
14from couchers.sql import couchers_select as select
15from couchers.utils import Timestamp_from_datetime
16from proto import discussions_pb2, discussions_pb2_grpc, notification_data_pb2
17from proto.internal import jobs_pb2
19logger = logging.getLogger(__name__)
22def discussion_to_pb(session, discussion: Discussion, context):
23 owner_community_id = None
24 owner_group_id = None
25 if discussion.owner_cluster.is_official_cluster:
26 owner_community_id = discussion.owner_cluster.parent_node_id
27 else:
28 owner_group_id = discussion.owner_cluster.id
30 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id)
32 return discussions_pb2.Discussion(
33 discussion_id=discussion.id,
34 slug=discussion.slug,
35 created=Timestamp_from_datetime(discussion.created),
36 creator_user_id=discussion.creator_user_id,
37 owner_community_id=owner_community_id,
38 owner_group_id=owner_group_id,
39 owner_title=discussion.owner_cluster.name,
40 title=discussion.title,
41 content=discussion.content,
42 thread=thread_to_pb(session, discussion.thread_id),
43 can_moderate=can_moderate,
44 )
47def generate_create_discussion_notifications(payload: jobs_pb2.GenerateCreateDiscussionNotificationsPayload):
48 with session_scope() as session:
49 discussion = session.execute(select(Discussion).where(Discussion.id == payload.discussion_id)).scalar_one()
51 cluster = discussion.owner_cluster
53 if not cluster.is_official_cluster:
54 raise NotImplementedError("Shouldn't have discussions under groups, only communities")
56 for user in list(cluster.members.where(User.is_visible)):
57 if is_not_visible(session, user.id, discussion.creator_user_id):
58 continue
59 context = make_background_user_context(user_id=user.id)
60 notify(
61 session,
62 user_id=user.id,
63 topic_action="discussion:create",
64 key=payload.discussion_id,
65 data=notification_data_pb2.DiscussionCreate(
66 author=user_model_to_pb(discussion.creator_user, session, context),
67 discussion=discussion_to_pb(session, discussion, context),
68 ),
69 )
72class Discussions(discussions_pb2_grpc.DiscussionsServicer):
73 def CreateDiscussion(self, request, context, session):
74 if not request.title:
75 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_TITLE)
76 if not request.content:
77 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_CONTENT)
78 if not request.owner_community_id and not request.owner_group_id:
79 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
81 if request.WhichOneof("owner") == "owner_group_id":
82 cluster = session.execute(
83 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id)
84 ).scalar_one_or_none()
85 elif request.WhichOneof("owner") == "owner_community_id":
86 cluster = session.execute(
87 select(Cluster)
88 .where(Cluster.parent_node_id == request.owner_community_id)
89 .where(Cluster.is_official_cluster)
90 ).scalar_one_or_none()
92 if not cluster:
93 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
95 if not cluster.discussions_enabled:
96 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_CREATE_DISCUSSION)
98 discussion = Discussion(
99 title=request.title,
100 content=request.content,
101 creator_user_id=context.user_id,
102 owner_cluster=cluster,
103 thread=Thread(),
104 )
105 session.add(discussion)
106 session.flush()
108 queue_job(
109 session,
110 job_type="generate_create_discussion_notifications",
111 payload=jobs_pb2.GenerateCreateDiscussionNotificationsPayload(
112 discussion_id=discussion.id,
113 ),
114 )
116 return discussion_to_pb(session, discussion, context)
118 def GetDiscussion(self, request, context, session):
119 discussion = session.execute(
120 select(Discussion).where(Discussion.id == request.discussion_id)
121 ).scalar_one_or_none()
122 if not discussion:
123 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND)
125 return discussion_to_pb(session, discussion, context)