Coverage for app / backend / src / tests / test_threads.py: 100%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import string 

2import textwrap 

3 

4import grpc 

5import pytest 

6 

7from couchers.db import session_scope 

8from couchers.models import Thread 

9from couchers.proto import threads_pb2 

10from couchers.servicers.threads import pack_thread_id 

11from tests.fixtures.db import generate_user 

12from tests.fixtures.sessions import threads_session 

13 

14 

15@pytest.fixture(autouse=True) 

16def _(testconfig): 

17 pass 

18 

19 

20def test_threads_basic(db): 

21 user1, token1 = generate_user() 

22 

23 # Create a dummy Thread (should be replaced by pages later on) 

24 with session_scope() as session: 

25 dummy_thread = Thread() 

26 session.add(dummy_thread) 

27 session.flush() 

28 PARENT_THREAD_ID = pack_thread_id(database_id=dummy_thread.id, depth=0) 

29 

30 with threads_session(token1) as api: 

31 bat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="bat")).thread_id 

32 

33 cat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="cat")).thread_id 

34 

35 dog_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="dog")).thread_id 

36 

37 dogs = [ 

38 api.PostReply(threads_pb2.PostReplyReq(thread_id=dog_id, content=animal)).thread_id 

39 for animal in ["hyena", "wolf", "prariewolf"] 

40 ] 

41 cats = [ 

42 api.PostReply(threads_pb2.PostReplyReq(thread_id=cat_id, content=animal)).thread_id 

43 for animal in ["cheetah", "lynx", "panther"] 

44 ] 

45 

46 # Make some queries 

47 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=PARENT_THREAD_ID)) 

48 assert len(ret.replies) == 3 

49 assert ret.next_page_token == "" 

50 assert ret.replies[0].thread_id == dog_id 

51 assert ret.replies[0].content == "dog" 

52 assert ret.replies[0].author_user_id == user1.id 

53 assert ret.replies[0].num_replies == 3 

54 

55 assert ret.replies[1].thread_id == cat_id 

56 assert ret.replies[1].content == "cat" 

57 assert ret.replies[1].author_user_id == user1.id 

58 assert ret.replies[1].num_replies == 3 

59 

60 assert ret.replies[2].thread_id == bat_id 

61 assert ret.replies[2].content == "bat" 

62 assert ret.replies[2].author_user_id == user1.id 

63 assert ret.replies[2].num_replies == 0 

64 

65 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=cat_id)) 

66 assert len(ret.replies) == 3 

67 assert ret.next_page_token == "" 

68 assert [reply.thread_id for reply in ret.replies] == cats[::-1] 

69 

70 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=dog_id)) 

71 assert len(ret.replies) == 3 

72 assert ret.next_page_token == "" 

73 assert [reply.thread_id for reply in ret.replies] == dogs[::-1] 

74 

75 

76def test_threads_errors(db): 

77 user1, token1 = generate_user() 

78 with threads_session(token1) as api: 

79 # request non-existing comment 

80 with pytest.raises(grpc.RpcError) as e: 

81 api.GetThread(threads_pb2.GetThreadReq(thread_id=11)) 

82 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

83 assert e.value.details() == "Discussion thread not found." 

84 

85 # request non-existing depth digit 

86 with pytest.raises(grpc.RpcError) as e: 

87 api.GetThread(threads_pb2.GetThreadReq(thread_id=19)) 

88 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

89 assert e.value.details() == "Discussion thread not found." 

90 

91 # post on non-existing comment 

92 with pytest.raises(grpc.RpcError) as e: 

93 api.PostReply(threads_pb2.PostReplyReq(thread_id=11, content="foo")) 

94 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

95 assert e.value.details() == "Discussion thread not found." 

96 

97 # post on non-existing depth 

98 with pytest.raises(grpc.RpcError) as e: 

99 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="foo")) 

100 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

101 assert e.value.details() == "Discussion thread not found." 

102 

103 # post empty content 

104 with pytest.raises(grpc.RpcError) as e: 

105 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="")) 

106 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

107 assert e.value.details() == "You cannot post an empty comment." 

108 

109 # post whitespace only content 

110 with pytest.raises(grpc.RpcError) as e: 

111 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content=" ")) 

112 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

113 assert e.value.details() == "You cannot post an empty comment." 

114 

115 

116def pagination_test(api, parent_id): 

117 # Post some data 

118 for c in reversed(string.ascii_lowercase): 

119 api.PostReply(threads_pb2.PostReplyReq(thread_id=parent_id, content=c)) 

120 

121 # Get it with pagination 

122 token = "" 

123 

124 for expected_page in textwrap.wrap(string.ascii_lowercase, 5): 

125 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_id, page_size=5, page_token=token)) 

126 assert "".join(x.content for x in ret.replies) == expected_page 

127 token = ret.next_page_token 

128 

129 assert token == "" 

130 

131 return ret.replies[0].thread_id # to be used as a test one level deeper 

132 

133 

134def test_threads_pagination(db): 

135 user1, token1 = generate_user() 

136 

137 PARENT_THREAD_ID = 10 

138 

139 # Create a dummy Thread (should be replaced by pages later on) 

140 with session_scope() as session: 

141 session.add(Thread()) 

142 

143 with threads_session(token1) as api: 

144 comment_id = pagination_test(api, PARENT_THREAD_ID) 

145 pagination_test(api, comment_id)