Coverage for app / backend / src / tests / test_admin.py: 100%
674 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1from datetime import date, datetime, timedelta
3import grpc
4import pytest
5from sqlalchemy import select
6from sqlalchemy.sql import func
8from couchers.db import session_scope
9from couchers.models import (
10 AccountDeletionToken,
11 ContentReport,
12 EventOccurrence,
13 ModerationUserList,
14 Reference,
15 User,
16 UserSession,
17)
18from couchers.proto import account_pb2, admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2
19from couchers.utils import Timestamp_from_datetime, now, parse_date
20from tests.fixtures.db import add_users_to_new_moderation_list, generate_user, make_friends
21from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email
22from tests.fixtures.sessions import (
23 account_session,
24 auth_api_session,
25 events_session,
26 real_admin_session,
27 references_session,
28 reporting_session,
29)
30from tests.test_communities import create_community
33@pytest.fixture(autouse=True)
34def _(testconfig):
35 pass
38def test_access_by_normal_user(db):
39 normal_user, normal_token = generate_user()
41 with real_admin_session(normal_token) as api:
42 # all requests to the admin servicer should break when done by a non-super_user
43 with pytest.raises(grpc.RpcError) as e:
44 api.GetUserDetails(
45 admin_pb2.GetUserDetailsReq(
46 user=str(normal_user.id),
47 )
48 )
49 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
52def test_GetUser(db):
53 super_user, super_token = generate_user(is_superuser=True)
54 normal_user, normal_token = generate_user()
56 with real_admin_session(super_token) as api:
57 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
58 assert res.user_id == normal_user.id
59 assert res.username == normal_user.username
61 with real_admin_session(super_token) as api:
62 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
64 with real_admin_session(super_token) as api:
65 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
66 assert res.user_id == normal_user.id
67 assert res.username == normal_user.username
70def test_GetUserDetails(db):
71 super_user, super_token = generate_user(is_superuser=True)
72 normal_user, normal_token = generate_user()
74 with real_admin_session(super_token) as api:
75 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
76 assert res.user_id == normal_user.id
77 assert res.username == normal_user.username
78 assert res.email == normal_user.email
79 assert res.gender == normal_user.gender
80 assert parse_date(res.birthdate) == normal_user.birthdate
81 assert not res.banned
82 assert not res.deleted
84 with real_admin_session(super_token) as api:
85 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
86 assert res.user_id == normal_user.id
87 assert res.username == normal_user.username
88 assert res.email == normal_user.email
89 assert res.gender == normal_user.gender
90 assert parse_date(res.birthdate) == normal_user.birthdate
91 assert not res.banned
92 assert not res.deleted
94 with real_admin_session(super_token) as api:
95 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
96 assert res.user_id == normal_user.id
97 assert res.username == normal_user.username
98 assert res.email == normal_user.email
99 assert res.gender == normal_user.gender
100 assert parse_date(res.birthdate) == normal_user.birthdate
101 assert not res.banned
102 assert not res.deleted
105def test_ChangeUserGender(db, push_collector: PushCollector):
106 super_user, super_token = generate_user(is_superuser=True)
107 normal_user, normal_token = generate_user()
109 with real_admin_session(super_token) as api:
110 with mock_notification_email() as mock:
111 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
112 assert res.user_id == normal_user.id
113 assert res.username == normal_user.username
114 assert res.email == normal_user.email
115 assert res.gender == "Machine"
116 assert parse_date(res.birthdate) == normal_user.birthdate
117 assert not res.banned
118 assert not res.deleted
120 mock.assert_called_once()
121 e = email_fields(mock)
122 assert e.subject == "[TEST] Your gender was changed"
123 assert e.recipient == normal_user.email
124 assert "Machine" in e.plain
125 assert "Machine" in e.html
127 push = push_collector.pop_for_user(normal_user.id, last=True)
128 assert push.content.title == "Gender changed"
129 assert push.content.body == "An admin changed your gender to Machine."
132def test_ChangeUserBirthdate(db, push_collector: PushCollector):
133 super_user, super_token = generate_user(is_superuser=True)
134 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
136 with real_admin_session(super_token) as api:
137 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
138 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
140 with mock_notification_email() as mock:
141 res = api.ChangeUserBirthdate(
142 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
143 )
145 assert res.user_id == normal_user.id
146 assert res.username == normal_user.username
147 assert res.email == normal_user.email
148 assert res.birthdate == "1990-05-25"
149 assert res.gender == normal_user.gender
150 assert not res.banned
151 assert not res.deleted
153 mock.assert_called_once()
154 e = email_fields(mock)
155 assert e.subject == "[TEST] Your date of birth was changed"
156 assert e.recipient == normal_user.email
157 assert "1990" in e.plain
158 assert "1990" in e.html
160 push = push_collector.pop_for_user(normal_user.id, last=True)
161 assert push.content.title == "Birthdate changed"
162 assert push.content.body == "An admin changed your date of birth to May 25, 1990."
165def test_BanUser(db):
166 super_user, super_token = generate_user(is_superuser=True)
167 normal_user, _ = generate_user()
168 admin_note = "A good reason"
170 with real_admin_session(super_token) as api:
171 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
172 assert res.user_id == normal_user.id
173 assert res.username == normal_user.username
174 assert res.email == normal_user.email
175 assert res.gender == normal_user.gender
176 assert parse_date(res.birthdate) == normal_user.birthdate
177 assert res.banned
178 assert not res.deleted
179 assert len(res.admin_actions) == 1
180 assert res.admin_actions[0].action_type == "ban"
181 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
182 assert res.admin_actions[0].note == admin_note
183 assert res.admin_actions[0].admin_user_id == super_user.id
184 assert res.admin_actions[0].admin_username == super_user.username
187def test_UnbanUser(db):
188 super_user, super_token = generate_user(is_superuser=True)
189 normal_user, _ = generate_user()
190 admin_note = "A good reason"
192 with real_admin_session(super_token) as api:
193 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
194 assert res.user_id == normal_user.id
195 assert res.username == normal_user.username
196 assert res.email == normal_user.email
197 assert res.gender == normal_user.gender
198 assert parse_date(res.birthdate) == normal_user.birthdate
199 assert not res.banned
200 assert not res.deleted
201 assert len(res.admin_actions) == 1
202 assert res.admin_actions[0].action_type == "unban"
203 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
206def test_AddAdminNote(db):
207 super_user, super_token = generate_user(is_superuser=True)
208 normal_user, _ = generate_user()
209 admin_note1 = "User reported strange behavior"
210 admin_note2 = "Insert private information here"
212 with real_admin_session(super_token) as api:
213 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
214 assert res.user_id == normal_user.id
215 assert res.username == normal_user.username
216 assert res.email == normal_user.email
217 assert res.gender == normal_user.gender
218 assert parse_date(res.birthdate) == normal_user.birthdate
219 assert not res.banned
220 assert not res.deleted
221 assert len(res.admin_actions) == 1
222 assert res.admin_actions[0].action_type == "note"
223 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL
224 assert res.admin_actions[0].note == admin_note1
226 with real_admin_session(super_token) as api:
227 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
228 assert len(res.admin_actions) == 2
229 assert res.admin_actions[0].note == admin_note1
230 assert res.admin_actions[1].note == admin_note2
233def test_AddAdminNote_blank(db):
234 super_user, super_token = generate_user(is_superuser=True)
235 normal_user, _ = generate_user()
236 empty_admin_note = " \t \n "
238 with real_admin_session(super_token) as api:
239 with pytest.raises(grpc.RpcError) as e:
240 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
241 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
242 assert e.value.details() == "The admin note cannot be empty."
245def test_admin_content_reports(db):
246 super_user, super_token = generate_user(is_superuser=True)
247 normal_user, token = generate_user()
248 bad_user1, _ = generate_user()
249 bad_user2, _ = generate_user()
251 with reporting_session(token) as api:
252 api.Report(
253 reporting_pb2.ReportReq(
254 reason="spam",
255 description="r1",
256 content_ref="comment/123",
257 author_user=bad_user1.username,
258 user_agent="n/a",
259 page="https://couchers.org/comment/123",
260 )
261 )
262 api.Report(
263 reporting_pb2.ReportReq(
264 reason="spam",
265 description="r2",
266 content_ref="comment/124",
267 author_user=bad_user2.username,
268 user_agent="n/a",
269 page="https://couchers.org/comment/124",
270 )
271 )
272 api.Report(
273 reporting_pb2.ReportReq(
274 reason="something else",
275 description="r3",
276 content_ref="page/321",
277 author_user=bad_user1.username,
278 user_agent="n/a",
279 page="https://couchers.org/page/321",
280 )
281 )
283 with session_scope() as session:
284 id_by_description: dict[str, int] = dict(
285 session.execute(select(ContentReport.description, ContentReport.id)).all() # type: ignore[arg-type]
286 )
288 with real_admin_session(super_token) as api:
289 with pytest.raises(grpc.RpcError) as e:
290 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
291 assert e.value.code() == grpc.StatusCode.NOT_FOUND
292 assert e.value.details() == "Content report not found."
294 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
295 rep = res.content_report
296 assert rep.content_report_id == id_by_description["r2"]
297 assert rep.reporting_user_id == normal_user.id
298 assert rep.author_user_id == bad_user2.id
299 assert rep.reason == "spam"
300 assert rep.description == "r2"
301 assert rep.content_ref == "comment/124"
302 assert rep.user_agent == "n/a"
303 assert rep.page == "https://couchers.org/comment/124"
305 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
306 assert res.content_reports[0].content_report_id == id_by_description["r3"]
307 assert res.content_reports[1].content_report_id == id_by_description["r1"]
310def test_DeleteUser(db):
311 super_user, super_token = generate_user(is_superuser=True)
312 normal_user, normal_token = generate_user()
314 with real_admin_session(super_token) as api:
315 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
316 assert res.user_id == normal_user.id
317 assert res.username == normal_user.username
318 assert res.email == normal_user.email
319 assert res.gender == normal_user.gender
320 assert parse_date(res.birthdate) == normal_user.birthdate
321 assert not res.banned
322 assert res.deleted
324 with real_admin_session(super_token) as api:
325 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
326 assert res.user_id == normal_user.id
327 assert res.username == normal_user.username
328 assert res.email == normal_user.email
329 assert res.gender == normal_user.gender
330 assert parse_date(res.birthdate) == normal_user.birthdate
331 assert not res.banned
332 assert not res.deleted
335def test_RecoverDeletedUser_after_user_initiated_deletion(db, push_collector: PushCollector):
336 """
337 When a user deletes their account through the normal flow (ConfirmDeleteAccount),
338 undelete_token and undelete_until are set. The admin RecoverDeletedUser must clear
339 these fields to satisfy the undelete_nullity database constraint.
340 """
341 super_user, super_token = generate_user(is_superuser=True)
342 normal_user, normal_token = generate_user()
343 user_id = normal_user.id
345 # User initiates account deletion
346 with account_session(normal_token) as account:
347 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
349 # Get the deletion confirmation token
350 with session_scope() as session:
351 deletion_token = session.execute(select(AccountDeletionToken)).scalar_one().token
353 # User confirms account deletion (this sets undelete_token and undelete_until)
354 with auth_api_session() as (auth_api, metadata_interceptor):
355 auth_api.ConfirmDeleteAccount(auth_pb2.ConfirmDeleteAccountReq(token=deletion_token))
357 # Verify the user is deleted and has undelete fields set
358 with session_scope() as session:
359 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
360 assert user.deleted_at is not None
361 assert user.undelete_token is not None
362 assert user.undelete_until is not None
364 # Admin recovers the user
365 with real_admin_session(super_token) as api:
366 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
367 assert res.user_id == user_id
368 assert not res.deleted
370 # Verify undelete fields are cleared
371 with session_scope() as session:
372 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
373 assert user.deleted_at is None
374 assert user.undelete_token is None
375 assert user.undelete_until is None
378def test_CreateApiKey(db, push_collector: PushCollector):
379 with session_scope() as session:
380 super_user, super_token = generate_user(is_superuser=True)
381 normal_user, normal_token = generate_user()
383 assert (
384 session.execute(
385 select(func.count())
386 .select_from(UserSession)
387 .where(UserSession.is_api_key == True)
388 .where(UserSession.user_id == normal_user.id)
389 ).scalar_one()
390 == 0
391 )
393 with mock_notification_email() as mock:
394 with real_admin_session(super_token) as api:
395 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
397 mock.assert_called_once()
398 e = email_fields(mock)
399 assert e.subject == "[TEST] Your API key for Couchers.org"
401 with session_scope() as session:
402 token = session.execute(
403 select(UserSession.token)
404 .where(UserSession.is_valid)
405 .where(UserSession.is_api_key == True)
406 .where(UserSession.user_id == normal_user.id)
407 ).scalar_one()
409 assert token in e.plain
410 assert token in e.html
412 assert e.recipient == normal_user.email
413 assert "api key" in e.subject.lower()
414 unique_string = "We've issued you with the following API key:"
415 assert unique_string in e.plain
416 assert unique_string in e.html
417 assert "support@couchers.org" in e.plain
418 assert "support@couchers.org" in e.html
420 push = push_collector.pop_for_user(normal_user.id, last=True)
421 assert push.content.title == "API key created"
422 assert push.content.body == "Details were sent to you via email."
425def test_GetChats(db):
426 super_user, super_token = generate_user(is_superuser=True)
427 normal_user, normal_token = generate_user()
429 with real_admin_session(super_token) as api:
430 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
431 assert res.user.user_id == normal_user.id
432 assert res.user.username == normal_user.username
433 assert res.user.name == normal_user.name
434 # New user should have no chats
435 assert len(res.host_requests) == 0
436 assert len(res.group_chats) == 0
439def test_badges(db, push_collector: PushCollector):
440 super_user, super_token = generate_user(is_superuser=True)
441 normal_user, normal_token = generate_user()
443 with real_admin_session(super_token) as api:
444 # can add a badge
445 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
446 with mock_notification_email() as mock:
447 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
448 assert "swagster" in res.badges
450 # badge emails are disabled by default
451 mock.assert_not_called()
453 push = push_collector.pop_for_user(normal_user.id, last=True)
454 assert push.content.title == "New profile badge: Swagster"
455 assert push.content.body == "The Swagster badge was added to your profile."
457 # can't add/edit special tags
458 with pytest.raises(grpc.RpcError) as e:
459 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
460 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
461 assert e.value.details() == "Admins cannot edit that badge."
463 # double add badge
464 with pytest.raises(grpc.RpcError) as e:
465 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
466 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
467 assert e.value.details() == "The user already has that badge."
469 # can remove badge
470 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
471 with mock_notification_email() as mock:
472 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
473 assert "swagster" not in res.badges
475 # badge emails are disabled by default
476 mock.assert_not_called()
478 push = push_collector.pop_for_user(normal_user.id, last=True)
479 assert push.content.title == "Profile badge removed"
480 assert push.content.body == "The Swagster badge was removed from your profile."
482 # not found on user
483 with pytest.raises(grpc.RpcError) as e:
484 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
485 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
486 assert e.value.details() == "The user does not have that badge."
488 # not found in general
489 with pytest.raises(grpc.RpcError) as e:
490 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
491 assert e.value.code() == grpc.StatusCode.NOT_FOUND
492 assert e.value.details() == "Badge not found."
495def test_DeleteEvent(db):
496 super_user, super_token = generate_user(is_superuser=True)
497 normal_user, normal_token = generate_user()
499 with session_scope() as session:
500 create_community(session, 0, 2, "Community", [normal_user], [], None)
502 start_time = now() + timedelta(hours=2)
503 end_time = start_time + timedelta(hours=3)
504 with events_session(normal_token) as api:
505 res = api.CreateEvent(
506 events_pb2.CreateEventReq(
507 title="Dummy Title",
508 content="Dummy content.",
509 photo_key=None,
510 offline_information=events_pb2.OfflineEventInformation(
511 address="Near Null Island",
512 lat=0.1,
513 lng=0.2,
514 ),
515 start_time=Timestamp_from_datetime(start_time),
516 end_time=Timestamp_from_datetime(end_time),
517 timezone="UTC",
518 )
519 )
520 event_id = res.event_id
521 assert not res.is_deleted
523 with session_scope() as session:
524 with real_admin_session(super_token) as api:
525 api.DeleteEvent(
526 admin_pb2.DeleteEventReq(
527 event_id=event_id,
528 )
529 )
530 occurrence = session.get_one(EventOccurrence, ident=event_id)
531 assert occurrence.is_deleted
534def test_ListUserIds(db):
535 super_user, super_token = generate_user(is_superuser=True)
536 normal_user, normal_token = generate_user()
538 with real_admin_session(super_token) as api:
539 res = api.ListUserIds(
540 admin_pb2.ListUserIdsReq(
541 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
542 )
543 )
544 assert len(res.user_ids) == 2
545 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
547 with real_admin_session(super_token) as api:
548 res = api.ListUserIds(
549 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
550 )
551 assert res.user_ids == []
554def test_EditReferenceText(db):
555 super_user, super_token = generate_user(is_superuser=True)
556 test_new_text = "New Text"
558 user1, user1_token = generate_user()
559 user2, user2_token = generate_user()
560 make_friends(user1, user2)
562 with session_scope() as session:
563 with references_session(user1_token) as api:
564 reference = api.WriteFriendReference(
565 references_pb2.WriteFriendReferenceReq(
566 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
567 )
568 )
570 with real_admin_session(super_token) as admin_api:
571 admin_api.EditReferenceText(
572 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
573 )
575 session.expire_all()
577 modified_reference = session.execute(
578 select(Reference).where(Reference.id == reference.reference_id)
579 ).scalar_one()
580 assert modified_reference.text == test_new_text
583def test_DeleteReference(db):
584 super_user, super_token = generate_user(is_superuser=True)
586 user1, user1_token = generate_user()
587 user2, user2_token = generate_user()
588 make_friends(user1, user2)
590 with references_session(user1_token) as api:
591 reference = api.WriteFriendReference(
592 references_pb2.WriteFriendReferenceReq(
593 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
594 )
595 )
597 with references_session(user1_token) as api:
598 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
600 with real_admin_session(super_token) as admin_api:
601 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
603 with references_session(user1_token) as api:
604 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
606 with session_scope() as session:
607 modified_reference = session.execute(
608 select(Reference).where(Reference.id == reference.reference_id)
609 ).scalar_one()
610 assert modified_reference.is_deleted
613def test_GetUserReferences(db):
614 super_user, super_token = generate_user(is_superuser=True)
616 user1, user1_token = generate_user()
617 user2, user2_token = generate_user()
618 user3, user3_token = generate_user()
619 make_friends(user1, user2)
620 make_friends(user1, user3)
621 make_friends(user2, user3)
623 # user1 writes reference about user2
624 with references_session(user1_token) as api:
625 ref1 = api.WriteFriendReference(
626 references_pb2.WriteFriendReferenceReq(
627 to_user_id=user2.id,
628 text="Reference from user1 to user2",
629 private_text="",
630 was_appropriate=True,
631 rating=1,
632 )
633 )
635 # user2 writes reference about user1
636 with references_session(user2_token) as api:
637 ref2 = api.WriteFriendReference(
638 references_pb2.WriteFriendReferenceReq(
639 to_user_id=user1.id,
640 text="Reference from user2 to user1",
641 private_text="Private note",
642 was_appropriate=True,
643 rating=0.8,
644 )
645 )
647 # user3 writes reference about user1
648 with references_session(user3_token) as api:
649 ref3 = api.WriteFriendReference(
650 references_pb2.WriteFriendReferenceReq(
651 to_user_id=user1.id,
652 text="Reference from user3 to user1",
653 private_text="",
654 was_appropriate=False,
655 rating=0.5,
656 )
657 )
659 # Delete ref3
660 with real_admin_session(super_token) as admin_api:
661 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=ref3.reference_id))
663 # Test GetUserReferences for user1
664 with real_admin_session(super_token) as admin_api:
665 res = admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user=user1.username))
667 # user1 wrote 1 reference
668 assert len(res.references_from) == 1
669 assert res.references_from[0].reference_id == ref1.reference_id
670 assert res.references_from[0].from_user_id == user1.id
671 assert res.references_from[0].to_user_id == user2.id
672 assert res.references_from[0].text == "Reference from user1 to user2"
673 assert res.references_from[0].is_deleted is False
675 # user1 received 2 references (including the deleted one)
676 assert len(res.references_to) == 2
677 # Ordered by id descending, so ref3 comes first
678 assert res.references_to[0].reference_id == ref3.reference_id
679 assert res.references_to[0].is_deleted is True
680 assert res.references_to[0].was_appropriate is False
682 assert res.references_to[1].reference_id == ref2.reference_id
683 assert res.references_to[1].private_text == "Private note"
684 assert res.references_to[1].rating == 0.8
685 assert res.references_to[1].is_deleted is False
688def test_GetUserReferences_not_found(db):
689 super_user, super_token = generate_user(is_superuser=True)
691 with real_admin_session(super_token) as admin_api:
692 with pytest.raises(grpc.RpcError) as e:
693 admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user="nonexistent"))
694 assert e.value.code() == grpc.StatusCode.NOT_FOUND
697def test_AddUsersToModerationUserList(db):
698 super_user, super_token = generate_user(is_superuser=True)
699 user1, _ = generate_user()
700 user2, _ = generate_user()
701 user3, _ = generate_user()
702 user4, _ = generate_user()
703 user5, _ = generate_user()
704 moderation_list_id = add_users_to_new_moderation_list([user1])
706 with session_scope() as session:
707 with real_admin_session(super_token) as api:
708 # Test adding users to a non-existent moderation list (should raise an error)
709 with pytest.raises(grpc.RpcError) as e:
710 api.AddUsersToModerationUserList(
711 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999),
712 )
713 assert e.value.code() == grpc.StatusCode.NOT_FOUND
714 assert "Moderation user list not found." == e.value.details()
716 # Test with non-existent user (should raise an error)
717 with pytest.raises(grpc.RpcError) as e:
718 api.AddUsersToModerationUserList(
719 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]),
720 )
721 assert e.value.code() == grpc.StatusCode.NOT_FOUND
722 assert "Couldn't find that user." == e.value.details()
724 # Test successful creation of new moderation list (no moderation_list_id provided)
725 res = api.AddUsersToModerationUserList(
726 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]),
727 )
728 assert res.moderation_list_id > 0
729 with session_scope() as session:
730 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id)
731 assert moderation_user_list is not None
732 assert len(moderation_user_list.users) == 3
733 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users})
735 # Test list endpoint returns same moderation list with same members not repeated
736 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
737 assert len(listRes.moderation_lists) == 1
738 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id
739 assert len(listRes.moderation_lists[0].member_ids) == 3
740 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids)
742 # Test user can be in multiple moderation lists
743 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
744 assert len(listRes3.moderation_lists) == 2
746 # Test adding users to an existing moderation list
747 res2 = api.AddUsersToModerationUserList(
748 admin_pb2.AddUsersToModerationUserListReq(
749 users=[user4.username, user5.username], moderation_list_id=moderation_list_id
750 ),
751 )
752 assert res2.moderation_list_id == moderation_list_id
753 with session_scope() as session:
754 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id)
755 assert len(moderation_user_list.users) == 3
756 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users})
758 # Test list user moderation lists endpoint returns the right moderation list
759 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username))
760 assert len(listRes2.moderation_lists) == 1
761 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id
762 assert len(listRes2.moderation_lists[0].member_ids) == 3
763 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids)
766def test_RemoveUserFromModerationUserList(db):
767 super_user, super_token = generate_user(is_superuser=True)
768 user1, _ = generate_user()
769 user2, _ = generate_user()
770 user3, _ = generate_user()
771 moderation_list_id = add_users_to_new_moderation_list([user1, user2])
773 with real_admin_session(super_token) as api:
774 # Test with non-existent user (should raise error)
775 with pytest.raises(grpc.RpcError) as e:
776 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent"))
777 assert e.value.code() == grpc.StatusCode.NOT_FOUND
778 assert "Couldn't find that user." == e.value.details()
780 # Test without providing moderation list id (should raise error)
781 with pytest.raises(grpc.RpcError) as e:
782 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username))
783 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
784 assert "Missing moderation user list id." == e.value.details()
786 # Test removing user that's not in the provided moderation list (should raise error)
787 with pytest.raises(grpc.RpcError) as e:
788 api.RemoveUserFromModerationUserList(
789 admin_pb2.RemoveUserFromModerationUserListReq(
790 user=user3.username, moderation_list_id=moderation_list_id
791 )
792 )
793 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
794 assert "User is not in the moderation user list." == e.value.details()
796 # Test successful removal
797 api.RemoveUserFromModerationUserList(
798 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id)
799 )
800 with session_scope() as session:
801 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id)
802 assert user1.id not in {user.id for user in moderation_user_list.users}
803 assert user2.id in {user.id for user in moderation_user_list.users}
805 # Test list user moderation lists endpoint returns right number of moderation lists
806 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
807 assert len(listRes.moderation_lists) == 0
808 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
809 assert len(listRes2.moderation_lists) == 1
811 # Test removing all users from moderation list should also delete the moderation list
812 api.RemoveUserFromModerationUserList(
813 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id)
814 )
815 with session_scope() as session:
816 assert session.get(ModerationUserList, moderation_list_id) is None
819def test_admin_delete_account_url(db, push_collector: PushCollector):
820 super_user, super_token = generate_user(is_superuser=True)
822 user, token = generate_user()
823 user_id = user.id
825 with real_admin_session(super_token) as admin_api:
826 url = admin_api.CreateAccountDeletionLink(
827 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
828 ).account_deletion_confirm_url
830 assert push_collector.count_for_user(user_id) == 0
832 with session_scope() as session:
833 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
834 token = token_o.token
835 assert token_o.user.id == user_id
836 assert url == f"http://localhost:3000/delete-account?token={token}"
838 with mock_notification_email() as mock:
839 with auth_api_session() as (auth_api, metadata_interceptor):
840 auth_api.ConfirmDeleteAccount(
841 auth_pb2.ConfirmDeleteAccountReq(
842 token=token,
843 )
844 )
846 push = push_collector.pop_for_user(user_id, last=True)
847 assert push.content.title == "Account deleted"
848 assert push.content.body == "You can restore it within 7 days using the link we emailed you."
849 mock.assert_called_once()
850 e = email_fields(mock)
853def test_SetLastDonated(db):
854 super_user, super_token = generate_user(is_superuser=True)
855 normal_user, normal_token = generate_user(last_donated=None)
857 with real_admin_session(super_token) as api:
858 # user starts with no last_donated
859 with session_scope() as session:
860 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
861 assert user.last_donated is None
863 # can set last_donated
864 donation_time = now() - timedelta(days=30)
865 res = api.SetLastDonated(
866 admin_pb2.SetLastDonatedReq(
867 user=normal_user.username,
868 last_donated=Timestamp_from_datetime(donation_time),
869 )
870 )
872 with session_scope() as session:
873 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
874 assert user.last_donated is not None
875 # check timestamp is close (within a second)
876 assert abs((user.last_donated - donation_time).total_seconds()) < 1
878 # can clear last_donated by not setting the field
879 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username))
881 with session_scope() as session:
882 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
883 assert user.last_donated is None
885 # user not found
886 with pytest.raises(grpc.RpcError) as e:
887 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent"))
888 assert e.value.code() == grpc.StatusCode.NOT_FOUND
889 assert e.value.details() == "Couldn't find that user."
892def test_admin_actions_level(db):
893 super_user, super_token = generate_user(is_superuser=True)
894 normal_user, _ = generate_user()
896 with real_admin_session(super_token) as api:
897 # Default level is NORMAL
898 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note="normal note"))
899 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL
901 # Explicitly set to DEBUG
902 res = api.AddAdminNote(
903 admin_pb2.AddAdminNoteReq(
904 user=normal_user.username,
905 admin_note="debug note",
906 level=admin_pb2.ADMIN_ACTION_LEVEL_DEBUG,
907 )
908 )
909 assert len(res.admin_actions) == 2
910 assert res.admin_actions[1].level == admin_pb2.ADMIN_ACTION_LEVEL_DEBUG
912 # Explicitly set to HIGH
913 res = api.AddAdminNote(
914 admin_pb2.AddAdminNoteReq(
915 user=normal_user.username,
916 admin_note="high note",
917 level=admin_pb2.ADMIN_ACTION_LEVEL_HIGH,
918 )
919 )
920 assert len(res.admin_actions) == 3
921 assert res.admin_actions[2].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
924def test_admin_actions_on_mutations(db, push_collector: PushCollector):
925 super_user, super_token = generate_user(is_superuser=True)
926 normal_user, _ = generate_user()
928 with real_admin_session(super_token) as api:
929 # ChangeUserGender
930 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
931 assert any(a.action_type == "change_gender" for a in res.admin_actions)
933 # ChangeUserBirthdate
934 res = api.ChangeUserBirthdate(
935 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-01-01")
936 )
937 assert any(a.action_type == "change_birthdate" for a in res.admin_actions)
939 # DeleteUser
940 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
941 assert any(a.action_type == "delete_user" for a in res.admin_actions)
942 assert any(
943 a.action_type == "delete_user" and a.level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH for a in res.admin_actions
944 )
946 # RecoverDeletedUser
947 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
948 assert any(a.action_type == "recover_user" for a in res.admin_actions)
950 # MarkUserNeedsLocationUpdate
951 res = api.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=normal_user.username))
952 assert any(a.action_type == "mark_needs_location_update" for a in res.admin_actions)
954 # SetLastDonated
955 res = api.SetLastDonated(
956 admin_pb2.SetLastDonatedReq(
957 user=normal_user.username,
958 last_donated=Timestamp_from_datetime(now()),
959 )
960 )
961 assert any(a.action_type == "set_last_donated" for a in res.admin_actions)
964def test_create_admin_tag(db):
965 super_user, super_token = generate_user(is_superuser=True)
967 with real_admin_session(super_token) as api:
968 res = api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
969 assert res.tag == "test-tag"
970 assert res.admin_tag_id > 0
973def test_create_admin_tag_duplicate(db):
974 super_user, super_token = generate_user(is_superuser=True)
976 with real_admin_session(super_token) as api:
977 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
978 with pytest.raises(grpc.RpcError) as e:
979 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
980 assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
981 assert e.value.details() == "That admin tag already exists."
984def test_create_admin_tag_empty(db):
985 super_user, super_token = generate_user(is_superuser=True)
987 with real_admin_session(super_token) as api:
988 with pytest.raises(grpc.RpcError) as e:
989 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=""))
990 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
991 assert e.value.details() == "The admin tag cannot be empty."
993 with pytest.raises(grpc.RpcError) as e:
994 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=" "))
995 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
996 assert e.value.details() == "The admin tag cannot be empty."
999def test_list_admin_tags(db):
1000 super_user, super_token = generate_user(is_superuser=True)
1002 with real_admin_session(super_token) as api:
1003 # Empty initially
1004 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq())
1005 assert len(res.tags) == 0
1007 # Add some tags
1008 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="bravo"))
1009 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="alpha"))
1011 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq())
1012 assert len(res.tags) == 2
1013 # Ordered alphabetically
1014 assert res.tags[0].tag == "alpha"
1015 assert res.tags[1].tag == "bravo"
1018def test_add_admin_tag_to_user(db):
1019 super_user, super_token = generate_user(is_superuser=True)
1020 normal_user, _ = generate_user()
1022 with real_admin_session(super_token) as api:
1023 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1025 res = api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1026 assert "vip" in res.admin_tags
1027 assert any(a.action_type == "add_tag" and a.tag == "vip" for a in res.admin_actions)
1030def test_add_admin_tag_to_user_duplicate(db):
1031 super_user, super_token = generate_user(is_superuser=True)
1032 normal_user, _ = generate_user()
1034 with real_admin_session(super_token) as api:
1035 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1036 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1038 with pytest.raises(grpc.RpcError) as e:
1039 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1040 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1041 assert e.value.details() == "The user already has that admin tag."
1044def test_add_admin_tag_to_user_tag_not_found(db):
1045 super_user, super_token = generate_user(is_superuser=True)
1046 normal_user, _ = generate_user()
1048 with real_admin_session(super_token) as api:
1049 with pytest.raises(grpc.RpcError) as e:
1050 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="nonexistent"))
1051 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1052 assert e.value.details() == "Admin tag not found."
1055def test_remove_admin_tag_from_user(db):
1056 super_user, super_token = generate_user(is_superuser=True)
1057 normal_user, _ = generate_user()
1059 with real_admin_session(super_token) as api:
1060 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1061 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1063 res = api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip"))
1064 assert "vip" not in res.admin_tags
1065 assert any(a.action_type == "remove_tag" and a.tag == "vip" for a in res.admin_actions)
1068def test_remove_admin_tag_from_user_not_assigned(db):
1069 super_user, super_token = generate_user(is_superuser=True)
1070 normal_user, _ = generate_user()
1072 with real_admin_session(super_token) as api:
1073 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1075 with pytest.raises(grpc.RpcError) as e:
1076 api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip"))
1077 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1078 assert e.value.details() == "The user does not have that admin tag."
1081def test_search_users_by_admin_tag(db):
1082 super_user, super_token = generate_user(is_superuser=True)
1083 user1, _ = generate_user()
1084 user2, _ = generate_user()
1085 user3, _ = generate_user()
1087 with real_admin_session(super_token) as api:
1088 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1089 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="flagged"))
1091 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user1.username, tag="vip"))
1092 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="vip"))
1093 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="flagged"))
1095 # Search for users with "vip" tag
1096 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip"]))
1097 user_ids = {u.user_id for u in res.users}
1098 assert user1.id in user_ids
1099 assert user2.id in user_ids
1100 assert user3.id not in user_ids
1102 # Search for users with both "vip" AND "flagged" tags (AND logic)
1103 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip", "flagged"]))
1104 user_ids = {u.user_id for u in res.users}
1105 assert user2.id in user_ids
1106 assert user1.id not in user_ids
1108 # Search for non-existent tag returns no results
1109 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["nonexistent"]))
1110 assert len(res.users) == 0
1113def test_search_users_by_admin_note(db):
1114 super_user, super_token = generate_user(is_superuser=True)
1115 user1, _ = generate_user()
1116 user2, _ = generate_user()
1118 with real_admin_session(super_token) as api:
1119 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="suspicious activity"))
1120 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="normal user"))
1122 # Search by admin action log content (ilike)
1123 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_action_log="%suspicious%"))
1124 user_ids = {u.user_id for u in res.users}
1125 assert user1.id in user_ids
1126 assert user2.id not in user_ids
1129# community invite feature tested in test_events.py
1130# SendBlogPostNotification tested in test_notifications.py
1131# MarkUserNeedsLocationUpdate tested in test_jail.py