Coverage for src/tests/test_jail.py: 100%
222 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1import grpc
2import pytest
3from google.protobuf import empty_pb2
5from couchers import errors, models
6from couchers.constants import TOS_VERSION
7from couchers.servicers import jail as servicers_jail
8from couchers.utils import to_aware_datetime
9from proto import admin_pb2, api_pb2, jail_pb2
10from tests.test_fixtures import ( # noqa # noqa
11 db,
12 email_fields,
13 fast_passwords,
14 generate_user,
15 mock_notification_email,
16 push_collector,
17 real_account_session,
18 real_admin_session,
19 real_api_session,
20 real_jail_session,
21 testconfig,
22)
25@pytest.fixture(autouse=True)
26def _(testconfig):
27 pass
30def test_jail_basic(db):
31 user1, token1 = generate_user()
33 with real_api_session(token1) as api:
34 res = api.Ping(api_pb2.PingReq())
36 with real_jail_session(token1) as jail:
37 res = jail.JailInfo(empty_pb2.Empty())
38 # check every field is false
39 for field in res.DESCRIPTOR.fields:
40 assert not getattr(res, field.name)
42 assert not res.jailed
44 # make the user jailed
45 user2, token2 = generate_user(accepted_tos=0)
47 with real_api_session(token2) as api, pytest.raises(grpc.RpcError) as e:
48 res = api.Ping(api_pb2.PingReq())
49 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
51 with real_jail_session(token2) as jail:
52 res = jail.JailInfo(empty_pb2.Empty())
54 assert res.jailed
56 reason_count = 0
58 # check at least one field is true
59 for field in res.DESCRIPTOR.fields:
60 reason_count += getattr(res, field.name) == True
62 assert reason_count > 0
65def test_JailInfo(db):
66 user1, token1 = generate_user(accepted_tos=0)
68 with real_jail_session(token1) as jail:
69 res = jail.JailInfo(empty_pb2.Empty())
70 assert res.jailed
71 assert res.has_not_accepted_tos
73 with real_api_session(token1) as api, pytest.raises(grpc.RpcError) as e:
74 res = api.Ping(api_pb2.PingReq())
75 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
77 # make the user not jailed
78 user2, token2 = generate_user()
80 with real_jail_session(token2) as jail:
81 res = jail.JailInfo(empty_pb2.Empty())
82 assert not res.jailed
83 assert not res.has_not_accepted_tos
85 with real_api_session(token2) as api:
86 res = api.Ping(api_pb2.PingReq())
89def test_AcceptTOS(db):
90 # make them have not accepted TOS
91 user1, token1 = generate_user(accepted_tos=0)
93 with real_jail_session(token1) as jail:
94 res = jail.JailInfo(empty_pb2.Empty())
95 assert res.jailed
96 assert res.has_not_accepted_tos
98 # make sure we can't unaccept
99 with pytest.raises(grpc.RpcError) as e:
100 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
101 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
102 assert e.value.details() == errors.CANT_UNACCEPT_TOS
104 res = jail.JailInfo(empty_pb2.Empty())
105 assert res.jailed
106 assert res.has_not_accepted_tos
108 # now accept
109 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
111 res = jail.JailInfo(empty_pb2.Empty())
112 assert not res.jailed
113 assert not res.has_not_accepted_tos
115 # make sure we can't unaccept
116 with pytest.raises(grpc.RpcError) as e:
117 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
118 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
119 assert e.value.details() == errors.CANT_UNACCEPT_TOS
121 # make them have accepted TOS
122 user2, token2 = generate_user()
124 with real_jail_session(token2) as jail:
125 res = jail.JailInfo(empty_pb2.Empty())
126 assert not res.jailed
127 assert not res.has_not_accepted_tos
129 # make sure we can't unaccept
130 with pytest.raises(grpc.RpcError) as e:
131 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False))
132 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
133 assert e.value.details() == errors.CANT_UNACCEPT_TOS
135 # accepting again doesn't do anything
136 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
138 res = jail.JailInfo(empty_pb2.Empty())
139 assert not res.jailed
140 assert not res.has_not_accepted_tos
143def test_TOS_increase(db, monkeypatch):
144 # test if the TOS version is updated
146 # not jailed yet
147 user, token = generate_user()
149 with real_jail_session(token) as jail:
150 res = jail.JailInfo(empty_pb2.Empty())
151 assert not res.jailed
152 assert not res.has_not_accepted_tos
154 with real_api_session(token) as api:
155 res = api.Ping(api_pb2.PingReq())
157 # now we pretend to update the TOS version
158 new_TOS_VERSION = TOS_VERSION + 1
160 monkeypatch.setattr(models, "TOS_VERSION", new_TOS_VERSION)
161 monkeypatch.setattr(servicers_jail, "TOS_VERSION", new_TOS_VERSION)
163 # make sure we're jailed
164 with real_api_session(token) as api, pytest.raises(grpc.RpcError) as e:
165 res = api.Ping(api_pb2.PingReq())
166 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
168 with real_jail_session(token) as jail:
169 res = jail.JailInfo(empty_pb2.Empty())
170 assert res.jailed
171 assert res.has_not_accepted_tos
173 # now accept
174 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True))
176 res = jail.JailInfo(empty_pb2.Empty())
177 assert not res.jailed
178 assert not res.has_not_accepted_tos
181def test_SetLocation(db):
182 # make them have not added a location
183 user1, token1 = generate_user(geom=None, geom_radius=None)
185 with real_jail_session(token1) as jail:
186 res = jail.JailInfo(empty_pb2.Empty())
187 assert res.jailed
188 assert res.has_not_added_location
190 res = jail.SetLocation(
191 jail_pb2.SetLocationReq(
192 city="New York City",
193 lat=40.7812,
194 lng=-73.9647,
195 radius=250,
196 )
197 )
199 assert not res.jailed
200 assert not res.has_not_added_location
202 res = jail.JailInfo(empty_pb2.Empty())
203 assert not res.jailed
204 assert not res.has_not_added_location
207def test_AcceptCommunityGuidelines(db):
208 # make them have not accepted GC
209 user1, token1 = generate_user(accepted_community_guidelines=0)
211 with real_jail_session(token1) as jail:
212 res = jail.JailInfo(empty_pb2.Empty())
213 assert res.jailed
214 assert res.has_not_accepted_community_guidelines
216 # make sure we can't unaccept
217 with pytest.raises(grpc.RpcError) as e:
218 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
219 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
220 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES
222 res = jail.JailInfo(empty_pb2.Empty())
223 assert res.jailed
224 assert res.has_not_accepted_community_guidelines
226 # now accept
227 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True))
229 res = jail.JailInfo(empty_pb2.Empty())
230 assert not res.jailed
231 assert not res.has_not_accepted_community_guidelines
233 # make sure we can't unaccept
234 with pytest.raises(grpc.RpcError) as e:
235 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
236 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
237 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES
239 # make them have accepted GC
240 user2, token2 = generate_user()
242 with real_jail_session(token2) as jail:
243 res = jail.JailInfo(empty_pb2.Empty())
244 assert not res.jailed
245 assert not res.has_not_accepted_community_guidelines
247 # make sure we can't unaccept
248 with pytest.raises(grpc.RpcError) as e:
249 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False))
250 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
251 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES
253 # accepting again doesn't do anything
254 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True))
256 res = jail.JailInfo(empty_pb2.Empty())
257 assert not res.jailed
258 assert not res.has_not_accepted_community_guidelines
261def test_modnotes(db, push_collector):
262 user, token = generate_user()
263 super_user, super_token = generate_user(is_superuser=True)
265 with real_jail_session(token) as jail:
266 res = jail.JailInfo(empty_pb2.Empty())
267 assert not res.jailed
268 assert not res.has_pending_mod_notes
269 assert len(res.pending_mod_notes) == 0
271 with real_account_session(token) as account:
272 res = account.ListModNotes(empty_pb2.Empty())
273 assert len(res.mod_notes) == 0
275 with real_admin_session(super_token) as admin:
276 with mock_notification_email() as mock:
277 admin.SendModNote(
278 admin_pb2.SendModNoteReq(
279 user=user.username,
280 content="# Important note\nThis is a sample mod note.",
281 internal_id="sample_note",
282 )
283 )
284 mock.assert_called_once()
285 e = email_fields(mock)
287 assert e.subject == "[TEST] You have received a mod note"
288 push_collector.assert_user_has_single_matching(
289 user.id,
290 title="You received a mod note",
291 body="You need to read and acknowledge the note before continuing to use the platform.",
292 )
294 with real_jail_session(token) as jail:
295 res = jail.JailInfo(empty_pb2.Empty())
296 assert res.jailed
297 assert res.has_pending_mod_notes
298 assert len(res.pending_mod_notes) == 1
299 note = res.pending_mod_notes[0]
300 assert note.note_content == "# Important note\nThis is a sample mod note."
302 note_id = note.note_id
304 with pytest.raises(grpc.RpcError) as e:
305 jail.AcknowledgePendingModNote(
306 jail_pb2.AcknowledgePendingModNoteReq(
307 note_id=note_id,
308 acknowledge=False,
309 )
310 )
311 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
312 assert e.value.details() == errors.MOD_NOTE_NEED_TO_ACKNOWELDGE
314 assert res.jailed
315 assert res.has_pending_mod_notes
316 assert len(res.pending_mod_notes) == 1
317 note = res.pending_mod_notes[0]
318 assert note.note_content == "# Important note\nThis is a sample mod note."
320 res = jail.AcknowledgePendingModNote(
321 jail_pb2.AcknowledgePendingModNoteReq(
322 note_id=note_id,
323 acknowledge=True,
324 )
325 )
326 assert not res.jailed
327 assert not res.has_pending_mod_notes
328 assert len(res.pending_mod_notes) == 0
330 with pytest.raises(grpc.RpcError) as e:
331 jail.AcknowledgePendingModNote(
332 jail_pb2.AcknowledgePendingModNoteReq(
333 note_id=note_id,
334 acknowledge=False,
335 )
336 )
337 assert e.value.code() == grpc.StatusCode.NOT_FOUND
338 assert e.value.details() == errors.MOD_NOTE_NOT_FOUND
340 with real_account_session(token) as account:
341 res = account.ListModNotes(empty_pb2.Empty())
342 assert len(res.mod_notes) == 1
343 note = res.mod_notes[0]
344 assert note.note_id == note_id
345 assert note.note_content == "# Important note\nThis is a sample mod note."
347 assert to_aware_datetime(note.acknowledged) > to_aware_datetime(note.created)
350def test_modnotes_no_notify(db, push_collector):
351 user, token = generate_user()
352 super_user, super_token = generate_user(is_superuser=True)
354 with real_jail_session(token) as jail:
355 res = jail.JailInfo(empty_pb2.Empty())
356 assert not res.jailed
357 assert not res.has_pending_mod_notes
358 assert len(res.pending_mod_notes) == 0
360 with real_account_session(token) as account:
361 res = account.ListModNotes(empty_pb2.Empty())
362 assert len(res.mod_notes) == 0
364 with real_admin_session(super_token) as admin:
365 with mock_notification_email() as mock:
366 admin.SendModNote(
367 admin_pb2.SendModNoteReq(
368 user=user.username,
369 content="# Important note\nThis is a sample mod note.",
370 internal_id="sample_note",
371 do_not_notify=True,
372 )
373 )
374 mock.assert_not_called()
376 with real_jail_session(token) as jail:
377 res = jail.JailInfo(empty_pb2.Empty())
378 assert res.jailed
379 assert res.has_pending_mod_notes
380 assert len(res.pending_mod_notes) == 1
381 note = res.pending_mod_notes[0]
382 assert note.note_content == "# Important note\nThis is a sample mod note."