Coverage for src/couchers/servicers/discussions.py: 92%
59 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-13 03:05 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-13 03:05 +0000
1import logging
2from types import SimpleNamespace
4import grpc
6from couchers import errors
7from couchers.db import can_moderate_node, session_scope
8from couchers.jobs.enqueue import queue_job
9from couchers.models import Cluster, Discussion, Thread, User
10from couchers.notifications.notify import notify
11from couchers.servicers.api import user_model_to_pb
12from couchers.servicers.blocking import are_blocked
13from couchers.servicers.threads import thread_to_pb
14from couchers.sql import couchers_select as select
15from couchers.utils import Timestamp_from_datetime
16from proto import discussions_pb2, discussions_pb2_grpc, notification_data_pb2
17from proto.internal import jobs_pb2
19logger = logging.getLogger(__name__)
22def discussion_to_pb(session, discussion: Discussion, context):
23 owner_community_id = None
24 owner_group_id = None
25 if discussion.owner_cluster.is_official_cluster:
26 owner_community_id = discussion.owner_cluster.parent_node_id
27 else:
28 owner_group_id = discussion.owner_cluster.id
30 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id)
32 return discussions_pb2.Discussion(
33 discussion_id=discussion.id,
34 slug=discussion.slug,
35 created=Timestamp_from_datetime(discussion.created),
36 creator_user_id=discussion.creator_user_id,
37 owner_community_id=owner_community_id,
38 owner_group_id=owner_group_id,
39 owner_title=discussion.owner_cluster.name,
40 title=discussion.title,
41 content=discussion.content,
42 thread=thread_to_pb(session, discussion.thread_id),
43 can_moderate=can_moderate,
44 )
47def generate_create_discussion_notifications(payload: jobs_pb2.GenerateCreateDiscussionNotificationsPayload):
48 with session_scope() as session:
49 discussion = session.execute(select(Discussion).where(Discussion.id == payload.discussion_id)).scalar_one()
51 cluster = discussion.owner_cluster
53 if not cluster.is_official_cluster:
54 raise NotImplementedError("Shouldn't have discussions under groups, only communities")
56 for user in list(cluster.members.where(User.is_visible)):
57 if are_blocked(session, user.id, discussion.creator_user_id):
58 continue
59 context = SimpleNamespace(user_id=user.id)
60 notify(
61 session,
62 user_id=user.id,
63 topic_action="discussion:create",
64 key=payload.discussion_id,
65 data=notification_data_pb2.DiscussionCreate(
66 author=user_model_to_pb(discussion.creator_user, session, context),
67 discussion=discussion_to_pb(session, discussion, context),
68 ),
69 )
72class Discussions(discussions_pb2_grpc.DiscussionsServicer):
73 def CreateDiscussion(self, request, context, session):
74 if not request.title:
75 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_TITLE)
76 if not request.content:
77 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_CONTENT)
78 if not request.owner_community_id and not request.owner_group_id:
79 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
81 if request.WhichOneof("owner") == "owner_group_id":
82 cluster = session.execute(
83 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id)
84 ).scalar_one_or_none()
85 elif request.WhichOneof("owner") == "owner_community_id":
86 cluster = session.execute(
87 select(Cluster)
88 .where(Cluster.parent_node_id == request.owner_community_id)
89 .where(Cluster.is_official_cluster)
90 ).scalar_one_or_none()
92 if not cluster:
93 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
95 discussion = Discussion(
96 title=request.title,
97 content=request.content,
98 creator_user_id=context.user_id,
99 owner_cluster=cluster,
100 thread=Thread(),
101 )
102 session.add(discussion)
103 session.flush()
105 queue_job(
106 session,
107 job_type="generate_create_discussion_notifications",
108 payload=jobs_pb2.GenerateCreateDiscussionNotificationsPayload(
109 discussion_id=discussion.id,
110 ),
111 )
113 return discussion_to_pb(session, discussion, context)
115 def GetDiscussion(self, request, context, session):
116 discussion = session.execute(
117 select(Discussion).where(Discussion.id == request.discussion_id)
118 ).scalar_one_or_none()
119 if not discussion:
120 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND)
122 return discussion_to_pb(session, discussion, context)