Coverage for app / backend / src / tests / test_threads.py: 100%
94 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import string
2import textwrap
4import grpc
5import pytest
7from couchers.db import session_scope
8from couchers.models import Thread
9from couchers.proto import threads_pb2
10from couchers.servicers.threads import pack_thread_id
11from tests.fixtures.db import generate_user
12from tests.fixtures.sessions import threads_session
15@pytest.fixture(autouse=True)
16def _(testconfig):
17 pass
20def test_threads_basic(db):
21 user1, token1 = generate_user()
23 # Create a dummy Thread (should be replaced by pages later on)
24 with session_scope() as session:
25 dummy_thread = Thread()
26 session.add(dummy_thread)
27 session.flush()
28 PARENT_THREAD_ID = pack_thread_id(database_id=dummy_thread.id, depth=0)
30 with threads_session(token1) as api:
31 bat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="bat")).thread_id
33 cat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="cat")).thread_id
35 dog_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="dog")).thread_id
37 dogs = [
38 api.PostReply(threads_pb2.PostReplyReq(thread_id=dog_id, content=animal)).thread_id
39 for animal in ["hyena", "wolf", "prariewolf"]
40 ]
41 cats = [
42 api.PostReply(threads_pb2.PostReplyReq(thread_id=cat_id, content=animal)).thread_id
43 for animal in ["cheetah", "lynx", "panther"]
44 ]
46 # Make some queries
47 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=PARENT_THREAD_ID))
48 assert len(ret.replies) == 3
49 assert ret.next_page_token == ""
50 assert ret.replies[0].thread_id == dog_id
51 assert ret.replies[0].content == "dog"
52 assert ret.replies[0].author_user_id == user1.id
53 assert ret.replies[0].num_replies == 3
55 assert ret.replies[1].thread_id == cat_id
56 assert ret.replies[1].content == "cat"
57 assert ret.replies[1].author_user_id == user1.id
58 assert ret.replies[1].num_replies == 3
60 assert ret.replies[2].thread_id == bat_id
61 assert ret.replies[2].content == "bat"
62 assert ret.replies[2].author_user_id == user1.id
63 assert ret.replies[2].num_replies == 0
65 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=cat_id))
66 assert len(ret.replies) == 3
67 assert ret.next_page_token == ""
68 assert [reply.thread_id for reply in ret.replies] == cats[::-1]
70 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=dog_id))
71 assert len(ret.replies) == 3
72 assert ret.next_page_token == ""
73 assert [reply.thread_id for reply in ret.replies] == dogs[::-1]
76def test_threads_errors(db):
77 user1, token1 = generate_user()
78 with threads_session(token1) as api:
79 # request non-existing comment
80 with pytest.raises(grpc.RpcError) as e:
81 api.GetThread(threads_pb2.GetThreadReq(thread_id=11))
82 assert e.value.code() == grpc.StatusCode.NOT_FOUND
83 assert e.value.details() == "Discussion thread not found."
85 # request non-existing depth digit
86 with pytest.raises(grpc.RpcError) as e:
87 api.GetThread(threads_pb2.GetThreadReq(thread_id=19))
88 assert e.value.code() == grpc.StatusCode.NOT_FOUND
89 assert e.value.details() == "Discussion thread not found."
91 # post on non-existing comment
92 with pytest.raises(grpc.RpcError) as e:
93 api.PostReply(threads_pb2.PostReplyReq(thread_id=11, content="foo"))
94 assert e.value.code() == grpc.StatusCode.NOT_FOUND
95 assert e.value.details() == "Discussion thread not found."
97 # post on non-existing depth
98 with pytest.raises(grpc.RpcError) as e:
99 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="foo"))
100 assert e.value.code() == grpc.StatusCode.NOT_FOUND
101 assert e.value.details() == "Discussion thread not found."
103 # post empty content
104 with pytest.raises(grpc.RpcError) as e:
105 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content=""))
106 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
107 assert e.value.details() == "You cannot post an empty comment."
109 # post whitespace only content
110 with pytest.raises(grpc.RpcError) as e:
111 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content=" "))
112 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
113 assert e.value.details() == "You cannot post an empty comment."
116def pagination_test(api, parent_id):
117 # Post some data
118 for c in reversed(string.ascii_lowercase):
119 api.PostReply(threads_pb2.PostReplyReq(thread_id=parent_id, content=c))
121 # Get it with pagination
122 token = ""
124 for expected_page in textwrap.wrap(string.ascii_lowercase, 5):
125 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_id, page_size=5, page_token=token))
126 assert "".join(x.content for x in ret.replies) == expected_page
127 token = ret.next_page_token
129 assert token == ""
131 return ret.replies[0].thread_id # to be used as a test one level deeper
134def test_threads_pagination(db):
135 user1, token1 = generate_user()
137 PARENT_THREAD_ID = 10
139 # Create a dummy Thread (should be replaced by pages later on)
140 with session_scope() as session:
141 session.add(Thread())
143 with threads_session(token1) as api:
144 comment_id = pagination_test(api, PARENT_THREAD_ID)
145 pagination_test(api, comment_id)