Coverage for src/tests/test_threads.py: 100%

94 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-11 15:27 +0000

1import string 

2 

3import grpc 

4import pytest 

5 

6from couchers import errors 

7from couchers.db import session_scope 

8from couchers.models import Thread 

9from couchers.servicers.threads import pack_thread_id 

10from proto import threads_pb2 

11from tests.test_fixtures import db, generate_user, testconfig, threads_session # noqa 

12 

13 

14@pytest.fixture(autouse=True) 

15def _(testconfig): 

16 pass 

17 

18 

19def test_threads_basic(db): 

20 user1, token1 = generate_user() 

21 

22 # Create a dummy Thread (should be replaced by pages later on) 

23 with session_scope() as session: 

24 dummy_thread = Thread() 

25 session.add(dummy_thread) 

26 session.flush() 

27 PARENT_THREAD_ID = pack_thread_id(database_id=dummy_thread.id, depth=0) 

28 

29 with threads_session(token1) as api: 

30 bat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="bat")).thread_id 

31 

32 cat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="cat")).thread_id 

33 

34 dog_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="dog")).thread_id 

35 

36 dogs = [ 

37 api.PostReply(threads_pb2.PostReplyReq(thread_id=dog_id, content=animal)).thread_id 

38 for animal in ["hyena", "wolf", "prariewolf"] 

39 ] 

40 cats = [ 

41 api.PostReply(threads_pb2.PostReplyReq(thread_id=cat_id, content=animal)).thread_id 

42 for animal in ["cheetah", "lynx", "panther"] 

43 ] 

44 

45 # Make some queries 

46 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=PARENT_THREAD_ID)) 

47 assert len(ret.replies) == 3 

48 assert ret.next_page_token == "" 

49 assert ret.replies[0].thread_id == dog_id 

50 assert ret.replies[0].content == "dog" 

51 assert ret.replies[0].author_user_id == user1.id 

52 assert ret.replies[0].num_replies == 3 

53 

54 assert ret.replies[1].thread_id == cat_id 

55 assert ret.replies[1].content == "cat" 

56 assert ret.replies[1].author_user_id == user1.id 

57 assert ret.replies[1].num_replies == 3 

58 

59 assert ret.replies[2].thread_id == bat_id 

60 assert ret.replies[2].content == "bat" 

61 assert ret.replies[2].author_user_id == user1.id 

62 assert ret.replies[2].num_replies == 0 

63 

64 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=cat_id)) 

65 assert len(ret.replies) == 3 

66 assert ret.next_page_token == "" 

67 assert [reply.thread_id for reply in ret.replies] == cats[::-1] 

68 

69 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=dog_id)) 

70 assert len(ret.replies) == 3 

71 assert ret.next_page_token == "" 

72 assert [reply.thread_id for reply in ret.replies] == dogs[::-1] 

73 

74 

75def test_threads_errors(db): 

76 user1, token1 = generate_user() 

77 with threads_session(token1) as api: 

78 # request non-existing comment 

79 with pytest.raises(grpc.RpcError) as e: 

80 api.GetThread(threads_pb2.GetThreadReq(thread_id=11)) 

81 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

82 assert e.value.details() == errors.THREAD_NOT_FOUND 

83 

84 # request non-existing depth digit 

85 with pytest.raises(grpc.RpcError) as e: 

86 api.GetThread(threads_pb2.GetThreadReq(thread_id=19)) 

87 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

88 assert e.value.details() == errors.THREAD_NOT_FOUND 

89 

90 # post on non-existing comment 

91 with pytest.raises(grpc.RpcError) as e: 

92 api.PostReply(threads_pb2.PostReplyReq(thread_id=11, content="foo")) 

93 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

94 assert e.value.details() == errors.THREAD_NOT_FOUND 

95 

96 # post on non-existing depth 

97 with pytest.raises(grpc.RpcError) as e: 

98 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="foo")) 

99 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

100 assert e.value.details() == errors.THREAD_NOT_FOUND 

101 

102 # post empty content 

103 with pytest.raises(grpc.RpcError) as e: 

104 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="")) 

105 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

106 assert e.value.details() == errors.INVALID_COMMENT 

107 

108 # post whitespace only content 

109 with pytest.raises(grpc.RpcError) as e: 

110 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content=" ")) 

111 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

112 assert e.value.details() == errors.INVALID_COMMENT 

113 

114 

115def pagination_test(api, parent_id): 

116 # Post some data 

117 for c in reversed(string.ascii_lowercase): 

118 api.PostReply(threads_pb2.PostReplyReq(thread_id=parent_id, content=c)) 

119 

120 # Get it with pagination 

121 token = "" 

122 import textwrap 

123 

124 for expected_page in textwrap.wrap(string.ascii_lowercase, 5): 

125 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_id, page_size=5, page_token=token)) 

126 assert "".join(x.content for x in ret.replies) == expected_page 

127 token = ret.next_page_token 

128 

129 assert token == "" 

130 

131 return ret.replies[0].thread_id # to be used as a test one level deeper 

132 

133 

134def test_threads_pagination(db): 

135 user1, token1 = generate_user() 

136 

137 PARENT_THREAD_ID = 10 

138 

139 # Create a dummy Thread (should be replaced by pages later on) 

140 with session_scope() as session: 

141 session.add(Thread(id=1)) 

142 

143 with threads_session(token1) as api: 

144 comment_id = pagination_test(api, PARENT_THREAD_ID) 

145 pagination_test(api, comment_id)