Coverage for app/backend/src/tests/test_jail.py: 100%
233 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import grpc
2import pytest
3from google.protobuf import empty_pb2
5from couchers.constants import TOS_VERSION
6from couchers.models import users
7from couchers.proto import admin_pb2, api_pb2, jail_pb2
8from couchers.servicers import jail as servicers_jail
9from couchers.utils import create_coordinate, to_aware_datetime
10from tests.fixtures.db import generate_user
11from tests.fixtures.misc import EmailCollector, PushCollector
12from tests.fixtures.sessions import real_account_session, real_admin_session, real_api_session, real_jail_session
15@pytest.fixture(autouse=True)
16def _(testconfig):
17 pass
20def test_jail_basic(db):
21 user1, token1 = generate_user()
23 with real_api_session(token1) as api:
24 api.Ping(api_pb2.PingReq())
26 with real_jail_session(token1) as jail:
27 res = jail.JailInfo(empty_pb2.Empty())
28 # check every field is false
29 for field in res.DESCRIPTOR.fields:
30 assert not getattr(res, field.name)
32 assert not res.jailed
34 # make the user jailed
35 user2, token2 = generate_user(accepted_tos=0)
37 with real_api_session(token2) as api, pytest.raises(grpc.RpcError) as e:
38 api.Ping(api_pb2.PingReq())
39 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
41 with real_jail_session(token2) as jail:
42 res = jail.JailInfo(empty_pb2.Empty())
44 assert res.jailed
46 reason_count = 0
48 # check at least one field is true
49 for field in res.DESCRIPTOR.fields:
50 reason_count += getattr(res, field.name) == True
52 assert reason_count > 0
55def test_JailInfo(db):
56 user1, token1 = generate_user(accepted_tos=0)
58 with real_jail_session(token1) as jail:
59 res = jail.JailInfo(empty_pb2.Empty())
60 assert res.jailed
61 assert res.has_not_accepted_tos
63 with real_api_session(token1) as api, pytest.raises(grpc.RpcError) as e:
64 res = api.Ping(api_pb2.PingReq())
65 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
67 # make the user not jailed
68 user2, token2 = generate_user()
70 with real_jail_session(token2) as jail:
71 res = jail.JailInfo(empty_pb2.Empty())
72 assert not res.jailed
73 assert not res.has_not_accepted_tos
75 with real_api_session(token2) as api:
76 res = api.Ping(api_pb2.PingReq())
79def test_AcceptTOS(db):
80 # make them have not accepted TOS
81 user1, token1 = generate_user(accepted_tos=0)
83 with real_jail_session(token1) as jail:
84 res = jail.JailInfo(empty_pb2.Empty())
85 assert res.jailed
86 assert res.has_not_accepted_tos
88 # make sure we can't unaccept
89 with pytest.raises(grpc.RpcError) as e:
90 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
91 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
92 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service."
94 res = jail.JailInfo(empty_pb2.Empty())
95 assert res.jailed
96 assert res.has_not_accepted_tos
98 # now accept
99 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
101 res = jail.JailInfo(empty_pb2.Empty())
102 assert not res.jailed
103 assert not res.has_not_accepted_tos
105 # make sure we can't unaccept
106 with pytest.raises(grpc.RpcError) as e:
107 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
108 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
109 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service."
111 # make them have accepted TOS
112 user2, token2 = generate_user()
114 with real_jail_session(token2) as jail:
115 res = jail.JailInfo(empty_pb2.Empty())
116 assert not res.jailed
117 assert not res.has_not_accepted_tos
119 # make sure we can't unaccept
120 with pytest.raises(grpc.RpcError) as e:
121 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
122 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
123 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service."
125 # accepting again doesn't do anything
126 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
128 res = jail.JailInfo(empty_pb2.Empty())
129 assert not res.jailed
130 assert not res.has_not_accepted_tos
133def test_TOS_increase(db, monkeypatch):
134 # test if the TOS version is updated
136 # not jailed yet
137 user, token = generate_user()
139 with real_jail_session(token) as jail:
140 res = jail.JailInfo(empty_pb2.Empty())
141 assert not res.jailed
142 assert not res.has_not_accepted_tos
144 with real_api_session(token) as api:
145 res = api.Ping(api_pb2.PingReq())
147 # now we pretend to update the TOS version
148 new_TOS_VERSION = TOS_VERSION + 1
150 monkeypatch.setattr(users, "TOS_VERSION", new_TOS_VERSION)
151 monkeypatch.setattr(servicers_jail, "TOS_VERSION", new_TOS_VERSION)
153 # make sure we're jailed
154 with real_api_session(token) as api, pytest.raises(grpc.RpcError) as e:
155 api.Ping(api_pb2.PingReq())
156 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
158 with real_jail_session(token) as jail:
159 res = jail.JailInfo(empty_pb2.Empty())
160 assert res.jailed
161 assert res.has_not_accepted_tos
163 # now accept
164 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
166 res = jail.JailInfo(empty_pb2.Empty())
167 assert not res.jailed
168 assert not res.has_not_accepted_tos
171def test_SetLocation(db):
172 # make them need to update location
173 user1, token1 = generate_user(geom=create_coordinate(0, 0), geom_radius=0, needs_to_update_location=True)
176def test_MarkUserNeedsLocationUpdate(db):
177 user, token = generate_user()
178 super_user, super_token = generate_user(is_superuser=True)
180 with real_jail_session(token) as jail:
181 res = jail.JailInfo(empty_pb2.Empty())
182 assert not res.jailed
183 assert not res.needs_to_update_location
184 assert len(res.pending_mod_notes) == 0
186 with real_admin_session(super_token) as admin:
187 admin.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=user.username))
189 with real_jail_session(token) as jail:
190 res = jail.JailInfo(empty_pb2.Empty())
191 assert res.jailed
192 assert res.needs_to_update_location
194 res = jail.SetLocation(
195 jail_pb2.SetLocationReq(
196 city="New York City",
197 lat=40.7812,
198 lng=-73.9647,
199 radius=250,
200 )
201 )
203 assert not res.jailed
204 assert not res.needs_to_update_location
206 res = jail.JailInfo(empty_pb2.Empty())
207 assert not res.jailed
208 assert not res.needs_to_update_location
211def test_AcceptCommunityGuidelines(db):
212 # make them have not accepted GC
213 user1, token1 = generate_user(accepted_community_guidelines=0)
215 with real_jail_session(token1) as jail:
216 res = jail.JailInfo(empty_pb2.Empty())
217 assert res.jailed
218 assert res.has_not_accepted_community_guidelines
220 # make sure we can't unaccept
221 with pytest.raises(grpc.RpcError) as e:
222 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
223 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
224 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines."
226 res = jail.JailInfo(empty_pb2.Empty())
227 assert res.jailed
228 assert res.has_not_accepted_community_guidelines
230 # now accept
231 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True))
233 res = jail.JailInfo(empty_pb2.Empty())
234 assert not res.jailed
235 assert not res.has_not_accepted_community_guidelines
237 # make sure we can't unaccept
238 with pytest.raises(grpc.RpcError) as e:
239 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
240 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
241 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines."
243 # make them have accepted GC
244 user2, token2 = generate_user()
246 with real_jail_session(token2) as jail:
247 res = jail.JailInfo(empty_pb2.Empty())
248 assert not res.jailed
249 assert not res.has_not_accepted_community_guidelines
251 # make sure we can't unaccept
252 with pytest.raises(grpc.RpcError) as e:
253 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
254 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
255 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines."
257 # accepting again doesn't do anything
258 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True))
260 res = jail.JailInfo(empty_pb2.Empty())
261 assert not res.jailed
262 assert not res.has_not_accepted_community_guidelines
265def test_modnotes(db, email_collector: EmailCollector, push_collector: PushCollector):
266 user, token = generate_user()
267 super_user, super_token = generate_user(is_superuser=True)
269 with real_jail_session(token) as jail:
270 res = jail.JailInfo(empty_pb2.Empty())
271 assert not res.jailed
272 assert not res.has_pending_mod_notes
273 assert len(res.pending_mod_notes) == 0
275 with real_account_session(token) as account:
276 res = account.ListModNotes(empty_pb2.Empty())
277 assert len(res.mod_notes) == 0
279 with real_admin_session(super_token) as admin:
280 admin.SendModNote(
281 admin_pb2.SendModNoteReq(
282 user=user.username,
283 content="# Important note\nThis is a sample mod note.",
284 internal_id="sample_note",
285 )
286 )
288 email = email_collector.pop_for_recipient(user.email, last=True)
289 assert email.subject == "[TEST] You have received a mod note"
291 push = push_collector.pop_for_user(user.id, last=True)
292 assert push.content.title == "New moderator note"
293 assert push.content.body == "You received a moderator note. Read and acknowledge it to continue using the platform."
295 with real_jail_session(token) as jail:
296 res = jail.JailInfo(empty_pb2.Empty())
297 assert res.jailed
298 assert res.has_pending_mod_notes
299 assert len(res.pending_mod_notes) == 1
300 note = res.pending_mod_notes[0]
301 assert note.note_content == "# Important note\nThis is a sample mod note."
303 note_id = note.note_id
305 with pytest.raises(grpc.RpcError) as e:
306 jail.AcknowledgePendingModNote(
307 jail_pb2.AcknowledgePendingModNoteReq(
308 note_id=note_id,
309 acknowledge=False,
310 )
311 )
312 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
313 assert e.value.details() == "You need to read and acknowledge the moderator note."
315 assert res.jailed
316 assert res.has_pending_mod_notes
317 assert len(res.pending_mod_notes) == 1
318 note = res.pending_mod_notes[0]
319 assert note.note_content == "# Important note\nThis is a sample mod note."
321 res = jail.AcknowledgePendingModNote(
322 jail_pb2.AcknowledgePendingModNoteReq(
323 note_id=note_id,
324 acknowledge=True,
325 )
326 )
327 assert not res.jailed
328 assert not res.has_pending_mod_notes
329 assert len(res.pending_mod_notes) == 0
331 with pytest.raises(grpc.RpcError) as e:
332 jail.AcknowledgePendingModNote(
333 jail_pb2.AcknowledgePendingModNoteReq(
334 note_id=note_id,
335 acknowledge=False,
336 )
337 )
338 assert e.value.code() == grpc.StatusCode.NOT_FOUND
339 assert e.value.details() == "Moderator note not found."
341 with real_account_session(token) as account:
342 res = account.ListModNotes(empty_pb2.Empty())
343 assert len(res.mod_notes) == 1
344 note = res.mod_notes[0]
345 assert note.note_id == note_id
346 assert note.note_content == "# Important note\nThis is a sample mod note."
348 assert to_aware_datetime(note.acknowledged) > to_aware_datetime(note.created)
351def test_modnotes_no_notify(db, email_collector: EmailCollector, push_collector: PushCollector):
352 user, token = generate_user()
353 super_user, super_token = generate_user(is_superuser=True)
355 with real_jail_session(token) as jail:
356 res = jail.JailInfo(empty_pb2.Empty())
357 assert not res.jailed
358 assert not res.has_pending_mod_notes
359 assert len(res.pending_mod_notes) == 0
361 with real_account_session(token) as account:
362 res = account.ListModNotes(empty_pb2.Empty())
363 assert len(res.mod_notes) == 0
365 with real_admin_session(super_token) as admin:
366 admin.SendModNote(
367 admin_pb2.SendModNoteReq(
368 user=user.username,
369 content="# Important note\nThis is a sample mod note.",
370 internal_id="sample_note",
371 do_not_notify=True,
372 )
373 )
375 assert email_collector.count_for_recipient(user.email) == 0
377 with real_jail_session(token) as jail:
378 res = jail.JailInfo(empty_pb2.Empty())
379 assert res.jailed
380 assert res.has_pending_mod_notes
381 assert len(res.pending_mod_notes) == 1
382 note = res.pending_mod_notes[0]
383 assert note.note_content == "# Important note\nThis is a sample mod note."