Coverage for src/couchers/servicers/discussions.py: 90%
60 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-18 14:01 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-18 14:01 +0000
1import logging
3import grpc
5from couchers.context import make_background_user_context
6from couchers.db import can_moderate_node, session_scope
7from couchers.jobs.enqueue import queue_job
8from couchers.models import Cluster, Discussion, Thread, User
9from couchers.notifications.notify import notify
10from couchers.proto import discussions_pb2, discussions_pb2_grpc, notification_data_pb2
11from couchers.proto.internal import jobs_pb2
12from couchers.servicers.api import user_model_to_pb
13from couchers.servicers.blocking import is_not_visible
14from couchers.servicers.threads import thread_to_pb
15from couchers.sql import couchers_select as select
16from couchers.utils import Timestamp_from_datetime
18logger = logging.getLogger(__name__)
21def discussion_to_pb(session, discussion: Discussion, context):
22 owner_community_id = None
23 owner_group_id = None
24 if discussion.owner_cluster.is_official_cluster:
25 owner_community_id = discussion.owner_cluster.parent_node_id
26 else:
27 owner_group_id = discussion.owner_cluster.id
29 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id)
31 return discussions_pb2.Discussion(
32 discussion_id=discussion.id,
33 slug=discussion.slug,
34 created=Timestamp_from_datetime(discussion.created),
35 creator_user_id=discussion.creator_user_id,
36 owner_community_id=owner_community_id,
37 owner_group_id=owner_group_id,
38 owner_title=discussion.owner_cluster.name,
39 title=discussion.title,
40 content=discussion.content,
41 thread=thread_to_pb(session, discussion.thread_id),
42 can_moderate=can_moderate,
43 )
46def generate_create_discussion_notifications(payload: jobs_pb2.GenerateCreateDiscussionNotificationsPayload):
47 with session_scope() as session:
48 discussion = session.execute(select(Discussion).where(Discussion.id == payload.discussion_id)).scalar_one()
50 cluster = discussion.owner_cluster
52 if not cluster.is_official_cluster:
53 raise NotImplementedError("Shouldn't have discussions under groups, only communities")
55 for user in list(cluster.members.where(User.is_visible)):
56 if is_not_visible(session, user.id, discussion.creator_user_id):
57 continue
58 context = make_background_user_context(user_id=user.id)
59 notify(
60 session,
61 user_id=user.id,
62 topic_action="discussion:create",
63 key=payload.discussion_id,
64 data=notification_data_pb2.DiscussionCreate(
65 author=user_model_to_pb(discussion.creator_user, session, context),
66 discussion=discussion_to_pb(session, discussion, context),
67 ),
68 )
71class Discussions(discussions_pb2_grpc.DiscussionsServicer):
72 def CreateDiscussion(self, request, context, session):
73 if not request.title:
74 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_discussion_title")
75 if not request.content:
76 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_discussion_content")
77 if not request.owner_community_id and not request.owner_group_id:
78 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "group_or_community_not_found")
80 if request.WhichOneof("owner") == "owner_group_id":
81 cluster = session.execute(
82 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id)
83 ).scalar_one_or_none()
84 elif request.WhichOneof("owner") == "owner_community_id":
85 cluster = session.execute(
86 select(Cluster)
87 .where(Cluster.parent_node_id == request.owner_community_id)
88 .where(Cluster.is_official_cluster)
89 ).scalar_one_or_none()
91 if not cluster:
92 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_or_community_not_found")
94 if not cluster.discussions_enabled:
95 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cannot_create_discussion")
97 discussion = Discussion(
98 title=request.title,
99 content=request.content,
100 creator_user_id=context.user_id,
101 owner_cluster=cluster,
102 thread=Thread(),
103 )
104 session.add(discussion)
105 session.flush()
107 queue_job(
108 session,
109 job_type="generate_create_discussion_notifications",
110 payload=jobs_pb2.GenerateCreateDiscussionNotificationsPayload(
111 discussion_id=discussion.id,
112 ),
113 )
115 return discussion_to_pb(session, discussion, context)
117 def GetDiscussion(self, request, context, session):
118 discussion = session.execute(
119 select(Discussion).where(Discussion.id == request.discussion_id)
120 ).scalar_one_or_none()
121 if not discussion:
122 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found")
124 return discussion_to_pb(session, discussion, context)