Coverage for src/tests/test_admin.py: 100%
302 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1from datetime import date, datetime
2from re import match
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers import errors
9from couchers.db import session_scope
10from couchers.models import Cluster, ContentReport, EventOccurrence, UserSession
11from couchers.sql import couchers_select as select
12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta
13from proto import admin_pb2, events_pb2, reporting_pb2
14from tests.test_communities import create_community
15from tests.test_fixtures import ( # noqa
16 db,
17 email_fields,
18 events_session,
19 generate_user,
20 get_user_id_and_token,
21 mock_notification_email,
22 push_collector,
23 real_admin_session,
24 reporting_session,
25 testconfig,
26)
29@pytest.fixture(autouse=True)
30def _(testconfig):
31 pass
34def test_access_by_normal_user(db):
35 normal_user, normal_token = generate_user()
37 with real_admin_session(normal_token) as api:
38 # all requests to the admin servicer should break when done by a non-super_user
39 with pytest.raises(grpc.RpcError) as e:
40 api.GetUserDetails(
41 admin_pb2.GetUserDetailsReq(
42 user=str(normal_user.id),
43 )
44 )
45 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
48def test_GetUserDetails(db):
49 super_user, super_token = generate_user(is_superuser=True)
50 normal_user, normal_token = generate_user()
52 with real_admin_session(super_token) as api:
53 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
54 assert res.user_id == normal_user.id
55 assert res.username == normal_user.username
56 assert res.email == normal_user.email
57 assert res.gender == normal_user.gender
58 assert parse_date(res.birthdate) == normal_user.birthdate
59 assert not res.banned
60 assert not res.deleted
62 with real_admin_session(super_token) as api:
63 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
64 assert res.user_id == normal_user.id
65 assert res.username == normal_user.username
66 assert res.email == normal_user.email
67 assert res.gender == normal_user.gender
68 assert parse_date(res.birthdate) == normal_user.birthdate
69 assert not res.banned
70 assert not res.deleted
72 with real_admin_session(super_token) as api:
73 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
74 assert res.user_id == normal_user.id
75 assert res.username == normal_user.username
76 assert res.email == normal_user.email
77 assert res.gender == normal_user.gender
78 assert parse_date(res.birthdate) == normal_user.birthdate
79 assert not res.banned
80 assert not res.deleted
83def test_ChangeUserGender(db, push_collector):
84 super_user, super_token = generate_user(is_superuser=True)
85 normal_user, normal_token = generate_user()
87 with real_admin_session(super_token) as api:
88 with mock_notification_email() as mock:
89 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
90 assert res.user_id == normal_user.id
91 assert res.username == normal_user.username
92 assert res.email == normal_user.email
93 assert res.gender == "Machine"
94 assert parse_date(res.birthdate) == normal_user.birthdate
95 assert not res.banned
96 assert not res.deleted
98 mock.assert_called_once()
99 e = email_fields(mock)
100 assert e.subject == "[TEST] Your gender was changed"
101 assert e.recipient == normal_user.email
102 assert "Machine" in e.plain
103 assert "Machine" in e.html
105 push_collector.assert_user_has_single_matching(
106 normal_user.id,
107 title="Your gender was changed",
108 body="Your gender on Couchers.org was changed to Machine by an admin.",
109 )
112def test_ChangeUserBirthdate(db, push_collector):
113 super_user, super_token = generate_user(is_superuser=True)
114 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
116 with real_admin_session(super_token) as api:
117 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
118 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
120 with mock_notification_email() as mock:
121 res = api.ChangeUserBirthdate(
122 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
123 )
125 assert res.user_id == normal_user.id
126 assert res.username == normal_user.username
127 assert res.email == normal_user.email
128 assert res.birthdate == "1990-05-25"
129 assert res.gender == normal_user.gender
130 assert not res.banned
131 assert not res.deleted
133 mock.assert_called_once()
134 e = email_fields(mock)
135 assert e.subject == "[TEST] Your date of birth was changed"
136 assert e.recipient == normal_user.email
137 assert "1990" in e.plain
138 assert "1990" in e.html
140 push_collector.assert_user_has_single_matching(
141 normal_user.id,
142 title="Your date of birth was changed",
143 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.",
144 )
147def test_BanUser(db):
148 super_user, super_token = generate_user(is_superuser=True)
149 normal_user, _ = generate_user()
150 admin_note = "A good reason"
151 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
152 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
154 with real_admin_session(super_token) as api:
155 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
156 assert res.user_id == normal_user.id
157 assert res.username == normal_user.username
158 assert res.email == normal_user.email
159 assert res.gender == normal_user.gender
160 assert parse_date(res.birthdate) == normal_user.birthdate
161 assert res.banned
162 assert not res.deleted
163 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
166def test_UnbanUser(db):
167 super_user, super_token = generate_user(is_superuser=True)
168 normal_user, _ = generate_user()
169 admin_note = "A good reason"
170 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
171 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
173 with real_admin_session(super_token) as api:
174 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
175 assert res.user_id == normal_user.id
176 assert res.username == normal_user.username
177 assert res.email == normal_user.email
178 assert res.gender == normal_user.gender
179 assert parse_date(res.birthdate) == normal_user.birthdate
180 assert not res.banned
181 assert not res.deleted
182 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
185def test_AddAdminNote(db):
186 super_user, super_token = generate_user(is_superuser=True)
187 normal_user, _ = generate_user()
188 admin_note1 = "User reported strange behavior"
189 admin_note2 = "Insert private information here"
190 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
191 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
193 with real_admin_session(super_token) as api:
194 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
195 assert res.user_id == normal_user.id
196 assert res.username == normal_user.username
197 assert res.email == normal_user.email
198 assert res.gender == normal_user.gender
199 assert parse_date(res.birthdate) == normal_user.birthdate
200 assert not res.banned
201 assert not res.deleted
202 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
204 with real_admin_session(super_token) as api:
205 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
206 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
209def test_AddAdminNote_blank(db):
210 super_user, super_token = generate_user(is_superuser=True)
211 normal_user, _ = generate_user()
212 empty_admin_note = " \t \n "
214 with real_admin_session(super_token) as api:
215 with pytest.raises(grpc.RpcError) as e:
216 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
217 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
218 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY
221def test_admin_content_reports(db):
222 super_user, super_token = generate_user(is_superuser=True)
223 normal_user, token = generate_user()
224 bad_user1, _ = generate_user()
225 bad_user2, _ = generate_user()
227 with reporting_session(token) as api:
228 api.Report(
229 reporting_pb2.ReportReq(
230 reason="spam",
231 description="r1",
232 content_ref="comment/123",
233 author_user=bad_user1.username,
234 user_agent="n/a",
235 page="https://couchers.org/comment/123",
236 )
237 )
238 api.Report(
239 reporting_pb2.ReportReq(
240 reason="spam",
241 description="r2",
242 content_ref="comment/124",
243 author_user=bad_user2.username,
244 user_agent="n/a",
245 page="https://couchers.org/comment/124",
246 )
247 )
248 api.Report(
249 reporting_pb2.ReportReq(
250 reason="something else",
251 description="r3",
252 content_ref="page/321",
253 author_user=bad_user1.username,
254 user_agent="n/a",
255 page="https://couchers.org/page/321",
256 )
257 )
259 with session_scope() as session:
260 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all())
262 with real_admin_session(super_token) as api:
263 with pytest.raises(grpc.RpcError) as e:
264 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
265 assert e.value.code() == grpc.StatusCode.NOT_FOUND
266 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND
268 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
269 rep = res.content_report
270 assert rep.content_report_id == id_by_description["r2"]
271 assert rep.reporting_user_id == normal_user.id
272 assert rep.author_user_id == bad_user2.id
273 assert rep.reason == "spam"
274 assert rep.description == "r2"
275 assert rep.content_ref == "comment/124"
276 assert rep.user_agent == "n/a"
277 assert rep.page == "https://couchers.org/comment/124"
279 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
280 assert res.content_reports[0].content_report_id == id_by_description["r3"]
281 assert res.content_reports[1].content_report_id == id_by_description["r1"]
284def test_DeleteUser(db):
285 super_user, super_token = generate_user(is_superuser=True)
286 normal_user, normal_token = generate_user()
288 with real_admin_session(super_token) as api:
289 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
290 assert res.user_id == normal_user.id
291 assert res.username == normal_user.username
292 assert res.email == normal_user.email
293 assert res.gender == normal_user.gender
294 assert parse_date(res.birthdate) == normal_user.birthdate
295 assert not res.banned
296 assert res.deleted
299def test_CreateApiKey(db, push_collector):
300 with session_scope() as session:
301 super_user, super_token = generate_user(is_superuser=True)
302 normal_user, normal_token = generate_user()
304 assert (
305 session.execute(
306 select(func.count())
307 .select_from(UserSession)
308 .where(UserSession.is_api_key == True)
309 .where(UserSession.user_id == normal_user.id)
310 ).scalar_one()
311 == 0
312 )
314 with mock_notification_email() as mock:
315 with real_admin_session(super_token) as api:
316 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
318 mock.assert_called_once()
319 e = email_fields(mock)
320 assert e.subject == "[TEST] Your API key for Couchers.org"
322 with session_scope() as session:
323 api_key = session.execute(
324 select(UserSession)
325 .where(UserSession.is_valid)
326 .where(UserSession.is_api_key == True)
327 .where(UserSession.user_id == normal_user.id)
328 ).scalar_one()
330 assert api_key.token in e.plain
331 assert api_key.token in e.html
333 assert e.recipient == normal_user.email
334 assert "api key" in e.subject.lower()
335 unique_string = "We've issued you with the following API key:"
336 assert unique_string in e.plain
337 assert unique_string in e.html
338 assert "support@couchers.org" in e.plain
339 assert "support@couchers.org" in e.html
341 push_collector.assert_user_has_single_matching(
342 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email."
343 )
346VALID_GEOJSON_MULTIPOLYGON = """
347 {
348 "type": "MultiPolygon",
349 "coordinates":
350 [
351 [
352 [
353 [
354 -73.98114904754641,
355 40.7470284264813
356 ],
357 [
358 -73.98314135177611,
359 40.73416844413217
360 ],
361 [
362 -74.00538969848634,
363 40.734314779027144
364 ],
365 [
366 -74.00479214294432,
367 40.75027851544338
368 ],
369 [
370 -73.98114904754641,
371 40.7470284264813
372 ]
373 ]
374 ]
375 ]
376 }
377"""
379POINT_GEOJSON = """
380{ "type": "Point", "coordinates": [100.0, 0.0] }
381"""
384def test_CreateCommunity_invalid_geojson(db):
385 super_user, super_token = generate_user(is_superuser=True)
386 normal_user, normal_token = generate_user()
387 with real_admin_session(super_token) as api:
388 with pytest.raises(grpc.RpcError) as e:
389 api.CreateCommunity(
390 admin_pb2.CreateCommunityReq(
391 name="test community",
392 description="community for testing",
393 admin_ids=[],
394 geojson=POINT_GEOJSON,
395 )
396 )
397 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
398 assert e.value.details() == errors.NO_MULTIPOLYGON
401def test_CreateCommunity(db):
402 with session_scope() as session:
403 super_user, super_token = generate_user(is_superuser=True)
404 normal_user, normal_token = generate_user()
405 with real_admin_session(super_token) as api:
406 api.CreateCommunity(
407 admin_pb2.CreateCommunityReq(
408 name="test community",
409 description="community for testing",
410 admin_ids=[],
411 geojson=VALID_GEOJSON_MULTIPOLYGON,
412 )
413 )
414 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
415 assert community.description == "community for testing"
416 assert community.slug == "test-community"
419def test_GetChats(db):
420 super_user, super_token = generate_user(is_superuser=True)
421 normal_user, normal_token = generate_user()
423 with real_admin_session(super_token) as api:
424 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
425 assert res.response
428def test_badges(db, push_collector):
429 super_user, super_token = generate_user(is_superuser=True)
430 normal_user, normal_token = generate_user()
432 with real_admin_session(super_token) as api:
433 # can add a badge
434 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
435 with mock_notification_email() as mock:
436 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
437 assert "volunteer" in res.badges
439 # badge emails are disabled by default
440 mock.assert_not_called()
442 push_collector.assert_user_has_single_matching(
443 normal_user.id,
444 title="The Active Volunteer badge was added to your profile",
445 body="Check out your profile to see the new badge!",
446 )
448 # can't add/edit special tags
449 with pytest.raises(grpc.RpcError) as e:
450 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
451 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
452 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE
454 # double add badge
455 with pytest.raises(grpc.RpcError) as e:
456 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
457 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
458 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE
460 # can remove badge
461 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
462 with mock_notification_email() as mock:
463 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
464 assert "volunteer" not in res.badges
466 # badge emails are disabled by default
467 mock.assert_not_called()
469 push_collector.assert_user_push_matches_fields(
470 normal_user.id,
471 ix=1,
472 title="The Active Volunteer badge was removed from your profile",
473 body="You can see all your badges on your profile.",
474 )
476 # not found on user
477 with pytest.raises(grpc.RpcError) as e:
478 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
479 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
480 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE
482 # not found in general
483 with pytest.raises(grpc.RpcError) as e:
484 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
485 assert e.value.code() == grpc.StatusCode.NOT_FOUND
486 assert e.value.details() == errors.BADGE_NOT_FOUND
489def test_DeleteEvent(db):
490 super_user, super_token = generate_user(is_superuser=True)
491 normal_user, normal_token = generate_user()
493 with session_scope() as session:
494 create_community(session, 0, 2, "Community", [normal_user], [], None)
496 start_time = now() + timedelta(hours=2)
497 end_time = start_time + timedelta(hours=3)
498 with events_session(normal_token) as api:
499 res = api.CreateEvent(
500 events_pb2.CreateEventReq(
501 title="Dummy Title",
502 content="Dummy content.",
503 photo_key=None,
504 offline_information=events_pb2.OfflineEventInformation(
505 address="Near Null Island",
506 lat=0.1,
507 lng=0.2,
508 ),
509 start_time=Timestamp_from_datetime(start_time),
510 end_time=Timestamp_from_datetime(end_time),
511 timezone="UTC",
512 )
513 )
514 event_id = res.event_id
515 assert not res.is_deleted
517 with session_scope() as session:
518 with real_admin_session(super_token) as api:
519 api.DeleteEvent(
520 admin_pb2.DeleteEventReq(
521 event_id=event_id,
522 )
523 )
524 occurrence = session.get(EventOccurrence, ident=event_id)
525 assert occurrence.is_deleted
528def test_ListUserIds(db):
529 super_user, super_token = generate_user(is_superuser=True)
530 normal_user, normal_token = generate_user()
532 with real_admin_session(super_token) as api:
533 res = api.ListUserIds(
534 admin_pb2.ListUserIdsReq(
535 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
536 )
537 )
538 assert len(res.user_ids) == 2
539 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
541 with real_admin_session(super_token) as api:
542 res = api.ListUserIds(
543 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
544 )
545 assert res.user_ids == []
548# community invite feature tested in test_events.py