Coverage for src/couchers/servicers/discussions.py: 91%

58 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 02:39 +0000

1import logging 

2 

3import grpc 

4 

5from couchers import errors 

6from couchers.db import can_moderate_node, session_scope 

7from couchers.jobs.enqueue import queue_job 

8from couchers.models import Cluster, Discussion, Thread, User 

9from couchers.notifications.notify import notify 

10from couchers.servicers.api import user_model_to_pb 

11from couchers.servicers.blocking import are_blocked 

12from couchers.servicers.threads import thread_to_pb 

13from couchers.sql import couchers_select as select 

14from couchers.utils import Timestamp_from_datetime, make_user_context 

15from proto import discussions_pb2, discussions_pb2_grpc, notification_data_pb2 

16from proto.internal import jobs_pb2 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21def discussion_to_pb(session, discussion: Discussion, context): 

22 owner_community_id = None 

23 owner_group_id = None 

24 if discussion.owner_cluster.is_official_cluster: 

25 owner_community_id = discussion.owner_cluster.parent_node_id 

26 else: 

27 owner_group_id = discussion.owner_cluster.id 

28 

29 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id) 

30 

31 return discussions_pb2.Discussion( 

32 discussion_id=discussion.id, 

33 slug=discussion.slug, 

34 created=Timestamp_from_datetime(discussion.created), 

35 creator_user_id=discussion.creator_user_id, 

36 owner_community_id=owner_community_id, 

37 owner_group_id=owner_group_id, 

38 owner_title=discussion.owner_cluster.name, 

39 title=discussion.title, 

40 content=discussion.content, 

41 thread=thread_to_pb(session, discussion.thread_id), 

42 can_moderate=can_moderate, 

43 ) 

44 

45 

46def generate_create_discussion_notifications(payload: jobs_pb2.GenerateCreateDiscussionNotificationsPayload): 

47 with session_scope() as session: 

48 discussion = session.execute(select(Discussion).where(Discussion.id == payload.discussion_id)).scalar_one() 

49 

50 cluster = discussion.owner_cluster 

51 

52 if not cluster.is_official_cluster: 

53 raise NotImplementedError("Shouldn't have discussions under groups, only communities") 

54 

55 for user in list(cluster.members.where(User.is_visible)): 

56 if are_blocked(session, user.id, discussion.creator_user_id): 

57 continue 

58 context = make_user_context(user_id=user.id) 

59 notify( 

60 session, 

61 user_id=user.id, 

62 topic_action="discussion:create", 

63 key=payload.discussion_id, 

64 data=notification_data_pb2.DiscussionCreate( 

65 author=user_model_to_pb(discussion.creator_user, session, context), 

66 discussion=discussion_to_pb(session, discussion, context), 

67 ), 

68 ) 

69 

70 

71class Discussions(discussions_pb2_grpc.DiscussionsServicer): 

72 def CreateDiscussion(self, request, context, session): 

73 if not request.title: 

74 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_TITLE) 

75 if not request.content: 

76 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_CONTENT) 

77 if not request.owner_community_id and not request.owner_group_id: 

78 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

79 

80 if request.WhichOneof("owner") == "owner_group_id": 

81 cluster = session.execute( 

82 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id) 

83 ).scalar_one_or_none() 

84 elif request.WhichOneof("owner") == "owner_community_id": 

85 cluster = session.execute( 

86 select(Cluster) 

87 .where(Cluster.parent_node_id == request.owner_community_id) 

88 .where(Cluster.is_official_cluster) 

89 ).scalar_one_or_none() 

90 

91 if not cluster: 

92 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

93 

94 discussion = Discussion( 

95 title=request.title, 

96 content=request.content, 

97 creator_user_id=context.user_id, 

98 owner_cluster=cluster, 

99 thread=Thread(), 

100 ) 

101 session.add(discussion) 

102 session.flush() 

103 

104 queue_job( 

105 session, 

106 job_type="generate_create_discussion_notifications", 

107 payload=jobs_pb2.GenerateCreateDiscussionNotificationsPayload( 

108 discussion_id=discussion.id, 

109 ), 

110 ) 

111 

112 return discussion_to_pb(session, discussion, context) 

113 

114 def GetDiscussion(self, request, context, session): 

115 discussion = session.execute( 

116 select(Discussion).where(Discussion.id == request.discussion_id) 

117 ).scalar_one_or_none() 

118 if not discussion: 

119 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND) 

120 

121 return discussion_to_pb(session, discussion, context)