Coverage for src/tests/test_admin.py: 100%
478 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-18 14:01 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-18 14:01 +0000
1from datetime import date, datetime
2from re import match
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers.db import session_scope
9from couchers.models import (
10 AccountDeletionToken,
11 Cluster,
12 ContentReport,
13 EventOccurrence,
14 ModerationUserList,
15 Node,
16 Reference,
17 UserSession,
18)
19from couchers.proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2
20from couchers.sql import couchers_select as select
21from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta
22from tests.test_communities import create_community
23from tests.test_fixtures import ( # noqa
24 add_users_to_new_moderation_list,
25 auth_api_session,
26 db,
27 email_fields,
28 events_session,
29 generate_user,
30 get_user_id_and_token,
31 mock_notification_email,
32 push_collector,
33 real_admin_session,
34 references_session,
35 reporting_session,
36 testconfig,
37)
40@pytest.fixture(autouse=True)
41def _(testconfig):
42 pass
45def test_access_by_normal_user(db):
46 normal_user, normal_token = generate_user()
48 with real_admin_session(normal_token) as api:
49 # all requests to the admin servicer should break when done by a non-super_user
50 with pytest.raises(grpc.RpcError) as e:
51 api.GetUserDetails(
52 admin_pb2.GetUserDetailsReq(
53 user=str(normal_user.id),
54 )
55 )
56 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
59def test_GetUser(db):
60 super_user, super_token = generate_user(is_superuser=True)
61 normal_user, normal_token = generate_user()
63 with real_admin_session(super_token) as api:
64 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
65 assert res.user_id == normal_user.id
66 assert res.username == normal_user.username
68 with real_admin_session(super_token) as api:
69 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
71 with real_admin_session(super_token) as api:
72 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
73 assert res.user_id == normal_user.id
74 assert res.username == normal_user.username
77def test_GetUserDetails(db):
78 super_user, super_token = generate_user(is_superuser=True)
79 normal_user, normal_token = generate_user()
81 with real_admin_session(super_token) as api:
82 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
83 assert res.user_id == normal_user.id
84 assert res.username == normal_user.username
85 assert res.email == normal_user.email
86 assert res.gender == normal_user.gender
87 assert parse_date(res.birthdate) == normal_user.birthdate
88 assert not res.banned
89 assert not res.deleted
91 with real_admin_session(super_token) as api:
92 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
93 assert res.user_id == normal_user.id
94 assert res.username == normal_user.username
95 assert res.email == normal_user.email
96 assert res.gender == normal_user.gender
97 assert parse_date(res.birthdate) == normal_user.birthdate
98 assert not res.banned
99 assert not res.deleted
101 with real_admin_session(super_token) as api:
102 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
103 assert res.user_id == normal_user.id
104 assert res.username == normal_user.username
105 assert res.email == normal_user.email
106 assert res.gender == normal_user.gender
107 assert parse_date(res.birthdate) == normal_user.birthdate
108 assert not res.banned
109 assert not res.deleted
112def test_ChangeUserGender(db, push_collector):
113 super_user, super_token = generate_user(is_superuser=True)
114 normal_user, normal_token = generate_user()
116 with real_admin_session(super_token) as api:
117 with mock_notification_email() as mock:
118 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
119 assert res.user_id == normal_user.id
120 assert res.username == normal_user.username
121 assert res.email == normal_user.email
122 assert res.gender == "Machine"
123 assert parse_date(res.birthdate) == normal_user.birthdate
124 assert not res.banned
125 assert not res.deleted
127 mock.assert_called_once()
128 e = email_fields(mock)
129 assert e.subject == "[TEST] Your gender was changed"
130 assert e.recipient == normal_user.email
131 assert "Machine" in e.plain
132 assert "Machine" in e.html
134 push_collector.assert_user_has_single_matching(
135 normal_user.id,
136 title="Your gender was changed",
137 body="Your gender on Couchers.org was changed to Machine by an admin.",
138 )
141def test_ChangeUserBirthdate(db, push_collector):
142 super_user, super_token = generate_user(is_superuser=True)
143 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
145 with real_admin_session(super_token) as api:
146 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
147 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
149 with mock_notification_email() as mock:
150 res = api.ChangeUserBirthdate(
151 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
152 )
154 assert res.user_id == normal_user.id
155 assert res.username == normal_user.username
156 assert res.email == normal_user.email
157 assert res.birthdate == "1990-05-25"
158 assert res.gender == normal_user.gender
159 assert not res.banned
160 assert not res.deleted
162 mock.assert_called_once()
163 e = email_fields(mock)
164 assert e.subject == "[TEST] Your date of birth was changed"
165 assert e.recipient == normal_user.email
166 assert "1990" in e.plain
167 assert "1990" in e.html
169 push_collector.assert_user_has_single_matching(
170 normal_user.id,
171 title="Your date of birth was changed",
172 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.",
173 )
176def test_BanUser(db):
177 super_user, super_token = generate_user(is_superuser=True)
178 normal_user, _ = generate_user()
179 admin_note = "A good reason"
180 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
181 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
183 with real_admin_session(super_token) as api:
184 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
185 assert res.user_id == normal_user.id
186 assert res.username == normal_user.username
187 assert res.email == normal_user.email
188 assert res.gender == normal_user.gender
189 assert parse_date(res.birthdate) == normal_user.birthdate
190 assert res.banned
191 assert not res.deleted
192 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
195def test_UnbanUser(db):
196 super_user, super_token = generate_user(is_superuser=True)
197 normal_user, _ = generate_user()
198 admin_note = "A good reason"
199 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
200 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
202 with real_admin_session(super_token) as api:
203 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
204 assert res.user_id == normal_user.id
205 assert res.username == normal_user.username
206 assert res.email == normal_user.email
207 assert res.gender == normal_user.gender
208 assert parse_date(res.birthdate) == normal_user.birthdate
209 assert not res.banned
210 assert not res.deleted
211 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
214def test_AddAdminNote(db):
215 super_user, super_token = generate_user(is_superuser=True)
216 normal_user, _ = generate_user()
217 admin_note1 = "User reported strange behavior"
218 admin_note2 = "Insert private information here"
219 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
220 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
222 with real_admin_session(super_token) as api:
223 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
224 assert res.user_id == normal_user.id
225 assert res.username == normal_user.username
226 assert res.email == normal_user.email
227 assert res.gender == normal_user.gender
228 assert parse_date(res.birthdate) == normal_user.birthdate
229 assert not res.banned
230 assert not res.deleted
231 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
233 with real_admin_session(super_token) as api:
234 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
235 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
238def test_AddAdminNote_blank(db):
239 super_user, super_token = generate_user(is_superuser=True)
240 normal_user, _ = generate_user()
241 empty_admin_note = " \t \n "
243 with real_admin_session(super_token) as api:
244 with pytest.raises(grpc.RpcError) as e:
245 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
246 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
247 assert e.value.details() == "The admin note cannot be empty."
250def test_admin_content_reports(db):
251 super_user, super_token = generate_user(is_superuser=True)
252 normal_user, token = generate_user()
253 bad_user1, _ = generate_user()
254 bad_user2, _ = generate_user()
256 with reporting_session(token) as api:
257 api.Report(
258 reporting_pb2.ReportReq(
259 reason="spam",
260 description="r1",
261 content_ref="comment/123",
262 author_user=bad_user1.username,
263 user_agent="n/a",
264 page="https://couchers.org/comment/123",
265 )
266 )
267 api.Report(
268 reporting_pb2.ReportReq(
269 reason="spam",
270 description="r2",
271 content_ref="comment/124",
272 author_user=bad_user2.username,
273 user_agent="n/a",
274 page="https://couchers.org/comment/124",
275 )
276 )
277 api.Report(
278 reporting_pb2.ReportReq(
279 reason="something else",
280 description="r3",
281 content_ref="page/321",
282 author_user=bad_user1.username,
283 user_agent="n/a",
284 page="https://couchers.org/page/321",
285 )
286 )
288 with session_scope() as session:
289 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all())
291 with real_admin_session(super_token) as api:
292 with pytest.raises(grpc.RpcError) as e:
293 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
294 assert e.value.code() == grpc.StatusCode.NOT_FOUND
295 assert e.value.details() == "Content report not found."
297 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
298 rep = res.content_report
299 assert rep.content_report_id == id_by_description["r2"]
300 assert rep.reporting_user_id == normal_user.id
301 assert rep.author_user_id == bad_user2.id
302 assert rep.reason == "spam"
303 assert rep.description == "r2"
304 assert rep.content_ref == "comment/124"
305 assert rep.user_agent == "n/a"
306 assert rep.page == "https://couchers.org/comment/124"
308 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
309 assert res.content_reports[0].content_report_id == id_by_description["r3"]
310 assert res.content_reports[1].content_report_id == id_by_description["r1"]
313def test_DeleteUser(db):
314 super_user, super_token = generate_user(is_superuser=True)
315 normal_user, normal_token = generate_user()
317 with real_admin_session(super_token) as api:
318 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
319 assert res.user_id == normal_user.id
320 assert res.username == normal_user.username
321 assert res.email == normal_user.email
322 assert res.gender == normal_user.gender
323 assert parse_date(res.birthdate) == normal_user.birthdate
324 assert not res.banned
325 assert res.deleted
327 with real_admin_session(super_token) as api:
328 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
329 assert res.user_id == normal_user.id
330 assert res.username == normal_user.username
331 assert res.email == normal_user.email
332 assert res.gender == normal_user.gender
333 assert parse_date(res.birthdate) == normal_user.birthdate
334 assert not res.banned
335 assert not res.deleted
338def test_CreateApiKey(db, push_collector):
339 with session_scope() as session:
340 super_user, super_token = generate_user(is_superuser=True)
341 normal_user, normal_token = generate_user()
343 assert (
344 session.execute(
345 select(func.count())
346 .select_from(UserSession)
347 .where(UserSession.is_api_key == True)
348 .where(UserSession.user_id == normal_user.id)
349 ).scalar_one()
350 == 0
351 )
353 with mock_notification_email() as mock:
354 with real_admin_session(super_token) as api:
355 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
357 mock.assert_called_once()
358 e = email_fields(mock)
359 assert e.subject == "[TEST] Your API key for Couchers.org"
361 with session_scope() as session:
362 api_key = session.execute(
363 select(UserSession)
364 .where(UserSession.is_valid)
365 .where(UserSession.is_api_key == True)
366 .where(UserSession.user_id == normal_user.id)
367 ).scalar_one()
369 assert api_key.token in e.plain
370 assert api_key.token in e.html
372 assert e.recipient == normal_user.email
373 assert "api key" in e.subject.lower()
374 unique_string = "We've issued you with the following API key:"
375 assert unique_string in e.plain
376 assert unique_string in e.html
377 assert "support@couchers.org" in e.plain
378 assert "support@couchers.org" in e.html
380 push_collector.assert_user_has_single_matching(
381 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email."
382 )
385VALID_GEOJSON_MULTIPOLYGON = """
386 {
387 "type": "MultiPolygon",
388 "coordinates":
389 [
390 [
391 [
392 [
393 -73.98114904754641,
394 40.7470284264813
395 ],
396 [
397 -73.98314135177611,
398 40.73416844413217
399 ],
400 [
401 -74.00538969848634,
402 40.734314779027144
403 ],
404 [
405 -74.00479214294432,
406 40.75027851544338
407 ],
408 [
409 -73.98114904754641,
410 40.7470284264813
411 ]
412 ]
413 ]
414 ]
415 }
416"""
418POINT_GEOJSON = """
419{ "type": "Point", "coordinates": [100.0, 0.0] }
420"""
423def test_CreateCommunity_invalid_geojson(db):
424 super_user, super_token = generate_user(is_superuser=True)
425 normal_user, normal_token = generate_user()
426 with real_admin_session(super_token) as api:
427 with pytest.raises(grpc.RpcError) as e:
428 api.CreateCommunity(
429 admin_pb2.CreateCommunityReq(
430 name="test community",
431 description="community for testing",
432 admin_ids=[],
433 geojson=POINT_GEOJSON,
434 )
435 )
436 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
437 assert e.value.details() == "GeoJson was not of type MultiPolygon."
440def test_CreateCommunity(db):
441 with session_scope() as session:
442 super_user, super_token = generate_user(is_superuser=True)
443 normal_user, normal_token = generate_user()
444 with real_admin_session(super_token) as api:
445 api.CreateCommunity(
446 admin_pb2.CreateCommunityReq(
447 name="test community",
448 description="community for testing",
449 admin_ids=[],
450 geojson=VALID_GEOJSON_MULTIPOLYGON,
451 )
452 )
453 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
454 assert community.description == "community for testing"
455 assert community.slug == "test-community"
458def test_UpdateCommunity_invalid_geojson(db):
459 super_user, super_token = generate_user(is_superuser=True)
461 with session_scope() as session:
462 with real_admin_session(super_token) as api:
463 api.CreateCommunity(
464 admin_pb2.CreateCommunityReq(
465 name="test community",
466 description="community for testing",
467 admin_ids=[],
468 geojson=VALID_GEOJSON_MULTIPOLYGON,
469 )
470 )
471 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
473 with pytest.raises(grpc.RpcError) as e:
474 api.UpdateCommunity(
475 admin_pb2.UpdateCommunityReq(
476 community_id=community.parent_node_id,
477 name="test community 2",
478 description="community for testing 2",
479 geojson=POINT_GEOJSON,
480 )
481 )
482 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
483 assert e.value.details() == "GeoJson was not of type MultiPolygon."
486def test_UpdateCommunity_invalid_id(db):
487 super_user, super_token = generate_user(is_superuser=True)
489 with session_scope() as session:
490 with real_admin_session(super_token) as api:
491 api.CreateCommunity(
492 admin_pb2.CreateCommunityReq(
493 name="test community",
494 description="community for testing",
495 admin_ids=[],
496 geojson=VALID_GEOJSON_MULTIPOLYGON,
497 )
498 )
500 with pytest.raises(grpc.RpcError) as e:
501 api.UpdateCommunity(
502 admin_pb2.UpdateCommunityReq(
503 community_id=1000,
504 name="test community 1000",
505 description="community for testing 1000",
506 geojson=VALID_GEOJSON_MULTIPOLYGON,
507 )
508 )
509 assert e.value.code() == grpc.StatusCode.NOT_FOUND
510 assert e.value.details() == "Community not found."
513def test_UpdateCommunity(db):
514 super_user, super_token = generate_user(is_superuser=True)
516 with session_scope() as session:
517 with real_admin_session(super_token) as api:
518 api.CreateCommunity(
519 admin_pb2.CreateCommunityReq(
520 name="test community",
521 description="community for testing",
522 admin_ids=[],
523 geojson=VALID_GEOJSON_MULTIPOLYGON,
524 )
525 )
526 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
527 assert community.description == "community for testing"
529 api.CreateCommunity(
530 admin_pb2.CreateCommunityReq(
531 name="test community 2",
532 description="community for testing 2",
533 admin_ids=[],
534 geojson=VALID_GEOJSON_MULTIPOLYGON,
535 )
536 )
537 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one()
539 api.UpdateCommunity(
540 admin_pb2.UpdateCommunityReq(
541 community_id=community.parent_node_id,
542 name="test community 2",
543 description="community for testing 2",
544 geojson=VALID_GEOJSON_MULTIPOLYGON,
545 parent_node_id=community_2.parent_node_id,
546 )
547 )
548 session.commit()
550 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one()
551 assert community_updated.description == "community for testing 2"
552 assert community_updated.slug == "test-community-2"
554 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one()
555 assert node_updated.parent_node_id == community_2.parent_node_id
558def test_GetChats(db):
559 super_user, super_token = generate_user(is_superuser=True)
560 normal_user, normal_token = generate_user()
562 with real_admin_session(super_token) as api:
563 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
564 assert res.response
567def test_badges(db, push_collector):
568 super_user, super_token = generate_user(is_superuser=True)
569 normal_user, normal_token = generate_user()
571 with real_admin_session(super_token) as api:
572 # can add a badge
573 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
574 with mock_notification_email() as mock:
575 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
576 assert "volunteer" in res.badges
578 # badge emails are disabled by default
579 mock.assert_not_called()
581 push_collector.assert_user_has_single_matching(
582 normal_user.id,
583 title="The Active Volunteer badge was added to your profile",
584 body="Check out your profile to see the new badge!",
585 )
587 # can't add/edit special tags
588 with pytest.raises(grpc.RpcError) as e:
589 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
590 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
591 assert e.value.details() == "Admins cannot edit that badge."
593 # double add badge
594 with pytest.raises(grpc.RpcError) as e:
595 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
596 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
597 assert e.value.details() == "The user already has that badge."
599 # can remove badge
600 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
601 with mock_notification_email() as mock:
602 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
603 assert "volunteer" not in res.badges
605 # badge emails are disabled by default
606 mock.assert_not_called()
608 push_collector.assert_user_push_matches_fields(
609 normal_user.id,
610 ix=1,
611 title="The Active Volunteer badge was removed from your profile",
612 body="You can see all your badges on your profile.",
613 )
615 # not found on user
616 with pytest.raises(grpc.RpcError) as e:
617 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
618 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
619 assert e.value.details() == "The user does not have that badge."
621 # not found in general
622 with pytest.raises(grpc.RpcError) as e:
623 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
624 assert e.value.code() == grpc.StatusCode.NOT_FOUND
625 assert e.value.details() == "Badge not found."
628def test_DeleteEvent(db):
629 super_user, super_token = generate_user(is_superuser=True)
630 normal_user, normal_token = generate_user()
632 with session_scope() as session:
633 create_community(session, 0, 2, "Community", [normal_user], [], None)
635 start_time = now() + timedelta(hours=2)
636 end_time = start_time + timedelta(hours=3)
637 with events_session(normal_token) as api:
638 res = api.CreateEvent(
639 events_pb2.CreateEventReq(
640 title="Dummy Title",
641 content="Dummy content.",
642 photo_key=None,
643 offline_information=events_pb2.OfflineEventInformation(
644 address="Near Null Island",
645 lat=0.1,
646 lng=0.2,
647 ),
648 start_time=Timestamp_from_datetime(start_time),
649 end_time=Timestamp_from_datetime(end_time),
650 timezone="UTC",
651 )
652 )
653 event_id = res.event_id
654 assert not res.is_deleted
656 with session_scope() as session:
657 with real_admin_session(super_token) as api:
658 api.DeleteEvent(
659 admin_pb2.DeleteEventReq(
660 event_id=event_id,
661 )
662 )
663 occurrence = session.get(EventOccurrence, ident=event_id)
664 assert occurrence.is_deleted
667def test_ListUserIds(db):
668 super_user, super_token = generate_user(is_superuser=True)
669 normal_user, normal_token = generate_user()
671 with real_admin_session(super_token) as api:
672 res = api.ListUserIds(
673 admin_pb2.ListUserIdsReq(
674 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
675 )
676 )
677 assert len(res.user_ids) == 2
678 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
680 with real_admin_session(super_token) as api:
681 res = api.ListUserIds(
682 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
683 )
684 assert res.user_ids == []
687def test_EditReferenceText(db):
688 super_user, super_token = generate_user(is_superuser=True)
689 test_new_text = "New Text"
691 user1, user1_token = generate_user()
692 user2, user2_token = generate_user()
694 with session_scope() as session:
695 with references_session(user1_token) as api:
696 reference = api.WriteFriendReference(
697 references_pb2.WriteFriendReferenceReq(
698 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
699 )
700 )
702 with real_admin_session(super_token) as admin_api:
703 admin_api.EditReferenceText(
704 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
705 )
707 session.expire_all()
709 modified_reference = session.execute(
710 select(Reference).where(Reference.id == reference.reference_id)
711 ).scalar_one_or_none()
712 assert modified_reference.text == test_new_text
715def test_DeleteReference(db):
716 super_user, super_token = generate_user(is_superuser=True)
718 user1, user1_token = generate_user()
719 user2, user2_token = generate_user()
721 with references_session(user1_token) as api:
722 reference = api.WriteFriendReference(
723 references_pb2.WriteFriendReferenceReq(
724 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
725 )
726 )
728 with references_session(user1_token) as api:
729 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
731 with real_admin_session(super_token) as admin_api:
732 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
734 with references_session(user1_token) as api:
735 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
737 with session_scope() as session:
738 modified_reference = session.execute(
739 select(Reference).where(Reference.id == reference.reference_id)
740 ).scalar_one_or_none()
741 assert modified_reference.is_deleted
744def test_AddUsersToModerationUserList(db):
745 super_user, super_token = generate_user(is_superuser=True)
746 user1, _ = generate_user()
747 user2, _ = generate_user()
748 user3, _ = generate_user()
749 user4, _ = generate_user()
750 user5, _ = generate_user()
751 moderation_list_id = add_users_to_new_moderation_list([user1])
753 with session_scope() as session:
754 with real_admin_session(super_token) as api:
755 # Test adding users to a non-existent moderation list (should raise an error)
756 with pytest.raises(grpc.RpcError) as e:
757 api.AddUsersToModerationUserList(
758 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999),
759 )
760 assert e.value.code() == grpc.StatusCode.NOT_FOUND
761 assert "Moderation user list not found." == e.value.details()
763 # Test with non-existent user (should raise an error)
764 with pytest.raises(grpc.RpcError) as e:
765 api.AddUsersToModerationUserList(
766 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]),
767 )
768 assert e.value.code() == grpc.StatusCode.NOT_FOUND
769 assert "Couldn't find that user." == e.value.details()
771 # Test successful creation of new moderation list (no moderation_list_id provided)
772 res = api.AddUsersToModerationUserList(
773 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]),
774 )
775 assert res.moderation_list_id > 0
776 with session_scope() as session:
777 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id)
778 assert moderation_user_list is not None
779 assert len(moderation_user_list.users) == 3
780 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users})
782 # Test list endpoint returns same moderation list with same members not repeated
783 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
784 assert len(listRes.moderation_lists) == 1
785 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id
786 assert len(listRes.moderation_lists[0].member_ids) == 3
787 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids)
789 # Test user can be in multiple moderation lists
790 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
791 assert len(listRes3.moderation_lists) == 2
793 # Test adding users to an existing moderation list
794 res2 = api.AddUsersToModerationUserList(
795 admin_pb2.AddUsersToModerationUserListReq(
796 users=[user4.username, user5.username], moderation_list_id=moderation_list_id
797 ),
798 )
799 assert res2.moderation_list_id == moderation_list_id
800 with session_scope() as session:
801 moderation_user_list = session.get(ModerationUserList, moderation_list_id)
802 assert len(moderation_user_list.users) == 3
803 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users})
805 # Test list user moderation lists endpoint returns the right moderation list
806 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username))
807 assert len(listRes2.moderation_lists) == 1
808 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id
809 assert len(listRes2.moderation_lists[0].member_ids) == 3
810 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids)
813def test_RemoveUserFromModerationUserList(db):
814 super_user, super_token = generate_user(is_superuser=True)
815 user1, _ = generate_user()
816 user2, _ = generate_user()
817 user3, _ = generate_user()
818 moderation_list_id = add_users_to_new_moderation_list([user1, user2])
820 with real_admin_session(super_token) as api:
821 # Test with non-existent user (should raise error)
822 with pytest.raises(grpc.RpcError) as e:
823 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent"))
824 assert e.value.code() == grpc.StatusCode.NOT_FOUND
825 assert "Couldn't find that user." == e.value.details()
827 # Test without providing moderation list id (should raise error)
828 with pytest.raises(grpc.RpcError) as e:
829 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username))
830 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
831 assert "Missing moderation user list id." == e.value.details()
833 # Test removing user that's not in the provided moderation list (should raise error)
834 with pytest.raises(grpc.RpcError) as e:
835 api.RemoveUserFromModerationUserList(
836 admin_pb2.RemoveUserFromModerationUserListReq(
837 user=user3.username, moderation_list_id=moderation_list_id
838 )
839 )
840 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
841 assert "User is not in the moderation user list." == e.value.details()
843 # Test successful removal
844 api.RemoveUserFromModerationUserList(
845 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id)
846 )
847 with session_scope() as session:
848 moderation_user_list = session.get(ModerationUserList, moderation_list_id)
849 assert user1.id not in {user.id for user in moderation_user_list.users}
850 assert user2.id in {user.id for user in moderation_user_list.users}
852 # Test list user moderation lists endpoint returns right number of moderation lists
853 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
854 assert len(listRes.moderation_lists) == 0
855 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
856 assert len(listRes2.moderation_lists) == 1
858 # Test removing all users from moderation list should also delete the moderation list
859 api.RemoveUserFromModerationUserList(
860 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id)
861 )
862 with session_scope() as session:
863 assert session.get(ModerationUserList, moderation_list_id) is None
866def test_admin_delete_account_url(db, push_collector):
867 super_user, super_token = generate_user(is_superuser=True)
869 user, token = generate_user()
870 user_id = user.id
872 with real_admin_session(super_token) as admin_api:
873 url = admin_api.CreateAccountDeletionLink(
874 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
875 ).account_deletion_confirm_url
877 push_collector.assert_user_has_count(user_id, 0)
879 with session_scope() as session:
880 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
881 token = token_o.token
882 assert token_o.user.id == user_id
883 assert url == f"http://localhost:3000/delete-account?token={token}"
885 with mock_notification_email() as mock:
886 with auth_api_session() as (auth_api, metadata_interceptor):
887 auth_api.ConfirmDeleteAccount(
888 auth_pb2.ConfirmDeleteAccountReq(
889 token=token,
890 )
891 )
893 push_collector.assert_user_push_matches_fields(
894 user_id,
895 ix=0,
896 title="Your Couchers.org account has been deleted",
897 body="You can still undo this by following the link we emailed to you within 7 days.",
898 )
900 mock.assert_called_once()
901 e = email_fields(mock)
904# community invite feature tested in test_events.py
905# SendBlogPostNotification tested in test_notifications.py
906# MarkUserNeedsLocationUpdate tested in test_jail.py