Coverage for src/tests/test_threads.py: 100%
93 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-17 18:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-17 18:19 +0000
1import string
3import grpc
4import pytest
6from couchers.db import session_scope
7from couchers.models import Thread
8from couchers.proto import threads_pb2
9from couchers.servicers.threads import pack_thread_id
10from tests.test_fixtures import db, generate_user, testconfig, threads_session # noqa
13@pytest.fixture(autouse=True)
14def _(testconfig):
15 pass
18def test_threads_basic(db):
19 user1, token1 = generate_user()
21 # Create a dummy Thread (should be replaced by pages later on)
22 with session_scope() as session:
23 dummy_thread = Thread()
24 session.add(dummy_thread)
25 session.flush()
26 PARENT_THREAD_ID = pack_thread_id(database_id=dummy_thread.id, depth=0)
28 with threads_session(token1) as api:
29 bat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="bat")).thread_id
31 cat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="cat")).thread_id
33 dog_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="dog")).thread_id
35 dogs = [
36 api.PostReply(threads_pb2.PostReplyReq(thread_id=dog_id, content=animal)).thread_id
37 for animal in ["hyena", "wolf", "prariewolf"]
38 ]
39 cats = [
40 api.PostReply(threads_pb2.PostReplyReq(thread_id=cat_id, content=animal)).thread_id
41 for animal in ["cheetah", "lynx", "panther"]
42 ]
44 # Make some queries
45 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=PARENT_THREAD_ID))
46 assert len(ret.replies) == 3
47 assert ret.next_page_token == ""
48 assert ret.replies[0].thread_id == dog_id
49 assert ret.replies[0].content == "dog"
50 assert ret.replies[0].author_user_id == user1.id
51 assert ret.replies[0].num_replies == 3
53 assert ret.replies[1].thread_id == cat_id
54 assert ret.replies[1].content == "cat"
55 assert ret.replies[1].author_user_id == user1.id
56 assert ret.replies[1].num_replies == 3
58 assert ret.replies[2].thread_id == bat_id
59 assert ret.replies[2].content == "bat"
60 assert ret.replies[2].author_user_id == user1.id
61 assert ret.replies[2].num_replies == 0
63 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=cat_id))
64 assert len(ret.replies) == 3
65 assert ret.next_page_token == ""
66 assert [reply.thread_id for reply in ret.replies] == cats[::-1]
68 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=dog_id))
69 assert len(ret.replies) == 3
70 assert ret.next_page_token == ""
71 assert [reply.thread_id for reply in ret.replies] == dogs[::-1]
74def test_threads_errors(db):
75 user1, token1 = generate_user()
76 with threads_session(token1) as api:
77 # request non-existing comment
78 with pytest.raises(grpc.RpcError) as e:
79 api.GetThread(threads_pb2.GetThreadReq(thread_id=11))
80 assert e.value.code() == grpc.StatusCode.NOT_FOUND
81 assert e.value.details() == "Discussion thread not found."
83 # request non-existing depth digit
84 with pytest.raises(grpc.RpcError) as e:
85 api.GetThread(threads_pb2.GetThreadReq(thread_id=19))
86 assert e.value.code() == grpc.StatusCode.NOT_FOUND
87 assert e.value.details() == "Discussion thread not found."
89 # post on non-existing comment
90 with pytest.raises(grpc.RpcError) as e:
91 api.PostReply(threads_pb2.PostReplyReq(thread_id=11, content="foo"))
92 assert e.value.code() == grpc.StatusCode.NOT_FOUND
93 assert e.value.details() == "Discussion thread not found."
95 # post on non-existing depth
96 with pytest.raises(grpc.RpcError) as e:
97 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="foo"))
98 assert e.value.code() == grpc.StatusCode.NOT_FOUND
99 assert e.value.details() == "Discussion thread not found."
101 # post empty content
102 with pytest.raises(grpc.RpcError) as e:
103 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content=""))
104 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
105 assert e.value.details() == "You cannot post an empty comment."
107 # post whitespace only content
108 with pytest.raises(grpc.RpcError) as e:
109 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content=" "))
110 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
111 assert e.value.details() == "You cannot post an empty comment."
114def pagination_test(api, parent_id):
115 # Post some data
116 for c in reversed(string.ascii_lowercase):
117 api.PostReply(threads_pb2.PostReplyReq(thread_id=parent_id, content=c))
119 # Get it with pagination
120 token = ""
121 import textwrap
123 for expected_page in textwrap.wrap(string.ascii_lowercase, 5):
124 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_id, page_size=5, page_token=token))
125 assert "".join(x.content for x in ret.replies) == expected_page
126 token = ret.next_page_token
128 assert token == ""
130 return ret.replies[0].thread_id # to be used as a test one level deeper
133def test_threads_pagination(db):
134 user1, token1 = generate_user()
136 PARENT_THREAD_ID = 10
138 # Create a dummy Thread (should be replaced by pages later on)
139 with session_scope() as session:
140 session.add(Thread(id=1))
142 with threads_session(token1) as api:
143 comment_id = pagination_test(api, PARENT_THREAD_ID)
144 pagination_test(api, comment_id)