Coverage for src/tests/test_admin.py: 100%
337 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1from datetime import date, datetime
2from re import match
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers import errors
9from couchers.db import session_scope
10from couchers.models import Cluster, ContentReport, EventOccurrence, Node, UserSession
11from couchers.sql import couchers_select as select
12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta
13from proto import admin_pb2, events_pb2, reporting_pb2
14from tests.test_communities import create_community
15from tests.test_fixtures import ( # noqa
16 db,
17 email_fields,
18 events_session,
19 generate_user,
20 get_user_id_and_token,
21 mock_notification_email,
22 push_collector,
23 real_admin_session,
24 reporting_session,
25 testconfig,
26)
29@pytest.fixture(autouse=True)
30def _(testconfig):
31 pass
34def test_access_by_normal_user(db):
35 normal_user, normal_token = generate_user()
37 with real_admin_session(normal_token) as api:
38 # all requests to the admin servicer should break when done by a non-super_user
39 with pytest.raises(grpc.RpcError) as e:
40 api.GetUserDetails(
41 admin_pb2.GetUserDetailsReq(
42 user=str(normal_user.id),
43 )
44 )
45 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
48def test_GetUserDetails(db):
49 super_user, super_token = generate_user(is_superuser=True)
50 normal_user, normal_token = generate_user()
52 with real_admin_session(super_token) as api:
53 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
54 assert res.user_id == normal_user.id
55 assert res.username == normal_user.username
56 assert res.email == normal_user.email
57 assert res.gender == normal_user.gender
58 assert parse_date(res.birthdate) == normal_user.birthdate
59 assert not res.banned
60 assert not res.deleted
62 with real_admin_session(super_token) as api:
63 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
64 assert res.user_id == normal_user.id
65 assert res.username == normal_user.username
66 assert res.email == normal_user.email
67 assert res.gender == normal_user.gender
68 assert parse_date(res.birthdate) == normal_user.birthdate
69 assert not res.banned
70 assert not res.deleted
72 with real_admin_session(super_token) as api:
73 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
74 assert res.user_id == normal_user.id
75 assert res.username == normal_user.username
76 assert res.email == normal_user.email
77 assert res.gender == normal_user.gender
78 assert parse_date(res.birthdate) == normal_user.birthdate
79 assert not res.banned
80 assert not res.deleted
83def test_ChangeUserGender(db, push_collector):
84 super_user, super_token = generate_user(is_superuser=True)
85 normal_user, normal_token = generate_user()
87 with real_admin_session(super_token) as api:
88 with mock_notification_email() as mock:
89 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
90 assert res.user_id == normal_user.id
91 assert res.username == normal_user.username
92 assert res.email == normal_user.email
93 assert res.gender == "Machine"
94 assert parse_date(res.birthdate) == normal_user.birthdate
95 assert not res.banned
96 assert not res.deleted
98 mock.assert_called_once()
99 e = email_fields(mock)
100 assert e.subject == "[TEST] Your gender was changed"
101 assert e.recipient == normal_user.email
102 assert "Machine" in e.plain
103 assert "Machine" in e.html
105 push_collector.assert_user_has_single_matching(
106 normal_user.id,
107 title="Your gender was changed",
108 body="Your gender on Couchers.org was changed to Machine by an admin.",
109 )
112def test_ChangeUserBirthdate(db, push_collector):
113 super_user, super_token = generate_user(is_superuser=True)
114 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
116 with real_admin_session(super_token) as api:
117 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
118 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
120 with mock_notification_email() as mock:
121 res = api.ChangeUserBirthdate(
122 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
123 )
125 assert res.user_id == normal_user.id
126 assert res.username == normal_user.username
127 assert res.email == normal_user.email
128 assert res.birthdate == "1990-05-25"
129 assert res.gender == normal_user.gender
130 assert not res.banned
131 assert not res.deleted
133 mock.assert_called_once()
134 e = email_fields(mock)
135 assert e.subject == "[TEST] Your date of birth was changed"
136 assert e.recipient == normal_user.email
137 assert "1990" in e.plain
138 assert "1990" in e.html
140 push_collector.assert_user_has_single_matching(
141 normal_user.id,
142 title="Your date of birth was changed",
143 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.",
144 )
147def test_BanUser(db):
148 super_user, super_token = generate_user(is_superuser=True)
149 normal_user, _ = generate_user()
150 admin_note = "A good reason"
151 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
152 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
154 with real_admin_session(super_token) as api:
155 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
156 assert res.user_id == normal_user.id
157 assert res.username == normal_user.username
158 assert res.email == normal_user.email
159 assert res.gender == normal_user.gender
160 assert parse_date(res.birthdate) == normal_user.birthdate
161 assert res.banned
162 assert not res.deleted
163 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
166def test_UnbanUser(db):
167 super_user, super_token = generate_user(is_superuser=True)
168 normal_user, _ = generate_user()
169 admin_note = "A good reason"
170 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
171 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
173 with real_admin_session(super_token) as api:
174 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
175 assert res.user_id == normal_user.id
176 assert res.username == normal_user.username
177 assert res.email == normal_user.email
178 assert res.gender == normal_user.gender
179 assert parse_date(res.birthdate) == normal_user.birthdate
180 assert not res.banned
181 assert not res.deleted
182 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
185def test_AddAdminNote(db):
186 super_user, super_token = generate_user(is_superuser=True)
187 normal_user, _ = generate_user()
188 admin_note1 = "User reported strange behavior"
189 admin_note2 = "Insert private information here"
190 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
191 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
193 with real_admin_session(super_token) as api:
194 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
195 assert res.user_id == normal_user.id
196 assert res.username == normal_user.username
197 assert res.email == normal_user.email
198 assert res.gender == normal_user.gender
199 assert parse_date(res.birthdate) == normal_user.birthdate
200 assert not res.banned
201 assert not res.deleted
202 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
204 with real_admin_session(super_token) as api:
205 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
206 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
209def test_AddAdminNote_blank(db):
210 super_user, super_token = generate_user(is_superuser=True)
211 normal_user, _ = generate_user()
212 empty_admin_note = " \t \n "
214 with real_admin_session(super_token) as api:
215 with pytest.raises(grpc.RpcError) as e:
216 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
217 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
218 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY
221def test_admin_content_reports(db):
222 super_user, super_token = generate_user(is_superuser=True)
223 normal_user, token = generate_user()
224 bad_user1, _ = generate_user()
225 bad_user2, _ = generate_user()
227 with reporting_session(token) as api:
228 api.Report(
229 reporting_pb2.ReportReq(
230 reason="spam",
231 description="r1",
232 content_ref="comment/123",
233 author_user=bad_user1.username,
234 user_agent="n/a",
235 page="https://couchers.org/comment/123",
236 )
237 )
238 api.Report(
239 reporting_pb2.ReportReq(
240 reason="spam",
241 description="r2",
242 content_ref="comment/124",
243 author_user=bad_user2.username,
244 user_agent="n/a",
245 page="https://couchers.org/comment/124",
246 )
247 )
248 api.Report(
249 reporting_pb2.ReportReq(
250 reason="something else",
251 description="r3",
252 content_ref="page/321",
253 author_user=bad_user1.username,
254 user_agent="n/a",
255 page="https://couchers.org/page/321",
256 )
257 )
259 with session_scope() as session:
260 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all())
262 with real_admin_session(super_token) as api:
263 with pytest.raises(grpc.RpcError) as e:
264 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
265 assert e.value.code() == grpc.StatusCode.NOT_FOUND
266 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND
268 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
269 rep = res.content_report
270 assert rep.content_report_id == id_by_description["r2"]
271 assert rep.reporting_user_id == normal_user.id
272 assert rep.author_user_id == bad_user2.id
273 assert rep.reason == "spam"
274 assert rep.description == "r2"
275 assert rep.content_ref == "comment/124"
276 assert rep.user_agent == "n/a"
277 assert rep.page == "https://couchers.org/comment/124"
279 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
280 assert res.content_reports[0].content_report_id == id_by_description["r3"]
281 assert res.content_reports[1].content_report_id == id_by_description["r1"]
284def test_DeleteUser(db):
285 super_user, super_token = generate_user(is_superuser=True)
286 normal_user, normal_token = generate_user()
288 with real_admin_session(super_token) as api:
289 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
290 assert res.user_id == normal_user.id
291 assert res.username == normal_user.username
292 assert res.email == normal_user.email
293 assert res.gender == normal_user.gender
294 assert parse_date(res.birthdate) == normal_user.birthdate
295 assert not res.banned
296 assert res.deleted
299def test_CreateApiKey(db, push_collector):
300 with session_scope() as session:
301 super_user, super_token = generate_user(is_superuser=True)
302 normal_user, normal_token = generate_user()
304 assert (
305 session.execute(
306 select(func.count())
307 .select_from(UserSession)
308 .where(UserSession.is_api_key == True)
309 .where(UserSession.user_id == normal_user.id)
310 ).scalar_one()
311 == 0
312 )
314 with mock_notification_email() as mock:
315 with real_admin_session(super_token) as api:
316 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
318 mock.assert_called_once()
319 e = email_fields(mock)
320 assert e.subject == "[TEST] Your API key for Couchers.org"
322 with session_scope() as session:
323 api_key = session.execute(
324 select(UserSession)
325 .where(UserSession.is_valid)
326 .where(UserSession.is_api_key == True)
327 .where(UserSession.user_id == normal_user.id)
328 ).scalar_one()
330 assert api_key.token in e.plain
331 assert api_key.token in e.html
333 assert e.recipient == normal_user.email
334 assert "api key" in e.subject.lower()
335 unique_string = "We've issued you with the following API key:"
336 assert unique_string in e.plain
337 assert unique_string in e.html
338 assert "support@couchers.org" in e.plain
339 assert "support@couchers.org" in e.html
341 push_collector.assert_user_has_single_matching(
342 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email."
343 )
346VALID_GEOJSON_MULTIPOLYGON = """
347 {
348 "type": "MultiPolygon",
349 "coordinates":
350 [
351 [
352 [
353 [
354 -73.98114904754641,
355 40.7470284264813
356 ],
357 [
358 -73.98314135177611,
359 40.73416844413217
360 ],
361 [
362 -74.00538969848634,
363 40.734314779027144
364 ],
365 [
366 -74.00479214294432,
367 40.75027851544338
368 ],
369 [
370 -73.98114904754641,
371 40.7470284264813
372 ]
373 ]
374 ]
375 ]
376 }
377"""
379POINT_GEOJSON = """
380{ "type": "Point", "coordinates": [100.0, 0.0] }
381"""
384def test_CreateCommunity_invalid_geojson(db):
385 super_user, super_token = generate_user(is_superuser=True)
386 normal_user, normal_token = generate_user()
387 with real_admin_session(super_token) as api:
388 with pytest.raises(grpc.RpcError) as e:
389 api.CreateCommunity(
390 admin_pb2.CreateCommunityReq(
391 name="test community",
392 description="community for testing",
393 admin_ids=[],
394 geojson=POINT_GEOJSON,
395 )
396 )
397 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
398 assert e.value.details() == errors.NO_MULTIPOLYGON
401def test_CreateCommunity(db):
402 with session_scope() as session:
403 super_user, super_token = generate_user(is_superuser=True)
404 normal_user, normal_token = generate_user()
405 with real_admin_session(super_token) as api:
406 api.CreateCommunity(
407 admin_pb2.CreateCommunityReq(
408 name="test community",
409 description="community for testing",
410 admin_ids=[],
411 geojson=VALID_GEOJSON_MULTIPOLYGON,
412 )
413 )
414 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
415 assert community.description == "community for testing"
416 assert community.slug == "test-community"
419def test_UpdateCommunity_invalid_geojson(db):
420 super_user, super_token = generate_user(is_superuser=True)
422 with session_scope() as session:
423 with real_admin_session(super_token) as api:
424 api.CreateCommunity(
425 admin_pb2.CreateCommunityReq(
426 name="test community",
427 description="community for testing",
428 admin_ids=[],
429 geojson=VALID_GEOJSON_MULTIPOLYGON,
430 )
431 )
432 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
434 with pytest.raises(grpc.RpcError) as e:
435 api.UpdateCommunity(
436 admin_pb2.UpdateCommunityReq(
437 community_id=community.parent_node_id,
438 name="test community 2",
439 description="community for testing 2",
440 geojson=POINT_GEOJSON,
441 )
442 )
443 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
444 assert e.value.details() == errors.NO_MULTIPOLYGON
447def test_UpdateCommunity_invalid_id(db):
448 super_user, super_token = generate_user(is_superuser=True)
450 with session_scope() as session:
451 with real_admin_session(super_token) as api:
452 api.CreateCommunity(
453 admin_pb2.CreateCommunityReq(
454 name="test community",
455 description="community for testing",
456 admin_ids=[],
457 geojson=VALID_GEOJSON_MULTIPOLYGON,
458 )
459 )
461 with pytest.raises(grpc.RpcError) as e:
462 api.UpdateCommunity(
463 admin_pb2.UpdateCommunityReq(
464 community_id=1000,
465 name="test community 1000",
466 description="community for testing 1000",
467 geojson=VALID_GEOJSON_MULTIPOLYGON,
468 )
469 )
470 assert e.value.code() == grpc.StatusCode.NOT_FOUND
471 assert e.value.details() == errors.COMMUNITY_NOT_FOUND
474def test_UpdateCommunity(db):
475 super_user, super_token = generate_user(is_superuser=True)
477 with session_scope() as session:
478 with real_admin_session(super_token) as api:
479 api.CreateCommunity(
480 admin_pb2.CreateCommunityReq(
481 name="test community",
482 description="community for testing",
483 admin_ids=[],
484 geojson=VALID_GEOJSON_MULTIPOLYGON,
485 )
486 )
487 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
488 assert community.description == "community for testing"
490 api.CreateCommunity(
491 admin_pb2.CreateCommunityReq(
492 name="test community 2",
493 description="community for testing 2",
494 admin_ids=[],
495 geojson=VALID_GEOJSON_MULTIPOLYGON,
496 )
497 )
498 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one()
500 api.UpdateCommunity(
501 admin_pb2.UpdateCommunityReq(
502 community_id=community.parent_node_id,
503 name="test community 2",
504 description="community for testing 2",
505 geojson=VALID_GEOJSON_MULTIPOLYGON,
506 parent_node_id=community_2.parent_node_id,
507 )
508 )
509 session.commit()
511 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one()
512 assert community_updated.description == "community for testing 2"
513 assert community_updated.slug == "test-community-2"
515 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one()
516 assert node_updated.parent_node_id == community_2.parent_node_id
519def test_GetChats(db):
520 super_user, super_token = generate_user(is_superuser=True)
521 normal_user, normal_token = generate_user()
523 with real_admin_session(super_token) as api:
524 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
525 assert res.response
528def test_badges(db, push_collector):
529 super_user, super_token = generate_user(is_superuser=True)
530 normal_user, normal_token = generate_user()
532 with real_admin_session(super_token) as api:
533 # can add a badge
534 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
535 with mock_notification_email() as mock:
536 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
537 assert "volunteer" in res.badges
539 # badge emails are disabled by default
540 mock.assert_not_called()
542 push_collector.assert_user_has_single_matching(
543 normal_user.id,
544 title="The Active Volunteer badge was added to your profile",
545 body="Check out your profile to see the new badge!",
546 )
548 # can't add/edit special tags
549 with pytest.raises(grpc.RpcError) as e:
550 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
551 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
552 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE
554 # double add badge
555 with pytest.raises(grpc.RpcError) as e:
556 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
557 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
558 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE
560 # can remove badge
561 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
562 with mock_notification_email() as mock:
563 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
564 assert "volunteer" not in res.badges
566 # badge emails are disabled by default
567 mock.assert_not_called()
569 push_collector.assert_user_push_matches_fields(
570 normal_user.id,
571 ix=1,
572 title="The Active Volunteer badge was removed from your profile",
573 body="You can see all your badges on your profile.",
574 )
576 # not found on user
577 with pytest.raises(grpc.RpcError) as e:
578 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
579 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
580 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE
582 # not found in general
583 with pytest.raises(grpc.RpcError) as e:
584 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
585 assert e.value.code() == grpc.StatusCode.NOT_FOUND
586 assert e.value.details() == errors.BADGE_NOT_FOUND
589def test_DeleteEvent(db):
590 super_user, super_token = generate_user(is_superuser=True)
591 normal_user, normal_token = generate_user()
593 with session_scope() as session:
594 create_community(session, 0, 2, "Community", [normal_user], [], None)
596 start_time = now() + timedelta(hours=2)
597 end_time = start_time + timedelta(hours=3)
598 with events_session(normal_token) as api:
599 res = api.CreateEvent(
600 events_pb2.CreateEventReq(
601 title="Dummy Title",
602 content="Dummy content.",
603 photo_key=None,
604 offline_information=events_pb2.OfflineEventInformation(
605 address="Near Null Island",
606 lat=0.1,
607 lng=0.2,
608 ),
609 start_time=Timestamp_from_datetime(start_time),
610 end_time=Timestamp_from_datetime(end_time),
611 timezone="UTC",
612 )
613 )
614 event_id = res.event_id
615 assert not res.is_deleted
617 with session_scope() as session:
618 with real_admin_session(super_token) as api:
619 api.DeleteEvent(
620 admin_pb2.DeleteEventReq(
621 event_id=event_id,
622 )
623 )
624 occurrence = session.get(EventOccurrence, ident=event_id)
625 assert occurrence.is_deleted
628def test_ListUserIds(db):
629 super_user, super_token = generate_user(is_superuser=True)
630 normal_user, normal_token = generate_user()
632 with real_admin_session(super_token) as api:
633 res = api.ListUserIds(
634 admin_pb2.ListUserIdsReq(
635 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
636 )
637 )
638 assert len(res.user_ids) == 2
639 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
641 with real_admin_session(super_token) as api:
642 res = api.ListUserIds(
643 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
644 )
645 assert res.user_ids == []
648# community invite feature tested in test_events.py