Coverage for src/couchers/servicers/discussions.py: 92%

59 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-13 03:05 +0000

1import logging 

2from types import SimpleNamespace 

3 

4import grpc 

5 

6from couchers import errors 

7from couchers.db import can_moderate_node, session_scope 

8from couchers.jobs.enqueue import queue_job 

9from couchers.models import Cluster, Discussion, Thread, User 

10from couchers.notifications.notify import notify 

11from couchers.servicers.api import user_model_to_pb 

12from couchers.servicers.blocking import are_blocked 

13from couchers.servicers.threads import thread_to_pb 

14from couchers.sql import couchers_select as select 

15from couchers.utils import Timestamp_from_datetime 

16from proto import discussions_pb2, discussions_pb2_grpc, notification_data_pb2 

17from proto.internal import jobs_pb2 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22def discussion_to_pb(session, discussion: Discussion, context): 

23 owner_community_id = None 

24 owner_group_id = None 

25 if discussion.owner_cluster.is_official_cluster: 

26 owner_community_id = discussion.owner_cluster.parent_node_id 

27 else: 

28 owner_group_id = discussion.owner_cluster.id 

29 

30 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id) 

31 

32 return discussions_pb2.Discussion( 

33 discussion_id=discussion.id, 

34 slug=discussion.slug, 

35 created=Timestamp_from_datetime(discussion.created), 

36 creator_user_id=discussion.creator_user_id, 

37 owner_community_id=owner_community_id, 

38 owner_group_id=owner_group_id, 

39 owner_title=discussion.owner_cluster.name, 

40 title=discussion.title, 

41 content=discussion.content, 

42 thread=thread_to_pb(session, discussion.thread_id), 

43 can_moderate=can_moderate, 

44 ) 

45 

46 

47def generate_create_discussion_notifications(payload: jobs_pb2.GenerateCreateDiscussionNotificationsPayload): 

48 with session_scope() as session: 

49 discussion = session.execute(select(Discussion).where(Discussion.id == payload.discussion_id)).scalar_one() 

50 

51 cluster = discussion.owner_cluster 

52 

53 if not cluster.is_official_cluster: 

54 raise NotImplementedError("Shouldn't have discussions under groups, only communities") 

55 

56 for user in list(cluster.members.where(User.is_visible)): 

57 if are_blocked(session, user.id, discussion.creator_user_id): 

58 continue 

59 context = SimpleNamespace(user_id=user.id) 

60 notify( 

61 session, 

62 user_id=user.id, 

63 topic_action="discussion:create", 

64 key=payload.discussion_id, 

65 data=notification_data_pb2.DiscussionCreate( 

66 author=user_model_to_pb(discussion.creator_user, session, context), 

67 discussion=discussion_to_pb(session, discussion, context), 

68 ), 

69 ) 

70 

71 

72class Discussions(discussions_pb2_grpc.DiscussionsServicer): 

73 def CreateDiscussion(self, request, context, session): 

74 if not request.title: 

75 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_TITLE) 

76 if not request.content: 

77 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_CONTENT) 

78 if not request.owner_community_id and not request.owner_group_id: 

79 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

80 

81 if request.WhichOneof("owner") == "owner_group_id": 

82 cluster = session.execute( 

83 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id) 

84 ).scalar_one_or_none() 

85 elif request.WhichOneof("owner") == "owner_community_id": 

86 cluster = session.execute( 

87 select(Cluster) 

88 .where(Cluster.parent_node_id == request.owner_community_id) 

89 .where(Cluster.is_official_cluster) 

90 ).scalar_one_or_none() 

91 

92 if not cluster: 

93 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

94 

95 discussion = Discussion( 

96 title=request.title, 

97 content=request.content, 

98 creator_user_id=context.user_id, 

99 owner_cluster=cluster, 

100 thread=Thread(), 

101 ) 

102 session.add(discussion) 

103 session.flush() 

104 

105 queue_job( 

106 session, 

107 job_type="generate_create_discussion_notifications", 

108 payload=jobs_pb2.GenerateCreateDiscussionNotificationsPayload( 

109 discussion_id=discussion.id, 

110 ), 

111 ) 

112 

113 return discussion_to_pb(session, discussion, context) 

114 

115 def GetDiscussion(self, request, context, session): 

116 discussion = session.execute( 

117 select(Discussion).where(Discussion.id == request.discussion_id) 

118 ).scalar_one_or_none() 

119 if not discussion: 

120 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND) 

121 

122 return discussion_to_pb(session, discussion, context)