Coverage for app/backend/src/tests/test_admin.py: 100%
1062 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import json
2from datetime import UTC, date, datetime, timedelta
3from unittest.mock import patch
5import grpc
6import pytest
7from sqlalchemy import select
8from sqlalchemy.sql import func
10from couchers.db import session_scope
11from couchers.models import (
12 AccountDeletionToken,
13 ContentReport,
14 EventOccurrence,
15 FriendRelationship,
16 FriendStatus,
17 ModerationObjectType,
18 ModerationState,
19 ModerationUserList,
20 ModerationVisibility,
21 NonvisibleUserAccess,
22 NonvisibleUserAccessType,
23 NonvisibleUserState,
24 PhotoGallery,
25 PhotoGalleryItem,
26 Reference,
27 Upload,
28 User,
29 UserActivity,
30 UserSession,
31)
32from couchers.proto import (
33 account_pb2,
34 admin_pb2,
35 auth_pb2,
36 events_pb2,
37 references_pb2,
38 reporting_pb2,
39 requests_pb2,
40)
41from couchers.utils import Timestamp_from_datetime, now, parse_date
42from tests.fixtures.db import add_users_to_new_moderation_list, generate_user, make_friends
43from tests.fixtures.misc import EmailCollector, PushCollector
44from tests.fixtures.sessions import (
45 account_session,
46 auth_api_session,
47 events_session,
48 real_admin_session,
49 references_session,
50 reporting_session,
51 requests_session,
52)
53from tests.test_communities import create_community
54from tests.test_requests import valid_request_text
57@pytest.fixture(autouse=True)
58def _(testconfig):
59 pass
62def test_access_by_normal_user(db):
63 normal_user, normal_token = generate_user()
65 with real_admin_session(normal_token) as api:
66 # all requests to the admin servicer should break when done by a non-super_user
67 with pytest.raises(grpc.RpcError) as e:
68 api.GetUserDetails(
69 admin_pb2.GetUserDetailsReq(
70 user=str(normal_user.id),
71 )
72 )
73 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
76def test_GetNonvisibleUserAccessLog(db):
77 super_user, super_token = generate_user(is_superuser=True)
78 target, _ = generate_user(username="target")
79 viewer, _ = generate_user(username="viewer")
81 with session_scope() as session:
82 session.add(
83 NonvisibleUserAccess(
84 access_type=NonvisibleUserAccessType.login_attempt,
85 target_user_id=target.id,
86 target_state=NonvisibleUserState.banned,
87 actor_user_id=target.id,
88 ip_address="1.2.3.4",
89 sofa="device-cookie",
90 )
91 )
92 session.add(
93 NonvisibleUserAccess(
94 access_type=NonvisibleUserAccessType.ghost_served,
95 target_user_id=target.id,
96 target_state=NonvisibleUserState.banned,
97 actor_user_id=viewer.id,
98 )
99 )
100 session.add(
101 NonvisibleUserAccess(
102 access_type=NonvisibleUserAccessType.ghost_served,
103 target_user_id=target.id,
104 target_state=NonvisibleUserState.banned,
105 actor_user_id=None,
106 )
107 )
109 with real_admin_session(super_token) as api:
110 res = api.GetNonvisibleUserAccessLog(admin_pb2.GetNonvisibleUserAccessLogReq(user="target"))
112 assert len(res.entries) == 3
113 for entry in res.entries:
114 assert entry.target_user_id == target.id
115 assert entry.target_state == admin_pb2.NONVISIBLE_USER_STATE_BANNED
117 login = [e for e in res.entries if e.access_type == admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_LOGIN_ATTEMPT]
118 views = [e for e in res.entries if e.access_type == admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_GHOST_SERVED]
119 assert len(login) == 1
120 assert len(views) == 2
122 assert login[0].actor_user_id.value == target.id
123 assert login[0].actor_username == "target"
124 assert login[0].ip_address == "1.2.3.4"
125 assert login[0].sofa == "device-cookie"
127 logged_in_view = [e for e in views if e.actor_username == "viewer"]
128 logged_out_view = [e for e in views if not e.actor_username]
129 assert len(logged_in_view) == 1
130 assert logged_in_view[0].actor_user_id.value == viewer.id
131 assert len(logged_out_view) == 1
132 assert not logged_out_view[0].HasField("actor_user_id")
135def test_GetUser(db):
136 super_user, super_token = generate_user(is_superuser=True)
137 normal_user, normal_token = generate_user()
139 with real_admin_session(super_token) as api:
140 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
141 assert res.user_id == normal_user.id
142 assert res.username == normal_user.username
144 with real_admin_session(super_token) as api:
145 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
147 with real_admin_session(super_token) as api:
148 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
149 assert res.user_id == normal_user.id
150 assert res.username == normal_user.username
153def test_GetUserDetails(db):
154 super_user, super_token = generate_user(is_superuser=True)
155 normal_user, normal_token = generate_user()
157 with real_admin_session(super_token) as api:
158 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
159 assert res.user_id == normal_user.id
160 assert res.username == normal_user.username
161 assert res.email == normal_user.email
162 assert res.gender == normal_user.gender
163 assert parse_date(res.birthdate) == normal_user.birthdate
164 assert not res.banned
165 assert not res.deleted
167 with real_admin_session(super_token) as api:
168 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
169 assert res.user_id == normal_user.id
170 assert res.username == normal_user.username
171 assert res.email == normal_user.email
172 assert res.gender == normal_user.gender
173 assert parse_date(res.birthdate) == normal_user.birthdate
174 assert not res.banned
175 assert not res.deleted
177 with real_admin_session(super_token) as api:
178 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
179 assert res.user_id == normal_user.id
180 assert res.username == normal_user.username
181 assert res.email == normal_user.email
182 assert res.gender == normal_user.gender
183 assert parse_date(res.birthdate) == normal_user.birthdate
184 assert not res.banned
185 assert not res.deleted
188def test_ChangeUserGender(db, email_collector: EmailCollector, push_collector: PushCollector):
189 super_user, super_token = generate_user(is_superuser=True)
190 normal_user, normal_token = generate_user()
192 with real_admin_session(super_token) as api:
193 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
194 assert res.user_id == normal_user.id
195 assert res.username == normal_user.username
196 assert res.email == normal_user.email
197 assert res.gender == "Machine"
198 assert parse_date(res.birthdate) == normal_user.birthdate
199 assert not res.banned
200 assert not res.deleted
202 email = email_collector.pop_for_recipient(normal_user.email, last=True)
203 assert email.subject == "[TEST] Your gender was changed"
204 assert email.recipient == normal_user.email
205 assert "Machine" in email.plain
206 assert "Machine" in email.html
208 push = push_collector.pop_for_user(normal_user.id, last=True)
209 assert push.content.title == "Gender changed"
210 assert push.content.body == "An admin changed your gender to Machine."
213def test_ChangeUserBirthdate(db, email_collector: EmailCollector, push_collector: PushCollector):
214 super_user, super_token = generate_user(is_superuser=True)
215 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
217 with real_admin_session(super_token) as api:
218 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
219 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
221 res = api.ChangeUserBirthdate(
222 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
223 )
225 assert res.user_id == normal_user.id
226 assert res.username == normal_user.username
227 assert res.email == normal_user.email
228 assert res.birthdate == "1990-05-25"
229 assert res.gender == normal_user.gender
230 assert not res.banned
231 assert not res.deleted
233 email = email_collector.pop_for_recipient(normal_user.email, last=True)
234 assert email.subject == "[TEST] Your date of birth was changed"
235 assert email.recipient == normal_user.email
236 assert "1990" in email.plain
237 assert "1990" in email.html
239 push = push_collector.pop_for_user(normal_user.id, last=True)
240 assert push.content.title == "Birthdate changed"
241 assert push.content.body == "An admin changed your date of birth to May 25, 1990."
244def test_BanUser(db):
245 super_user, super_token = generate_user(is_superuser=True)
246 normal_user, _ = generate_user()
247 admin_note = "A good reason"
249 with real_admin_session(super_token) as api:
250 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
251 assert res.user_id == normal_user.id
252 assert res.username == normal_user.username
253 assert res.email == normal_user.email
254 assert res.gender == normal_user.gender
255 assert parse_date(res.birthdate) == normal_user.birthdate
256 assert res.banned
257 assert not res.deleted
258 assert len(res.admin_actions) == 1
259 assert res.admin_actions[0].action_type == "ban"
260 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
261 assert res.admin_actions[0].note == admin_note
262 assert res.admin_actions[0].admin_user_id == super_user.id
263 assert res.admin_actions[0].admin_username == super_user.username
266def test_UnbanUser(db):
267 super_user, super_token = generate_user(is_superuser=True)
268 normal_user, _ = generate_user()
269 admin_note = "A good reason"
271 with real_admin_session(super_token) as api:
272 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
273 assert res.user_id == normal_user.id
274 assert res.username == normal_user.username
275 assert res.email == normal_user.email
276 assert res.gender == normal_user.gender
277 assert parse_date(res.birthdate) == normal_user.birthdate
278 assert not res.banned
279 assert not res.deleted
280 assert len(res.admin_actions) == 1
281 assert res.admin_actions[0].action_type == "unban"
282 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
285def test_ShadowUser(db):
286 super_user, super_token = generate_user(is_superuser=True)
287 surfer, surfer_token = generate_user()
288 host, _ = generate_user()
289 admin_note = "Spammer"
291 # Create a host request from `surfer` and approve its moderation state to VISIBLE so we can verify the cascade
292 today_plus_2 = (date.today() + timedelta(days=2)).isoformat()
293 today_plus_3 = (date.today() + timedelta(days=3)).isoformat()
294 with requests_session(surfer_token) as api:
295 host_request_id = api.CreateHostRequest(
296 requests_pb2.CreateHostRequestReq(
297 host_user_id=host.id,
298 from_date=today_plus_2,
299 to_date=today_plus_3,
300 text=valid_request_text(),
301 )
302 ).host_request_id
303 with session_scope() as session:
304 state = session.execute(
305 select(ModerationState)
306 .where(ModerationState.object_type == ModerationObjectType.host_request)
307 .where(ModerationState.object_id == host_request_id)
308 ).scalar_one()
309 state.visibility = ModerationVisibility.visible
311 with real_admin_session(super_token) as api:
312 res = api.ShadowUser(admin_pb2.ShadowUserReq(user=surfer.username, admin_note=admin_note))
313 assert res.user_id == surfer.id
314 assert res.shadowed
315 assert not res.banned
316 assert not res.deleted
317 assert len(res.admin_actions) == 1
318 assert res.admin_actions[0].action_type == "shadow"
319 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
320 assert res.admin_actions[0].note == admin_note
322 # The previously-visible host request is now shadowed
323 with session_scope() as session:
324 state = session.execute(
325 select(ModerationState)
326 .where(ModerationState.object_type == ModerationObjectType.host_request)
327 .where(ModerationState.object_id == host_request_id)
328 ).scalar_one()
329 assert state.visibility == ModerationVisibility.shadowed
332def test_UnshadowUser(db):
333 super_user, super_token = generate_user(is_superuser=True)
334 surfer, surfer_token = generate_user()
335 host, _ = generate_user()
337 today_plus_2 = (date.today() + timedelta(days=2)).isoformat()
338 today_plus_3 = (date.today() + timedelta(days=3)).isoformat()
339 with requests_session(surfer_token) as api:
340 shadow_cascade_request_id = api.CreateHostRequest(
341 requests_pb2.CreateHostRequestReq(
342 host_user_id=host.id,
343 from_date=today_plus_2,
344 to_date=today_plus_3,
345 text=valid_request_text(),
346 )
347 ).host_request_id
348 admin_hidden_request_id = api.CreateHostRequest(
349 requests_pb2.CreateHostRequestReq(
350 host_user_id=host.id,
351 from_date=today_plus_2,
352 to_date=today_plus_3,
353 text=valid_request_text(),
354 )
355 ).host_request_id
357 with session_scope() as session:
358 session.execute(select(User).where(User.id == surfer.id)).scalar_one().shadowed_at = now()
359 session.execute(
360 select(ModerationState)
361 .where(ModerationState.object_type == ModerationObjectType.host_request)
362 .where(ModerationState.object_id == shadow_cascade_request_id)
363 ).scalar_one().visibility = ModerationVisibility.shadowed
364 session.execute(
365 select(ModerationState)
366 .where(ModerationState.object_type == ModerationObjectType.host_request)
367 .where(ModerationState.object_id == admin_hidden_request_id)
368 ).scalar_one().visibility = ModerationVisibility.hidden
370 with real_admin_session(super_token) as api:
371 res = api.UnshadowUser(admin_pb2.UnshadowUserReq(user=surfer.username, admin_note="rehabilitated"))
372 assert not res.shadowed
373 assert len(res.admin_actions) == 1
374 assert res.admin_actions[0].action_type == "unshadow"
375 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
377 with session_scope() as session:
378 assert (
379 session.execute(
380 select(ModerationState)
381 .where(ModerationState.object_type == ModerationObjectType.host_request)
382 .where(ModerationState.object_id == shadow_cascade_request_id)
383 )
384 .scalar_one()
385 .visibility
386 == ModerationVisibility.visible
387 )
388 assert (
389 session.execute(
390 select(ModerationState)
391 .where(ModerationState.object_type == ModerationObjectType.host_request)
392 .where(ModerationState.object_id == admin_hidden_request_id)
393 )
394 .scalar_one()
395 .visibility
396 == ModerationVisibility.hidden
397 )
400def test_ShadowUser_blank_note(db):
401 super_user, super_token = generate_user(is_superuser=True)
402 normal_user, _ = generate_user()
404 with real_admin_session(super_token) as api:
405 with pytest.raises(grpc.RpcError) as e:
406 api.ShadowUser(admin_pb2.ShadowUserReq(user=normal_user.username, admin_note=" \t "))
407 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
410def test_AddAdminNote(db):
411 super_user, super_token = generate_user(is_superuser=True)
412 normal_user, _ = generate_user()
413 admin_note1 = "User reported strange behavior"
414 admin_note2 = "Insert private information here"
416 with real_admin_session(super_token) as api:
417 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
418 assert res.user_id == normal_user.id
419 assert res.username == normal_user.username
420 assert res.email == normal_user.email
421 assert res.gender == normal_user.gender
422 assert parse_date(res.birthdate) == normal_user.birthdate
423 assert not res.banned
424 assert not res.deleted
425 assert len(res.admin_actions) == 1
426 assert res.admin_actions[0].action_type == "note"
427 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL
428 assert res.admin_actions[0].note == admin_note1
430 with real_admin_session(super_token) as api:
431 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
432 assert len(res.admin_actions) == 2
433 assert res.admin_actions[0].note == admin_note1
434 assert res.admin_actions[1].note == admin_note2
437def test_AddAdminNote_blank(db):
438 super_user, super_token = generate_user(is_superuser=True)
439 normal_user, _ = generate_user()
440 empty_admin_note = " \t \n "
442 with real_admin_session(super_token) as api:
443 with pytest.raises(grpc.RpcError) as e:
444 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
445 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
446 assert e.value.details() == "Provide exactly one of admin_note or data."
449def test_AddAdminNote_data(db):
450 super_user, super_token = generate_user(is_superuser=True)
451 normal_user, _ = generate_user()
452 payload = '{"kind": "flag", "score": 0.87, "reasons": ["spam", "burst"]}'
454 with real_admin_session(super_token) as api:
455 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, data=payload))
456 assert len(res.admin_actions) == 1
457 assert res.admin_actions[0].action_type == "note"
458 assert res.admin_actions[0].note == ""
459 assert json.loads(res.admin_actions[0].data) == {"kind": "flag", "score": 0.87, "reasons": ["spam", "burst"]}
462def test_AddAdminNote_both_note_and_data(db):
463 super_user, super_token = generate_user(is_superuser=True)
464 normal_user, _ = generate_user()
466 with real_admin_session(super_token) as api:
467 with pytest.raises(grpc.RpcError) as e:
468 api.AddAdminNote(
469 admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note="note text", data='{"x": 1}')
470 )
471 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
472 assert e.value.details() == "Provide exactly one of admin_note or data."
475def test_AddAdminNote_neither(db):
476 super_user, super_token = generate_user(is_superuser=True)
477 normal_user, _ = generate_user()
479 with real_admin_session(super_token) as api:
480 with pytest.raises(grpc.RpcError) as e:
481 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username))
482 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
483 assert e.value.details() == "Provide exactly one of admin_note or data."
486def test_AddAdminNote_invalid_json(db):
487 super_user, super_token = generate_user(is_superuser=True)
488 normal_user, _ = generate_user()
490 with real_admin_session(super_token) as api:
491 with pytest.raises(grpc.RpcError) as e:
492 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, data="{not valid json"))
493 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
494 assert e.value.details() == "The admin note data must be valid JSON."
497def test_admin_content_reports(db):
498 super_user, super_token = generate_user(is_superuser=True)
499 normal_user, token = generate_user()
500 bad_user1, _ = generate_user()
501 bad_user2, _ = generate_user()
503 with reporting_session(token) as api:
504 api.Report(
505 reporting_pb2.ReportReq(
506 reason="spam",
507 description="r1",
508 content_ref="comment/123",
509 author_user=bad_user1.username,
510 user_agent="n/a",
511 page="https://couchers.org/comment/123",
512 )
513 )
514 api.Report(
515 reporting_pb2.ReportReq(
516 reason="spam",
517 description="r2",
518 content_ref="comment/124",
519 author_user=bad_user2.username,
520 user_agent="n/a",
521 page="https://couchers.org/comment/124",
522 )
523 )
524 api.Report(
525 reporting_pb2.ReportReq(
526 reason="something else",
527 description="r3",
528 content_ref="page/321",
529 author_user=bad_user1.username,
530 user_agent="n/a",
531 page="https://couchers.org/page/321",
532 )
533 )
535 with session_scope() as session:
536 id_by_description: dict[str, int] = dict(
537 session.execute(select(ContentReport.description, ContentReport.id)).all() # type: ignore[arg-type]
538 )
540 with real_admin_session(super_token) as api:
541 with pytest.raises(grpc.RpcError) as e:
542 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
543 assert e.value.code() == grpc.StatusCode.NOT_FOUND
544 assert e.value.details() == "Content report not found."
546 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
547 rep = res.content_report
548 assert rep.content_report_id == id_by_description["r2"]
549 assert rep.reporting_user_id == normal_user.id
550 assert rep.author_user_id == bad_user2.id
551 assert rep.reason == "spam"
552 assert rep.description == "r2"
553 assert rep.content_ref == "comment/124"
554 assert rep.user_agent == "n/a"
555 assert rep.page == "https://couchers.org/comment/124"
557 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
558 assert res.content_reports[0].content_report_id == id_by_description["r3"]
559 assert res.content_reports[1].content_report_id == id_by_description["r1"]
562def test_DeleteUser(db):
563 super_user, super_token = generate_user(is_superuser=True)
564 normal_user, normal_token = generate_user()
566 with real_admin_session(super_token) as api:
567 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
568 assert res.user_id == normal_user.id
569 assert res.username == normal_user.username
570 assert res.email == normal_user.email
571 assert res.gender == normal_user.gender
572 assert parse_date(res.birthdate) == normal_user.birthdate
573 assert not res.banned
574 assert res.deleted
576 with real_admin_session(super_token) as api:
577 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
578 assert res.user_id == normal_user.id
579 assert res.username == normal_user.username
580 assert res.email == normal_user.email
581 assert res.gender == normal_user.gender
582 assert parse_date(res.birthdate) == normal_user.birthdate
583 assert not res.banned
584 assert not res.deleted
587def test_RecoverDeletedUser_after_user_initiated_deletion(db, push_collector: PushCollector):
588 """
589 When a user deletes their account through the normal flow (ConfirmDeleteAccount),
590 undelete_token and undelete_until are set. The admin RecoverDeletedUser must clear
591 these fields to satisfy the undelete_nullity database constraint.
592 """
593 super_user, super_token = generate_user(is_superuser=True)
594 normal_user, normal_token = generate_user()
595 user_id = normal_user.id
597 # User initiates account deletion
598 with account_session(normal_token) as account:
599 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
601 # Get the deletion confirmation token
602 with session_scope() as session:
603 deletion_token = session.execute(select(AccountDeletionToken)).scalar_one().token
605 # User confirms account deletion (this sets undelete_token and undelete_until)
606 with auth_api_session() as (auth_api, metadata_interceptor):
607 auth_api.ConfirmDeleteAccount(auth_pb2.ConfirmDeleteAccountReq(token=deletion_token))
609 # Verify the user is deleted and has undelete fields set
610 with session_scope() as session:
611 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
612 assert user.deleted_at is not None
613 assert user.undelete_token is not None
614 assert user.undelete_until is not None
616 # Admin recovers the user
617 with real_admin_session(super_token) as api:
618 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
619 assert res.user_id == user_id
620 assert not res.deleted
622 # Verify undelete fields are cleared
623 with session_scope() as session:
624 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
625 assert user.deleted_at is None
626 assert user.undelete_token is None
627 assert user.undelete_until is None
630def test_CreateApiKey(db, email_collector: EmailCollector, push_collector: PushCollector):
631 with session_scope() as session:
632 super_user, super_token = generate_user(is_superuser=True)
633 normal_user, normal_token = generate_user()
635 assert (
636 session.execute(
637 select(func.count())
638 .select_from(UserSession)
639 .where(UserSession.is_api_key == True)
640 .where(UserSession.user_id == normal_user.id)
641 ).scalar_one()
642 == 0
643 )
645 with real_admin_session(super_token) as api:
646 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
648 email = email_collector.pop_for_recipient(normal_user.email, last=True)
649 assert email.subject == "[TEST] Your API key for Couchers.org"
651 with session_scope() as session:
652 token = session.execute(
653 select(UserSession.token)
654 .where(UserSession.is_valid)
655 .where(UserSession.is_api_key == True)
656 .where(UserSession.user_id == normal_user.id)
657 ).scalar_one()
659 assert token in email.plain
660 assert token in email.html
662 assert email.recipient == normal_user.email
663 assert "api key" in email.subject.lower()
664 unique_string = "We've issued you with the following API key:"
665 assert unique_string in email.plain
666 assert unique_string in email.html
667 assert "support@couchers.org" in email.plain
668 assert "support@couchers.org" in email.html
670 push = push_collector.pop_for_user(normal_user.id, last=True)
671 assert push.content.title == "API key created"
672 assert push.content.body == "Details were sent to you via email."
675def test_GetChats(db):
676 super_user, super_token = generate_user(is_superuser=True)
677 normal_user, normal_token = generate_user()
679 with real_admin_session(super_token) as api:
680 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
681 assert res.user.user_id == normal_user.id
682 assert res.user.username == normal_user.username
683 assert res.user.name == normal_user.name
684 # New user should have no chats
685 assert len(res.host_requests) == 0
686 assert len(res.group_chats) == 0
689def test_badges(db, email_collector: EmailCollector, push_collector: PushCollector):
690 super_user, super_token = generate_user(is_superuser=True)
691 normal_user, normal_token = generate_user()
693 with real_admin_session(super_token) as api:
694 # can add a badge
695 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
696 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
697 assert "swagster" in res.badges
699 # badge emails are disabled by default
700 assert email_collector.count_for_recipient(normal_user.email) == 0
702 push = push_collector.pop_for_user(normal_user.id, last=True)
703 assert push.content.title == "New profile badge: Swagster"
704 assert push.content.body == "The Swagster badge was added to your profile."
706 # can't add/edit special tags
707 with pytest.raises(grpc.RpcError) as e:
708 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
709 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
710 assert e.value.details() == "Admins cannot edit that badge."
712 # double add badge
713 with pytest.raises(grpc.RpcError) as e:
714 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster"))
715 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
716 assert e.value.details() == "The user already has that badge."
718 # can remove badge
719 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
720 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
721 assert "swagster" not in res.badges
723 # badge emails are disabled by default
724 assert email_collector.count_for_recipient(normal_user.email) == 0
726 push = push_collector.pop_for_user(normal_user.id, last=True)
727 assert push.content.title == "Profile badge removed"
728 assert push.content.body == "The Swagster badge was removed from your profile."
730 # not found on user
731 with pytest.raises(grpc.RpcError) as e:
732 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster"))
733 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
734 assert e.value.details() == "The user does not have that badge."
736 # not found in general
737 with pytest.raises(grpc.RpcError) as e:
738 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
739 assert e.value.code() == grpc.StatusCode.NOT_FOUND
740 assert e.value.details() == "Badge not found."
743def test_DeleteEvent(db):
744 super_user, super_token = generate_user(is_superuser=True)
745 normal_user, normal_token = generate_user()
747 with session_scope() as session:
748 create_community(session, 0, 2, "Community", [normal_user], [], None)
750 start_time = now() + timedelta(hours=2)
751 end_time = start_time + timedelta(hours=3)
752 with events_session(normal_token) as api:
753 res = api.CreateEvent(
754 events_pb2.CreateEventReq(
755 title="Dummy Title",
756 content="Dummy content.",
757 photo_key=None,
758 offline_information=events_pb2.OfflineEventInformation(
759 address="Near Null Island",
760 lat=0.1,
761 lng=0.2,
762 ),
763 start_time=Timestamp_from_datetime(start_time),
764 end_time=Timestamp_from_datetime(end_time),
765 timezone="UTC",
766 )
767 )
768 event_id = res.event_id
769 assert not res.is_deleted
771 with session_scope() as session:
772 with real_admin_session(super_token) as api:
773 api.DeleteEvent(
774 admin_pb2.DeleteEventReq(
775 event_id=event_id,
776 )
777 )
778 occurrence = session.get_one(EventOccurrence, ident=event_id)
779 assert occurrence.is_deleted
782def test_ListUserIds(db):
783 super_user, super_token = generate_user(is_superuser=True)
784 normal_user, normal_token = generate_user()
786 with real_admin_session(super_token) as api:
787 res = api.ListUserIds(
788 admin_pb2.ListUserIdsReq(
789 start_time=Timestamp_from_datetime(datetime(2000, 1, 1, tzinfo=UTC)),
790 end_time=Timestamp_from_datetime(now()),
791 )
792 )
793 assert len(res.user_ids) == 2
794 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
796 with real_admin_session(super_token) as api:
797 res = api.ListUserIds(
798 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
799 )
800 assert res.user_ids == []
803def test_EditReferenceText(db):
804 super_user, super_token = generate_user(is_superuser=True)
805 test_new_text = "New Text"
807 user1, user1_token = generate_user()
808 user2, user2_token = generate_user()
809 make_friends(user1, user2)
811 with session_scope() as session:
812 with references_session(user1_token) as api:
813 reference = api.WriteFriendReference(
814 references_pb2.WriteFriendReferenceReq(
815 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
816 )
817 )
819 with real_admin_session(super_token) as admin_api:
820 admin_api.EditReferenceText(
821 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
822 )
824 session.expire_all()
826 modified_reference = session.execute(
827 select(Reference).where(Reference.id == reference.reference_id)
828 ).scalar_one()
829 assert modified_reference.text == test_new_text
832def test_DeleteReference_deprecated(db):
833 """DeleteReference is deprecated; admins should hide via UMS instead."""
834 super_user, super_token = generate_user(is_superuser=True)
836 user1, user1_token = generate_user()
837 user2, user2_token = generate_user()
838 make_friends(user1, user2)
840 with references_session(user1_token) as api:
841 reference = api.WriteFriendReference(
842 references_pb2.WriteFriendReferenceReq(
843 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
844 )
845 )
847 with real_admin_session(super_token) as admin_api:
848 with pytest.raises(grpc.RpcError) as e:
849 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
850 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
853def test_GetUserReferences(db):
854 super_user, super_token = generate_user(is_superuser=True)
856 user1, user1_token = generate_user()
857 user2, user2_token = generate_user()
858 user3, user3_token = generate_user()
859 make_friends(user1, user2)
860 make_friends(user1, user3)
861 make_friends(user2, user3)
863 # user1 writes reference about user2
864 with references_session(user1_token) as api:
865 ref1 = api.WriteFriendReference(
866 references_pb2.WriteFriendReferenceReq(
867 to_user_id=user2.id,
868 text="Reference from user1 to user2",
869 private_text="",
870 was_appropriate=True,
871 rating=1,
872 )
873 )
875 # user2 writes reference about user1
876 with references_session(user2_token) as api:
877 ref2 = api.WriteFriendReference(
878 references_pb2.WriteFriendReferenceReq(
879 to_user_id=user1.id,
880 text="Reference from user2 to user1",
881 private_text="Private note",
882 was_appropriate=True,
883 rating=0.8,
884 )
885 )
887 # user3 writes reference about user1
888 with references_session(user3_token) as api:
889 ref3 = api.WriteFriendReference(
890 references_pb2.WriteFriendReferenceReq(
891 to_user_id=user1.id,
892 text="Reference from user3 to user1",
893 private_text="",
894 was_appropriate=False,
895 rating=0.5,
896 )
897 )
899 # Test GetUserReferences for user1 (admin view shows everything regardless of UMS state).
900 with real_admin_session(super_token) as admin_api:
901 res = admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user=user1.username))
903 # user1 wrote 1 reference
904 assert len(res.references_from) == 1
905 assert res.references_from[0].reference_id == ref1.reference_id
906 assert res.references_from[0].from_user_id == user1.id
907 assert res.references_from[0].to_user_id == user2.id
908 assert res.references_from[0].text == "Reference from user1 to user2"
910 # user1 received 2 references
911 assert len(res.references_to) == 2
912 # Ordered by id descending, so ref3 comes first
913 assert res.references_to[0].reference_id == ref3.reference_id
914 assert res.references_to[0].was_appropriate is False
916 assert res.references_to[1].reference_id == ref2.reference_id
917 assert res.references_to[1].private_text == "Private note"
918 assert res.references_to[1].rating == 0.8
921def test_GetUserReferences_not_found(db):
922 super_user, super_token = generate_user(is_superuser=True)
924 with real_admin_session(super_token) as admin_api:
925 with pytest.raises(grpc.RpcError) as e:
926 admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user="nonexistent"))
927 assert e.value.code() == grpc.StatusCode.NOT_FOUND
930def test_GetFriendRequests(db):
931 super_user, super_token = generate_user(is_superuser=True)
933 user1, _ = generate_user()
934 user2, _ = generate_user()
935 user3, _ = generate_user()
936 user4, _ = generate_user()
938 # Create a mix of friend requests directly so we control the state
939 def _add_friend_request(from_user_id, to_user_id, status, visibility, time_responded=None):
940 with session_scope() as session:
941 mod_state = ModerationState(
942 object_type=ModerationObjectType.friend_request,
943 object_id=0,
944 visibility=visibility,
945 )
946 session.add(mod_state)
947 session.flush()
948 rel = FriendRelationship(
949 from_user_id=from_user_id,
950 to_user_id=to_user_id,
951 status=status,
952 moderation_state_id=mod_state.id,
953 time_responded=time_responded,
954 )
955 session.add(rel)
956 session.flush()
957 mod_state.object_id = rel.id
959 # user1 -> user2: pending, shadowed
960 _add_friend_request(user1.id, user2.id, FriendStatus.pending, ModerationVisibility.shadowed)
961 # user1 -> user3: accepted, visible
962 _add_friend_request(user1.id, user3.id, FriendStatus.accepted, ModerationVisibility.visible, time_responded=now())
963 # user4 -> user1: rejected, visible
964 _add_friend_request(user4.id, user1.id, FriendStatus.rejected, ModerationVisibility.visible, time_responded=now())
966 with real_admin_session(super_token) as admin_api:
967 res = admin_api.GetFriendRequests(admin_pb2.GetFriendRequestsReq(user=user1.username))
969 # user1 sent two: to user2 (pending) and to user3 (accepted), ordered by id desc
970 assert len(res.sent) == 2
971 assert res.sent[0].from_user.user_id == user1.id
972 assert res.sent[0].to_user.user_id == user3.id
973 assert res.sent[0].status == "accepted"
974 assert res.sent[0].HasField("time_responded")
975 assert res.sent[0].moderation_visibility == "visible"
977 assert res.sent[1].from_user.user_id == user1.id
978 assert res.sent[1].to_user.user_id == user2.id
979 assert res.sent[1].status == "pending"
980 assert not res.sent[1].HasField("time_responded")
981 assert res.sent[1].moderation_visibility == "shadowed"
983 # user1 received one: from user4 (rejected)
984 assert len(res.received) == 1
985 assert res.received[0].from_user.user_id == user4.id
986 assert res.received[0].to_user.user_id == user1.id
987 assert res.received[0].status == "rejected"
990def test_GetFriendRequests_not_found(db):
991 super_user, super_token = generate_user(is_superuser=True)
993 with real_admin_session(super_token) as admin_api:
994 with pytest.raises(grpc.RpcError) as e:
995 admin_api.GetFriendRequests(admin_pb2.GetFriendRequestsReq(user="nonexistent"))
996 assert e.value.code() == grpc.StatusCode.NOT_FOUND
999def test_AddUsersToModerationUserList(db):
1000 super_user, super_token = generate_user(is_superuser=True)
1001 user1, _ = generate_user()
1002 user2, _ = generate_user()
1003 user3, _ = generate_user()
1004 user4, _ = generate_user()
1005 user5, _ = generate_user()
1006 moderation_list_id = add_users_to_new_moderation_list([user1])
1008 with session_scope() as session:
1009 with real_admin_session(super_token) as api:
1010 # Test adding users to a non-existent moderation list (should raise an error)
1011 with pytest.raises(grpc.RpcError) as e:
1012 api.AddUsersToModerationUserList(
1013 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999),
1014 )
1015 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1016 assert "Moderation user list not found." == e.value.details()
1018 # Test with non-existent user (should raise an error)
1019 with pytest.raises(grpc.RpcError) as e:
1020 api.AddUsersToModerationUserList(
1021 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]),
1022 )
1023 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1024 assert "Couldn't find that user." == e.value.details()
1026 # Test successful creation of new moderation list (no moderation_list_id provided)
1027 res = api.AddUsersToModerationUserList(
1028 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]),
1029 )
1030 assert res.moderation_list_id > 0
1031 with session_scope() as session:
1032 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id)
1033 assert moderation_user_list is not None
1034 assert len(moderation_user_list.users) == 3
1035 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users})
1037 # Test list endpoint returns same moderation list with same members not repeated
1038 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
1039 assert len(listRes.moderation_lists) == 1
1040 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id
1041 assert len(listRes.moderation_lists[0].members) == 3
1042 assert {user1.id, user2.id, user3.id}.issubset({m.user_id for m in listRes.moderation_lists[0].members})
1044 # Test user can be in multiple moderation lists
1045 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
1046 assert len(listRes3.moderation_lists) == 2
1048 # Test adding users to an existing moderation list
1049 res2 = api.AddUsersToModerationUserList(
1050 admin_pb2.AddUsersToModerationUserListReq(
1051 users=[user4.username, user5.username], moderation_list_id=moderation_list_id
1052 ),
1053 )
1054 assert res2.moderation_list_id == moderation_list_id
1055 with session_scope() as session:
1056 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id)
1057 assert len(moderation_user_list.users) == 3
1058 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users})
1060 # Test list user moderation lists endpoint returns the right moderation list
1061 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username))
1062 assert len(listRes2.moderation_lists) == 1
1063 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id
1064 assert len(listRes2.moderation_lists[0].members) == 3
1065 assert {user1.id, user4.id, user5.id}.issubset({m.user_id for m in listRes2.moderation_lists[0].members})
1068def test_RemoveUserFromModerationUserList(db):
1069 super_user, super_token = generate_user(is_superuser=True)
1070 user1, _ = generate_user()
1071 user2, _ = generate_user()
1072 user3, _ = generate_user()
1073 moderation_list_id = add_users_to_new_moderation_list([user1, user2])
1075 with real_admin_session(super_token) as api:
1076 # Test with non-existent user (should raise error)
1077 with pytest.raises(grpc.RpcError) as e:
1078 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent"))
1079 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1080 assert "Couldn't find that user." == e.value.details()
1082 # Test without providing moderation list id (should raise error)
1083 with pytest.raises(grpc.RpcError) as e:
1084 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username))
1085 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1086 assert "Missing moderation user list id." == e.value.details()
1088 # Test removing user that's not in the provided moderation list (should raise error)
1089 with pytest.raises(grpc.RpcError) as e:
1090 api.RemoveUserFromModerationUserList(
1091 admin_pb2.RemoveUserFromModerationUserListReq(
1092 user=user3.username, moderation_list_id=moderation_list_id
1093 )
1094 )
1095 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1096 assert "User is not in the moderation user list." == e.value.details()
1098 # Test successful removal
1099 api.RemoveUserFromModerationUserList(
1100 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id)
1101 )
1102 with session_scope() as session:
1103 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id)
1104 assert user1.id not in {user.id for user in moderation_user_list.users}
1105 assert user2.id in {user.id for user in moderation_user_list.users}
1107 # Test list user moderation lists endpoint returns right number of moderation lists
1108 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username))
1109 assert len(listRes.moderation_lists) == 0
1110 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username))
1111 assert len(listRes2.moderation_lists) == 1
1113 # Test removing all users from moderation list should also delete the moderation list
1114 api.RemoveUserFromModerationUserList(
1115 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id)
1116 )
1117 with session_scope() as session:
1118 assert session.get(ModerationUserList, moderation_list_id) is None
1121def test_admin_delete_account_url(db, email_collector: EmailCollector, push_collector: PushCollector):
1122 super_user, super_token = generate_user(is_superuser=True)
1124 user, token = generate_user()
1125 user_id = user.id
1127 with real_admin_session(super_token) as admin_api:
1128 url = admin_api.CreateAccountDeletionLink(
1129 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
1130 ).account_deletion_confirm_url
1132 assert push_collector.count_for_user(user_id) == 0
1134 with session_scope() as session:
1135 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
1136 token = token_o.token
1137 assert token_o.user.id == user_id
1138 assert url == f"http://localhost:3000/delete-account?token={token}"
1140 with auth_api_session() as (auth_api, metadata_interceptor):
1141 auth_api.ConfirmDeleteAccount(
1142 auth_pb2.ConfirmDeleteAccountReq(
1143 token=token,
1144 )
1145 )
1147 push = push_collector.pop_for_user(user_id, last=True)
1148 assert push.content.title == "Account deleted"
1149 assert push.content.body == "You can restore it within 7 days using the link we emailed you."
1150 email_collector.pop_for_recipient(user.email, last=True)
1153def test_AccessStats(db):
1154 super_user, super_token = generate_user(is_superuser=True)
1155 normal_user, normal_token = generate_user()
1157 # Insert UserActivity rows: a couple inside the default 90-day window, one well
1158 # outside it, and one with NULL ip_address / user_agent. The INET column is
1159 # returned by psycopg3 as an IPv4Address/IPv6Address object, which used to
1160 # crash the proto string assignment.
1161 in_window_1 = now() - timedelta(days=1)
1162 in_window_2 = now() - timedelta(days=10)
1163 out_of_window = now() - timedelta(days=200)
1164 with session_scope() as session:
1165 session.add(
1166 UserActivity(
1167 user_id=normal_user.id, period=in_window_1, ip_address="1.2.3.4", user_agent="ua-a", api_calls=5
1168 )
1169 )
1170 session.add(
1171 UserActivity(
1172 user_id=normal_user.id, period=in_window_2, ip_address="2001:db8::1", user_agent="ua-b", api_calls=3
1173 )
1174 )
1175 session.add(
1176 UserActivity(
1177 user_id=normal_user.id, period=out_of_window, ip_address="9.9.9.9", user_agent="ua-old", api_calls=99
1178 )
1179 )
1180 session.add(UserActivity(user_id=normal_user.id, period=in_window_1, api_calls=1))
1182 with real_admin_session(super_token) as api:
1183 res = api.AccessStats(admin_pb2.AccessStatsReq(user=normal_user.username))
1185 by_ip = {s.ip_address: s for s in res.stats}
1186 assert "1.2.3.4" in by_ip
1187 assert by_ip["1.2.3.4"].api_call_count == 5
1188 assert by_ip["1.2.3.4"].user_agent == "ua-a"
1189 assert "2001:db8::1" in by_ip
1190 assert by_ip["2001:db8::1"].api_call_count == 3
1191 # NULL ip_address row produces an empty-string ip_address in the proto
1192 assert "" in by_ip
1193 assert by_ip[""].api_call_count == 1
1194 # out-of-window row is excluded by the 90-day default
1195 assert "9.9.9.9" not in by_ip
1197 # explicit end_time should bound the upper end of the window (regression: was >=)
1198 with real_admin_session(super_token) as api:
1199 res = api.AccessStats(
1200 admin_pb2.AccessStatsReq(
1201 user=normal_user.username,
1202 start_time=Timestamp_from_datetime(now() - timedelta(days=5)),
1203 end_time=Timestamp_from_datetime(now()),
1204 )
1205 )
1206 ips = {s.ip_address for s in res.stats}
1207 assert ips == {"1.2.3.4", ""}
1210def test_SetLastDonated(db):
1211 super_user, super_token = generate_user(is_superuser=True)
1212 normal_user, normal_token = generate_user(last_donated=None)
1214 with real_admin_session(super_token) as api:
1215 # user starts with no last_donated
1216 with session_scope() as session:
1217 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
1218 assert user.last_donated is None
1220 # can set last_donated
1221 donation_time = now() - timedelta(days=30)
1222 res = api.SetLastDonated(
1223 admin_pb2.SetLastDonatedReq(
1224 user=normal_user.username,
1225 last_donated=Timestamp_from_datetime(donation_time),
1226 )
1227 )
1229 with session_scope() as session:
1230 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
1231 assert user.last_donated is not None
1232 # check timestamp is close (within a second)
1233 assert abs((user.last_donated - donation_time).total_seconds()) < 1
1235 # can clear last_donated by not setting the field
1236 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username))
1238 with session_scope() as session:
1239 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one()
1240 assert user.last_donated is None
1242 # user not found
1243 with pytest.raises(grpc.RpcError) as e:
1244 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent"))
1245 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1246 assert e.value.details() == "Couldn't find that user."
1249def test_admin_actions_level(db):
1250 super_user, super_token = generate_user(is_superuser=True)
1251 normal_user, _ = generate_user()
1253 with real_admin_session(super_token) as api:
1254 # Default level is NORMAL
1255 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note="normal note"))
1256 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL
1258 # Explicitly set to DEBUG
1259 res = api.AddAdminNote(
1260 admin_pb2.AddAdminNoteReq(
1261 user=normal_user.username,
1262 admin_note="debug note",
1263 level=admin_pb2.ADMIN_ACTION_LEVEL_DEBUG,
1264 )
1265 )
1266 assert len(res.admin_actions) == 2
1267 assert res.admin_actions[1].level == admin_pb2.ADMIN_ACTION_LEVEL_DEBUG
1269 # Explicitly set to HIGH
1270 res = api.AddAdminNote(
1271 admin_pb2.AddAdminNoteReq(
1272 user=normal_user.username,
1273 admin_note="high note",
1274 level=admin_pb2.ADMIN_ACTION_LEVEL_HIGH,
1275 )
1276 )
1277 assert len(res.admin_actions) == 3
1278 assert res.admin_actions[2].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH
1280 # Explicitly set to TRACE
1281 res = api.AddAdminNote(
1282 admin_pb2.AddAdminNoteReq(
1283 user=normal_user.username,
1284 admin_note="trace note",
1285 level=admin_pb2.ADMIN_ACTION_LEVEL_TRACE,
1286 )
1287 )
1288 assert len(res.admin_actions) == 4
1289 assert res.admin_actions[3].level == admin_pb2.ADMIN_ACTION_LEVEL_TRACE
1292def test_admin_actions_on_mutations(db, push_collector: PushCollector):
1293 super_user, super_token = generate_user(is_superuser=True)
1294 normal_user, _ = generate_user()
1296 original_gender = normal_user.gender
1297 original_birthdate = normal_user.birthdate
1299 with real_admin_session(super_token) as api:
1300 # ChangeUserGender
1301 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
1302 assert any(
1303 a.action_type == "change_gender" and a.note == f"Changed from '{original_gender}' to 'Machine'"
1304 for a in res.admin_actions
1305 )
1307 # ChangeUserBirthdate
1308 res = api.ChangeUserBirthdate(
1309 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-01-01")
1310 )
1311 assert any(
1312 a.action_type == "change_birthdate" and a.note == f"Changed from {original_birthdate} to 1990-01-01"
1313 for a in res.admin_actions
1314 )
1316 # SetPassportSexGenderException
1317 res = api.SetPassportSexGenderException(
1318 admin_pb2.SetPassportSexGenderExceptionReq(user=normal_user.username, passport_sex_gender_exception=True)
1319 )
1320 assert any(
1321 a.action_type == "set_passport_sex_gender_exception" and a.note == "Changed from False to True"
1322 for a in res.admin_actions
1323 )
1325 # SendModNote with notify
1326 res = api.SendModNote(
1327 admin_pb2.SendModNoteReq(
1328 user=normal_user.username, content="Please update your profile", internal_id="test1"
1329 )
1330 )
1331 assert any(
1332 a.action_type == "send_mod_note" and a.note == "Notify user: Yes\n\nPlease update your profile"
1333 for a in res.admin_actions
1334 )
1336 # SendModNote with do_not_notify
1337 res = api.SendModNote(
1338 admin_pb2.SendModNoteReq(
1339 user=normal_user.username,
1340 content="Silent note",
1341 internal_id="test2",
1342 do_not_notify=True,
1343 )
1344 )
1345 assert any(
1346 a.action_type == "send_mod_note" and a.note == "Notify user: No\n\nSilent note" for a in res.admin_actions
1347 )
1349 # DeleteUser
1350 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
1351 assert any(a.action_type == "delete_user" for a in res.admin_actions)
1352 assert any(
1353 a.action_type == "delete_user" and a.level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH for a in res.admin_actions
1354 )
1356 # RecoverDeletedUser
1357 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
1358 assert any(a.action_type == "recover_user" for a in res.admin_actions)
1360 # MarkUserNeedsLocationUpdate
1361 res = api.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=normal_user.username))
1362 assert any(
1363 a.action_type == "mark_needs_location_update" and a.note == "Marked user as needing location update"
1364 for a in res.admin_actions
1365 )
1367 # SetLastDonated
1368 res = api.SetLastDonated(
1369 admin_pb2.SetLastDonatedReq(
1370 user=normal_user.username,
1371 last_donated=Timestamp_from_datetime(now()),
1372 )
1373 )
1374 assert any(a.action_type == "set_last_donated" for a in res.admin_actions)
1377def test_create_admin_tag(db):
1378 super_user, super_token = generate_user(is_superuser=True)
1380 with real_admin_session(super_token) as api:
1381 res = api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
1382 assert res.tag == "test-tag"
1383 assert res.admin_tag_id > 0
1386def test_create_admin_tag_duplicate(db):
1387 super_user, super_token = generate_user(is_superuser=True)
1389 with real_admin_session(super_token) as api:
1390 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
1391 with pytest.raises(grpc.RpcError) as e:
1392 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag"))
1393 assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
1394 assert e.value.details() == "That admin tag already exists."
1397def test_create_admin_tag_empty(db):
1398 super_user, super_token = generate_user(is_superuser=True)
1400 with real_admin_session(super_token) as api:
1401 with pytest.raises(grpc.RpcError) as e:
1402 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=""))
1403 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1404 assert e.value.details() == "The admin tag cannot be empty."
1406 with pytest.raises(grpc.RpcError) as e:
1407 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=" "))
1408 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1409 assert e.value.details() == "The admin tag cannot be empty."
1412def test_list_admin_tags(db):
1413 super_user, super_token = generate_user(is_superuser=True)
1415 with real_admin_session(super_token) as api:
1416 # Empty initially
1417 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq())
1418 assert len(res.tags) == 0
1420 # Add some tags
1421 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="bravo"))
1422 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="alpha"))
1424 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq())
1425 assert len(res.tags) == 2
1426 # Ordered alphabetically
1427 assert res.tags[0].tag == "alpha"
1428 assert res.tags[1].tag == "bravo"
1431def test_add_admin_tag_to_user(db):
1432 super_user, super_token = generate_user(is_superuser=True)
1433 normal_user, _ = generate_user()
1435 with real_admin_session(super_token) as api:
1436 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1438 res = api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1439 assert "vip" in res.admin_tags
1440 assert any(a.action_type == "add_tag" and a.tag == "vip" for a in res.admin_actions)
1443def test_add_admin_tag_to_user_duplicate(db):
1444 super_user, super_token = generate_user(is_superuser=True)
1445 normal_user, _ = generate_user()
1447 with real_admin_session(super_token) as api:
1448 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1449 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1451 with pytest.raises(grpc.RpcError) as e:
1452 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1453 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1454 assert e.value.details() == "The user already has that admin tag."
1457def test_add_admin_tag_to_user_tag_not_found(db):
1458 super_user, super_token = generate_user(is_superuser=True)
1459 normal_user, _ = generate_user()
1461 with real_admin_session(super_token) as api:
1462 with pytest.raises(grpc.RpcError) as e:
1463 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="nonexistent"))
1464 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1465 assert e.value.details() == "Admin tag not found."
1468def test_remove_admin_tag_from_user(db):
1469 super_user, super_token = generate_user(is_superuser=True)
1470 normal_user, _ = generate_user()
1472 with real_admin_session(super_token) as api:
1473 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1474 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip"))
1476 res = api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip"))
1477 assert "vip" not in res.admin_tags
1478 assert any(a.action_type == "remove_tag" and a.tag == "vip" for a in res.admin_actions)
1481def test_remove_admin_tag_from_user_not_assigned(db):
1482 super_user, super_token = generate_user(is_superuser=True)
1483 normal_user, _ = generate_user()
1485 with real_admin_session(super_token) as api:
1486 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1488 with pytest.raises(grpc.RpcError) as e:
1489 api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip"))
1490 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1491 assert e.value.details() == "The user does not have that admin tag."
1494def test_search_users_by_admin_tag(db):
1495 super_user, super_token = generate_user(is_superuser=True)
1496 user1, _ = generate_user()
1497 user2, _ = generate_user()
1498 user3, _ = generate_user()
1500 with real_admin_session(super_token) as api:
1501 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip"))
1502 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="flagged"))
1504 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user1.username, tag="vip"))
1505 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="vip"))
1506 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="flagged"))
1508 # Search for users with "vip" tag
1509 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip"]))
1510 user_ids = {u.user_id for u in res.users}
1511 assert user1.id in user_ids
1512 assert user2.id in user_ids
1513 assert user3.id not in user_ids
1515 # Search for users with both "vip" AND "flagged" tags (AND logic)
1516 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip", "flagged"]))
1517 user_ids = {u.user_id for u in res.users}
1518 assert user2.id in user_ids
1519 assert user1.id not in user_ids
1521 # Search for non-existent tag returns no results
1522 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["nonexistent"]))
1523 assert len(res.users) == 0
1526def test_search_users_by_admin_note(db):
1527 super_user, super_token = generate_user(is_superuser=True)
1528 user1, _ = generate_user()
1529 user2, _ = generate_user()
1531 with real_admin_session(super_token) as api:
1532 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="suspicious activity"))
1533 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="normal user"))
1535 # Search by admin action log content (ilike)
1536 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_action_log="%suspicious%"))
1537 user_ids = {u.user_id for u in res.users}
1538 assert user1.id in user_ids
1539 assert user2.id not in user_ids
1542def test_ListAdminActions_empty(db):
1543 super_user, super_token = generate_user(is_superuser=True)
1545 with real_admin_session(super_token) as api:
1546 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq())
1547 assert len(res.admin_actions) == 0
1548 assert res.next_page_token == ""
1551def test_ListAdminActions_returns_newest_first_with_target_info(db):
1552 super_user, super_token = generate_user(is_superuser=True)
1553 user1, _ = generate_user()
1554 user2, _ = generate_user()
1556 with real_admin_session(super_token) as api:
1557 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="first note"))
1558 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="second note"))
1559 api.BanUser(admin_pb2.BanUserReq(user=user1.username, admin_note="ban reason"))
1561 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq())
1563 assert len(res.admin_actions) == 3
1564 # Newest first
1565 assert res.admin_actions[0].action_type == "ban"
1566 assert res.admin_actions[0].target_user_id == user1.id
1567 assert res.admin_actions[0].target_username == user1.username
1568 assert res.admin_actions[0].admin_user_id == super_user.id
1569 assert res.admin_actions[0].admin_username == super_user.username
1570 assert res.admin_actions[1].action_type == "note"
1571 assert res.admin_actions[1].target_user_id == user2.id
1572 assert res.admin_actions[2].action_type == "note"
1573 assert res.admin_actions[2].target_user_id == user1.id
1576def test_ListAdminActions_filter_by_admin_and_target(db):
1577 super1, super1_token = generate_user(is_superuser=True)
1578 super2, super2_token = generate_user(is_superuser=True)
1579 user1, _ = generate_user()
1580 user2, _ = generate_user()
1582 with real_admin_session(super1_token) as api:
1583 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="from super1 to user1"))
1584 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="from super1 to user2"))
1585 with real_admin_session(super2_token) as api:
1586 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="from super2 to user1"))
1588 with real_admin_session(super1_token) as api:
1589 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq(admin_user_id=super1.id))
1590 assert {a.note for a in res.admin_actions} == {"from super1 to user1", "from super1 to user2"}
1592 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq(target_user_id=user1.id))
1593 assert {a.note for a in res.admin_actions} == {"from super1 to user1", "from super2 to user1"}
1595 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq(admin_user_id=super1.id, target_user_id=user1.id))
1596 assert [a.note for a in res.admin_actions] == ["from super1 to user1"]
1599def test_ListAdminActions_pagination(db):
1600 super_user, super_token = generate_user(is_superuser=True)
1601 user, _ = generate_user()
1603 with real_admin_session(super_token) as api:
1604 for i in range(3):
1605 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user.username, admin_note=f"note {i}"))
1607 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq(page_size=2))
1608 assert len(res.admin_actions) == 2
1609 assert res.next_page_token != ""
1610 first_page_notes = [a.note for a in res.admin_actions]
1612 res2 = api.ListAdminActions(admin_pb2.ListAdminActionsReq(page_size=2, page_token=res.next_page_token))
1613 assert len(res2.admin_actions) == 1
1614 assert res2.next_page_token == ""
1616 all_notes = first_page_notes + [a.note for a in res2.admin_actions]
1617 assert set(all_notes) == {"note 0", "note 1", "note 2"}
1620def test_ListUserUploads(db):
1621 super_user, super_token = generate_user(is_superuser=True)
1622 user, _ = generate_user(complete_profile=False)
1623 other_user, _ = generate_user()
1625 with session_scope() as session:
1626 for i in range(3):
1627 session.add(
1628 Upload(
1629 key=f"key{i}",
1630 filename=f"photo{i}.jpg",
1631 creator_user_id=user.id,
1632 credit=f"credit {i}" if i == 0 else None,
1633 )
1634 )
1635 session.add(Upload(key="other_key", filename="other.jpg", creator_user_id=other_user.id))
1637 with real_admin_session(super_token) as api:
1638 res = api.ListUserUploads(admin_pb2.ListUserUploadsReq(user=user.username))
1640 assert len(res.uploads) == 3
1641 assert res.next_page_token == ""
1642 assert {u.filename for u in res.uploads} == {"photo0.jpg", "photo1.jpg", "photo2.jpg"}
1644 upload0 = next(u for u in res.uploads if u.key == "key0")
1645 assert upload0.credit == "credit 0"
1646 assert upload0.full_url.endswith("/img/full/photo0.jpg")
1647 assert upload0.thumbnail_url.endswith("/img/thumbnail/photo0.jpg")
1648 assert upload0.HasField("created")
1651def test_ListUserUploads_pagination(db):
1652 super_user, super_token = generate_user(is_superuser=True)
1653 user, _ = generate_user(complete_profile=False)
1655 with session_scope() as session:
1656 for i in range(3):
1657 session.add(Upload(key=f"key{i}", filename=f"photo{i}.jpg", creator_user_id=user.id))
1659 with real_admin_session(super_token) as api:
1660 res = api.ListUserUploads(admin_pb2.ListUserUploadsReq(user=user.username, page_size=2))
1661 assert len(res.uploads) == 2
1662 assert res.next_page_token != ""
1663 first_page_keys = [u.key for u in res.uploads]
1665 res2 = api.ListUserUploads(
1666 admin_pb2.ListUserUploadsReq(user=user.username, page_size=2, page_token=res.next_page_token)
1667 )
1668 assert len(res2.uploads) == 1
1669 assert res2.next_page_token == ""
1671 all_keys = first_page_keys + [u.key for u in res2.uploads]
1672 assert set(all_keys) == {"key0", "key1", "key2"}
1675def test_ListUserUploads_uses(db):
1676 super_user, super_token = generate_user(is_superuser=True)
1677 user, _ = generate_user(complete_profile=False)
1679 with session_scope() as session:
1680 session.add(Upload(key="used_key", filename="used.jpg", creator_user_id=user.id))
1681 session.add(Upload(key="unused_key", filename="unused.jpg", creator_user_id=user.id))
1682 gallery = PhotoGallery(owner_user_id=user.id)
1683 session.add(gallery)
1684 session.flush()
1685 session.add(PhotoGalleryItem(gallery_id=gallery.id, upload_key="used_key", position=1.0))
1687 with real_admin_session(super_token) as api:
1688 res = api.ListUserUploads(admin_pb2.ListUserUploadsReq(user=user.username))
1690 uploads = {u.key: u for u in res.uploads}
1691 assert list(uploads["unused_key"].uses) == []
1693 used_uses = uploads["used_key"].uses
1694 assert len(used_uses) == 1
1695 assert used_uses[0].type == admin_pb2.UPLOAD_USE_TYPE_PROFILE_GALLERY_PHOTO_AVATAR
1696 assert used_uses[0].is_current
1697 assert used_uses[0].user_id == user.id
1700def test_ListUserUploads_not_found(db):
1701 super_user, super_token = generate_user(is_superuser=True)
1703 with real_admin_session(super_token) as api:
1704 with pytest.raises(grpc.RpcError) as e:
1705 api.ListUserUploads(admin_pb2.ListUserUploadsReq(user="nonexistent"))
1706 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1709# community invite feature tested in test_events.py
1710# SendBlogPostNotification tested in test_notifications.py
1711# MarkUserNeedsLocationUpdate tested in test_jail.py
1714def _ota_manifest(*, version, fingerprint, created_at="2026-05-31T00:00:00.000Z"):
1715 return {
1716 "id": f"id-{version}",
1717 "createdAt": created_at,
1718 "runtimeVersion": fingerprint,
1719 "launchAsset": {"key": "bundle", "url": f"https://cdn.testing.invalid/{version}/bundle.hbc"},
1720 "assets": [],
1721 "metadata": {},
1722 "extra": {},
1723 }
1726def _ota_signed_multipart(manifest):
1727 # Mimics the signed multipart body the CDN holds (signature header omitted; we only read the JSON).
1728 boundary = "COUCHERS_OTA_BOUNDARY"
1730 def part(name, body, content_type):
1731 return f'--{boundary}\r\ncontent-disposition: form-data; name="{name}"\r\ncontent-type: {content_type}\r\n\r\n{body}\r\n'
1733 body = (
1734 part("manifest", json.dumps(manifest), "application/json; charset=utf-8")
1735 + part("extensions", "{}", "application/json")
1736 + f"--{boundary}--\r\n"
1737 )
1738 return f"multipart/mixed; boundary={boundary}", body.encode()
1741def _patch_ota_cdn(manifests):
1742 # manifests: {version: manifest_dict}. URL is {cdn_root}/{version}/{platform}/manifest.
1743 def fake(url):
1744 version = url.split("/")[-3]
1745 if version not in manifests:
1746 return "multipart/mixed; boundary=COUCHERS_OTA_BOUNDARY", b""
1747 return _ota_signed_multipart(manifests[version])
1749 return patch("couchers.servicers.admin._fetch_signed_manifest", side_effect=fake)
1752def test_CreateOTAPackage(db):
1753 super_user, super_token = generate_user(is_superuser=True)
1755 manifests = {"v1.3.1.aaaa": _ota_manifest(version="v1.3.1.aaaa", fingerprint="ios-fp")}
1756 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api:
1757 res = api.CreateOTAPackage(
1758 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.aaaa")
1759 )
1761 assert res.platform == admin_pb2.OTA_PLATFORM_IOS
1762 assert res.fingerprint == "ios-fp"
1763 assert res.version == "v1.3.1.aaaa"
1764 assert res.manifest_id == "id-v1.3.1.aaaa"
1765 assert res.banned is False
1766 assert res.live is True
1767 assert res.creator_user_id == super_user.id
1770def test_CreateOTAPackage_invalid(db):
1771 _, super_token = generate_user(is_superuser=True)
1773 manifests = {"v-incomplete": {"id": "x"}} # on the CDN but missing runtimeVersion / createdAt
1774 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api:
1775 # missing version
1776 with pytest.raises(grpc.RpcError) as e:
1777 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS))
1778 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1780 # nothing published at this version
1781 with pytest.raises(grpc.RpcError) as e:
1782 api.CreateOTAPackage(
1783 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v-missing")
1784 )
1785 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1787 # manifest present but missing required fields
1788 with pytest.raises(grpc.RpcError) as e:
1789 api.CreateOTAPackage(
1790 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v-incomplete")
1791 )
1792 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1795def test_CreateOTAPackage_rejects_duplicate_version(db):
1796 _, super_token = generate_user(is_superuser=True)
1798 manifests = {"v1.3.1.aaaa": _ota_manifest(version="v1.3.1.aaaa", fingerprint="ios-fp")}
1799 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api:
1800 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.aaaa"))
1801 with pytest.raises(grpc.RpcError) as e:
1802 api.CreateOTAPackage(
1803 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.aaaa")
1804 )
1805 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1808def test_ListOTAPackages(db):
1809 _, super_token = generate_user(is_superuser=True)
1811 manifests = {
1812 "v1.3.1.ios": _ota_manifest(version="v1.3.1.ios", fingerprint="ios-fp", created_at="2026-05-30T00:00:00.000Z"),
1813 "v1.3.2.ios": _ota_manifest(version="v1.3.2.ios", fingerprint="ios-fp", created_at="2026-05-31T00:00:00.000Z"),
1814 "v1.3.2.android": _ota_manifest(
1815 version="v1.3.2.android", fingerprint="android-fp", created_at="2026-06-01T00:00:00.000Z"
1816 ),
1817 }
1818 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api:
1819 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.ios"))
1820 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.2.ios"))
1821 api.CreateOTAPackage(
1822 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_ANDROID, version="v1.3.2.android")
1823 )
1825 res = api.ListOTAPackages(admin_pb2.ListOTAPackagesReq())
1826 # newest (by manifest createdAt) first
1827 assert [p.version for p in res.packages] == ["v1.3.2.android", "v1.3.2.ios", "v1.3.1.ios"]
1828 # only the newest per (platform, fingerprint) is live
1829 live = {p.version: p.live for p in res.packages}
1830 assert live == {"v1.3.2.android": True, "v1.3.2.ios": True, "v1.3.1.ios": False}
1832 ios = api.ListOTAPackages(admin_pb2.ListOTAPackagesReq(platform=admin_pb2.OTA_PLATFORM_IOS))
1833 assert [p.version for p in ios.packages] == ["v1.3.2.ios", "v1.3.1.ios"]
1836def test_BanOTAPackage(db):
1837 super_user, super_token = generate_user(is_superuser=True)
1839 manifests = {
1840 "v1.3.1.good": _ota_manifest(
1841 version="v1.3.1.good", fingerprint="ios-fp", created_at="2026-05-30T00:00:00.000Z"
1842 ),
1843 "v1.3.2.bad": _ota_manifest(version="v1.3.2.bad", fingerprint="ios-fp", created_at="2026-05-31T00:00:00.000Z"),
1844 }
1845 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api:
1846 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.good"))
1847 second = api.CreateOTAPackage(
1848 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.2.bad")
1849 )
1850 assert second.live is True
1852 banned = api.BanOTAPackage(
1853 admin_pb2.BanOTAPackageReq(ota_package_id=second.ota_package_id, reason="bad bundle")
1854 )
1855 assert banned.banned is True
1856 assert banned.banned_reason == "bad bundle"
1857 assert banned.banned_by_user_id == super_user.id
1858 assert banned.live is False
1860 # banning the newest stops new check-ins getting it; the previous one becomes live again
1861 res = api.ListOTAPackages(admin_pb2.ListOTAPackagesReq(include_banned=True))
1862 live = {p.version: p.live for p in res.packages}
1863 assert live == {"v1.3.2.bad": False, "v1.3.1.good": True}
1865 # banned packages are excluded by default
1866 non_banned = api.ListOTAPackages(admin_pb2.ListOTAPackagesReq())
1867 assert [p.version for p in non_banned.packages] == ["v1.3.1.good"]
1870def test_BanOTAPackage_requires_reason(db):
1871 _, super_token = generate_user(is_superuser=True)
1873 manifests = {
1874 "v1.3.1": _ota_manifest(version="v1.3.1", fingerprint="ios-fp", created_at="2026-05-30T00:00:00.000Z"),
1875 }
1876 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api:
1877 pkg = api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1"))
1878 with pytest.raises(grpc.RpcError) as e:
1879 api.BanOTAPackage(admin_pb2.BanOTAPackageReq(ota_package_id=pkg.ota_package_id))
1880 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1881 with pytest.raises(grpc.RpcError) as e:
1882 api.BanOTAPackage(admin_pb2.BanOTAPackageReq(ota_package_id=pkg.ota_package_id, reason=" "))
1883 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1886def test_BanOTAPackage_not_found(db):
1887 _, super_token = generate_user(is_superuser=True)
1889 with real_admin_session(super_token) as api:
1890 with pytest.raises(grpc.RpcError) as e:
1891 api.BanOTAPackage(admin_pb2.BanOTAPackageReq(ota_package_id=123456, reason="never mind"))
1892 assert e.value.code() == grpc.StatusCode.NOT_FOUND