Coverage for src/couchers/servicers/discussions.py: 92%
39 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1import grpc
3from couchers import errors
4from couchers.db import can_moderate_node
5from couchers.models import Cluster, Discussion, Thread
6from couchers.servicers.threads import thread_to_pb
7from couchers.sql import couchers_select as select
8from couchers.utils import Timestamp_from_datetime
9from proto import discussions_pb2, discussions_pb2_grpc
12def discussion_to_pb(session, discussion: Discussion, context):
13 owner_community_id = None
14 owner_group_id = None
15 if discussion.owner_cluster.is_official_cluster:
16 owner_community_id = discussion.owner_cluster.parent_node_id
17 else:
18 owner_group_id = discussion.owner_cluster.id
20 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id)
22 return discussions_pb2.Discussion(
23 discussion_id=discussion.id,
24 slug=discussion.slug,
25 created=Timestamp_from_datetime(discussion.created),
26 creator_user_id=discussion.creator_user_id,
27 owner_community_id=owner_community_id,
28 owner_group_id=owner_group_id,
29 title=discussion.title,
30 content=discussion.content,
31 thread=thread_to_pb(session, discussion.thread_id),
32 can_moderate=can_moderate,
33 )
36class Discussions(discussions_pb2_grpc.DiscussionsServicer):
37 def CreateDiscussion(self, request, context, session):
38 if not request.title:
39 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_TITLE)
40 if not request.content:
41 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_CONTENT)
42 if not request.owner_community_id and not request.owner_group_id:
43 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
45 if request.WhichOneof("owner") == "owner_group_id":
46 cluster = session.execute(
47 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id)
48 ).scalar_one_or_none()
49 elif request.WhichOneof("owner") == "owner_community_id":
50 cluster = session.execute(
51 select(Cluster)
52 .where(Cluster.parent_node_id == request.owner_community_id)
53 .where(Cluster.is_official_cluster)
54 ).scalar_one_or_none()
56 if not cluster:
57 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
59 discussion = Discussion(
60 title=request.title,
61 content=request.content,
62 creator_user_id=context.user_id,
63 owner_cluster=cluster,
64 thread=Thread(),
65 )
66 session.add(discussion)
67 session.commit()
68 return discussion_to_pb(session, discussion, context)
70 def GetDiscussion(self, request, context, session):
71 discussion = session.execute(
72 select(Discussion).where(Discussion.id == request.discussion_id)
73 ).scalar_one_or_none()
74 if not discussion:
75 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND)
77 return discussion_to_pb(session, discussion, context)