Coverage for src/couchers/servicers/discussions.py: 93%

42 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 16:44 +0000

1import grpc 

2 

3from couchers import errors 

4from couchers.db import can_moderate_node, session_scope 

5from couchers.models import Cluster, Discussion, Thread 

6from couchers.servicers.threads import thread_to_pb 

7from couchers.sql import couchers_select as select 

8from couchers.utils import Timestamp_from_datetime 

9from proto import discussions_pb2, discussions_pb2_grpc 

10 

11 

12def discussion_to_pb(discussion: Discussion, context): 

13 owner_community_id = None 

14 owner_group_id = None 

15 if discussion.owner_cluster.is_official_cluster: 

16 owner_community_id = discussion.owner_cluster.parent_node_id 

17 else: 

18 owner_group_id = discussion.owner_cluster.id 

19 

20 with session_scope() as session: 

21 can_moderate = can_moderate_node(session, context.user_id, discussion.owner_cluster.parent_node_id) 

22 

23 return discussions_pb2.Discussion( 

24 discussion_id=discussion.id, 

25 slug=discussion.slug, 

26 created=Timestamp_from_datetime(discussion.created), 

27 creator_user_id=discussion.creator_user_id, 

28 owner_community_id=owner_community_id, 

29 owner_group_id=owner_group_id, 

30 title=discussion.title, 

31 content=discussion.content, 

32 thread=thread_to_pb(discussion.thread_id), 

33 can_moderate=can_moderate, 

34 ) 

35 

36 

37class Discussions(discussions_pb2_grpc.DiscussionsServicer): 

38 def CreateDiscussion(self, request, context): 

39 if not request.title: 

40 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_TITLE) 

41 if not request.content: 

42 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_DISCUSSION_CONTENT) 

43 if not request.owner_community_id and not request.owner_group_id: 

44 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

45 

46 with session_scope() as session: 

47 if request.WhichOneof("owner") == "owner_group_id": 

48 cluster = session.execute( 

49 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.owner_group_id) 

50 ).scalar_one_or_none() 

51 elif request.WhichOneof("owner") == "owner_community_id": 

52 cluster = session.execute( 

53 select(Cluster) 

54 .where(Cluster.parent_node_id == request.owner_community_id) 

55 .where(Cluster.is_official_cluster) 

56 ).scalar_one_or_none() 

57 

58 if not cluster: 

59 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

60 

61 discussion = Discussion( 

62 title=request.title, 

63 content=request.content, 

64 creator_user_id=context.user_id, 

65 owner_cluster=cluster, 

66 thread=Thread(), 

67 ) 

68 session.add(discussion) 

69 session.commit() 

70 return discussion_to_pb(discussion, context) 

71 

72 def GetDiscussion(self, request, context): 

73 with session_scope() as session: 

74 discussion = session.execute( 

75 select(Discussion).where(Discussion.id == request.discussion_id) 

76 ).scalar_one_or_none() 

77 if not discussion: 

78 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND) 

79 

80 return discussion_to_pb(discussion, context)