Coverage for src/couchers/servicers/discussions.py: 92%

39 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import grpc 

2 

3from couchers import errors 

4from couchers.db import can_moderate_node 

5from couchers.models import Cluster, Discussion, Thread 

6from couchers.servicers.threads import thread_to_pb 

7from couchers.sql import couchers_select as select 

8from couchers.utils import Timestamp_from_datetime 

9from proto import discussions_pb2, discussions_pb2_grpc 

10 

11 

12def discussion_to_pb(session, discussion: Discussion, context): 

13 owner_community_id = None 

14 owner_group_id = None 

15 if discussion.owner_cluster.is_official_cluster: 

16 owner_community_id = discussion.owner_cluster.parent_node_id 

17 else: 

18 owner_group_id = discussion.owner_cluster.id 

19 

20 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id) 

21 

22 return discussions_pb2.Discussion( 

23 discussion_id=discussion.id, 

24 slug=discussion.slug, 

25 created=Timestamp_from_datetime(discussion.created), 

26 creator_user_id=discussion.creator_user_id, 

27 owner_community_id=owner_community_id, 

28 owner_group_id=owner_group_id, 

29 title=discussion.title, 

30 content=discussion.content, 

31 thread=thread_to_pb(session, discussion.thread_id), 

32 can_moderate=can_moderate, 

33 ) 

34 

35 

36class Discussions(discussions_pb2_grpc.DiscussionsServicer): 

37 def CreateDiscussion(self, request, context, session): 

38 if not request.title: 

39 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_TITLE) 

40 if not request.content: 

41 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_CONTENT) 

42 if not request.owner_community_id and not request.owner_group_id: 

43 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

44 

45 if request.WhichOneof("owner") == "owner_group_id": 

46 cluster = session.execute( 

47 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id) 

48 ).scalar_one_or_none() 

49 elif request.WhichOneof("owner") == "owner_community_id": 

50 cluster = session.execute( 

51 select(Cluster) 

52 .where(Cluster.parent_node_id == request.owner_community_id) 

53 .where(Cluster.is_official_cluster) 

54 ).scalar_one_or_none() 

55 

56 if not cluster: 

57 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

58 

59 discussion = Discussion( 

60 title=request.title, 

61 content=request.content, 

62 creator_user_id=context.user_id, 

63 owner_cluster=cluster, 

64 thread=Thread(), 

65 ) 

66 session.add(discussion) 

67 session.commit() 

68 return discussion_to_pb(session, discussion, context) 

69 

70 def GetDiscussion(self, request, context, session): 

71 discussion = session.execute( 

72 select(Discussion).where(Discussion.id == request.discussion_id) 

73 ).scalar_one_or_none() 

74 if not discussion: 

75 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND) 

76 

77 return discussion_to_pb(session, discussion, context)