Coverage for src/tests/test_admin.py: 100%
452 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-17 23:52 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-17 23:52 +0000
1from datetime import date, datetime
2from re import match
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers.db import session_scope
9from couchers.models import (
10 AccountDeletionToken,
11 ContentReport,
12 EventOccurrence,
13 ModerationUserList,
14 Reference,
15 User,
16 UserSession,
17)
18from couchers.proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2
19from couchers.sql import couchers_select as select
20from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta
21from tests.test_communities import create_community
22from tests.test_fixtures import ( # noqa
23 add_users_to_new_moderation_list,
24 auth_api_session,
25 db,
26 email_fields,
27 events_session,
28 generate_user,
29 get_user_id_and_token,
30 make_friends,
31 mock_notification_email,
32 push_collector,
33 real_admin_session,
34 references_session,
35 reporting_session,
36 requests_session,
37 testconfig,
38)
41@pytest.fixture(autouse=True)
42def _(testconfig):
43 pass
46def test_access_by_normal_user(db):
47 normal_user, normal_token = generate_user()
49 with real_admin_session(normal_token) as api:
50 # all requests to the admin servicer should break when done by a non-super_user
51 with pytest.raises(grpc.RpcError) as e:
52 api.GetUserDetails(
53 admin_pb2.GetUserDetailsReq(
54 user=str(normal_user.id),
55 )
56 )
57 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
60def test_GetUser(db):
61 super_user, super_token = generate_user(is_superuser=True)
62 normal_user, normal_token = generate_user()
64 with real_admin_session(super_token) as api:
65 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
66 assert res.user_id == normal_user.id
67 assert res.username == normal_user.username
69 with real_admin_session(super_token) as api:
70 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
72 with real_admin_session(super_token) as api:
73 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
74 assert res.user_id == normal_user.id
75 assert res.username == normal_user.username
78def test_GetUserDetails(db):
79 super_user, super_token = generate_user(is_superuser=True)
80 normal_user, normal_token = generate_user()
82 with real_admin_session(super_token) as api:
83 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
84 assert res.user_id == normal_user.id
85 assert res.username == normal_user.username
86 assert res.email == normal_user.email
87 assert res.gender == normal_user.gender
88 assert parse_date(res.birthdate) == normal_user.birthdate
89 assert not res.banned
90 assert not res.deleted
92 with real_admin_session(super_token) as api:
93 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
94 assert res.user_id == normal_user.id
95 assert res.username == normal_user.username
96 assert res.email == normal_user.email
97 assert res.gender == normal_user.gender
98 assert parse_date(res.birthdate) == normal_user.birthdate
99 assert not res.banned
100 assert not res.deleted
102 with real_admin_session(super_token) as api:
103 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
104 assert res.user_id == normal_user.id
105 assert res.username == normal_user.username
106 assert res.email == normal_user.email
107 assert res.gender == normal_user.gender
108 assert parse_date(res.birthdate) == normal_user.birthdate
109 assert not res.banned
110 assert not res.deleted
113def test_ChangeUserGender(db, push_collector):
114 super_user, super_token = generate_user(is_superuser=True)
115 normal_user, normal_token = generate_user()
117 with real_admin_session(super_token) as api:
118 with mock_notification_email() as mock:
119 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
120 assert res.user_id == normal_user.id
121 assert res.username == normal_user.username
122 assert res.email == normal_user.email
123 assert res.gender == "Machine"
124 assert parse_date(res.birthdate) == normal_user.birthdate
125 assert not res.banned
126 assert not res.deleted
128 mock.assert_called_once()
129 e = email_fields(mock)
130 assert e.subject == "[TEST] Your gender was changed"
131 assert e.recipient == normal_user.email
132 assert "Machine" in e.plain
133 assert "Machine" in e.html
135 push_collector.assert_user_has_single_matching(
136 normal_user.id,
137 title="Your gender was changed",
138 body="Your gender on Couchers.org was changed to Machine by an admin.",
139 )
142def test_ChangeUserBirthdate(db, push_collector):
143 super_user, super_token = generate_user(is_superuser=True)
144 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
146 with real_admin_session(super_token) as api:
147 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
148 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
150 with mock_notification_email() as mock:
151 res = api.ChangeUserBirthdate(
152 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
153 )
155 assert res.user_id == normal_user.id
156 assert res.username == normal_user.username
157 assert res.email == normal_user.email
158 assert res.birthdate == "1990-05-25"
159 assert res.gender == normal_user.gender
160 assert not res.banned
161 assert not res.deleted
163 mock.assert_called_once()
164 e = email_fields(mock)
165 assert e.subject == "[TEST] Your date of birth was changed"
166 assert e.recipient == normal_user.email
167 assert "1990" in e.plain
168 assert "1990" in e.html
170 push_collector.assert_user_has_single_matching(
171 normal_user.id,
172 title="Your date of birth was changed",
173 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.",
174 )
177def test_BanUser(db):
178 super_user, super_token = generate_user(is_superuser=True)
179 normal_user, _ = generate_user()
180 admin_note = "A good reason"
181 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
182 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
184 with real_admin_session(super_token) as api:
185 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
186 assert res.user_id == normal_user.id
187 assert res.username == normal_user.username
188 assert res.email == normal_user.email
189 assert res.gender == normal_user.gender
190 assert parse_date(res.birthdate) == normal_user.birthdate
191 assert res.banned
192 assert not res.deleted
193 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
196def test_UnbanUser(db):
197 super_user, super_token = generate_user(is_superuser=True)
198 normal_user, _ = generate_user()
199 admin_note = "A good reason"
200 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
201 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
203 with real_admin_session(super_token) as api:
204 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
205 assert res.user_id == normal_user.id
206 assert res.username == normal_user.username
207 assert res.email == normal_user.email
208 assert res.gender == normal_user.gender
209 assert parse_date(res.birthdate) == normal_user.birthdate
210 assert not res.banned
211 assert not res.deleted
212 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
215def test_AddAdminNote(db):
216 super_user, super_token = generate_user(is_superuser=True)
217 normal_user, _ = generate_user()
218 admin_note1 = "User reported strange behavior"
219 admin_note2 = "Insert private information here"
220 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
221 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
223 with real_admin_session(super_token) as api:
224 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
225 assert res.user_id == normal_user.id
226 assert res.username == normal_user.username
227 assert res.email == normal_user.email
228 assert res.gender == normal_user.gender
229 assert parse_date(res.birthdate) == normal_user.birthdate
230 assert not res.banned
231 assert not res.deleted
232 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
234 with real_admin_session(super_token) as api:
235 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
236 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
239def test_AddAdminNote_blank(db):
240 super_user, super_token = generate_user(is_superuser=True)
241 normal_user, _ = generate_user()
242 empty_admin_note = " \t \n "
244 with real_admin_session(super_token) as api:
245 with pytest.raises(grpc.RpcError) as e:
246 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
247 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
248 assert e.value.details() == "The admin note cannot be empty."
251def test_admin_content_reports(db):
252 super_user, super_token = generate_user(is_superuser=True)
253 normal_user, token = generate_user()
254 bad_user1, _ = generate_user()
255 bad_user2, _ = generate_user()
257 with reporting_session(token) as api:
258 api.Report(
259 reporting_pb2.ReportReq(
260 reason="spam",
261 description="r1",
262 content_ref="comment/123",
263 author_user=bad_user1.username,
264 user_agent="n/a",
265 page="https://couchers.org/comment/123",
266 )
267 )
268 api.Report(
269 reporting_pb2.ReportReq(
270 reason="spam",
271 description="r2",
272 content_ref="comment/124",
273 author_user=bad_user2.username,
274 user_agent="n/a",
275 page="https://couchers.org/comment/124",
276 )
277 )
278 api.Report(
279 reporting_pb2.ReportReq(
280 reason="something else",
281 description="r3",
282 content_ref="page/321",
283 author_user=bad_user1.username,
284 user_agent="n/a",
285 page="https://couchers.org/page/321",
286 )
287 )
289 with session_scope() as session:
290 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all())
292 with real_admin_session(super_token) as api:
293 with pytest.raises(grpc.RpcError) as e:
294 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
295 assert e.value.code() == grpc.StatusCode.NOT_FOUND
296 assert e.value.details() == "Content report not found."
298 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
299 rep = res.content_report
300 assert rep.content_report_id == id_by_description["r2"]
301 assert rep.reporting_user_id == normal_user.id
302 assert rep.author_user_id == bad_user2.id
303 assert rep.reason == "spam"
304 assert rep.description == "r2"
305 assert rep.content_ref == "comment/124"
306 assert rep.user_agent == "n/a"
307 assert rep.page == "https://couchers.org/comment/124"
309 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
310 assert res.content_reports[0].content_report_id == id_by_description["r3"]
311 assert res.content_reports[1].content_report_id == id_by_description["r1"]
314def test_DeleteUser(db):
315 super_user, super_token = generate_user(is_superuser=True)
316 normal_user, normal_token = generate_user()
318 with real_admin_session(super_token) as api:
319 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
320 assert res.user_id == normal_user.id
321 assert res.username == normal_user.username
322 assert res.email == normal_user.email
323 assert res.gender == normal_user.gender
324 assert parse_date(res.birthdate) == normal_user.birthdate
325 assert not res.banned
326 assert res.deleted
328 with real_admin_session(super_token) as api:
329 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
330 assert res.user_id == normal_user.id
331 assert res.username == normal_user.username
332 assert res.email == normal_user.email
333 assert res.gender == normal_user.gender
334 assert parse_date(res.birthdate) == normal_user.birthdate
335 assert not res.banned
336 assert not res.deleted
339def test_CreateApiKey(db, push_collector):
340 with session_scope() as session:
341 super_user, super_token = generate_user(is_superuser=True)
342 normal_user, normal_token = generate_user()
344 assert (
345 session.execute(
346 select(func.count())
347 .select_from(UserSession)
348 .where(UserSession.is_api_key == True)
349 .where(UserSession.user_id == normal_user.id)
350 ).scalar_one()
351 == 0
352 )
354 with mock_notification_email() as mock:
355 with real_admin_session(super_token) as api:
356 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
358 mock.assert_called_once()
359 e = email_fields(mock)
360 assert e.subject == "[TEST] Your API key for Couchers.org"
362 with session_scope() as session:
363 token = session.execute(
364 select(UserSession.token)
365 .where(UserSession.is_valid)
366 .where(UserSession.is_api_key == True)
367 .where(UserSession.user_id == normal_user.id)
368 ).scalar_one()
370 assert token in e.plain
371 assert token in e.html
373 assert e.recipient == normal_user.email
374 assert "api key" in e.subject.lower()
375 unique_string = "We've issued you with the following API key:"
376 assert unique_string in e.plain
377 assert unique_string in e.html
378 assert "support@couchers.org" in e.plain
379 assert "support@couchers.org" in e.html
381 push_collector.assert_user_has_single_matching(
382 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email."
383 )
386def test_GetChats(db):
387 super_user, super_token = generate_user(is_superuser=True)
388 normal_user, normal_token = generate_user()
390 with real_admin_session(super_token) as api:
391 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
392 # Check the structured response fields - user field contains full UserDetails
393 assert res.user.user_id == normal_user.id
394 assert res.user.username == normal_user.username
395 assert res.user.name == normal_user.name
396 assert res.user.email == normal_user.email
397 # New user should have no chats
398 assert len(res.host_requests) == 0
399 assert len(res.group_chats) == 0
402def test_badges(db, push_collector):
403 super_user, super_token = generate_user(is_superuser=True)
404 normal_user, normal_token = generate_user()
406 with real_admin_session(super_token) as api:
407 # can add a badge
408 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
409 with mock_notification_email() as mock:
410 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
411 assert "swagster" in res.badges
413 # badge emails are disabled by default
414 mock.assert_not_called()
416 push_collector.assert_user_has_single_matching(
417 normal_user.id,
418 title="The Swagster badge was added to your profile",
419 body="Check out your profile to see the new badge!",
420 )
422 # can't add/edit special tags
423 with pytest.raises(grpc.RpcError) as e:
424 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
425 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
426 assert e.value.details() == "Admins cannot edit that badge."
428 # double add badge
429 with pytest.raises(grpc.RpcError) as e:
430 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
431 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
432 assert e.value.details() == "The user already has that badge."
434 # can remove badge
435 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
436 with mock_notification_email() as mock:
437 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
438 assert "swagster" not in res.badges
440 # badge emails are disabled by default
441 mock.assert_not_called()
443 push_collector.assert_user_push_matches_fields(
444 normal_user.id,
445 ix=1,
446 title="The Swagster badge was removed from your profile",
447 body="You can see all your badges on your profile.",
448 )
450 # not found on user
451 with pytest.raises(grpc.RpcError) as e:
452 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
453 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
454 assert e.value.details() == "The user does not have that badge."
456 # not found in general
457 with pytest.raises(grpc.RpcError) as e:
458 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
459 assert e.value.code() == grpc.StatusCode.NOT_FOUND
460 assert e.value.details() == "Badge not found."
463def test_DeleteEvent(db):
464 super_user, super_token = generate_user(is_superuser=True)
465 normal_user, normal_token = generate_user()
467 with session_scope() as session:
468 create_community(session, 0, 2, "Community", [normal_user], [], None)
470 start_time = now() + timedelta(hours=2)
471 end_time = start_time + timedelta(hours=3)
472 with events_session(normal_token) as api:
473 res = api.CreateEvent(
474 events_pb2.CreateEventReq(
475 title="Dummy Title",
476 content="Dummy content.",
477 photo_key=None,
478 offline_information=events_pb2.OfflineEventInformation(
479 address="Near Null Island",
480 lat=0.1,
481 lng=0.2,
482 ),
483 start_time=Timestamp_from_datetime(start_time),
484 end_time=Timestamp_from_datetime(end_time),
485 timezone="UTC",
486 )
487 )
488 event_id = res.event_id
489 assert not res.is_deleted
491 with session_scope() as session:
492 with real_admin_session(super_token) as api:
493 api.DeleteEvent(
494 admin_pb2.DeleteEventReq(
495 event_id=event_id,
496 )
497 )
498 occurrence = session.get(EventOccurrence, ident=event_id)
499 assert occurrence.is_deleted
502def test_ListUserIds(db):
503 super_user, super_token = generate_user(is_superuser=True)
504 normal_user, normal_token = generate_user()
506 with real_admin_session(super_token) as api:
507 res = api.ListUserIds(
508 admin_pb2.ListUserIdsReq(
509 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
510 )
511 )
512 assert len(res.user_ids) == 2
513 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
515 with real_admin_session(super_token) as api:
516 res = api.ListUserIds(
517 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
518 )
519 assert res.user_ids == []
522def test_EditReferenceText(db):
523 super_user, super_token = generate_user(is_superuser=True)
524 test_new_text = "New Text"
526 user1, user1_token = generate_user()
527 user2, user2_token = generate_user()
528 make_friends(user1, user2)
530 with session_scope() as session:
531 with references_session(user1_token) as api:
532 reference = api.WriteFriendReference(
533 references_pb2.WriteFriendReferenceReq(
534 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
535 )
536 )
538 with real_admin_session(super_token) as admin_api:
539 admin_api.EditReferenceText(
540 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
541 )
543 session.expire_all()
545 modified_reference = session.execute(
546 select(Reference).where(Reference.id == reference.reference_id)
547 ).scalar_one_or_none()
548 assert modified_reference.text == test_new_text
551def test_DeleteReference(db):
552 super_user, super_token = generate_user(is_superuser=True)
554 user1, user1_token = generate_user()
555 user2, user2_token = generate_user()
556 make_friends(user1, user2)
558 with references_session(user1_token) as api:
559 reference = api.WriteFriendReference(
560 references_pb2.WriteFriendReferenceReq(
561 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
562 )
563 )
565 with references_session(user1_token) as api:
566 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
568 with real_admin_session(super_token) as admin_api:
569 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
571 with references_session(user1_token) as api:
572 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
574 with session_scope() as session:
575 modified_reference = session.execute(
576 select(Reference).where(Reference.id == reference.reference_id)
577 ).scalar_one_or_none()
578 assert modified_reference.is_deleted
581def test_AddUsersToModerationUserList(db):
582 super_user, super_token = generate_user(is_superuser=True)
583 user1, _ = generate_user()
584 user2, _ = generate_user()
585 user3, _ = generate_user()
586 user4, _ = generate_user()
587 user5, _ = generate_user()
588 moderation_list_id = add_users_to_new_moderation_list([user1])
590 with session_scope() as session:
591 with real_admin_session(super_token) as api:
592 # Test adding users to a non-existent moderation list (should raise an error)
593 with pytest.raises(grpc.RpcError) as e:
594 api.AddUsersToModerationUserList(
595 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999),
596 )
597 assert e.value.code() == grpc.StatusCode.NOT_FOUND
598 assert "Moderation user list not found." == e.value.details()
600 # Test with non-existent user (should raise an error)
601 with pytest.raises(grpc.RpcError) as e:
602 api.AddUsersToModerationUserList(
603 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]),
604 )
605 assert e.value.code() == grpc.StatusCode.NOT_FOUND
606 assert "Couldn't find that user." == e.value.details()
608 # Test successful creation of new moderation list (no moderation_list_id provided)
609 res = api.AddUsersToModerationUserList(
610 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]),
611 )
612 assert res.moderation_list_id > 0
613 with session_scope() as session:
614 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id)
615 assert moderation_user_list is not None
616 assert len(moderation_user_list.users) == 3
617 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users})
619 # Test list endpoint returns same moderation list with same members not repeated
620 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
621 assert len(listRes.moderation_lists) == 1
622 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id
623 assert len(listRes.moderation_lists[0].member_ids) == 3
624 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids)
626 # Test user can be in multiple moderation lists
627 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
628 assert len(listRes3.moderation_lists) == 2
630 # Test adding users to an existing moderation list
631 res2 = api.AddUsersToModerationUserList(
632 admin_pb2.AddUsersToModerationUserListReq(
633 users=[user4.username, user5.username], moderation_list_id=moderation_list_id
634 ),
635 )
636 assert res2.moderation_list_id == moderation_list_id
637 with session_scope() as session:
638 moderation_user_list = session.get(ModerationUserList, moderation_list_id)
639 assert len(moderation_user_list.users) == 3
640 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users})
642 # Test list user moderation lists endpoint returns the right moderation list
643 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username))
644 assert len(listRes2.moderation_lists) == 1
645 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id
646 assert len(listRes2.moderation_lists[0].member_ids) == 3
647 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids)
650def test_RemoveUserFromModerationUserList(db):
651 super_user, super_token = generate_user(is_superuser=True)
652 user1, _ = generate_user()
653 user2, _ = generate_user()
654 user3, _ = generate_user()
655 moderation_list_id = add_users_to_new_moderation_list([user1, user2])
657 with real_admin_session(super_token) as api:
658 # Test with non-existent user (should raise error)
659 with pytest.raises(grpc.RpcError) as e:
660 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent"))
661 assert e.value.code() == grpc.StatusCode.NOT_FOUND
662 assert "Couldn't find that user." == e.value.details()
664 # Test without providing moderation list id (should raise error)
665 with pytest.raises(grpc.RpcError) as e:
666 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username))
667 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
668 assert "Missing moderation user list id." == e.value.details()
670 # Test removing user that's not in the provided moderation list (should raise error)
671 with pytest.raises(grpc.RpcError) as e:
672 api.RemoveUserFromModerationUserList(
673 admin_pb2.RemoveUserFromModerationUserListReq(
674 user=user3.username, moderation_list_id=moderation_list_id
675 )
676 )
677 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
678 assert "User is not in the moderation user list." == e.value.details()
680 # Test successful removal
681 api.RemoveUserFromModerationUserList(
682 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id)
683 )
684 with session_scope() as session:
685 moderation_user_list = session.get(ModerationUserList, moderation_list_id)
686 assert user1.id not in {user.id for user in moderation_user_list.users}
687 assert user2.id in {user.id for user in moderation_user_list.users}
689 # Test list user moderation lists endpoint returns right number of moderation lists
690 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
691 assert len(listRes.moderation_lists) == 0
692 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
693 assert len(listRes2.moderation_lists) == 1
695 # Test removing all users from moderation list should also delete the moderation list
696 api.RemoveUserFromModerationUserList(
697 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id)
698 )
699 with session_scope() as session:
700 assert session.get(ModerationUserList, moderation_list_id) is None
703def test_admin_delete_account_url(db, push_collector):
704 super_user, super_token = generate_user(is_superuser=True)
706 user, token = generate_user()
707 user_id = user.id
709 with real_admin_session(super_token) as admin_api:
710 url = admin_api.CreateAccountDeletionLink(
711 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
712 ).account_deletion_confirm_url
714 push_collector.assert_user_has_count(user_id, 0)
716 with session_scope() as session:
717 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
718 token = token_o.token
719 assert token_o.user.id == user_id
720 assert url == f"http://localhost:3000/delete-account?token={token}"
722 with mock_notification_email() as mock:
723 with auth_api_session() as (auth_api, metadata_interceptor):
724 auth_api.ConfirmDeleteAccount(
725 auth_pb2.ConfirmDeleteAccountReq(
726 token=token,
727 )
728 )
730 push_collector.assert_user_push_matches_fields(
731 user_id,
732 ix=0,
733 title="Your Couchers.org account has been deleted",
734 body="You can still undo this by following the link we emailed to you within 7 days.",
735 )
737 mock.assert_called_once()
738 e = email_fields(mock)
741def test_SetLastDonated(db):
742 super_user, super_token = generate_user(is_superuser=True)
743 normal_user, normal_token = generate_user(last_donated=None)
745 with real_admin_session(super_token) as api:
746 # user starts with no last_donated
747 with session_scope() as session:
748 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
749 assert user.last_donated is None
751 # can set last_donated
752 donation_time = now() - timedelta(days=30)
753 res = api.SetLastDonated(
754 admin_pb2.SetLastDonatedReq(
755 user=normal_user.username,
756 last_donated=Timestamp_from_datetime(donation_time),
757 )
758 )
760 with session_scope() as session:
761 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
762 assert user.last_donated is not None
763 # check timestamp is close (within a second)
764 assert abs((user.last_donated - donation_time).total_seconds()) < 1
766 # can clear last_donated by not setting the field
767 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username))
769 with session_scope() as session:
770 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
771 assert user.last_donated is None
773 # user not found
774 with pytest.raises(grpc.RpcError) as e:
775 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent"))
776 assert e.value.code() == grpc.StatusCode.NOT_FOUND
777 assert e.value.details() == "Couldn't find that user."
780# community invite feature tested in test_events.py
781# SendBlogPostNotification tested in test_notifications.py
782# MarkUserNeedsLocationUpdate tested in test_jail.py