Coverage for src/tests/test_threads.py: 100%

93 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-17 18:19 +0000

1import string 

2 

3import grpc 

4import pytest 

5 

6from couchers.db import session_scope 

7from couchers.models import Thread 

8from couchers.proto import threads_pb2 

9from couchers.servicers.threads import pack_thread_id 

10from tests.test_fixtures import db, generate_user, testconfig, threads_session # noqa 

11 

12 

13@pytest.fixture(autouse=True) 

14def _(testconfig): 

15 pass 

16 

17 

18def test_threads_basic(db): 

19 user1, token1 = generate_user() 

20 

21 # Create a dummy Thread (should be replaced by pages later on) 

22 with session_scope() as session: 

23 dummy_thread = Thread() 

24 session.add(dummy_thread) 

25 session.flush() 

26 PARENT_THREAD_ID = pack_thread_id(database_id=dummy_thread.id, depth=0) 

27 

28 with threads_session(token1) as api: 

29 bat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="bat")).thread_id 

30 

31 cat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="cat")).thread_id 

32 

33 dog_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="dog")).thread_id 

34 

35 dogs = [ 

36 api.PostReply(threads_pb2.PostReplyReq(thread_id=dog_id, content=animal)).thread_id 

37 for animal in ["hyena", "wolf", "prariewolf"] 

38 ] 

39 cats = [ 

40 api.PostReply(threads_pb2.PostReplyReq(thread_id=cat_id, content=animal)).thread_id 

41 for animal in ["cheetah", "lynx", "panther"] 

42 ] 

43 

44 # Make some queries 

45 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=PARENT_THREAD_ID)) 

46 assert len(ret.replies) == 3 

47 assert ret.next_page_token == "" 

48 assert ret.replies[0].thread_id == dog_id 

49 assert ret.replies[0].content == "dog" 

50 assert ret.replies[0].author_user_id == user1.id 

51 assert ret.replies[0].num_replies == 3 

52 

53 assert ret.replies[1].thread_id == cat_id 

54 assert ret.replies[1].content == "cat" 

55 assert ret.replies[1].author_user_id == user1.id 

56 assert ret.replies[1].num_replies == 3 

57 

58 assert ret.replies[2].thread_id == bat_id 

59 assert ret.replies[2].content == "bat" 

60 assert ret.replies[2].author_user_id == user1.id 

61 assert ret.replies[2].num_replies == 0 

62 

63 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=cat_id)) 

64 assert len(ret.replies) == 3 

65 assert ret.next_page_token == "" 

66 assert [reply.thread_id for reply in ret.replies] == cats[::-1] 

67 

68 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=dog_id)) 

69 assert len(ret.replies) == 3 

70 assert ret.next_page_token == "" 

71 assert [reply.thread_id for reply in ret.replies] == dogs[::-1] 

72 

73 

74def test_threads_errors(db): 

75 user1, token1 = generate_user() 

76 with threads_session(token1) as api: 

77 # request non-existing comment 

78 with pytest.raises(grpc.RpcError) as e: 

79 api.GetThread(threads_pb2.GetThreadReq(thread_id=11)) 

80 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

81 assert e.value.details() == "Discussion thread not found." 

82 

83 # request non-existing depth digit 

84 with pytest.raises(grpc.RpcError) as e: 

85 api.GetThread(threads_pb2.GetThreadReq(thread_id=19)) 

86 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

87 assert e.value.details() == "Discussion thread not found." 

88 

89 # post on non-existing comment 

90 with pytest.raises(grpc.RpcError) as e: 

91 api.PostReply(threads_pb2.PostReplyReq(thread_id=11, content="foo")) 

92 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

93 assert e.value.details() == "Discussion thread not found." 

94 

95 # post on non-existing depth 

96 with pytest.raises(grpc.RpcError) as e: 

97 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="foo")) 

98 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

99 assert e.value.details() == "Discussion thread not found." 

100 

101 # post empty content 

102 with pytest.raises(grpc.RpcError) as e: 

103 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="")) 

104 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

105 assert e.value.details() == "You cannot post an empty comment." 

106 

107 # post whitespace only content 

108 with pytest.raises(grpc.RpcError) as e: 

109 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content=" ")) 

110 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

111 assert e.value.details() == "You cannot post an empty comment." 

112 

113 

114def pagination_test(api, parent_id): 

115 # Post some data 

116 for c in reversed(string.ascii_lowercase): 

117 api.PostReply(threads_pb2.PostReplyReq(thread_id=parent_id, content=c)) 

118 

119 # Get it with pagination 

120 token = "" 

121 import textwrap 

122 

123 for expected_page in textwrap.wrap(string.ascii_lowercase, 5): 

124 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_id, page_size=5, page_token=token)) 

125 assert "".join(x.content for x in ret.replies) == expected_page 

126 token = ret.next_page_token 

127 

128 assert token == "" 

129 

130 return ret.replies[0].thread_id # to be used as a test one level deeper 

131 

132 

133def test_threads_pagination(db): 

134 user1, token1 = generate_user() 

135 

136 PARENT_THREAD_ID = 10 

137 

138 # Create a dummy Thread (should be replaced by pages later on) 

139 with session_scope() as session: 

140 session.add(Thread(id=1)) 

141 

142 with threads_session(token1) as api: 

143 comment_id = pagination_test(api, PARENT_THREAD_ID) 

144 pagination_test(api, comment_id)