Coverage for app / backend / src / tests / test_admin.py: 100%
749 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1from datetime import date, datetime, timedelta
3import grpc
4import pytest
5from sqlalchemy import select
6from sqlalchemy.sql import func
8from couchers.db import session_scope
9from couchers.models import (
10 AccountDeletionToken,
11 ContentReport,
12 EventOccurrence,
13 FriendRelationship,
14 FriendStatus,
15 ModerationObjectType,
16 ModerationState,
17 ModerationUserList,
18 ModerationVisibility,
19 Reference,
20 User,
21 UserActivity,
22 UserSession,
23)
24from couchers.proto import account_pb2, admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2
25from couchers.utils import Timestamp_from_datetime, now, parse_date
26from tests.fixtures.db import add_users_to_new_moderation_list, generate_user, make_friends
27from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email
28from tests.fixtures.sessions import (
29 account_session,
30 auth_api_session,
31 events_session,
32 real_admin_session,
33 references_session,
34 reporting_session,
35)
36from tests.test_communities import create_community
39@pytest.fixture(autouse=True)
40def _(testconfig):
41 pass
44def test_access_by_normal_user(db):
45 normal_user, normal_token = generate_user()
47 with real_admin_session(normal_token) as api:
48 # all requests to the admin servicer should break when done by a non-super_user
49 with pytest.raises(grpc.RpcError) as e:
50 api.GetUserDetails(
51 admin_pb2.GetUserDetailsReq(
52 user=str(normal_user.id),
53 )
54 )
55 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
58def test_GetUser(db):
59 super_user, super_token = generate_user(is_superuser=True)
60 normal_user, normal_token = generate_user()
62 with real_admin_session(super_token) as api:
63 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
64 assert res.user_id == normal_user.id
65 assert res.username == normal_user.username
67 with real_admin_session(super_token) as api:
68 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
70 with real_admin_session(super_token) as api:
71 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
72 assert res.user_id == normal_user.id
73 assert res.username == normal_user.username
76def test_GetUserDetails(db):
77 super_user, super_token = generate_user(is_superuser=True)
78 normal_user, normal_token = generate_user()
80 with real_admin_session(super_token) as api:
81 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
82 assert res.user_id == normal_user.id
83 assert res.username == normal_user.username
84 assert res.email == normal_user.email
85 assert res.gender == normal_user.gender
86 assert parse_date(res.birthdate) == normal_user.birthdate
87 assert not res.banned
88 assert not res.deleted
90 with real_admin_session(super_token) as api:
91 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
92 assert res.user_id == normal_user.id
93 assert res.username == normal_user.username
94 assert res.email == normal_user.email
95 assert res.gender == normal_user.gender
96 assert parse_date(res.birthdate) == normal_user.birthdate
97 assert not res.banned
98 assert not res.deleted
100 with real_admin_session(super_token) as api:
101 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
102 assert res.user_id == normal_user.id
103 assert res.username == normal_user.username
104 assert res.email == normal_user.email
105 assert res.gender == normal_user.gender
106 assert parse_date(res.birthdate) == normal_user.birthdate
107 assert not res.banned
108 assert not res.deleted
111def test_ChangeUserGender(db, push_collector: PushCollector):
112 super_user, super_token = generate_user(is_superuser=True)
113 normal_user, normal_token = generate_user()
115 with real_admin_session(super_token) as api:
116 with mock_notification_email() as mock:
117 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
118 assert res.user_id == normal_user.id
119 assert res.username == normal_user.username
120 assert res.email == normal_user.email
121 assert res.gender == "Machine"
122 assert parse_date(res.birthdate) == normal_user.birthdate
123 assert not res.banned
124 assert not res.deleted
126 mock.assert_called_once()
127 e = email_fields(mock)
128 assert e.subject == "[TEST] Your gender was changed"
129 assert e.recipient == normal_user.email
130 assert "Machine" in e.plain
131 assert "Machine" in e.html
133 push = push_collector.pop_for_user(normal_user.id, last=True)
134 assert push.content.title == "Gender changed"
135 assert push.content.body == "An admin changed your gender to Machine."
138def test_ChangeUserBirthdate(db, push_collector: PushCollector):
139 super_user, super_token = generate_user(is_superuser=True)
140 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
142 with real_admin_session(super_token) as api:
143 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
144 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
146 with mock_notification_email() as mock:
147 res = api.ChangeUserBirthdate(
148 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
149 )
151 assert res.user_id == normal_user.id
152 assert res.username == normal_user.username
153 assert res.email == normal_user.email
154 assert res.birthdate == "1990-05-25"
155 assert res.gender == normal_user.gender
156 assert not res.banned
157 assert not res.deleted
159 mock.assert_called_once()
160 e = email_fields(mock)
161 assert e.subject == "[TEST] Your date of birth was changed"
162 assert e.recipient == normal_user.email
163 assert "1990" in e.plain
164 assert "1990" in e.html
166 push = push_collector.pop_for_user(normal_user.id, last=True)
167 assert push.content.title == "Birthdate changed"
168 assert push.content.body == "An admin changed your date of birth to May 25, 1990."
171def test_BanUser(db):
172 super_user, super_token = generate_user(is_superuser=True)
173 normal_user, _ = generate_user()
174 admin_note = "A good reason"
176 with real_admin_session(super_token) as api:
177 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
178 assert res.user_id == normal_user.id
179 assert res.username == normal_user.username
180 assert res.email == normal_user.email
181 assert res.gender == normal_user.gender
182 assert parse_date(res.birthdate) == normal_user.birthdate
183 assert res.banned
184 assert not res.deleted
185 assert len(res.admin_actions) == 1
186 assert res.admin_actions[0].action_type == "ban"
187 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
188 assert res.admin_actions[0].note == admin_note
189 assert res.admin_actions[0].admin_user_id == super_user.id
190 assert res.admin_actions[0].admin_username == super_user.username
193def test_UnbanUser(db):
194 super_user, super_token = generate_user(is_superuser=True)
195 normal_user, _ = generate_user()
196 admin_note = "A good reason"
198 with real_admin_session(super_token) as api:
199 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
200 assert res.user_id == normal_user.id
201 assert res.username == normal_user.username
202 assert res.email == normal_user.email
203 assert res.gender == normal_user.gender
204 assert parse_date(res.birthdate) == normal_user.birthdate
205 assert not res.banned
206 assert not res.deleted
207 assert len(res.admin_actions) == 1
208 assert res.admin_actions[0].action_type == "unban"
209 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
212def test_AddAdminNote(db):
213 super_user, super_token = generate_user(is_superuser=True)
214 normal_user, _ = generate_user()
215 admin_note1 = "User reported strange behavior"
216 admin_note2 = "Insert private information here"
218 with real_admin_session(super_token) as api:
219 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
220 assert res.user_id == normal_user.id
221 assert res.username == normal_user.username
222 assert res.email == normal_user.email
223 assert res.gender == normal_user.gender
224 assert parse_date(res.birthdate) == normal_user.birthdate
225 assert not res.banned
226 assert not res.deleted
227 assert len(res.admin_actions) == 1
228 assert res.admin_actions[0].action_type == "note"
229 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL
230 assert res.admin_actions[0].note == admin_note1
232 with real_admin_session(super_token) as api:
233 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
234 assert len(res.admin_actions) == 2
235 assert res.admin_actions[0].note == admin_note1
236 assert res.admin_actions[1].note == admin_note2
239def test_AddAdminNote_blank(db):
240 super_user, super_token = generate_user(is_superuser=True)
241 normal_user, _ = generate_user()
242 empty_admin_note = " \t \n "
244 with real_admin_session(super_token) as api:
245 with pytest.raises(grpc.RpcError) as e:
246 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
247 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
248 assert e.value.details() == "The admin note cannot be empty."
251def test_admin_content_reports(db):
252 super_user, super_token = generate_user(is_superuser=True)
253 normal_user, token = generate_user()
254 bad_user1, _ = generate_user()
255 bad_user2, _ = generate_user()
257 with reporting_session(token) as api:
258 api.Report(
259 reporting_pb2.ReportReq(
260 reason="spam",
261 description="r1",
262 content_ref="comment/123",
263 author_user=bad_user1.username,
264 user_agent="n/a",
265 page="https://couchers.org/comment/123",
266 )
267 )
268 api.Report(
269 reporting_pb2.ReportReq(
270 reason="spam",
271 description="r2",
272 content_ref="comment/124",
273 author_user=bad_user2.username,
274 user_agent="n/a",
275 page="https://couchers.org/comment/124",
276 )
277 )
278 api.Report(
279 reporting_pb2.ReportReq(
280 reason="something else",
281 description="r3",
282 content_ref="page/321",
283 author_user=bad_user1.username,
284 user_agent="n/a",
285 page="https://couchers.org/page/321",
286 )
287 )
289 with session_scope() as session:
290 id_by_description: dict[str, int] = dict(
291 session.execute(select(ContentReport.description, ContentReport.id)).all() # type: ignore[arg-type]
292 )
294 with real_admin_session(super_token) as api:
295 with pytest.raises(grpc.RpcError) as e:
296 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
297 assert e.value.code() == grpc.StatusCode.NOT_FOUND
298 assert e.value.details() == "Content report not found."
300 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
301 rep = res.content_report
302 assert rep.content_report_id == id_by_description["r2"]
303 assert rep.reporting_user_id == normal_user.id
304 assert rep.author_user_id == bad_user2.id
305 assert rep.reason == "spam"
306 assert rep.description == "r2"
307 assert rep.content_ref == "comment/124"
308 assert rep.user_agent == "n/a"
309 assert rep.page == "https://couchers.org/comment/124"
311 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
312 assert res.content_reports[0].content_report_id == id_by_description["r3"]
313 assert res.content_reports[1].content_report_id == id_by_description["r1"]
316def test_DeleteUser(db):
317 super_user, super_token = generate_user(is_superuser=True)
318 normal_user, normal_token = generate_user()
320 with real_admin_session(super_token) as api:
321 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
322 assert res.user_id == normal_user.id
323 assert res.username == normal_user.username
324 assert res.email == normal_user.email
325 assert res.gender == normal_user.gender
326 assert parse_date(res.birthdate) == normal_user.birthdate
327 assert not res.banned
328 assert res.deleted
330 with real_admin_session(super_token) as api:
331 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
332 assert res.user_id == normal_user.id
333 assert res.username == normal_user.username
334 assert res.email == normal_user.email
335 assert res.gender == normal_user.gender
336 assert parse_date(res.birthdate) == normal_user.birthdate
337 assert not res.banned
338 assert not res.deleted
341def test_RecoverDeletedUser_after_user_initiated_deletion(db, push_collector: PushCollector):
342 """
343 When a user deletes their account through the normal flow (ConfirmDeleteAccount),
344 undelete_token and undelete_until are set. The admin RecoverDeletedUser must clear
345 these fields to satisfy the undelete_nullity database constraint.
346 """
347 super_user, super_token = generate_user(is_superuser=True)
348 normal_user, normal_token = generate_user()
349 user_id = normal_user.id
351 # User initiates account deletion
352 with account_session(normal_token) as account:
353 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
355 # Get the deletion confirmation token
356 with session_scope() as session:
357 deletion_token = session.execute(select(AccountDeletionToken)).scalar_one().token
359 # User confirms account deletion (this sets undelete_token and undelete_until)
360 with auth_api_session() as (auth_api, metadata_interceptor):
361 auth_api.ConfirmDeleteAccount(auth_pb2.ConfirmDeleteAccountReq(token=deletion_token))
363 # Verify the user is deleted and has undelete fields set
364 with session_scope() as session:
365 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
366 assert user.deleted_at is not None
367 assert user.undelete_token is not None
368 assert user.undelete_until is not None
370 # Admin recovers the user
371 with real_admin_session(super_token) as api:
372 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
373 assert res.user_id == user_id
374 assert not res.deleted
376 # Verify undelete fields are cleared
377 with session_scope() as session:
378 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
379 assert user.deleted_at is None
380 assert user.undelete_token is None
381 assert user.undelete_until is None
384def test_CreateApiKey(db, push_collector: PushCollector):
385 with session_scope() as session:
386 super_user, super_token = generate_user(is_superuser=True)
387 normal_user, normal_token = generate_user()
389 assert (
390 session.execute(
391 select(func.count())
392 .select_from(UserSession)
393 .where(UserSession.is_api_key == True)
394 .where(UserSession.user_id == normal_user.id)
395 ).scalar_one()
396 == 0
397 )
399 with mock_notification_email() as mock:
400 with real_admin_session(super_token) as api:
401 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
403 mock.assert_called_once()
404 e = email_fields(mock)
405 assert e.subject == "[TEST] Your API key for Couchers.org"
407 with session_scope() as session:
408 token = session.execute(
409 select(UserSession.token)
410 .where(UserSession.is_valid)
411 .where(UserSession.is_api_key == True)
412 .where(UserSession.user_id == normal_user.id)
413 ).scalar_one()
415 assert token in e.plain
416 assert token in e.html
418 assert e.recipient == normal_user.email
419 assert "api key" in e.subject.lower()
420 unique_string = "We've issued you with the following API key:"
421 assert unique_string in e.plain
422 assert unique_string in e.html
423 assert "support@couchers.org" in e.plain
424 assert "support@couchers.org" in e.html
426 push = push_collector.pop_for_user(normal_user.id, last=True)
427 assert push.content.title == "API key created"
428 assert push.content.body == "Details were sent to you via email."
431def test_GetChats(db):
432 super_user, super_token = generate_user(is_superuser=True)
433 normal_user, normal_token = generate_user()
435 with real_admin_session(super_token) as api:
436 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
437 assert res.user.user_id == normal_user.id
438 assert res.user.username == normal_user.username
439 assert res.user.name == normal_user.name
440 # New user should have no chats
441 assert len(res.host_requests) == 0
442 assert len(res.group_chats) == 0
445def test_badges(db, push_collector: PushCollector):
446 super_user, super_token = generate_user(is_superuser=True)
447 normal_user, normal_token = generate_user()
449 with real_admin_session(super_token) as api:
450 # can add a badge
451 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
452 with mock_notification_email() as mock:
453 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
454 assert "swagster" in res.badges
456 # badge emails are disabled by default
457 mock.assert_not_called()
459 push = push_collector.pop_for_user(normal_user.id, last=True)
460 assert push.content.title == "New profile badge: Swagster"
461 assert push.content.body == "The Swagster badge was added to your profile."
463 # can't add/edit special tags
464 with pytest.raises(grpc.RpcError) as e:
465 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
466 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
467 assert e.value.details() == "Admins cannot edit that badge."
469 # double add badge
470 with pytest.raises(grpc.RpcError) as e:
471 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
472 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
473 assert e.value.details() == "The user already has that badge."
475 # can remove badge
476 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
477 with mock_notification_email() as mock:
478 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
479 assert "swagster" not in res.badges
481 # badge emails are disabled by default
482 mock.assert_not_called()
484 push = push_collector.pop_for_user(normal_user.id, last=True)
485 assert push.content.title == "Profile badge removed"
486 assert push.content.body == "The Swagster badge was removed from your profile."
488 # not found on user
489 with pytest.raises(grpc.RpcError) as e:
490 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
491 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
492 assert e.value.details() == "The user does not have that badge."
494 # not found in general
495 with pytest.raises(grpc.RpcError) as e:
496 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
497 assert e.value.code() == grpc.StatusCode.NOT_FOUND
498 assert e.value.details() == "Badge not found."
501def test_DeleteEvent(db):
502 super_user, super_token = generate_user(is_superuser=True)
503 normal_user, normal_token = generate_user()
505 with session_scope() as session:
506 create_community(session, 0, 2, "Community", [normal_user], [], None)
508 start_time = now() + timedelta(hours=2)
509 end_time = start_time + timedelta(hours=3)
510 with events_session(normal_token) as api:
511 res = api.CreateEvent(
512 events_pb2.CreateEventReq(
513 title="Dummy Title",
514 content="Dummy content.",
515 photo_key=None,
516 offline_information=events_pb2.OfflineEventInformation(
517 address="Near Null Island",
518 lat=0.1,
519 lng=0.2,
520 ),
521 start_time=Timestamp_from_datetime(start_time),
522 end_time=Timestamp_from_datetime(end_time),
523 timezone="UTC",
524 )
525 )
526 event_id = res.event_id
527 assert not res.is_deleted
529 with session_scope() as session:
530 with real_admin_session(super_token) as api:
531 api.DeleteEvent(
532 admin_pb2.DeleteEventReq(
533 event_id=event_id,
534 )
535 )
536 occurrence = session.get_one(EventOccurrence, ident=event_id)
537 assert occurrence.is_deleted
540def test_ListUserIds(db):
541 super_user, super_token = generate_user(is_superuser=True)
542 normal_user, normal_token = generate_user()
544 with real_admin_session(super_token) as api:
545 res = api.ListUserIds(
546 admin_pb2.ListUserIdsReq(
547 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
548 )
549 )
550 assert len(res.user_ids) == 2
551 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
553 with real_admin_session(super_token) as api:
554 res = api.ListUserIds(
555 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
556 )
557 assert res.user_ids == []
560def test_EditReferenceText(db):
561 super_user, super_token = generate_user(is_superuser=True)
562 test_new_text = "New Text"
564 user1, user1_token = generate_user()
565 user2, user2_token = generate_user()
566 make_friends(user1, user2)
568 with session_scope() as session:
569 with references_session(user1_token) as api:
570 reference = api.WriteFriendReference(
571 references_pb2.WriteFriendReferenceReq(
572 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
573 )
574 )
576 with real_admin_session(super_token) as admin_api:
577 admin_api.EditReferenceText(
578 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
579 )
581 session.expire_all()
583 modified_reference = session.execute(
584 select(Reference).where(Reference.id == reference.reference_id)
585 ).scalar_one()
586 assert modified_reference.text == test_new_text
589def test_DeleteReference(db):
590 super_user, super_token = generate_user(is_superuser=True)
592 user1, user1_token = generate_user()
593 user2, user2_token = generate_user()
594 make_friends(user1, user2)
596 with references_session(user1_token) as api:
597 reference = api.WriteFriendReference(
598 references_pb2.WriteFriendReferenceReq(
599 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
600 )
601 )
603 with references_session(user1_token) as api:
604 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
606 with real_admin_session(super_token) as admin_api:
607 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
609 with references_session(user1_token) as api:
610 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
612 with session_scope() as session:
613 modified_reference = session.execute(
614 select(Reference).where(Reference.id == reference.reference_id)
615 ).scalar_one()
616 assert modified_reference.is_deleted
619def test_GetUserReferences(db):
620 super_user, super_token = generate_user(is_superuser=True)
622 user1, user1_token = generate_user()
623 user2, user2_token = generate_user()
624 user3, user3_token = generate_user()
625 make_friends(user1, user2)
626 make_friends(user1, user3)
627 make_friends(user2, user3)
629 # user1 writes reference about user2
630 with references_session(user1_token) as api:
631 ref1 = api.WriteFriendReference(
632 references_pb2.WriteFriendReferenceReq(
633 to_user_id=user2.id,
634 text="Reference from user1 to user2",
635 private_text="",
636 was_appropriate=True,
637 rating=1,
638 )
639 )
641 # user2 writes reference about user1
642 with references_session(user2_token) as api:
643 ref2 = api.WriteFriendReference(
644 references_pb2.WriteFriendReferenceReq(
645 to_user_id=user1.id,
646 text="Reference from user2 to user1",
647 private_text="Private note",
648 was_appropriate=True,
649 rating=0.8,
650 )
651 )
653 # user3 writes reference about user1
654 with references_session(user3_token) as api:
655 ref3 = api.WriteFriendReference(
656 references_pb2.WriteFriendReferenceReq(
657 to_user_id=user1.id,
658 text="Reference from user3 to user1",
659 private_text="",
660 was_appropriate=False,
661 rating=0.5,
662 )
663 )
665 # Delete ref3
666 with real_admin_session(super_token) as admin_api:
667 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=ref3.reference_id))
669 # Test GetUserReferences for user1
670 with real_admin_session(super_token) as admin_api:
671 res = admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user=user1.username))
673 # user1 wrote 1 reference
674 assert len(res.references_from) == 1
675 assert res.references_from[0].reference_id == ref1.reference_id
676 assert res.references_from[0].from_user_id == user1.id
677 assert res.references_from[0].to_user_id == user2.id
678 assert res.references_from[0].text == "Reference from user1 to user2"
679 assert res.references_from[0].is_deleted is False
681 # user1 received 2 references (including the deleted one)
682 assert len(res.references_to) == 2
683 # Ordered by id descending, so ref3 comes first
684 assert res.references_to[0].reference_id == ref3.reference_id
685 assert res.references_to[0].is_deleted is True
686 assert res.references_to[0].was_appropriate is False
688 assert res.references_to[1].reference_id == ref2.reference_id
689 assert res.references_to[1].private_text == "Private note"
690 assert res.references_to[1].rating == 0.8
691 assert res.references_to[1].is_deleted is False
694def test_GetUserReferences_not_found(db):
695 super_user, super_token = generate_user(is_superuser=True)
697 with real_admin_session(super_token) as admin_api:
698 with pytest.raises(grpc.RpcError) as e:
699 admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user="nonexistent"))
700 assert e.value.code() == grpc.StatusCode.NOT_FOUND
703def test_GetFriendRequests(db):
704 super_user, super_token = generate_user(is_superuser=True)
706 user1, _ = generate_user()
707 user2, _ = generate_user()
708 user3, _ = generate_user()
709 user4, _ = generate_user()
711 # Create a mix of friend requests directly so we control the state
712 def _add_friend_request(from_user_id, to_user_id, status, visibility, time_responded=None):
713 with session_scope() as session:
714 mod_state = ModerationState(
715 object_type=ModerationObjectType.friend_request,
716 object_id=0,
717 visibility=visibility,
718 )
719 session.add(mod_state)
720 session.flush()
721 rel = FriendRelationship(
722 from_user_id=from_user_id,
723 to_user_id=to_user_id,
724 status=status,
725 moderation_state_id=mod_state.id,
726 time_responded=time_responded,
727 )
728 session.add(rel)
729 session.flush()
730 mod_state.object_id = rel.id
732 # user1 -> user2: pending, shadowed
733 _add_friend_request(user1.id, user2.id, FriendStatus.pending, ModerationVisibility.shadowed)
734 # user1 -> user3: accepted, visible
735 _add_friend_request(user1.id, user3.id, FriendStatus.accepted, ModerationVisibility.visible, time_responded=now())
736 # user4 -> user1: rejected, visible
737 _add_friend_request(user4.id, user1.id, FriendStatus.rejected, ModerationVisibility.visible, time_responded=now())
739 with real_admin_session(super_token) as admin_api:
740 res = admin_api.GetFriendRequests(admin_pb2.GetFriendRequestsReq(user=user1.username))
742 # user1 sent two: to user2 (pending) and to user3 (accepted), ordered by id desc
743 assert len(res.sent) == 2
744 assert res.sent[0].from_user.user_id == user1.id
745 assert res.sent[0].to_user.user_id == user3.id
746 assert res.sent[0].status == "accepted"
747 assert res.sent[0].HasField("time_responded")
748 assert res.sent[0].moderation_visibility == "visible"
750 assert res.sent[1].from_user.user_id == user1.id
751 assert res.sent[1].to_user.user_id == user2.id
752 assert res.sent[1].status == "pending"
753 assert not res.sent[1].HasField("time_responded")
754 assert res.sent[1].moderation_visibility == "shadowed"
756 # user1 received one: from user4 (rejected)
757 assert len(res.received) == 1
758 assert res.received[0].from_user.user_id == user4.id
759 assert res.received[0].to_user.user_id == user1.id
760 assert res.received[0].status == "rejected"
763def test_GetFriendRequests_not_found(db):
764 super_user, super_token = generate_user(is_superuser=True)
766 with real_admin_session(super_token) as admin_api:
767 with pytest.raises(grpc.RpcError) as e:
768 admin_api.GetFriendRequests(admin_pb2.GetFriendRequestsReq(user="nonexistent"))
769 assert e.value.code() == grpc.StatusCode.NOT_FOUND
772def test_AddUsersToModerationUserList(db):
773 super_user, super_token = generate_user(is_superuser=True)
774 user1, _ = generate_user()
775 user2, _ = generate_user()
776 user3, _ = generate_user()
777 user4, _ = generate_user()
778 user5, _ = generate_user()
779 moderation_list_id = add_users_to_new_moderation_list([user1])
781 with session_scope() as session:
782 with real_admin_session(super_token) as api:
783 # Test adding users to a non-existent moderation list (should raise an error)
784 with pytest.raises(grpc.RpcError) as e:
785 api.AddUsersToModerationUserList(
786 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999),
787 )
788 assert e.value.code() == grpc.StatusCode.NOT_FOUND
789 assert "Moderation user list not found." == e.value.details()
791 # Test with non-existent user (should raise an error)
792 with pytest.raises(grpc.RpcError) as e:
793 api.AddUsersToModerationUserList(
794 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]),
795 )
796 assert e.value.code() == grpc.StatusCode.NOT_FOUND
797 assert "Couldn't find that user." == e.value.details()
799 # Test successful creation of new moderation list (no moderation_list_id provided)
800 res = api.AddUsersToModerationUserList(
801 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]),
802 )
803 assert res.moderation_list_id > 0
804 with session_scope() as session:
805 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id)
806 assert moderation_user_list is not None
807 assert len(moderation_user_list.users) == 3
808 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users})
810 # Test list endpoint returns same moderation list with same members not repeated
811 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
812 assert len(listRes.moderation_lists) == 1
813 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id
814 assert len(listRes.moderation_lists[0].members) == 3
815 assert {user1.id, user2.id, user3.id}.issubset({m.user_id for m in listRes.moderation_lists[0].members})
817 # Test user can be in multiple moderation lists
818 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
819 assert len(listRes3.moderation_lists) == 2
821 # Test adding users to an existing moderation list
822 res2 = api.AddUsersToModerationUserList(
823 admin_pb2.AddUsersToModerationUserListReq(
824 users=[user4.username, user5.username], moderation_list_id=moderation_list_id
825 ),
826 )
827 assert res2.moderation_list_id == moderation_list_id
828 with session_scope() as session:
829 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id)
830 assert len(moderation_user_list.users) == 3
831 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users})
833 # Test list user moderation lists endpoint returns the right moderation list
834 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username))
835 assert len(listRes2.moderation_lists) == 1
836 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id
837 assert len(listRes2.moderation_lists[0].members) == 3
838 assert {user1.id, user4.id, user5.id}.issubset({m.user_id for m in listRes2.moderation_lists[0].members})
841def test_RemoveUserFromModerationUserList(db):
842 super_user, super_token = generate_user(is_superuser=True)
843 user1, _ = generate_user()
844 user2, _ = generate_user()
845 user3, _ = generate_user()
846 moderation_list_id = add_users_to_new_moderation_list([user1, user2])
848 with real_admin_session(super_token) as api:
849 # Test with non-existent user (should raise error)
850 with pytest.raises(grpc.RpcError) as e:
851 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent"))
852 assert e.value.code() == grpc.StatusCode.NOT_FOUND
853 assert "Couldn't find that user." == e.value.details()
855 # Test without providing moderation list id (should raise error)
856 with pytest.raises(grpc.RpcError) as e:
857 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username))
858 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
859 assert "Missing moderation user list id." == e.value.details()
861 # Test removing user that's not in the provided moderation list (should raise error)
862 with pytest.raises(grpc.RpcError) as e:
863 api.RemoveUserFromModerationUserList(
864 admin_pb2.RemoveUserFromModerationUserListReq(
865 user=user3.username, moderation_list_id=moderation_list_id
866 )
867 )
868 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
869 assert "User is not in the moderation user list." == e.value.details()
871 # Test successful removal
872 api.RemoveUserFromModerationUserList(
873 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id)
874 )
875 with session_scope() as session:
876 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id)
877 assert user1.id not in {user.id for user in moderation_user_list.users}
878 assert user2.id in {user.id for user in moderation_user_list.users}
880 # Test list user moderation lists endpoint returns right number of moderation lists
881 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
882 assert len(listRes.moderation_lists) == 0
883 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
884 assert len(listRes2.moderation_lists) == 1
886 # Test removing all users from moderation list should also delete the moderation list
887 api.RemoveUserFromModerationUserList(
888 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id)
889 )
890 with session_scope() as session:
891 assert session.get(ModerationUserList, moderation_list_id) is None
894def test_admin_delete_account_url(db, push_collector: PushCollector):
895 super_user, super_token = generate_user(is_superuser=True)
897 user, token = generate_user()
898 user_id = user.id
900 with real_admin_session(super_token) as admin_api:
901 url = admin_api.CreateAccountDeletionLink(
902 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
903 ).account_deletion_confirm_url
905 assert push_collector.count_for_user(user_id) == 0
907 with session_scope() as session:
908 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
909 token = token_o.token
910 assert token_o.user.id == user_id
911 assert url == f"http://localhost:3000/delete-account?token={token}"
913 with mock_notification_email() as mock:
914 with auth_api_session() as (auth_api, metadata_interceptor):
915 auth_api.ConfirmDeleteAccount(
916 auth_pb2.ConfirmDeleteAccountReq(
917 token=token,
918 )
919 )
921 push = push_collector.pop_for_user(user_id, last=True)
922 assert push.content.title == "Account deleted"
923 assert push.content.body == "You can restore it within 7 days using the link we emailed you."
924 mock.assert_called_once()
925 e = email_fields(mock)
928def test_AccessStats(db):
929 super_user, super_token = generate_user(is_superuser=True)
930 normal_user, normal_token = generate_user()
932 # Insert UserActivity rows: a couple inside the default 90-day window, one well
933 # outside it, and one with NULL ip_address / user_agent. The INET column is
934 # returned by psycopg3 as an IPv4Address/IPv6Address object, which used to
935 # crash the proto string assignment.
936 in_window_1 = now() - timedelta(days=1)
937 in_window_2 = now() - timedelta(days=10)
938 out_of_window = now() - timedelta(days=200)
939 with session_scope() as session:
940 session.add(
941 UserActivity(
942 user_id=normal_user.id, period=in_window_1, ip_address="1.2.3.4", user_agent="ua-a", api_calls=5
943 )
944 )
945 session.add(
946 UserActivity(
947 user_id=normal_user.id, period=in_window_2, ip_address="2001:db8::1", user_agent="ua-b", api_calls=3
948 )
949 )
950 session.add(
951 UserActivity(
952 user_id=normal_user.id, period=out_of_window, ip_address="9.9.9.9", user_agent="ua-old", api_calls=99
953 )
954 )
955 session.add(UserActivity(user_id=normal_user.id, period=in_window_1, api_calls=1))
957 with real_admin_session(super_token) as api:
958 res = api.AccessStats(admin_pb2.AccessStatsReq(user=normal_user.username))
960 by_ip = {s.ip_address: s for s in res.stats}
961 assert "1.2.3.4" in by_ip
962 assert by_ip["1.2.3.4"].api_call_count == 5
963 assert by_ip["1.2.3.4"].user_agent == "ua-a"
964 assert "2001:db8::1" in by_ip
965 assert by_ip["2001:db8::1"].api_call_count == 3
966 # NULL ip_address row produces an empty-string ip_address in the proto
967 assert "" in by_ip
968 assert by_ip[""].api_call_count == 1
969 # out-of-window row is excluded by the 90-day default
970 assert "9.9.9.9" not in by_ip
972 # explicit end_time should bound the upper end of the window (regression: was >=)
973 with real_admin_session(super_token) as api:
974 res = api.AccessStats(
975 admin_pb2.AccessStatsReq(
976 user=normal_user.username,
977 start_time=Timestamp_from_datetime(now() - timedelta(days=5)),
978 end_time=Timestamp_from_datetime(now()),
979 )
980 )
981 ips = {s.ip_address for s in res.stats}
982 assert ips == {"1.2.3.4", ""}
985def test_SetLastDonated(db):
986 super_user, super_token = generate_user(is_superuser=True)
987 normal_user, normal_token = generate_user(last_donated=None)
989 with real_admin_session(super_token) as api:
990 # user starts with no last_donated
991 with session_scope() as session:
992 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
993 assert user.last_donated is None
995 # can set last_donated
996 donation_time = now() - timedelta(days=30)
997 res = api.SetLastDonated(
998 admin_pb2.SetLastDonatedReq(
999 user=normal_user.username,
1000 last_donated=Timestamp_from_datetime(donation_time),
1001 )
1002 )
1004 with session_scope() as session:
1005 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
1006 assert user.last_donated is not None
1007 # check timestamp is close (within a second)
1008 assert abs((user.last_donated - donation_time).total_seconds()) < 1
1010 # can clear last_donated by not setting the field
1011 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username))
1013 with session_scope() as session:
1014 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
1015 assert user.last_donated is None
1017 # user not found
1018 with pytest.raises(grpc.RpcError) as e:
1019 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent"))
1020 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1021 assert e.value.details() == "Couldn't find that user."
1024def test_admin_actions_level(db):
1025 super_user, super_token = generate_user(is_superuser=True)
1026 normal_user, _ = generate_user()
1028 with real_admin_session(super_token) as api:
1029 # Default level is NORMAL
1030 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note="normal note"))
1031 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL
1033 # Explicitly set to DEBUG
1034 res = api.AddAdminNote(
1035 admin_pb2.AddAdminNoteReq(
1036 user=normal_user.username,
1037 admin_note="debug note",
1038 level=admin_pb2.ADMIN_ACTION_LEVEL_DEBUG,
1039 )
1040 )
1041 assert len(res.admin_actions) == 2
1042 assert res.admin_actions[1].level == admin_pb2.ADMIN_ACTION_LEVEL_DEBUG
1044 # Explicitly set to HIGH
1045 res = api.AddAdminNote(
1046 admin_pb2.AddAdminNoteReq(
1047 user=normal_user.username,
1048 admin_note="high note",
1049 level=admin_pb2.ADMIN_ACTION_LEVEL_HIGH,
1050 )
1051 )
1052 assert len(res.admin_actions) == 3
1053 assert res.admin_actions[2].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
1056def test_admin_actions_on_mutations(db, push_collector: PushCollector):
1057 super_user, super_token = generate_user(is_superuser=True)
1058 normal_user, _ = generate_user()
1060 original_gender = normal_user.gender
1061 original_birthdate = normal_user.birthdate
1063 with real_admin_session(super_token) as api:
1064 # ChangeUserGender
1065 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
1066 assert any(
1067 a.action_type == "change_gender" and a.note == f"Changed from '{original_gender}' to 'Machine'"
1068 for a in res.admin_actions
1069 )
1071 # ChangeUserBirthdate
1072 res = api.ChangeUserBirthdate(
1073 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-01-01")
1074 )
1075 assert any(
1076 a.action_type == "change_birthdate" and a.note == f"Changed from {original_birthdate} to 1990-01-01"
1077 for a in res.admin_actions
1078 )
1080 # SetPassportSexGenderException
1081 res = api.SetPassportSexGenderException(
1082 admin_pb2.SetPassportSexGenderExceptionReq(user=normal_user.username, passport_sex_gender_exception=True)
1083 )
1084 assert any(
1085 a.action_type == "set_passport_sex_gender_exception" and a.note == "Changed from False to True"
1086 for a in res.admin_actions
1087 )
1089 # SendModNote with notify
1090 res = api.SendModNote(
1091 admin_pb2.SendModNoteReq(
1092 user=normal_user.username, content="Please update your profile", internal_id="test1"
1093 )
1094 )
1095 assert any(
1096 a.action_type == "send_mod_note" and a.note == "Notify user: Yes\n\nPlease update your profile"
1097 for a in res.admin_actions
1098 )
1100 # SendModNote with do_not_notify
1101 res = api.SendModNote(
1102 admin_pb2.SendModNoteReq(
1103 user=normal_user.username,
1104 content="Silent note",
1105 internal_id="test2",
1106 do_not_notify=True,
1107 )
1108 )
1109 assert any(
1110 a.action_type == "send_mod_note" and a.note == "Notify user: No\n\nSilent note" for a in res.admin_actions
1111 )
1113 # DeleteUser
1114 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
1115 assert any(a.action_type == "delete_user" for a in res.admin_actions)
1116 assert any(
1117 a.action_type == "delete_user" and a.level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH for a in res.admin_actions
1118 )
1120 # RecoverDeletedUser
1121 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
1122 assert any(a.action_type == "recover_user" for a in res.admin_actions)
1124 # MarkUserNeedsLocationUpdate
1125 res = api.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=normal_user.username))
1126 assert any(
1127 a.action_type == "mark_needs_location_update" and a.note == "Marked user as needing location update"
1128 for a in res.admin_actions
1129 )
1131 # SetLastDonated
1132 res = api.SetLastDonated(
1133 admin_pb2.SetLastDonatedReq(
1134 user=normal_user.username,
1135 last_donated=Timestamp_from_datetime(now()),
1136 )
1137 )
1138 assert any(a.action_type == "set_last_donated" for a in res.admin_actions)
1141def test_create_admin_tag(db):
1142 super_user, super_token = generate_user(is_superuser=True)
1144 with real_admin_session(super_token) as api:
1145 res = api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
1146 assert res.tag == "test-tag"
1147 assert res.admin_tag_id > 0
1150def test_create_admin_tag_duplicate(db):
1151 super_user, super_token = generate_user(is_superuser=True)
1153 with real_admin_session(super_token) as api:
1154 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
1155 with pytest.raises(grpc.RpcError) as e:
1156 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
1157 assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
1158 assert e.value.details() == "That admin tag already exists."
1161def test_create_admin_tag_empty(db):
1162 super_user, super_token = generate_user(is_superuser=True)
1164 with real_admin_session(super_token) as api:
1165 with pytest.raises(grpc.RpcError) as e:
1166 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=""))
1167 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1168 assert e.value.details() == "The admin tag cannot be empty."
1170 with pytest.raises(grpc.RpcError) as e:
1171 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=" "))
1172 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1173 assert e.value.details() == "The admin tag cannot be empty."
1176def test_list_admin_tags(db):
1177 super_user, super_token = generate_user(is_superuser=True)
1179 with real_admin_session(super_token) as api:
1180 # Empty initially
1181 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq())
1182 assert len(res.tags) == 0
1184 # Add some tags
1185 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="bravo"))
1186 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="alpha"))
1188 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq())
1189 assert len(res.tags) == 2
1190 # Ordered alphabetically
1191 assert res.tags[0].tag == "alpha"
1192 assert res.tags[1].tag == "bravo"
1195def test_add_admin_tag_to_user(db):
1196 super_user, super_token = generate_user(is_superuser=True)
1197 normal_user, _ = generate_user()
1199 with real_admin_session(super_token) as api:
1200 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1202 res = api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1203 assert "vip" in res.admin_tags
1204 assert any(a.action_type == "add_tag" and a.tag == "vip" for a in res.admin_actions)
1207def test_add_admin_tag_to_user_duplicate(db):
1208 super_user, super_token = generate_user(is_superuser=True)
1209 normal_user, _ = generate_user()
1211 with real_admin_session(super_token) as api:
1212 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1213 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1215 with pytest.raises(grpc.RpcError) as e:
1216 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1217 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1218 assert e.value.details() == "The user already has that admin tag."
1221def test_add_admin_tag_to_user_tag_not_found(db):
1222 super_user, super_token = generate_user(is_superuser=True)
1223 normal_user, _ = generate_user()
1225 with real_admin_session(super_token) as api:
1226 with pytest.raises(grpc.RpcError) as e:
1227 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="nonexistent"))
1228 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1229 assert e.value.details() == "Admin tag not found."
1232def test_remove_admin_tag_from_user(db):
1233 super_user, super_token = generate_user(is_superuser=True)
1234 normal_user, _ = generate_user()
1236 with real_admin_session(super_token) as api:
1237 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1238 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1240 res = api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip"))
1241 assert "vip" not in res.admin_tags
1242 assert any(a.action_type == "remove_tag" and a.tag == "vip" for a in res.admin_actions)
1245def test_remove_admin_tag_from_user_not_assigned(db):
1246 super_user, super_token = generate_user(is_superuser=True)
1247 normal_user, _ = generate_user()
1249 with real_admin_session(super_token) as api:
1250 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1252 with pytest.raises(grpc.RpcError) as e:
1253 api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip"))
1254 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1255 assert e.value.details() == "The user does not have that admin tag."
1258def test_search_users_by_admin_tag(db):
1259 super_user, super_token = generate_user(is_superuser=True)
1260 user1, _ = generate_user()
1261 user2, _ = generate_user()
1262 user3, _ = generate_user()
1264 with real_admin_session(super_token) as api:
1265 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1266 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="flagged"))
1268 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user1.username, tag="vip"))
1269 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="vip"))
1270 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="flagged"))
1272 # Search for users with "vip" tag
1273 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip"]))
1274 user_ids = {u.user_id for u in res.users}
1275 assert user1.id in user_ids
1276 assert user2.id in user_ids
1277 assert user3.id not in user_ids
1279 # Search for users with both "vip" AND "flagged" tags (AND logic)
1280 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip", "flagged"]))
1281 user_ids = {u.user_id for u in res.users}
1282 assert user2.id in user_ids
1283 assert user1.id not in user_ids
1285 # Search for non-existent tag returns no results
1286 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["nonexistent"]))
1287 assert len(res.users) == 0
1290def test_search_users_by_admin_note(db):
1291 super_user, super_token = generate_user(is_superuser=True)
1292 user1, _ = generate_user()
1293 user2, _ = generate_user()
1295 with real_admin_session(super_token) as api:
1296 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="suspicious activity"))
1297 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="normal user"))
1299 # Search by admin action log content (ilike)
1300 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_action_log="%suspicious%"))
1301 user_ids = {u.user_id for u in res.users}
1302 assert user1.id in user_ids
1303 assert user2.id not in user_ids
1306# community invite feature tested in test_events.py
1307# SendBlogPostNotification tested in test_notifications.py
1308# MarkUserNeedsLocationUpdate tested in test_jail.py