Coverage for src/tests/test_jail.py: 100%
233 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import grpc
2import pytest
3from google.protobuf import empty_pb2
5from couchers import errors, models
6from couchers.constants import TOS_VERSION
7from couchers.servicers import jail as servicers_jail
8from couchers.utils import create_coordinate, to_aware_datetime
9from proto import admin_pb2, api_pb2, jail_pb2
10from tests.test_fixtures import ( # noqa # noqa
11 db,
12 email_fields,
13 fast_passwords,
14 generate_user,
15 mock_notification_email,
16 push_collector,
17 real_account_session,
18 real_admin_session,
19 real_api_session,
20 real_jail_session,
21 testconfig,
22)
25@pytest.fixture(autouse=True)
26def _(testconfig):
27 pass
30def test_jail_basic(db):
31 user1, token1 = generate_user()
33 with real_api_session(token1) as api:
34 res = api.Ping(api_pb2.PingReq())
36 with real_jail_session(token1) as jail:
37 res = jail.JailInfo(empty_pb2.Empty())
38 # check every field is false
39 for field in res.DESCRIPTOR.fields:
40 assert not getattr(res, field.name)
42 assert not res.jailed
44 # make the user jailed
45 user2, token2 = generate_user(accepted_tos=0)
47 with real_api_session(token2) as api, pytest.raises(grpc.RpcError) as e:
48 res = api.Ping(api_pb2.PingReq())
49 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
51 with real_jail_session(token2) as jail:
52 res = jail.JailInfo(empty_pb2.Empty())
54 assert res.jailed
56 reason_count = 0
58 # check at least one field is true
59 for field in res.DESCRIPTOR.fields:
60 reason_count += getattr(res, field.name) == True
62 assert reason_count > 0
65def test_JailInfo(db):
66 user1, token1 = generate_user(accepted_tos=0)
68 with real_jail_session(token1) as jail:
69 res = jail.JailInfo(empty_pb2.Empty())
70 assert res.jailed
71 assert res.has_not_accepted_tos
73 with real_api_session(token1) as api, pytest.raises(grpc.RpcError) as e:
74 res = api.Ping(api_pb2.PingReq())
75 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
77 # make the user not jailed
78 user2, token2 = generate_user()
80 with real_jail_session(token2) as jail:
81 res = jail.JailInfo(empty_pb2.Empty())
82 assert not res.jailed
83 assert not res.has_not_accepted_tos
85 with real_api_session(token2) as api:
86 res = api.Ping(api_pb2.PingReq())
89def test_AcceptTOS(db):
90 # make them have not accepted TOS
91 user1, token1 = generate_user(accepted_tos=0)
93 with real_jail_session(token1) as jail:
94 res = jail.JailInfo(empty_pb2.Empty())
95 assert res.jailed
96 assert res.has_not_accepted_tos
98 # make sure we can't unaccept
99 with pytest.raises(grpc.RpcError) as e:
100 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
101 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
102 assert e.value.details() == errors.CANT_UNACCEPT_TOS
104 res = jail.JailInfo(empty_pb2.Empty())
105 assert res.jailed
106 assert res.has_not_accepted_tos
108 # now accept
109 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
111 res = jail.JailInfo(empty_pb2.Empty())
112 assert not res.jailed
113 assert not res.has_not_accepted_tos
115 # make sure we can't unaccept
116 with pytest.raises(grpc.RpcError) as e:
117 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
118 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
119 assert e.value.details() == errors.CANT_UNACCEPT_TOS
121 # make them have accepted TOS
122 user2, token2 = generate_user()
124 with real_jail_session(token2) as jail:
125 res = jail.JailInfo(empty_pb2.Empty())
126 assert not res.jailed
127 assert not res.has_not_accepted_tos
129 # make sure we can't unaccept
130 with pytest.raises(grpc.RpcError) as e:
131 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
132 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
133 assert e.value.details() == errors.CANT_UNACCEPT_TOS
135 # accepting again doesn't do anything
136 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
138 res = jail.JailInfo(empty_pb2.Empty())
139 assert not res.jailed
140 assert not res.has_not_accepted_tos
143def test_TOS_increase(db, monkeypatch):
144 # test if the TOS version is updated
146 # not jailed yet
147 user, token = generate_user()
149 with real_jail_session(token) as jail:
150 res = jail.JailInfo(empty_pb2.Empty())
151 assert not res.jailed
152 assert not res.has_not_accepted_tos
154 with real_api_session(token) as api:
155 res = api.Ping(api_pb2.PingReq())
157 # now we pretend to update the TOS version
158 new_TOS_VERSION = TOS_VERSION + 1
160 monkeypatch.setattr(models, "TOS_VERSION", new_TOS_VERSION)
161 monkeypatch.setattr(servicers_jail, "TOS_VERSION", new_TOS_VERSION)
163 # make sure we're jailed
164 with real_api_session(token) as api, pytest.raises(grpc.RpcError) as e:
165 res = api.Ping(api_pb2.PingReq())
166 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
168 with real_jail_session(token) as jail:
169 res = jail.JailInfo(empty_pb2.Empty())
170 assert res.jailed
171 assert res.has_not_accepted_tos
173 # now accept
174 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
176 res = jail.JailInfo(empty_pb2.Empty())
177 assert not res.jailed
178 assert not res.has_not_accepted_tos
181def test_SetLocation(db):
182 # make them need to update location
183 user1, token1 = generate_user(geom=create_coordinate(0, 0), geom_radius=0, needs_to_update_location=True)
186def test_MarkUserNeedsLocationUpdate(db):
187 user, token = generate_user()
188 super_user, super_token = generate_user(is_superuser=True)
190 with real_jail_session(token) as jail:
191 res = jail.JailInfo(empty_pb2.Empty())
192 assert not res.jailed
193 assert not res.needs_to_update_location
194 assert len(res.pending_mod_notes) == 0
196 with real_admin_session(super_token) as admin:
197 with mock_notification_email() as mock:
198 admin.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=user.username))
200 with real_jail_session(token) as jail:
201 res = jail.JailInfo(empty_pb2.Empty())
202 assert res.jailed
203 assert res.needs_to_update_location
205 res = jail.SetLocation(
206 jail_pb2.SetLocationReq(
207 city="New York City",
208 lat=40.7812,
209 lng=-73.9647,
210 radius=250,
211 )
212 )
214 assert not res.jailed
215 assert not res.needs_to_update_location
217 res = jail.JailInfo(empty_pb2.Empty())
218 assert not res.jailed
219 assert not res.needs_to_update_location
222def test_AcceptCommunityGuidelines(db):
223 # make them have not accepted GC
224 user1, token1 = generate_user(accepted_community_guidelines=0)
226 with real_jail_session(token1) as jail:
227 res = jail.JailInfo(empty_pb2.Empty())
228 assert res.jailed
229 assert res.has_not_accepted_community_guidelines
231 # make sure we can't unaccept
232 with pytest.raises(grpc.RpcError) as e:
233 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
234 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
235 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES
237 res = jail.JailInfo(empty_pb2.Empty())
238 assert res.jailed
239 assert res.has_not_accepted_community_guidelines
241 # now accept
242 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True))
244 res = jail.JailInfo(empty_pb2.Empty())
245 assert not res.jailed
246 assert not res.has_not_accepted_community_guidelines
248 # make sure we can't unaccept
249 with pytest.raises(grpc.RpcError) as e:
250 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
251 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
252 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES
254 # make them have accepted GC
255 user2, token2 = generate_user()
257 with real_jail_session(token2) as jail:
258 res = jail.JailInfo(empty_pb2.Empty())
259 assert not res.jailed
260 assert not res.has_not_accepted_community_guidelines
262 # make sure we can't unaccept
263 with pytest.raises(grpc.RpcError) as e:
264 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
265 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
266 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES
268 # accepting again doesn't do anything
269 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True))
271 res = jail.JailInfo(empty_pb2.Empty())
272 assert not res.jailed
273 assert not res.has_not_accepted_community_guidelines
276def test_modnotes(db, push_collector):
277 user, token = generate_user()
278 super_user, super_token = generate_user(is_superuser=True)
280 with real_jail_session(token) as jail:
281 res = jail.JailInfo(empty_pb2.Empty())
282 assert not res.jailed
283 assert not res.has_pending_mod_notes
284 assert len(res.pending_mod_notes) == 0
286 with real_account_session(token) as account:
287 res = account.ListModNotes(empty_pb2.Empty())
288 assert len(res.mod_notes) == 0
290 with real_admin_session(super_token) as admin:
291 with mock_notification_email() as mock:
292 admin.SendModNote(
293 admin_pb2.SendModNoteReq(
294 user=user.username,
295 content="# Important note\nThis is a sample mod note.",
296 internal_id="sample_note",
297 )
298 )
299 mock.assert_called_once()
300 e = email_fields(mock)
302 assert e.subject == "[TEST] You have received a mod note"
303 push_collector.assert_user_has_single_matching(
304 user.id,
305 title="You received a mod note",
306 body="You need to read and acknowledge the note before continuing to use the platform.",
307 )
309 with real_jail_session(token) as jail:
310 res = jail.JailInfo(empty_pb2.Empty())
311 assert res.jailed
312 assert res.has_pending_mod_notes
313 assert len(res.pending_mod_notes) == 1
314 note = res.pending_mod_notes[0]
315 assert note.note_content == "# Important note\nThis is a sample mod note."
317 note_id = note.note_id
319 with pytest.raises(grpc.RpcError) as e:
320 jail.AcknowledgePendingModNote(
321 jail_pb2.AcknowledgePendingModNoteReq(
322 note_id=note_id,
323 acknowledge=False,
324 )
325 )
326 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
327 assert e.value.details() == errors.MOD_NOTE_NEED_TO_ACKNOWELDGE
329 assert res.jailed
330 assert res.has_pending_mod_notes
331 assert len(res.pending_mod_notes) == 1
332 note = res.pending_mod_notes[0]
333 assert note.note_content == "# Important note\nThis is a sample mod note."
335 res = jail.AcknowledgePendingModNote(
336 jail_pb2.AcknowledgePendingModNoteReq(
337 note_id=note_id,
338 acknowledge=True,
339 )
340 )
341 assert not res.jailed
342 assert not res.has_pending_mod_notes
343 assert len(res.pending_mod_notes) == 0
345 with pytest.raises(grpc.RpcError) as e:
346 jail.AcknowledgePendingModNote(
347 jail_pb2.AcknowledgePendingModNoteReq(
348 note_id=note_id,
349 acknowledge=False,
350 )
351 )
352 assert e.value.code() == grpc.StatusCode.NOT_FOUND
353 assert e.value.details() == errors.MOD_NOTE_NOT_FOUND
355 with real_account_session(token) as account:
356 res = account.ListModNotes(empty_pb2.Empty())
357 assert len(res.mod_notes) == 1
358 note = res.mod_notes[0]
359 assert note.note_id == note_id
360 assert note.note_content == "# Important note\nThis is a sample mod note."
362 assert to_aware_datetime(note.acknowledged) > to_aware_datetime(note.created)
365def test_modnotes_no_notify(db, push_collector):
366 user, token = generate_user()
367 super_user, super_token = generate_user(is_superuser=True)
369 with real_jail_session(token) as jail:
370 res = jail.JailInfo(empty_pb2.Empty())
371 assert not res.jailed
372 assert not res.has_pending_mod_notes
373 assert len(res.pending_mod_notes) == 0
375 with real_account_session(token) as account:
376 res = account.ListModNotes(empty_pb2.Empty())
377 assert len(res.mod_notes) == 0
379 with real_admin_session(super_token) as admin:
380 with mock_notification_email() as mock:
381 admin.SendModNote(
382 admin_pb2.SendModNoteReq(
383 user=user.username,
384 content="# Important note\nThis is a sample mod note.",
385 internal_id="sample_note",
386 do_not_notify=True,
387 )
388 )
389 mock.assert_not_called()
391 with real_jail_session(token) as jail:
392 res = jail.JailInfo(empty_pb2.Empty())
393 assert res.jailed
394 assert res.has_pending_mod_notes
395 assert len(res.pending_mod_notes) == 1
396 note = res.pending_mod_notes[0]
397 assert note.note_content == "# Important note\nThis is a sample mod note."