Coverage for app / backend / src / tests / test_admin.py: 100%
504 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1from datetime import date, datetime, timedelta
2from re import match
4import grpc
5import pytest
6from sqlalchemy import select
7from sqlalchemy.sql import func
9from couchers.db import session_scope
10from couchers.models import (
11 AccountDeletionToken,
12 ContentReport,
13 EventOccurrence,
14 ModerationUserList,
15 Reference,
16 User,
17 UserSession,
18)
19from couchers.proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2
20from couchers.utils import Timestamp_from_datetime, now, parse_date
21from tests.fixtures.db import add_users_to_new_moderation_list, generate_user, make_friends
22from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email
23from tests.fixtures.sessions import (
24 auth_api_session,
25 events_session,
26 real_admin_session,
27 references_session,
28 reporting_session,
29)
30from tests.test_communities import create_community
33@pytest.fixture(autouse=True)
34def _(testconfig):
35 pass
38def test_access_by_normal_user(db):
39 normal_user, normal_token = generate_user()
41 with real_admin_session(normal_token) as api:
42 # all requests to the admin servicer should break when done by a non-super_user
43 with pytest.raises(grpc.RpcError) as e:
44 api.GetUserDetails(
45 admin_pb2.GetUserDetailsReq(
46 user=str(normal_user.id),
47 )
48 )
49 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
52def test_GetUser(db):
53 super_user, super_token = generate_user(is_superuser=True)
54 normal_user, normal_token = generate_user()
56 with real_admin_session(super_token) as api:
57 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
58 assert res.user_id == normal_user.id
59 assert res.username == normal_user.username
61 with real_admin_session(super_token) as api:
62 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
64 with real_admin_session(super_token) as api:
65 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
66 assert res.user_id == normal_user.id
67 assert res.username == normal_user.username
70def test_GetUserDetails(db):
71 super_user, super_token = generate_user(is_superuser=True)
72 normal_user, normal_token = generate_user()
74 with real_admin_session(super_token) as api:
75 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
76 assert res.user_id == normal_user.id
77 assert res.username == normal_user.username
78 assert res.email == normal_user.email
79 assert res.gender == normal_user.gender
80 assert parse_date(res.birthdate) == normal_user.birthdate
81 assert not res.banned
82 assert not res.deleted
84 with real_admin_session(super_token) as api:
85 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
86 assert res.user_id == normal_user.id
87 assert res.username == normal_user.username
88 assert res.email == normal_user.email
89 assert res.gender == normal_user.gender
90 assert parse_date(res.birthdate) == normal_user.birthdate
91 assert not res.banned
92 assert not res.deleted
94 with real_admin_session(super_token) as api:
95 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
96 assert res.user_id == normal_user.id
97 assert res.username == normal_user.username
98 assert res.email == normal_user.email
99 assert res.gender == normal_user.gender
100 assert parse_date(res.birthdate) == normal_user.birthdate
101 assert not res.banned
102 assert not res.deleted
105def test_ChangeUserGender(db, push_collector: PushCollector):
106 super_user, super_token = generate_user(is_superuser=True)
107 normal_user, normal_token = generate_user()
109 with real_admin_session(super_token) as api:
110 with mock_notification_email() as mock:
111 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
112 assert res.user_id == normal_user.id
113 assert res.username == normal_user.username
114 assert res.email == normal_user.email
115 assert res.gender == "Machine"
116 assert parse_date(res.birthdate) == normal_user.birthdate
117 assert not res.banned
118 assert not res.deleted
120 mock.assert_called_once()
121 e = email_fields(mock)
122 assert e.subject == "[TEST] Your gender was changed"
123 assert e.recipient == normal_user.email
124 assert "Machine" in e.plain
125 assert "Machine" in e.html
127 push = push_collector.pop_for_user(normal_user.id, last=True)
128 assert push.content.title == "Gender changed"
129 assert push.content.body == "An admin changed your gender to Machine."
132def test_ChangeUserBirthdate(db, push_collector: PushCollector):
133 super_user, super_token = generate_user(is_superuser=True)
134 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
136 with real_admin_session(super_token) as api:
137 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
138 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
140 with mock_notification_email() as mock:
141 res = api.ChangeUserBirthdate(
142 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
143 )
145 assert res.user_id == normal_user.id
146 assert res.username == normal_user.username
147 assert res.email == normal_user.email
148 assert res.birthdate == "1990-05-25"
149 assert res.gender == normal_user.gender
150 assert not res.banned
151 assert not res.deleted
153 mock.assert_called_once()
154 e = email_fields(mock)
155 assert e.subject == "[TEST] Your date of birth was changed"
156 assert e.recipient == normal_user.email
157 assert "1990" in e.plain
158 assert "1990" in e.html
160 push = push_collector.pop_for_user(normal_user.id, last=True)
161 assert push.content.title == "Birthdate changed"
162 assert push.content.body == "An admin changed your date of birth to Friday 25 May 1990."
165def test_BanUser(db):
166 super_user, super_token = generate_user(is_superuser=True)
167 normal_user, _ = generate_user()
168 admin_note = "A good reason"
169 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
170 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
172 with real_admin_session(super_token) as api:
173 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
174 assert res.user_id == normal_user.id
175 assert res.username == normal_user.username
176 assert res.email == normal_user.email
177 assert res.gender == normal_user.gender
178 assert parse_date(res.birthdate) == normal_user.birthdate
179 assert res.banned
180 assert not res.deleted
181 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
184def test_UnbanUser(db):
185 super_user, super_token = generate_user(is_superuser=True)
186 normal_user, _ = generate_user()
187 admin_note = "A good reason"
188 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
189 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
191 with real_admin_session(super_token) as api:
192 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
193 assert res.user_id == normal_user.id
194 assert res.username == normal_user.username
195 assert res.email == normal_user.email
196 assert res.gender == normal_user.gender
197 assert parse_date(res.birthdate) == normal_user.birthdate
198 assert not res.banned
199 assert not res.deleted
200 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
203def test_AddAdminNote(db):
204 super_user, super_token = generate_user(is_superuser=True)
205 normal_user, _ = generate_user()
206 admin_note1 = "User reported strange behavior"
207 admin_note2 = "Insert private information here"
208 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
209 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
211 with real_admin_session(super_token) as api:
212 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
213 assert res.user_id == normal_user.id
214 assert res.username == normal_user.username
215 assert res.email == normal_user.email
216 assert res.gender == normal_user.gender
217 assert parse_date(res.birthdate) == normal_user.birthdate
218 assert not res.banned
219 assert not res.deleted
220 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
222 with real_admin_session(super_token) as api:
223 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
224 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
227def test_AddAdminNote_blank(db):
228 super_user, super_token = generate_user(is_superuser=True)
229 normal_user, _ = generate_user()
230 empty_admin_note = " \t \n "
232 with real_admin_session(super_token) as api:
233 with pytest.raises(grpc.RpcError) as e:
234 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
235 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
236 assert e.value.details() == "The admin note cannot be empty."
239def test_admin_content_reports(db):
240 super_user, super_token = generate_user(is_superuser=True)
241 normal_user, token = generate_user()
242 bad_user1, _ = generate_user()
243 bad_user2, _ = generate_user()
245 with reporting_session(token) as api:
246 api.Report(
247 reporting_pb2.ReportReq(
248 reason="spam",
249 description="r1",
250 content_ref="comment/123",
251 author_user=bad_user1.username,
252 user_agent="n/a",
253 page="https://couchers.org/comment/123",
254 )
255 )
256 api.Report(
257 reporting_pb2.ReportReq(
258 reason="spam",
259 description="r2",
260 content_ref="comment/124",
261 author_user=bad_user2.username,
262 user_agent="n/a",
263 page="https://couchers.org/comment/124",
264 )
265 )
266 api.Report(
267 reporting_pb2.ReportReq(
268 reason="something else",
269 description="r3",
270 content_ref="page/321",
271 author_user=bad_user1.username,
272 user_agent="n/a",
273 page="https://couchers.org/page/321",
274 )
275 )
277 with session_scope() as session:
278 id_by_description: dict[str, int] = dict(
279 session.execute(select(ContentReport.description, ContentReport.id)).all() # type: ignore[arg-type]
280 )
282 with real_admin_session(super_token) as api:
283 with pytest.raises(grpc.RpcError) as e:
284 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
285 assert e.value.code() == grpc.StatusCode.NOT_FOUND
286 assert e.value.details() == "Content report not found."
288 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
289 rep = res.content_report
290 assert rep.content_report_id == id_by_description["r2"]
291 assert rep.reporting_user_id == normal_user.id
292 assert rep.author_user_id == bad_user2.id
293 assert rep.reason == "spam"
294 assert rep.description == "r2"
295 assert rep.content_ref == "comment/124"
296 assert rep.user_agent == "n/a"
297 assert rep.page == "https://couchers.org/comment/124"
299 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
300 assert res.content_reports[0].content_report_id == id_by_description["r3"]
301 assert res.content_reports[1].content_report_id == id_by_description["r1"]
304def test_DeleteUser(db):
305 super_user, super_token = generate_user(is_superuser=True)
306 normal_user, normal_token = generate_user()
308 with real_admin_session(super_token) as api:
309 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
310 assert res.user_id == normal_user.id
311 assert res.username == normal_user.username
312 assert res.email == normal_user.email
313 assert res.gender == normal_user.gender
314 assert parse_date(res.birthdate) == normal_user.birthdate
315 assert not res.banned
316 assert res.deleted
318 with real_admin_session(super_token) as api:
319 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
320 assert res.user_id == normal_user.id
321 assert res.username == normal_user.username
322 assert res.email == normal_user.email
323 assert res.gender == normal_user.gender
324 assert parse_date(res.birthdate) == normal_user.birthdate
325 assert not res.banned
326 assert not res.deleted
329def test_CreateApiKey(db, push_collector: PushCollector):
330 with session_scope() as session:
331 super_user, super_token = generate_user(is_superuser=True)
332 normal_user, normal_token = generate_user()
334 assert (
335 session.execute(
336 select(func.count())
337 .select_from(UserSession)
338 .where(UserSession.is_api_key == True)
339 .where(UserSession.user_id == normal_user.id)
340 ).scalar_one()
341 == 0
342 )
344 with mock_notification_email() as mock:
345 with real_admin_session(super_token) as api:
346 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
348 mock.assert_called_once()
349 e = email_fields(mock)
350 assert e.subject == "[TEST] Your API key for Couchers.org"
352 with session_scope() as session:
353 token = session.execute(
354 select(UserSession.token)
355 .where(UserSession.is_valid)
356 .where(UserSession.is_api_key == True)
357 .where(UserSession.user_id == normal_user.id)
358 ).scalar_one()
360 assert token in e.plain
361 assert token in e.html
363 assert e.recipient == normal_user.email
364 assert "api key" in e.subject.lower()
365 unique_string = "We've issued you with the following API key:"
366 assert unique_string in e.plain
367 assert unique_string in e.html
368 assert "support@couchers.org" in e.plain
369 assert "support@couchers.org" in e.html
371 push = push_collector.pop_for_user(normal_user.id, last=True)
372 assert push.content.title == "API key created"
373 assert push.content.body == "Details were sent to you via email."
376def test_GetChats(db):
377 super_user, super_token = generate_user(is_superuser=True)
378 normal_user, normal_token = generate_user()
380 with real_admin_session(super_token) as api:
381 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
382 # Check the structured response fields - user field contains full UserDetails
383 assert res.user.user_id == normal_user.id
384 assert res.user.username == normal_user.username
385 assert res.user.name == normal_user.name
386 assert res.user.email == normal_user.email
387 # New user should have no chats
388 assert len(res.host_requests) == 0
389 assert len(res.group_chats) == 0
392def test_badges(db, push_collector: PushCollector):
393 super_user, super_token = generate_user(is_superuser=True)
394 normal_user, normal_token = generate_user()
396 with real_admin_session(super_token) as api:
397 # can add a badge
398 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
399 with mock_notification_email() as mock:
400 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
401 assert "swagster" in res.badges
403 # badge emails are disabled by default
404 mock.assert_not_called()
406 push = push_collector.pop_for_user(normal_user.id, last=True)
407 assert push.content.title == "New profile badge: Swagster"
408 assert push.content.body == "The Swagster badge was added to your profile."
410 # can't add/edit special tags
411 with pytest.raises(grpc.RpcError) as e:
412 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
413 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
414 assert e.value.details() == "Admins cannot edit that badge."
416 # double add badge
417 with pytest.raises(grpc.RpcError) as e:
418 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
419 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
420 assert e.value.details() == "The user already has that badge."
422 # can remove badge
423 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
424 with mock_notification_email() as mock:
425 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
426 assert "swagster" not in res.badges
428 # badge emails are disabled by default
429 mock.assert_not_called()
431 push = push_collector.pop_for_user(normal_user.id, last=True)
432 assert push.content.title == "Profile badge removed"
433 assert push.content.body == "The Swagster badge was removed from your profile."
435 # not found on user
436 with pytest.raises(grpc.RpcError) as e:
437 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
438 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
439 assert e.value.details() == "The user does not have that badge."
441 # not found in general
442 with pytest.raises(grpc.RpcError) as e:
443 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
444 assert e.value.code() == grpc.StatusCode.NOT_FOUND
445 assert e.value.details() == "Badge not found."
448def test_DeleteEvent(db):
449 super_user, super_token = generate_user(is_superuser=True)
450 normal_user, normal_token = generate_user()
452 with session_scope() as session:
453 create_community(session, 0, 2, "Community", [normal_user], [], None)
455 start_time = now() + timedelta(hours=2)
456 end_time = start_time + timedelta(hours=3)
457 with events_session(normal_token) as api:
458 res = api.CreateEvent(
459 events_pb2.CreateEventReq(
460 title="Dummy Title",
461 content="Dummy content.",
462 photo_key=None,
463 offline_information=events_pb2.OfflineEventInformation(
464 address="Near Null Island",
465 lat=0.1,
466 lng=0.2,
467 ),
468 start_time=Timestamp_from_datetime(start_time),
469 end_time=Timestamp_from_datetime(end_time),
470 timezone="UTC",
471 )
472 )
473 event_id = res.event_id
474 assert not res.is_deleted
476 with session_scope() as session:
477 with real_admin_session(super_token) as api:
478 api.DeleteEvent(
479 admin_pb2.DeleteEventReq(
480 event_id=event_id,
481 )
482 )
483 occurrence = session.get_one(EventOccurrence, ident=event_id)
484 assert occurrence.is_deleted
487def test_ListUserIds(db):
488 super_user, super_token = generate_user(is_superuser=True)
489 normal_user, normal_token = generate_user()
491 with real_admin_session(super_token) as api:
492 res = api.ListUserIds(
493 admin_pb2.ListUserIdsReq(
494 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
495 )
496 )
497 assert len(res.user_ids) == 2
498 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
500 with real_admin_session(super_token) as api:
501 res = api.ListUserIds(
502 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
503 )
504 assert res.user_ids == []
507def test_EditReferenceText(db):
508 super_user, super_token = generate_user(is_superuser=True)
509 test_new_text = "New Text"
511 user1, user1_token = generate_user()
512 user2, user2_token = generate_user()
513 make_friends(user1, user2)
515 with session_scope() as session:
516 with references_session(user1_token) as api:
517 reference = api.WriteFriendReference(
518 references_pb2.WriteFriendReferenceReq(
519 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
520 )
521 )
523 with real_admin_session(super_token) as admin_api:
524 admin_api.EditReferenceText(
525 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
526 )
528 session.expire_all()
530 modified_reference = session.execute(
531 select(Reference).where(Reference.id == reference.reference_id)
532 ).scalar_one()
533 assert modified_reference.text == test_new_text
536def test_DeleteReference(db):
537 super_user, super_token = generate_user(is_superuser=True)
539 user1, user1_token = generate_user()
540 user2, user2_token = generate_user()
541 make_friends(user1, user2)
543 with references_session(user1_token) as api:
544 reference = api.WriteFriendReference(
545 references_pb2.WriteFriendReferenceReq(
546 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
547 )
548 )
550 with references_session(user1_token) as api:
551 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
553 with real_admin_session(super_token) as admin_api:
554 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
556 with references_session(user1_token) as api:
557 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
559 with session_scope() as session:
560 modified_reference = session.execute(
561 select(Reference).where(Reference.id == reference.reference_id)
562 ).scalar_one()
563 assert modified_reference.is_deleted
566def test_GetUserReferences(db):
567 super_user, super_token = generate_user(is_superuser=True)
569 user1, user1_token = generate_user()
570 user2, user2_token = generate_user()
571 user3, user3_token = generate_user()
572 make_friends(user1, user2)
573 make_friends(user1, user3)
574 make_friends(user2, user3)
576 # user1 writes reference about user2
577 with references_session(user1_token) as api:
578 ref1 = api.WriteFriendReference(
579 references_pb2.WriteFriendReferenceReq(
580 to_user_id=user2.id,
581 text="Reference from user1 to user2",
582 private_text="",
583 was_appropriate=True,
584 rating=1,
585 )
586 )
588 # user2 writes reference about user1
589 with references_session(user2_token) as api:
590 ref2 = api.WriteFriendReference(
591 references_pb2.WriteFriendReferenceReq(
592 to_user_id=user1.id,
593 text="Reference from user2 to user1",
594 private_text="Private note",
595 was_appropriate=True,
596 rating=0.8,
597 )
598 )
600 # user3 writes reference about user1
601 with references_session(user3_token) as api:
602 ref3 = api.WriteFriendReference(
603 references_pb2.WriteFriendReferenceReq(
604 to_user_id=user1.id,
605 text="Reference from user3 to user1",
606 private_text="",
607 was_appropriate=False,
608 rating=0.5,
609 )
610 )
612 # Delete ref3
613 with real_admin_session(super_token) as admin_api:
614 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=ref3.reference_id))
616 # Test GetUserReferences for user1
617 with real_admin_session(super_token) as admin_api:
618 res = admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user=user1.username))
620 # user1 wrote 1 reference
621 assert len(res.references_from) == 1
622 assert res.references_from[0].reference_id == ref1.reference_id
623 assert res.references_from[0].from_user_id == user1.id
624 assert res.references_from[0].to_user_id == user2.id
625 assert res.references_from[0].text == "Reference from user1 to user2"
626 assert res.references_from[0].is_deleted is False
628 # user1 received 2 references (including the deleted one)
629 assert len(res.references_to) == 2
630 # Ordered by id descending, so ref3 comes first
631 assert res.references_to[0].reference_id == ref3.reference_id
632 assert res.references_to[0].is_deleted is True
633 assert res.references_to[0].was_appropriate is False
635 assert res.references_to[1].reference_id == ref2.reference_id
636 assert res.references_to[1].private_text == "Private note"
637 assert res.references_to[1].rating == 0.8
638 assert res.references_to[1].is_deleted is False
641def test_GetUserReferences_not_found(db):
642 super_user, super_token = generate_user(is_superuser=True)
644 with real_admin_session(super_token) as admin_api:
645 with pytest.raises(grpc.RpcError) as e:
646 admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user="nonexistent"))
647 assert e.value.code() == grpc.StatusCode.NOT_FOUND
650def test_AddUsersToModerationUserList(db):
651 super_user, super_token = generate_user(is_superuser=True)
652 user1, _ = generate_user()
653 user2, _ = generate_user()
654 user3, _ = generate_user()
655 user4, _ = generate_user()
656 user5, _ = generate_user()
657 moderation_list_id = add_users_to_new_moderation_list([user1])
659 with session_scope() as session:
660 with real_admin_session(super_token) as api:
661 # Test adding users to a non-existent moderation list (should raise an error)
662 with pytest.raises(grpc.RpcError) as e:
663 api.AddUsersToModerationUserList(
664 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999),
665 )
666 assert e.value.code() == grpc.StatusCode.NOT_FOUND
667 assert "Moderation user list not found." == e.value.details()
669 # Test with non-existent user (should raise an error)
670 with pytest.raises(grpc.RpcError) as e:
671 api.AddUsersToModerationUserList(
672 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]),
673 )
674 assert e.value.code() == grpc.StatusCode.NOT_FOUND
675 assert "Couldn't find that user." == e.value.details()
677 # Test successful creation of new moderation list (no moderation_list_id provided)
678 res = api.AddUsersToModerationUserList(
679 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]),
680 )
681 assert res.moderation_list_id > 0
682 with session_scope() as session:
683 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id)
684 assert moderation_user_list is not None
685 assert len(moderation_user_list.users) == 3
686 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users})
688 # Test list endpoint returns same moderation list with same members not repeated
689 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
690 assert len(listRes.moderation_lists) == 1
691 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id
692 assert len(listRes.moderation_lists[0].member_ids) == 3
693 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids)
695 # Test user can be in multiple moderation lists
696 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
697 assert len(listRes3.moderation_lists) == 2
699 # Test adding users to an existing moderation list
700 res2 = api.AddUsersToModerationUserList(
701 admin_pb2.AddUsersToModerationUserListReq(
702 users=[user4.username, user5.username], moderation_list_id=moderation_list_id
703 ),
704 )
705 assert res2.moderation_list_id == moderation_list_id
706 with session_scope() as session:
707 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id)
708 assert len(moderation_user_list.users) == 3
709 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users})
711 # Test list user moderation lists endpoint returns the right moderation list
712 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username))
713 assert len(listRes2.moderation_lists) == 1
714 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id
715 assert len(listRes2.moderation_lists[0].member_ids) == 3
716 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids)
719def test_RemoveUserFromModerationUserList(db):
720 super_user, super_token = generate_user(is_superuser=True)
721 user1, _ = generate_user()
722 user2, _ = generate_user()
723 user3, _ = generate_user()
724 moderation_list_id = add_users_to_new_moderation_list([user1, user2])
726 with real_admin_session(super_token) as api:
727 # Test with non-existent user (should raise error)
728 with pytest.raises(grpc.RpcError) as e:
729 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent"))
730 assert e.value.code() == grpc.StatusCode.NOT_FOUND
731 assert "Couldn't find that user." == e.value.details()
733 # Test without providing moderation list id (should raise error)
734 with pytest.raises(grpc.RpcError) as e:
735 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username))
736 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
737 assert "Missing moderation user list id." == e.value.details()
739 # Test removing user that's not in the provided moderation list (should raise error)
740 with pytest.raises(grpc.RpcError) as e:
741 api.RemoveUserFromModerationUserList(
742 admin_pb2.RemoveUserFromModerationUserListReq(
743 user=user3.username, moderation_list_id=moderation_list_id
744 )
745 )
746 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
747 assert "User is not in the moderation user list." == e.value.details()
749 # Test successful removal
750 api.RemoveUserFromModerationUserList(
751 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id)
752 )
753 with session_scope() as session:
754 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id)
755 assert user1.id not in {user.id for user in moderation_user_list.users}
756 assert user2.id in {user.id for user in moderation_user_list.users}
758 # Test list user moderation lists endpoint returns right number of moderation lists
759 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
760 assert len(listRes.moderation_lists) == 0
761 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
762 assert len(listRes2.moderation_lists) == 1
764 # Test removing all users from moderation list should also delete the moderation list
765 api.RemoveUserFromModerationUserList(
766 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id)
767 )
768 with session_scope() as session:
769 assert session.get(ModerationUserList, moderation_list_id) is None
772def test_admin_delete_account_url(db, push_collector: PushCollector):
773 super_user, super_token = generate_user(is_superuser=True)
775 user, token = generate_user()
776 user_id = user.id
778 with real_admin_session(super_token) as admin_api:
779 url = admin_api.CreateAccountDeletionLink(
780 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
781 ).account_deletion_confirm_url
783 assert push_collector.count_for_user(user_id) == 0
785 with session_scope() as session:
786 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
787 token = token_o.token
788 assert token_o.user.id == user_id
789 assert url == f"http://localhost:3000/delete-account?token={token}"
791 with mock_notification_email() as mock:
792 with auth_api_session() as (auth_api, metadata_interceptor):
793 auth_api.ConfirmDeleteAccount(
794 auth_pb2.ConfirmDeleteAccountReq(
795 token=token,
796 )
797 )
799 push = push_collector.pop_for_user(user_id, last=True)
800 assert push.content.title == "Account deleted"
801 assert push.content.body == "You can restore it within 7 days using the link we emailed you."
802 mock.assert_called_once()
803 e = email_fields(mock)
806def test_SetLastDonated(db):
807 super_user, super_token = generate_user(is_superuser=True)
808 normal_user, normal_token = generate_user(last_donated=None)
810 with real_admin_session(super_token) as api:
811 # user starts with no last_donated
812 with session_scope() as session:
813 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
814 assert user.last_donated is None
816 # can set last_donated
817 donation_time = now() - timedelta(days=30)
818 res = api.SetLastDonated(
819 admin_pb2.SetLastDonatedReq(
820 user=normal_user.username,
821 last_donated=Timestamp_from_datetime(donation_time),
822 )
823 )
825 with session_scope() as session:
826 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
827 assert user.last_donated is not None
828 # check timestamp is close (within a second)
829 assert abs((user.last_donated - donation_time).total_seconds()) < 1
831 # can clear last_donated by not setting the field
832 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username))
834 with session_scope() as session:
835 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
836 assert user.last_donated is None
838 # user not found
839 with pytest.raises(grpc.RpcError) as e:
840 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent"))
841 assert e.value.code() == grpc.StatusCode.NOT_FOUND
842 assert e.value.details() == "Couldn't find that user."
845# community invite feature tested in test_events.py
846# SendBlogPostNotification tested in test_notifications.py
847# MarkUserNeedsLocationUpdate tested in test_jail.py