Coverage for src/tests/test_admin.py: 100%
479 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1from datetime import date, datetime
2from re import match
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers import errors
9from couchers.db import session_scope
10from couchers.models import (
11 AccountDeletionToken,
12 Cluster,
13 ContentReport,
14 EventOccurrence,
15 ModerationUserList,
16 Node,
17 Reference,
18 UserSession,
19)
20from couchers.sql import couchers_select as select
21from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta
22from proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2
23from tests.test_communities import create_community
24from tests.test_fixtures import ( # noqa
25 add_users_to_new_moderation_list,
26 auth_api_session,
27 db,
28 email_fields,
29 events_session,
30 generate_user,
31 get_user_id_and_token,
32 mock_notification_email,
33 push_collector,
34 real_admin_session,
35 references_session,
36 reporting_session,
37 testconfig,
38)
41@pytest.fixture(autouse=True)
42def _(testconfig):
43 pass
46def test_access_by_normal_user(db):
47 normal_user, normal_token = generate_user()
49 with real_admin_session(normal_token) as api:
50 # all requests to the admin servicer should break when done by a non-super_user
51 with pytest.raises(grpc.RpcError) as e:
52 api.GetUserDetails(
53 admin_pb2.GetUserDetailsReq(
54 user=str(normal_user.id),
55 )
56 )
57 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
60def test_GetUser(db):
61 super_user, super_token = generate_user(is_superuser=True)
62 normal_user, normal_token = generate_user()
64 with real_admin_session(super_token) as api:
65 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
66 assert res.user_id == normal_user.id
67 assert res.username == normal_user.username
69 with real_admin_session(super_token) as api:
70 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
72 with real_admin_session(super_token) as api:
73 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
74 assert res.user_id == normal_user.id
75 assert res.username == normal_user.username
78def test_GetUserDetails(db):
79 super_user, super_token = generate_user(is_superuser=True)
80 normal_user, normal_token = generate_user()
82 with real_admin_session(super_token) as api:
83 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
84 assert res.user_id == normal_user.id
85 assert res.username == normal_user.username
86 assert res.email == normal_user.email
87 assert res.gender == normal_user.gender
88 assert parse_date(res.birthdate) == normal_user.birthdate
89 assert not res.banned
90 assert not res.deleted
92 with real_admin_session(super_token) as api:
93 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
94 assert res.user_id == normal_user.id
95 assert res.username == normal_user.username
96 assert res.email == normal_user.email
97 assert res.gender == normal_user.gender
98 assert parse_date(res.birthdate) == normal_user.birthdate
99 assert not res.banned
100 assert not res.deleted
102 with real_admin_session(super_token) as api:
103 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
104 assert res.user_id == normal_user.id
105 assert res.username == normal_user.username
106 assert res.email == normal_user.email
107 assert res.gender == normal_user.gender
108 assert parse_date(res.birthdate) == normal_user.birthdate
109 assert not res.banned
110 assert not res.deleted
113def test_ChangeUserGender(db, push_collector):
114 super_user, super_token = generate_user(is_superuser=True)
115 normal_user, normal_token = generate_user()
117 with real_admin_session(super_token) as api:
118 with mock_notification_email() as mock:
119 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
120 assert res.user_id == normal_user.id
121 assert res.username == normal_user.username
122 assert res.email == normal_user.email
123 assert res.gender == "Machine"
124 assert parse_date(res.birthdate) == normal_user.birthdate
125 assert not res.banned
126 assert not res.deleted
128 mock.assert_called_once()
129 e = email_fields(mock)
130 assert e.subject == "[TEST] Your gender was changed"
131 assert e.recipient == normal_user.email
132 assert "Machine" in e.plain
133 assert "Machine" in e.html
135 push_collector.assert_user_has_single_matching(
136 normal_user.id,
137 title="Your gender was changed",
138 body="Your gender on Couchers.org was changed to Machine by an admin.",
139 )
142def test_ChangeUserBirthdate(db, push_collector):
143 super_user, super_token = generate_user(is_superuser=True)
144 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
146 with real_admin_session(super_token) as api:
147 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
148 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
150 with mock_notification_email() as mock:
151 res = api.ChangeUserBirthdate(
152 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
153 )
155 assert res.user_id == normal_user.id
156 assert res.username == normal_user.username
157 assert res.email == normal_user.email
158 assert res.birthdate == "1990-05-25"
159 assert res.gender == normal_user.gender
160 assert not res.banned
161 assert not res.deleted
163 mock.assert_called_once()
164 e = email_fields(mock)
165 assert e.subject == "[TEST] Your date of birth was changed"
166 assert e.recipient == normal_user.email
167 assert "1990" in e.plain
168 assert "1990" in e.html
170 push_collector.assert_user_has_single_matching(
171 normal_user.id,
172 title="Your date of birth was changed",
173 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.",
174 )
177def test_BanUser(db):
178 super_user, super_token = generate_user(is_superuser=True)
179 normal_user, _ = generate_user()
180 admin_note = "A good reason"
181 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
182 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
184 with real_admin_session(super_token) as api:
185 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
186 assert res.user_id == normal_user.id
187 assert res.username == normal_user.username
188 assert res.email == normal_user.email
189 assert res.gender == normal_user.gender
190 assert parse_date(res.birthdate) == normal_user.birthdate
191 assert res.banned
192 assert not res.deleted
193 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
196def test_UnbanUser(db):
197 super_user, super_token = generate_user(is_superuser=True)
198 normal_user, _ = generate_user()
199 admin_note = "A good reason"
200 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
201 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
203 with real_admin_session(super_token) as api:
204 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
205 assert res.user_id == normal_user.id
206 assert res.username == normal_user.username
207 assert res.email == normal_user.email
208 assert res.gender == normal_user.gender
209 assert parse_date(res.birthdate) == normal_user.birthdate
210 assert not res.banned
211 assert not res.deleted
212 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
215def test_AddAdminNote(db):
216 super_user, super_token = generate_user(is_superuser=True)
217 normal_user, _ = generate_user()
218 admin_note1 = "User reported strange behavior"
219 admin_note2 = "Insert private information here"
220 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
221 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
223 with real_admin_session(super_token) as api:
224 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
225 assert res.user_id == normal_user.id
226 assert res.username == normal_user.username
227 assert res.email == normal_user.email
228 assert res.gender == normal_user.gender
229 assert parse_date(res.birthdate) == normal_user.birthdate
230 assert not res.banned
231 assert not res.deleted
232 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
234 with real_admin_session(super_token) as api:
235 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
236 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
239def test_AddAdminNote_blank(db):
240 super_user, super_token = generate_user(is_superuser=True)
241 normal_user, _ = generate_user()
242 empty_admin_note = " \t \n "
244 with real_admin_session(super_token) as api:
245 with pytest.raises(grpc.RpcError) as e:
246 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
247 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
248 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY
251def test_admin_content_reports(db):
252 super_user, super_token = generate_user(is_superuser=True)
253 normal_user, token = generate_user()
254 bad_user1, _ = generate_user()
255 bad_user2, _ = generate_user()
257 with reporting_session(token) as api:
258 api.Report(
259 reporting_pb2.ReportReq(
260 reason="spam",
261 description="r1",
262 content_ref="comment/123",
263 author_user=bad_user1.username,
264 user_agent="n/a",
265 page="https://couchers.org/comment/123",
266 )
267 )
268 api.Report(
269 reporting_pb2.ReportReq(
270 reason="spam",
271 description="r2",
272 content_ref="comment/124",
273 author_user=bad_user2.username,
274 user_agent="n/a",
275 page="https://couchers.org/comment/124",
276 )
277 )
278 api.Report(
279 reporting_pb2.ReportReq(
280 reason="something else",
281 description="r3",
282 content_ref="page/321",
283 author_user=bad_user1.username,
284 user_agent="n/a",
285 page="https://couchers.org/page/321",
286 )
287 )
289 with session_scope() as session:
290 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all())
292 with real_admin_session(super_token) as api:
293 with pytest.raises(grpc.RpcError) as e:
294 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
295 assert e.value.code() == grpc.StatusCode.NOT_FOUND
296 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND
298 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
299 rep = res.content_report
300 assert rep.content_report_id == id_by_description["r2"]
301 assert rep.reporting_user_id == normal_user.id
302 assert rep.author_user_id == bad_user2.id
303 assert rep.reason == "spam"
304 assert rep.description == "r2"
305 assert rep.content_ref == "comment/124"
306 assert rep.user_agent == "n/a"
307 assert rep.page == "https://couchers.org/comment/124"
309 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
310 assert res.content_reports[0].content_report_id == id_by_description["r3"]
311 assert res.content_reports[1].content_report_id == id_by_description["r1"]
314def test_DeleteUser(db):
315 super_user, super_token = generate_user(is_superuser=True)
316 normal_user, normal_token = generate_user()
318 with real_admin_session(super_token) as api:
319 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
320 assert res.user_id == normal_user.id
321 assert res.username == normal_user.username
322 assert res.email == normal_user.email
323 assert res.gender == normal_user.gender
324 assert parse_date(res.birthdate) == normal_user.birthdate
325 assert not res.banned
326 assert res.deleted
328 with real_admin_session(super_token) as api:
329 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
330 assert res.user_id == normal_user.id
331 assert res.username == normal_user.username
332 assert res.email == normal_user.email
333 assert res.gender == normal_user.gender
334 assert parse_date(res.birthdate) == normal_user.birthdate
335 assert not res.banned
336 assert not res.deleted
339def test_CreateApiKey(db, push_collector):
340 with session_scope() as session:
341 super_user, super_token = generate_user(is_superuser=True)
342 normal_user, normal_token = generate_user()
344 assert (
345 session.execute(
346 select(func.count())
347 .select_from(UserSession)
348 .where(UserSession.is_api_key == True)
349 .where(UserSession.user_id == normal_user.id)
350 ).scalar_one()
351 == 0
352 )
354 with mock_notification_email() as mock:
355 with real_admin_session(super_token) as api:
356 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
358 mock.assert_called_once()
359 e = email_fields(mock)
360 assert e.subject == "[TEST] Your API key for Couchers.org"
362 with session_scope() as session:
363 api_key = session.execute(
364 select(UserSession)
365 .where(UserSession.is_valid)
366 .where(UserSession.is_api_key == True)
367 .where(UserSession.user_id == normal_user.id)
368 ).scalar_one()
370 assert api_key.token in e.plain
371 assert api_key.token in e.html
373 assert e.recipient == normal_user.email
374 assert "api key" in e.subject.lower()
375 unique_string = "We've issued you with the following API key:"
376 assert unique_string in e.plain
377 assert unique_string in e.html
378 assert "support@couchers.org" in e.plain
379 assert "support@couchers.org" in e.html
381 push_collector.assert_user_has_single_matching(
382 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email."
383 )
386VALID_GEOJSON_MULTIPOLYGON = """
387 {
388 "type": "MultiPolygon",
389 "coordinates":
390 [
391 [
392 [
393 [
394 -73.98114904754641,
395 40.7470284264813
396 ],
397 [
398 -73.98314135177611,
399 40.73416844413217
400 ],
401 [
402 -74.00538969848634,
403 40.734314779027144
404 ],
405 [
406 -74.00479214294432,
407 40.75027851544338
408 ],
409 [
410 -73.98114904754641,
411 40.7470284264813
412 ]
413 ]
414 ]
415 ]
416 }
417"""
419POINT_GEOJSON = """
420{ "type": "Point", "coordinates": [100.0, 0.0] }
421"""
424def test_CreateCommunity_invalid_geojson(db):
425 super_user, super_token = generate_user(is_superuser=True)
426 normal_user, normal_token = generate_user()
427 with real_admin_session(super_token) as api:
428 with pytest.raises(grpc.RpcError) as e:
429 api.CreateCommunity(
430 admin_pb2.CreateCommunityReq(
431 name="test community",
432 description="community for testing",
433 admin_ids=[],
434 geojson=POINT_GEOJSON,
435 )
436 )
437 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
438 assert e.value.details() == errors.NO_MULTIPOLYGON
441def test_CreateCommunity(db):
442 with session_scope() as session:
443 super_user, super_token = generate_user(is_superuser=True)
444 normal_user, normal_token = generate_user()
445 with real_admin_session(super_token) as api:
446 api.CreateCommunity(
447 admin_pb2.CreateCommunityReq(
448 name="test community",
449 description="community for testing",
450 admin_ids=[],
451 geojson=VALID_GEOJSON_MULTIPOLYGON,
452 )
453 )
454 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
455 assert community.description == "community for testing"
456 assert community.slug == "test-community"
459def test_UpdateCommunity_invalid_geojson(db):
460 super_user, super_token = generate_user(is_superuser=True)
462 with session_scope() as session:
463 with real_admin_session(super_token) as api:
464 api.CreateCommunity(
465 admin_pb2.CreateCommunityReq(
466 name="test community",
467 description="community for testing",
468 admin_ids=[],
469 geojson=VALID_GEOJSON_MULTIPOLYGON,
470 )
471 )
472 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
474 with pytest.raises(grpc.RpcError) as e:
475 api.UpdateCommunity(
476 admin_pb2.UpdateCommunityReq(
477 community_id=community.parent_node_id,
478 name="test community 2",
479 description="community for testing 2",
480 geojson=POINT_GEOJSON,
481 )
482 )
483 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
484 assert e.value.details() == errors.NO_MULTIPOLYGON
487def test_UpdateCommunity_invalid_id(db):
488 super_user, super_token = generate_user(is_superuser=True)
490 with session_scope() as session:
491 with real_admin_session(super_token) as api:
492 api.CreateCommunity(
493 admin_pb2.CreateCommunityReq(
494 name="test community",
495 description="community for testing",
496 admin_ids=[],
497 geojson=VALID_GEOJSON_MULTIPOLYGON,
498 )
499 )
501 with pytest.raises(grpc.RpcError) as e:
502 api.UpdateCommunity(
503 admin_pb2.UpdateCommunityReq(
504 community_id=1000,
505 name="test community 1000",
506 description="community for testing 1000",
507 geojson=VALID_GEOJSON_MULTIPOLYGON,
508 )
509 )
510 assert e.value.code() == grpc.StatusCode.NOT_FOUND
511 assert e.value.details() == errors.COMMUNITY_NOT_FOUND
514def test_UpdateCommunity(db):
515 super_user, super_token = generate_user(is_superuser=True)
517 with session_scope() as session:
518 with real_admin_session(super_token) as api:
519 api.CreateCommunity(
520 admin_pb2.CreateCommunityReq(
521 name="test community",
522 description="community for testing",
523 admin_ids=[],
524 geojson=VALID_GEOJSON_MULTIPOLYGON,
525 )
526 )
527 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
528 assert community.description == "community for testing"
530 api.CreateCommunity(
531 admin_pb2.CreateCommunityReq(
532 name="test community 2",
533 description="community for testing 2",
534 admin_ids=[],
535 geojson=VALID_GEOJSON_MULTIPOLYGON,
536 )
537 )
538 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one()
540 api.UpdateCommunity(
541 admin_pb2.UpdateCommunityReq(
542 community_id=community.parent_node_id,
543 name="test community 2",
544 description="community for testing 2",
545 geojson=VALID_GEOJSON_MULTIPOLYGON,
546 parent_node_id=community_2.parent_node_id,
547 )
548 )
549 session.commit()
551 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one()
552 assert community_updated.description == "community for testing 2"
553 assert community_updated.slug == "test-community-2"
555 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one()
556 assert node_updated.parent_node_id == community_2.parent_node_id
559def test_GetChats(db):
560 super_user, super_token = generate_user(is_superuser=True)
561 normal_user, normal_token = generate_user()
563 with real_admin_session(super_token) as api:
564 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
565 assert res.response
568def test_badges(db, push_collector):
569 super_user, super_token = generate_user(is_superuser=True)
570 normal_user, normal_token = generate_user()
572 with real_admin_session(super_token) as api:
573 # can add a badge
574 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
575 with mock_notification_email() as mock:
576 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
577 assert "volunteer" in res.badges
579 # badge emails are disabled by default
580 mock.assert_not_called()
582 push_collector.assert_user_has_single_matching(
583 normal_user.id,
584 title="The Active Volunteer badge was added to your profile",
585 body="Check out your profile to see the new badge!",
586 )
588 # can't add/edit special tags
589 with pytest.raises(grpc.RpcError) as e:
590 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
591 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
592 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE
594 # double add badge
595 with pytest.raises(grpc.RpcError) as e:
596 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
597 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
598 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE
600 # can remove badge
601 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
602 with mock_notification_email() as mock:
603 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
604 assert "volunteer" not in res.badges
606 # badge emails are disabled by default
607 mock.assert_not_called()
609 push_collector.assert_user_push_matches_fields(
610 normal_user.id,
611 ix=1,
612 title="The Active Volunteer badge was removed from your profile",
613 body="You can see all your badges on your profile.",
614 )
616 # not found on user
617 with pytest.raises(grpc.RpcError) as e:
618 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
619 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
620 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE
622 # not found in general
623 with pytest.raises(grpc.RpcError) as e:
624 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
625 assert e.value.code() == grpc.StatusCode.NOT_FOUND
626 assert e.value.details() == errors.BADGE_NOT_FOUND
629def test_DeleteEvent(db):
630 super_user, super_token = generate_user(is_superuser=True)
631 normal_user, normal_token = generate_user()
633 with session_scope() as session:
634 create_community(session, 0, 2, "Community", [normal_user], [], None)
636 start_time = now() + timedelta(hours=2)
637 end_time = start_time + timedelta(hours=3)
638 with events_session(normal_token) as api:
639 res = api.CreateEvent(
640 events_pb2.CreateEventReq(
641 title="Dummy Title",
642 content="Dummy content.",
643 photo_key=None,
644 offline_information=events_pb2.OfflineEventInformation(
645 address="Near Null Island",
646 lat=0.1,
647 lng=0.2,
648 ),
649 start_time=Timestamp_from_datetime(start_time),
650 end_time=Timestamp_from_datetime(end_time),
651 timezone="UTC",
652 )
653 )
654 event_id = res.event_id
655 assert not res.is_deleted
657 with session_scope() as session:
658 with real_admin_session(super_token) as api:
659 api.DeleteEvent(
660 admin_pb2.DeleteEventReq(
661 event_id=event_id,
662 )
663 )
664 occurrence = session.get(EventOccurrence, ident=event_id)
665 assert occurrence.is_deleted
668def test_ListUserIds(db):
669 super_user, super_token = generate_user(is_superuser=True)
670 normal_user, normal_token = generate_user()
672 with real_admin_session(super_token) as api:
673 res = api.ListUserIds(
674 admin_pb2.ListUserIdsReq(
675 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
676 )
677 )
678 assert len(res.user_ids) == 2
679 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
681 with real_admin_session(super_token) as api:
682 res = api.ListUserIds(
683 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
684 )
685 assert res.user_ids == []
688def test_EditReferenceText(db):
689 super_user, super_token = generate_user(is_superuser=True)
690 test_new_text = "New Text"
692 user1, user1_token = generate_user()
693 user2, user2_token = generate_user()
695 with session_scope() as session:
696 with references_session(user1_token) as api:
697 reference = api.WriteFriendReference(
698 references_pb2.WriteFriendReferenceReq(
699 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
700 )
701 )
703 with real_admin_session(super_token) as admin_api:
704 admin_api.EditReferenceText(
705 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
706 )
708 session.expire_all()
710 modified_reference = session.execute(
711 select(Reference).where(Reference.id == reference.reference_id)
712 ).scalar_one_or_none()
713 assert modified_reference.text == test_new_text
716def test_DeleteReference(db):
717 super_user, super_token = generate_user(is_superuser=True)
719 user1, user1_token = generate_user()
720 user2, user2_token = generate_user()
722 with references_session(user1_token) as api:
723 reference = api.WriteFriendReference(
724 references_pb2.WriteFriendReferenceReq(
725 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
726 )
727 )
729 with references_session(user1_token) as api:
730 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
732 with real_admin_session(super_token) as admin_api:
733 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
735 with references_session(user1_token) as api:
736 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
738 with session_scope() as session:
739 modified_reference = session.execute(
740 select(Reference).where(Reference.id == reference.reference_id)
741 ).scalar_one_or_none()
742 assert modified_reference.is_deleted
745def test_AddUsersToModerationUserList(db):
746 super_user, super_token = generate_user(is_superuser=True)
747 user1, _ = generate_user()
748 user2, _ = generate_user()
749 user3, _ = generate_user()
750 user4, _ = generate_user()
751 user5, _ = generate_user()
752 moderation_list_id = add_users_to_new_moderation_list([user1])
754 with session_scope() as session:
755 with real_admin_session(super_token) as api:
756 # Test adding users to a non-existent moderation list (should raise an error)
757 with pytest.raises(grpc.RpcError) as e:
758 api.AddUsersToModerationUserList(
759 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999),
760 )
761 assert e.value.code() == grpc.StatusCode.NOT_FOUND
762 assert errors.MODERATION_USER_LIST_NOT_FOUND == e.value.details()
764 # Test with non-existent user (should raise an error)
765 with pytest.raises(grpc.RpcError) as e:
766 api.AddUsersToModerationUserList(
767 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]),
768 )
769 assert e.value.code() == grpc.StatusCode.NOT_FOUND
770 assert errors.USER_NOT_FOUND == e.value.details()
772 # Test successful creation of new moderation list (no moderation_list_id provided)
773 res = api.AddUsersToModerationUserList(
774 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]),
775 )
776 assert res.moderation_list_id > 0
777 with session_scope() as session:
778 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id)
779 assert moderation_user_list is not None
780 assert len(moderation_user_list.users) == 3
781 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users})
783 # Test list endpoint returns same moderation list with same members not repeated
784 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
785 assert len(listRes.moderation_lists) == 1
786 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id
787 assert len(listRes.moderation_lists[0].member_ids) == 3
788 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids)
790 # Test user can be in multiple moderation lists
791 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
792 assert len(listRes3.moderation_lists) == 2
794 # Test adding users to an existing moderation list
795 res2 = api.AddUsersToModerationUserList(
796 admin_pb2.AddUsersToModerationUserListReq(
797 users=[user4.username, user5.username], moderation_list_id=moderation_list_id
798 ),
799 )
800 assert res2.moderation_list_id == moderation_list_id
801 with session_scope() as session:
802 moderation_user_list = session.get(ModerationUserList, moderation_list_id)
803 assert len(moderation_user_list.users) == 3
804 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users})
806 # Test list user moderation lists endpoint returns the right moderation list
807 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username))
808 assert len(listRes2.moderation_lists) == 1
809 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id
810 assert len(listRes2.moderation_lists[0].member_ids) == 3
811 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids)
814def test_RemoveUserFromModerationUserList(db):
815 super_user, super_token = generate_user(is_superuser=True)
816 user1, _ = generate_user()
817 user2, _ = generate_user()
818 user3, _ = generate_user()
819 moderation_list_id = add_users_to_new_moderation_list([user1, user2])
821 with real_admin_session(super_token) as api:
822 # Test with non-existent user (should raise error)
823 with pytest.raises(grpc.RpcError) as e:
824 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent"))
825 assert e.value.code() == grpc.StatusCode.NOT_FOUND
826 assert errors.USER_NOT_FOUND == e.value.details()
828 # Test without providing moderation list id (should raise error)
829 with pytest.raises(grpc.RpcError) as e:
830 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username))
831 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
832 assert errors.MISSING_MODERATION_USER_LIST_ID == e.value.details()
834 # Test removing user that's not in the provided moderation list (should raise error)
835 with pytest.raises(grpc.RpcError) as e:
836 api.RemoveUserFromModerationUserList(
837 admin_pb2.RemoveUserFromModerationUserListReq(
838 user=user3.username, moderation_list_id=moderation_list_id
839 )
840 )
841 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
842 assert errors.USER_NOT_IN_THE_MODERATION_USER_LIST == e.value.details()
844 # Test successful removal
845 api.RemoveUserFromModerationUserList(
846 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id)
847 )
848 with session_scope() as session:
849 moderation_user_list = session.get(ModerationUserList, moderation_list_id)
850 assert user1.id not in {user.id for user in moderation_user_list.users}
851 assert user2.id in {user.id for user in moderation_user_list.users}
853 # Test list user moderation lists endpoint returns right number of moderation lists
854 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
855 assert len(listRes.moderation_lists) == 0
856 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
857 assert len(listRes2.moderation_lists) == 1
859 # Test removing all users from moderation list should also delete the moderation list
860 api.RemoveUserFromModerationUserList(
861 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id)
862 )
863 with session_scope() as session:
864 assert session.get(ModerationUserList, moderation_list_id) is None
867def test_admin_delete_account_url(db, push_collector):
868 super_user, super_token = generate_user(is_superuser=True)
870 user, token = generate_user()
871 user_id = user.id
873 with real_admin_session(super_token) as admin_api:
874 url = admin_api.CreateAccountDeletionLink(
875 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
876 ).account_deletion_confirm_url
878 push_collector.assert_user_has_count(user_id, 0)
880 with session_scope() as session:
881 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
882 token = token_o.token
883 assert token_o.user.id == user_id
884 assert url == f"http://localhost:3000/delete-account?token={token}"
886 with mock_notification_email() as mock:
887 with auth_api_session() as (auth_api, metadata_interceptor):
888 auth_api.ConfirmDeleteAccount(
889 auth_pb2.ConfirmDeleteAccountReq(
890 token=token,
891 )
892 )
894 push_collector.assert_user_push_matches_fields(
895 user_id,
896 ix=0,
897 title="Your Couchers.org account has been deleted",
898 body="You can still undo this by following the link we emailed to you within 7 days.",
899 )
901 mock.assert_called_once()
902 e = email_fields(mock)
905# community invite feature tested in test_events.py
906# SendBlogPostNotification tested in test_notifications.py
907# MarkUserNeedsLocationUpdate tested in test_jail.py