Coverage for src/couchers/servicers/discussions.py: 93%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import grpc
3from couchers import errors
4from couchers.db import can_moderate_node, session_scope
5from couchers.models import Cluster, Discussion, Thread
6from couchers.servicers.threads import thread_to_pb
7from couchers.sql import couchers_select as select
8from couchers.utils import Timestamp_from_datetime
9from proto import discussions_pb2, discussions_pb2_grpc
12def discussion_to_pb(discussion: Discussion, context):
13 owner_community_id = None
14 owner_group_id = None
15 if discussion.owner_cluster.is_official_cluster:
16 owner_community_id = discussion.owner_cluster.parent_node_id
17 else:
18 owner_group_id = discussion.owner_cluster.id
20 with session_scope() as session:
21 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id)
23 return discussions_pb2.Discussion(
24 discussion_id=discussion.id,
25 slug=discussion.slug,
26 created=Timestamp_from_datetime(discussion.created),
27 creator_user_id=discussion.creator_user_id,
28 owner_community_id=owner_community_id,
29 owner_group_id=owner_group_id,
30 title=discussion.title,
31 content=discussion.content,
32 thread=thread_to_pb(discussion.thread_id),
33 can_moderate=can_moderate,
34 )
37class Discussions(discussions_pb2_grpc.DiscussionsServicer):
38 def CreateDiscussion(self, request, context):
39 if not request.title:
40 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_TITLE)
41 if not request.content:
42 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_CONTENT)
43 if not request.owner_community_id and not request.owner_group_id:
44 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
46 with session_scope() as session:
47 if request.WhichOneof("owner") == "owner_group_id":
48 cluster = session.execute(
49 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id)
50 ).scalar_one_or_none()
51 elif request.WhichOneof("owner") == "owner_community_id":
52 cluster = session.execute(
53 select(Cluster)
54 .where(Cluster.parent_node_id == request.owner_community_id)
55 .where(Cluster.is_official_cluster)
56 ).scalar_one_or_none()
58 if not cluster:
59 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
61 discussion = Discussion(
62 title=request.title,
63 content=request.content,
64 creator_user_id=context.user_id,
65 owner_cluster=cluster,
66 thread=Thread(),
67 )
68 session.add(discussion)
69 session.commit()
70 return discussion_to_pb(discussion, context)
72 def GetDiscussion(self, request, context):
73 with session_scope() as session:
74 discussion = session.execute(
75 select(Discussion).where(Discussion.id == request.discussion_id)
76 ).scalar_one_or_none()
77 if not discussion:
78 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND)
80 return discussion_to_pb(discussion, context)