Coverage for app / backend / src / tests / test_admin.py: 100%

749 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1from datetime import date, datetime, timedelta 

2 

3import grpc 

4import pytest 

5from sqlalchemy import select 

6from sqlalchemy.sql import func 

7 

8from couchers.db import session_scope 

9from couchers.models import ( 

10 AccountDeletionToken, 

11 ContentReport, 

12 EventOccurrence, 

13 FriendRelationship, 

14 FriendStatus, 

15 ModerationObjectType, 

16 ModerationState, 

17 ModerationUserList, 

18 ModerationVisibility, 

19 Reference, 

20 User, 

21 UserActivity, 

22 UserSession, 

23) 

24from couchers.proto import account_pb2, admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2 

25from couchers.utils import Timestamp_from_datetime, now, parse_date 

26from tests.fixtures.db import add_users_to_new_moderation_list, generate_user, make_friends 

27from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email 

28from tests.fixtures.sessions import ( 

29 account_session, 

30 auth_api_session, 

31 events_session, 

32 real_admin_session, 

33 references_session, 

34 reporting_session, 

35) 

36from tests.test_communities import create_community 

37 

38 

39@pytest.fixture(autouse=True) 

40def _(testconfig): 

41 pass 

42 

43 

44def test_access_by_normal_user(db): 

45 normal_user, normal_token = generate_user() 

46 

47 with real_admin_session(normal_token) as api: 

48 # all requests to the admin servicer should break when done by a non-super_user 

49 with pytest.raises(grpc.RpcError) as e: 

50 api.GetUserDetails( 

51 admin_pb2.GetUserDetailsReq( 

52 user=str(normal_user.id), 

53 ) 

54 ) 

55 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

56 

57 

58def test_GetUser(db): 

59 super_user, super_token = generate_user(is_superuser=True) 

60 normal_user, normal_token = generate_user() 

61 

62 with real_admin_session(super_token) as api: 

63 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

64 assert res.user_id == normal_user.id 

65 assert res.username == normal_user.username 

66 

67 with real_admin_session(super_token) as api: 

68 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

69 

70 with real_admin_session(super_token) as api: 

71 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

72 assert res.user_id == normal_user.id 

73 assert res.username == normal_user.username 

74 

75 

76def test_GetUserDetails(db): 

77 super_user, super_token = generate_user(is_superuser=True) 

78 normal_user, normal_token = generate_user() 

79 

80 with real_admin_session(super_token) as api: 

81 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

82 assert res.user_id == normal_user.id 

83 assert res.username == normal_user.username 

84 assert res.email == normal_user.email 

85 assert res.gender == normal_user.gender 

86 assert parse_date(res.birthdate) == normal_user.birthdate 

87 assert not res.banned 

88 assert not res.deleted 

89 

90 with real_admin_session(super_token) as api: 

91 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

92 assert res.user_id == normal_user.id 

93 assert res.username == normal_user.username 

94 assert res.email == normal_user.email 

95 assert res.gender == normal_user.gender 

96 assert parse_date(res.birthdate) == normal_user.birthdate 

97 assert not res.banned 

98 assert not res.deleted 

99 

100 with real_admin_session(super_token) as api: 

101 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

102 assert res.user_id == normal_user.id 

103 assert res.username == normal_user.username 

104 assert res.email == normal_user.email 

105 assert res.gender == normal_user.gender 

106 assert parse_date(res.birthdate) == normal_user.birthdate 

107 assert not res.banned 

108 assert not res.deleted 

109 

110 

111def test_ChangeUserGender(db, push_collector: PushCollector): 

112 super_user, super_token = generate_user(is_superuser=True) 

113 normal_user, normal_token = generate_user() 

114 

115 with real_admin_session(super_token) as api: 

116 with mock_notification_email() as mock: 

117 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

118 assert res.user_id == normal_user.id 

119 assert res.username == normal_user.username 

120 assert res.email == normal_user.email 

121 assert res.gender == "Machine" 

122 assert parse_date(res.birthdate) == normal_user.birthdate 

123 assert not res.banned 

124 assert not res.deleted 

125 

126 mock.assert_called_once() 

127 e = email_fields(mock) 

128 assert e.subject == "[TEST] Your gender was changed" 

129 assert e.recipient == normal_user.email 

130 assert "Machine" in e.plain 

131 assert "Machine" in e.html 

132 

133 push = push_collector.pop_for_user(normal_user.id, last=True) 

134 assert push.content.title == "Gender changed" 

135 assert push.content.body == "An admin changed your gender to Machine." 

136 

137 

138def test_ChangeUserBirthdate(db, push_collector: PushCollector): 

139 super_user, super_token = generate_user(is_superuser=True) 

140 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

141 

142 with real_admin_session(super_token) as api: 

143 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

144 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

145 

146 with mock_notification_email() as mock: 

147 res = api.ChangeUserBirthdate( 

148 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

149 ) 

150 

151 assert res.user_id == normal_user.id 

152 assert res.username == normal_user.username 

153 assert res.email == normal_user.email 

154 assert res.birthdate == "1990-05-25" 

155 assert res.gender == normal_user.gender 

156 assert not res.banned 

157 assert not res.deleted 

158 

159 mock.assert_called_once() 

160 e = email_fields(mock) 

161 assert e.subject == "[TEST] Your date of birth was changed" 

162 assert e.recipient == normal_user.email 

163 assert "1990" in e.plain 

164 assert "1990" in e.html 

165 

166 push = push_collector.pop_for_user(normal_user.id, last=True) 

167 assert push.content.title == "Birthdate changed" 

168 assert push.content.body == "An admin changed your date of birth to May 25, 1990." 

169 

170 

171def test_BanUser(db): 

172 super_user, super_token = generate_user(is_superuser=True) 

173 normal_user, _ = generate_user() 

174 admin_note = "A good reason" 

175 

176 with real_admin_session(super_token) as api: 

177 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

178 assert res.user_id == normal_user.id 

179 assert res.username == normal_user.username 

180 assert res.email == normal_user.email 

181 assert res.gender == normal_user.gender 

182 assert parse_date(res.birthdate) == normal_user.birthdate 

183 assert res.banned 

184 assert not res.deleted 

185 assert len(res.admin_actions) == 1 

186 assert res.admin_actions[0].action_type == "ban" 

187 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

188 assert res.admin_actions[0].note == admin_note 

189 assert res.admin_actions[0].admin_user_id == super_user.id 

190 assert res.admin_actions[0].admin_username == super_user.username 

191 

192 

193def test_UnbanUser(db): 

194 super_user, super_token = generate_user(is_superuser=True) 

195 normal_user, _ = generate_user() 

196 admin_note = "A good reason" 

197 

198 with real_admin_session(super_token) as api: 

199 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

200 assert res.user_id == normal_user.id 

201 assert res.username == normal_user.username 

202 assert res.email == normal_user.email 

203 assert res.gender == normal_user.gender 

204 assert parse_date(res.birthdate) == normal_user.birthdate 

205 assert not res.banned 

206 assert not res.deleted 

207 assert len(res.admin_actions) == 1 

208 assert res.admin_actions[0].action_type == "unban" 

209 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

210 

211 

212def test_AddAdminNote(db): 

213 super_user, super_token = generate_user(is_superuser=True) 

214 normal_user, _ = generate_user() 

215 admin_note1 = "User reported strange behavior" 

216 admin_note2 = "Insert private information here" 

217 

218 with real_admin_session(super_token) as api: 

219 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

220 assert res.user_id == normal_user.id 

221 assert res.username == normal_user.username 

222 assert res.email == normal_user.email 

223 assert res.gender == normal_user.gender 

224 assert parse_date(res.birthdate) == normal_user.birthdate 

225 assert not res.banned 

226 assert not res.deleted 

227 assert len(res.admin_actions) == 1 

228 assert res.admin_actions[0].action_type == "note" 

229 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL 

230 assert res.admin_actions[0].note == admin_note1 

231 

232 with real_admin_session(super_token) as api: 

233 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

234 assert len(res.admin_actions) == 2 

235 assert res.admin_actions[0].note == admin_note1 

236 assert res.admin_actions[1].note == admin_note2 

237 

238 

239def test_AddAdminNote_blank(db): 

240 super_user, super_token = generate_user(is_superuser=True) 

241 normal_user, _ = generate_user() 

242 empty_admin_note = " \t \n " 

243 

244 with real_admin_session(super_token) as api: 

245 with pytest.raises(grpc.RpcError) as e: 

246 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

247 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

248 assert e.value.details() == "The admin note cannot be empty." 

249 

250 

251def test_admin_content_reports(db): 

252 super_user, super_token = generate_user(is_superuser=True) 

253 normal_user, token = generate_user() 

254 bad_user1, _ = generate_user() 

255 bad_user2, _ = generate_user() 

256 

257 with reporting_session(token) as api: 

258 api.Report( 

259 reporting_pb2.ReportReq( 

260 reason="spam", 

261 description="r1", 

262 content_ref="comment/123", 

263 author_user=bad_user1.username, 

264 user_agent="n/a", 

265 page="https://couchers.org/comment/123", 

266 ) 

267 ) 

268 api.Report( 

269 reporting_pb2.ReportReq( 

270 reason="spam", 

271 description="r2", 

272 content_ref="comment/124", 

273 author_user=bad_user2.username, 

274 user_agent="n/a", 

275 page="https://couchers.org/comment/124", 

276 ) 

277 ) 

278 api.Report( 

279 reporting_pb2.ReportReq( 

280 reason="something else", 

281 description="r3", 

282 content_ref="page/321", 

283 author_user=bad_user1.username, 

284 user_agent="n/a", 

285 page="https://couchers.org/page/321", 

286 ) 

287 ) 

288 

289 with session_scope() as session: 

290 id_by_description: dict[str, int] = dict( 

291 session.execute(select(ContentReport.description, ContentReport.id)).all() # type: ignore[arg-type] 

292 ) 

293 

294 with real_admin_session(super_token) as api: 

295 with pytest.raises(grpc.RpcError) as e: 

296 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

297 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

298 assert e.value.details() == "Content report not found." 

299 

300 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

301 rep = res.content_report 

302 assert rep.content_report_id == id_by_description["r2"] 

303 assert rep.reporting_user_id == normal_user.id 

304 assert rep.author_user_id == bad_user2.id 

305 assert rep.reason == "spam" 

306 assert rep.description == "r2" 

307 assert rep.content_ref == "comment/124" 

308 assert rep.user_agent == "n/a" 

309 assert rep.page == "https://couchers.org/comment/124" 

310 

311 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

312 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

313 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

314 

315 

316def test_DeleteUser(db): 

317 super_user, super_token = generate_user(is_superuser=True) 

318 normal_user, normal_token = generate_user() 

319 

320 with real_admin_session(super_token) as api: 

321 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

322 assert res.user_id == normal_user.id 

323 assert res.username == normal_user.username 

324 assert res.email == normal_user.email 

325 assert res.gender == normal_user.gender 

326 assert parse_date(res.birthdate) == normal_user.birthdate 

327 assert not res.banned 

328 assert res.deleted 

329 

330 with real_admin_session(super_token) as api: 

331 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

332 assert res.user_id == normal_user.id 

333 assert res.username == normal_user.username 

334 assert res.email == normal_user.email 

335 assert res.gender == normal_user.gender 

336 assert parse_date(res.birthdate) == normal_user.birthdate 

337 assert not res.banned 

338 assert not res.deleted 

339 

340 

341def test_RecoverDeletedUser_after_user_initiated_deletion(db, push_collector: PushCollector): 

342 """ 

343 When a user deletes their account through the normal flow (ConfirmDeleteAccount), 

344 undelete_token and undelete_until are set. The admin RecoverDeletedUser must clear 

345 these fields to satisfy the undelete_nullity database constraint. 

346 """ 

347 super_user, super_token = generate_user(is_superuser=True) 

348 normal_user, normal_token = generate_user() 

349 user_id = normal_user.id 

350 

351 # User initiates account deletion 

352 with account_session(normal_token) as account: 

353 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

354 

355 # Get the deletion confirmation token 

356 with session_scope() as session: 

357 deletion_token = session.execute(select(AccountDeletionToken)).scalar_one().token 

358 

359 # User confirms account deletion (this sets undelete_token and undelete_until) 

360 with auth_api_session() as (auth_api, metadata_interceptor): 

361 auth_api.ConfirmDeleteAccount(auth_pb2.ConfirmDeleteAccountReq(token=deletion_token)) 

362 

363 # Verify the user is deleted and has undelete fields set 

364 with session_scope() as session: 

365 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

366 assert user.deleted_at is not None 

367 assert user.undelete_token is not None 

368 assert user.undelete_until is not None 

369 

370 # Admin recovers the user 

371 with real_admin_session(super_token) as api: 

372 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

373 assert res.user_id == user_id 

374 assert not res.deleted 

375 

376 # Verify undelete fields are cleared 

377 with session_scope() as session: 

378 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

379 assert user.deleted_at is None 

380 assert user.undelete_token is None 

381 assert user.undelete_until is None 

382 

383 

384def test_CreateApiKey(db, push_collector: PushCollector): 

385 with session_scope() as session: 

386 super_user, super_token = generate_user(is_superuser=True) 

387 normal_user, normal_token = generate_user() 

388 

389 assert ( 

390 session.execute( 

391 select(func.count()) 

392 .select_from(UserSession) 

393 .where(UserSession.is_api_key == True) 

394 .where(UserSession.user_id == normal_user.id) 

395 ).scalar_one() 

396 == 0 

397 ) 

398 

399 with mock_notification_email() as mock: 

400 with real_admin_session(super_token) as api: 

401 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

402 

403 mock.assert_called_once() 

404 e = email_fields(mock) 

405 assert e.subject == "[TEST] Your API key for Couchers.org" 

406 

407 with session_scope() as session: 

408 token = session.execute( 

409 select(UserSession.token) 

410 .where(UserSession.is_valid) 

411 .where(UserSession.is_api_key == True) 

412 .where(UserSession.user_id == normal_user.id) 

413 ).scalar_one() 

414 

415 assert token in e.plain 

416 assert token in e.html 

417 

418 assert e.recipient == normal_user.email 

419 assert "api key" in e.subject.lower() 

420 unique_string = "We've issued you with the following API key:" 

421 assert unique_string in e.plain 

422 assert unique_string in e.html 

423 assert "support@couchers.org" in e.plain 

424 assert "support@couchers.org" in e.html 

425 

426 push = push_collector.pop_for_user(normal_user.id, last=True) 

427 assert push.content.title == "API key created" 

428 assert push.content.body == "Details were sent to you via email." 

429 

430 

431def test_GetChats(db): 

432 super_user, super_token = generate_user(is_superuser=True) 

433 normal_user, normal_token = generate_user() 

434 

435 with real_admin_session(super_token) as api: 

436 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

437 assert res.user.user_id == normal_user.id 

438 assert res.user.username == normal_user.username 

439 assert res.user.name == normal_user.name 

440 # New user should have no chats 

441 assert len(res.host_requests) == 0 

442 assert len(res.group_chats) == 0 

443 

444 

445def test_badges(db, push_collector: PushCollector): 

446 super_user, super_token = generate_user(is_superuser=True) 

447 normal_user, normal_token = generate_user() 

448 

449 with real_admin_session(super_token) as api: 

450 # can add a badge 

451 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

452 with mock_notification_email() as mock: 

453 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

454 assert "swagster" in res.badges 

455 

456 # badge emails are disabled by default 

457 mock.assert_not_called() 

458 

459 push = push_collector.pop_for_user(normal_user.id, last=True) 

460 assert push.content.title == "New profile badge: Swagster" 

461 assert push.content.body == "The Swagster badge was added to your profile." 

462 

463 # can't add/edit special tags 

464 with pytest.raises(grpc.RpcError) as e: 

465 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

466 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

467 assert e.value.details() == "Admins cannot edit that badge." 

468 

469 # double add badge 

470 with pytest.raises(grpc.RpcError) as e: 

471 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

472 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

473 assert e.value.details() == "The user already has that badge." 

474 

475 # can remove badge 

476 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

477 with mock_notification_email() as mock: 

478 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

479 assert "swagster" not in res.badges 

480 

481 # badge emails are disabled by default 

482 mock.assert_not_called() 

483 

484 push = push_collector.pop_for_user(normal_user.id, last=True) 

485 assert push.content.title == "Profile badge removed" 

486 assert push.content.body == "The Swagster badge was removed from your profile." 

487 

488 # not found on user 

489 with pytest.raises(grpc.RpcError) as e: 

490 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

491 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

492 assert e.value.details() == "The user does not have that badge." 

493 

494 # not found in general 

495 with pytest.raises(grpc.RpcError) as e: 

496 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

497 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

498 assert e.value.details() == "Badge not found." 

499 

500 

501def test_DeleteEvent(db): 

502 super_user, super_token = generate_user(is_superuser=True) 

503 normal_user, normal_token = generate_user() 

504 

505 with session_scope() as session: 

506 create_community(session, 0, 2, "Community", [normal_user], [], None) 

507 

508 start_time = now() + timedelta(hours=2) 

509 end_time = start_time + timedelta(hours=3) 

510 with events_session(normal_token) as api: 

511 res = api.CreateEvent( 

512 events_pb2.CreateEventReq( 

513 title="Dummy Title", 

514 content="Dummy content.", 

515 photo_key=None, 

516 offline_information=events_pb2.OfflineEventInformation( 

517 address="Near Null Island", 

518 lat=0.1, 

519 lng=0.2, 

520 ), 

521 start_time=Timestamp_from_datetime(start_time), 

522 end_time=Timestamp_from_datetime(end_time), 

523 timezone="UTC", 

524 ) 

525 ) 

526 event_id = res.event_id 

527 assert not res.is_deleted 

528 

529 with session_scope() as session: 

530 with real_admin_session(super_token) as api: 

531 api.DeleteEvent( 

532 admin_pb2.DeleteEventReq( 

533 event_id=event_id, 

534 ) 

535 ) 

536 occurrence = session.get_one(EventOccurrence, ident=event_id) 

537 assert occurrence.is_deleted 

538 

539 

540def test_ListUserIds(db): 

541 super_user, super_token = generate_user(is_superuser=True) 

542 normal_user, normal_token = generate_user() 

543 

544 with real_admin_session(super_token) as api: 

545 res = api.ListUserIds( 

546 admin_pb2.ListUserIdsReq( 

547 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

548 ) 

549 ) 

550 assert len(res.user_ids) == 2 

551 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

552 

553 with real_admin_session(super_token) as api: 

554 res = api.ListUserIds( 

555 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

556 ) 

557 assert res.user_ids == [] 

558 

559 

560def test_EditReferenceText(db): 

561 super_user, super_token = generate_user(is_superuser=True) 

562 test_new_text = "New Text" 

563 

564 user1, user1_token = generate_user() 

565 user2, user2_token = generate_user() 

566 make_friends(user1, user2) 

567 

568 with session_scope() as session: 

569 with references_session(user1_token) as api: 

570 reference = api.WriteFriendReference( 

571 references_pb2.WriteFriendReferenceReq( 

572 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

573 ) 

574 ) 

575 

576 with real_admin_session(super_token) as admin_api: 

577 admin_api.EditReferenceText( 

578 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

579 ) 

580 

581 session.expire_all() 

582 

583 modified_reference = session.execute( 

584 select(Reference).where(Reference.id == reference.reference_id) 

585 ).scalar_one() 

586 assert modified_reference.text == test_new_text 

587 

588 

589def test_DeleteReference(db): 

590 super_user, super_token = generate_user(is_superuser=True) 

591 

592 user1, user1_token = generate_user() 

593 user2, user2_token = generate_user() 

594 make_friends(user1, user2) 

595 

596 with references_session(user1_token) as api: 

597 reference = api.WriteFriendReference( 

598 references_pb2.WriteFriendReferenceReq( 

599 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

600 ) 

601 ) 

602 

603 with references_session(user1_token) as api: 

604 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

605 

606 with real_admin_session(super_token) as admin_api: 

607 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

608 

609 with references_session(user1_token) as api: 

610 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

611 

612 with session_scope() as session: 

613 modified_reference = session.execute( 

614 select(Reference).where(Reference.id == reference.reference_id) 

615 ).scalar_one() 

616 assert modified_reference.is_deleted 

617 

618 

619def test_GetUserReferences(db): 

620 super_user, super_token = generate_user(is_superuser=True) 

621 

622 user1, user1_token = generate_user() 

623 user2, user2_token = generate_user() 

624 user3, user3_token = generate_user() 

625 make_friends(user1, user2) 

626 make_friends(user1, user3) 

627 make_friends(user2, user3) 

628 

629 # user1 writes reference about user2 

630 with references_session(user1_token) as api: 

631 ref1 = api.WriteFriendReference( 

632 references_pb2.WriteFriendReferenceReq( 

633 to_user_id=user2.id, 

634 text="Reference from user1 to user2", 

635 private_text="", 

636 was_appropriate=True, 

637 rating=1, 

638 ) 

639 ) 

640 

641 # user2 writes reference about user1 

642 with references_session(user2_token) as api: 

643 ref2 = api.WriteFriendReference( 

644 references_pb2.WriteFriendReferenceReq( 

645 to_user_id=user1.id, 

646 text="Reference from user2 to user1", 

647 private_text="Private note", 

648 was_appropriate=True, 

649 rating=0.8, 

650 ) 

651 ) 

652 

653 # user3 writes reference about user1 

654 with references_session(user3_token) as api: 

655 ref3 = api.WriteFriendReference( 

656 references_pb2.WriteFriendReferenceReq( 

657 to_user_id=user1.id, 

658 text="Reference from user3 to user1", 

659 private_text="", 

660 was_appropriate=False, 

661 rating=0.5, 

662 ) 

663 ) 

664 

665 # Delete ref3 

666 with real_admin_session(super_token) as admin_api: 

667 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=ref3.reference_id)) 

668 

669 # Test GetUserReferences for user1 

670 with real_admin_session(super_token) as admin_api: 

671 res = admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user=user1.username)) 

672 

673 # user1 wrote 1 reference 

674 assert len(res.references_from) == 1 

675 assert res.references_from[0].reference_id == ref1.reference_id 

676 assert res.references_from[0].from_user_id == user1.id 

677 assert res.references_from[0].to_user_id == user2.id 

678 assert res.references_from[0].text == "Reference from user1 to user2" 

679 assert res.references_from[0].is_deleted is False 

680 

681 # user1 received 2 references (including the deleted one) 

682 assert len(res.references_to) == 2 

683 # Ordered by id descending, so ref3 comes first 

684 assert res.references_to[0].reference_id == ref3.reference_id 

685 assert res.references_to[0].is_deleted is True 

686 assert res.references_to[0].was_appropriate is False 

687 

688 assert res.references_to[1].reference_id == ref2.reference_id 

689 assert res.references_to[1].private_text == "Private note" 

690 assert res.references_to[1].rating == 0.8 

691 assert res.references_to[1].is_deleted is False 

692 

693 

694def test_GetUserReferences_not_found(db): 

695 super_user, super_token = generate_user(is_superuser=True) 

696 

697 with real_admin_session(super_token) as admin_api: 

698 with pytest.raises(grpc.RpcError) as e: 

699 admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user="nonexistent")) 

700 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

701 

702 

703def test_GetFriendRequests(db): 

704 super_user, super_token = generate_user(is_superuser=True) 

705 

706 user1, _ = generate_user() 

707 user2, _ = generate_user() 

708 user3, _ = generate_user() 

709 user4, _ = generate_user() 

710 

711 # Create a mix of friend requests directly so we control the state 

712 def _add_friend_request(from_user_id, to_user_id, status, visibility, time_responded=None): 

713 with session_scope() as session: 

714 mod_state = ModerationState( 

715 object_type=ModerationObjectType.friend_request, 

716 object_id=0, 

717 visibility=visibility, 

718 ) 

719 session.add(mod_state) 

720 session.flush() 

721 rel = FriendRelationship( 

722 from_user_id=from_user_id, 

723 to_user_id=to_user_id, 

724 status=status, 

725 moderation_state_id=mod_state.id, 

726 time_responded=time_responded, 

727 ) 

728 session.add(rel) 

729 session.flush() 

730 mod_state.object_id = rel.id 

731 

732 # user1 -> user2: pending, shadowed 

733 _add_friend_request(user1.id, user2.id, FriendStatus.pending, ModerationVisibility.shadowed) 

734 # user1 -> user3: accepted, visible 

735 _add_friend_request(user1.id, user3.id, FriendStatus.accepted, ModerationVisibility.visible, time_responded=now()) 

736 # user4 -> user1: rejected, visible 

737 _add_friend_request(user4.id, user1.id, FriendStatus.rejected, ModerationVisibility.visible, time_responded=now()) 

738 

739 with real_admin_session(super_token) as admin_api: 

740 res = admin_api.GetFriendRequests(admin_pb2.GetFriendRequestsReq(user=user1.username)) 

741 

742 # user1 sent two: to user2 (pending) and to user3 (accepted), ordered by id desc 

743 assert len(res.sent) == 2 

744 assert res.sent[0].from_user.user_id == user1.id 

745 assert res.sent[0].to_user.user_id == user3.id 

746 assert res.sent[0].status == "accepted" 

747 assert res.sent[0].HasField("time_responded") 

748 assert res.sent[0].moderation_visibility == "visible" 

749 

750 assert res.sent[1].from_user.user_id == user1.id 

751 assert res.sent[1].to_user.user_id == user2.id 

752 assert res.sent[1].status == "pending" 

753 assert not res.sent[1].HasField("time_responded") 

754 assert res.sent[1].moderation_visibility == "shadowed" 

755 

756 # user1 received one: from user4 (rejected) 

757 assert len(res.received) == 1 

758 assert res.received[0].from_user.user_id == user4.id 

759 assert res.received[0].to_user.user_id == user1.id 

760 assert res.received[0].status == "rejected" 

761 

762 

763def test_GetFriendRequests_not_found(db): 

764 super_user, super_token = generate_user(is_superuser=True) 

765 

766 with real_admin_session(super_token) as admin_api: 

767 with pytest.raises(grpc.RpcError) as e: 

768 admin_api.GetFriendRequests(admin_pb2.GetFriendRequestsReq(user="nonexistent")) 

769 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

770 

771 

772def test_AddUsersToModerationUserList(db): 

773 super_user, super_token = generate_user(is_superuser=True) 

774 user1, _ = generate_user() 

775 user2, _ = generate_user() 

776 user3, _ = generate_user() 

777 user4, _ = generate_user() 

778 user5, _ = generate_user() 

779 moderation_list_id = add_users_to_new_moderation_list([user1]) 

780 

781 with session_scope() as session: 

782 with real_admin_session(super_token) as api: 

783 # Test adding users to a non-existent moderation list (should raise an error) 

784 with pytest.raises(grpc.RpcError) as e: 

785 api.AddUsersToModerationUserList( 

786 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999), 

787 ) 

788 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

789 assert "Moderation user list not found." == e.value.details() 

790 

791 # Test with non-existent user (should raise an error) 

792 with pytest.raises(grpc.RpcError) as e: 

793 api.AddUsersToModerationUserList( 

794 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]), 

795 ) 

796 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

797 assert "Couldn't find that user." == e.value.details() 

798 

799 # Test successful creation of new moderation list (no moderation_list_id provided) 

800 res = api.AddUsersToModerationUserList( 

801 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]), 

802 ) 

803 assert res.moderation_list_id > 0 

804 with session_scope() as session: 

805 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id) 

806 assert moderation_user_list is not None 

807 assert len(moderation_user_list.users) == 3 

808 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users}) 

809 

810 # Test list endpoint returns same moderation list with same members not repeated 

811 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

812 assert len(listRes.moderation_lists) == 1 

813 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id 

814 assert len(listRes.moderation_lists[0].members) == 3 

815 assert {user1.id, user2.id, user3.id}.issubset({m.user_id for m in listRes.moderation_lists[0].members}) 

816 

817 # Test user can be in multiple moderation lists 

818 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

819 assert len(listRes3.moderation_lists) == 2 

820 

821 # Test adding users to an existing moderation list 

822 res2 = api.AddUsersToModerationUserList( 

823 admin_pb2.AddUsersToModerationUserListReq( 

824 users=[user4.username, user5.username], moderation_list_id=moderation_list_id 

825 ), 

826 ) 

827 assert res2.moderation_list_id == moderation_list_id 

828 with session_scope() as session: 

829 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id) 

830 assert len(moderation_user_list.users) == 3 

831 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users}) 

832 

833 # Test list user moderation lists endpoint returns the right moderation list 

834 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username)) 

835 assert len(listRes2.moderation_lists) == 1 

836 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id 

837 assert len(listRes2.moderation_lists[0].members) == 3 

838 assert {user1.id, user4.id, user5.id}.issubset({m.user_id for m in listRes2.moderation_lists[0].members}) 

839 

840 

841def test_RemoveUserFromModerationUserList(db): 

842 super_user, super_token = generate_user(is_superuser=True) 

843 user1, _ = generate_user() 

844 user2, _ = generate_user() 

845 user3, _ = generate_user() 

846 moderation_list_id = add_users_to_new_moderation_list([user1, user2]) 

847 

848 with real_admin_session(super_token) as api: 

849 # Test with non-existent user (should raise error) 

850 with pytest.raises(grpc.RpcError) as e: 

851 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent")) 

852 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

853 assert "Couldn't find that user." == e.value.details() 

854 

855 # Test without providing moderation list id (should raise error) 

856 with pytest.raises(grpc.RpcError) as e: 

857 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username)) 

858 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

859 assert "Missing moderation user list id." == e.value.details() 

860 

861 # Test removing user that's not in the provided moderation list (should raise error) 

862 with pytest.raises(grpc.RpcError) as e: 

863 api.RemoveUserFromModerationUserList( 

864 admin_pb2.RemoveUserFromModerationUserListReq( 

865 user=user3.username, moderation_list_id=moderation_list_id 

866 ) 

867 ) 

868 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

869 assert "User is not in the moderation user list." == e.value.details() 

870 

871 # Test successful removal 

872 api.RemoveUserFromModerationUserList( 

873 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id) 

874 ) 

875 with session_scope() as session: 

876 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id) 

877 assert user1.id not in {user.id for user in moderation_user_list.users} 

878 assert user2.id in {user.id for user in moderation_user_list.users} 

879 

880 # Test list user moderation lists endpoint returns right number of moderation lists 

881 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

882 assert len(listRes.moderation_lists) == 0 

883 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

884 assert len(listRes2.moderation_lists) == 1 

885 

886 # Test removing all users from moderation list should also delete the moderation list 

887 api.RemoveUserFromModerationUserList( 

888 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id) 

889 ) 

890 with session_scope() as session: 

891 assert session.get(ModerationUserList, moderation_list_id) is None 

892 

893 

894def test_admin_delete_account_url(db, push_collector: PushCollector): 

895 super_user, super_token = generate_user(is_superuser=True) 

896 

897 user, token = generate_user() 

898 user_id = user.id 

899 

900 with real_admin_session(super_token) as admin_api: 

901 url = admin_api.CreateAccountDeletionLink( 

902 admin_pb2.CreateAccountDeletionLinkReq(user=user.username) 

903 ).account_deletion_confirm_url 

904 

905 assert push_collector.count_for_user(user_id) == 0 

906 

907 with session_scope() as session: 

908 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

909 token = token_o.token 

910 assert token_o.user.id == user_id 

911 assert url == f"http://localhost:3000/delete-account?token={token}" 

912 

913 with mock_notification_email() as mock: 

914 with auth_api_session() as (auth_api, metadata_interceptor): 

915 auth_api.ConfirmDeleteAccount( 

916 auth_pb2.ConfirmDeleteAccountReq( 

917 token=token, 

918 ) 

919 ) 

920 

921 push = push_collector.pop_for_user(user_id, last=True) 

922 assert push.content.title == "Account deleted" 

923 assert push.content.body == "You can restore it within 7 days using the link we emailed you." 

924 mock.assert_called_once() 

925 e = email_fields(mock) 

926 

927 

928def test_AccessStats(db): 

929 super_user, super_token = generate_user(is_superuser=True) 

930 normal_user, normal_token = generate_user() 

931 

932 # Insert UserActivity rows: a couple inside the default 90-day window, one well 

933 # outside it, and one with NULL ip_address / user_agent. The INET column is 

934 # returned by psycopg3 as an IPv4Address/IPv6Address object, which used to 

935 # crash the proto string assignment. 

936 in_window_1 = now() - timedelta(days=1) 

937 in_window_2 = now() - timedelta(days=10) 

938 out_of_window = now() - timedelta(days=200) 

939 with session_scope() as session: 

940 session.add( 

941 UserActivity( 

942 user_id=normal_user.id, period=in_window_1, ip_address="1.2.3.4", user_agent="ua-a", api_calls=5 

943 ) 

944 ) 

945 session.add( 

946 UserActivity( 

947 user_id=normal_user.id, period=in_window_2, ip_address="2001:db8::1", user_agent="ua-b", api_calls=3 

948 ) 

949 ) 

950 session.add( 

951 UserActivity( 

952 user_id=normal_user.id, period=out_of_window, ip_address="9.9.9.9", user_agent="ua-old", api_calls=99 

953 ) 

954 ) 

955 session.add(UserActivity(user_id=normal_user.id, period=in_window_1, api_calls=1)) 

956 

957 with real_admin_session(super_token) as api: 

958 res = api.AccessStats(admin_pb2.AccessStatsReq(user=normal_user.username)) 

959 

960 by_ip = {s.ip_address: s for s in res.stats} 

961 assert "1.2.3.4" in by_ip 

962 assert by_ip["1.2.3.4"].api_call_count == 5 

963 assert by_ip["1.2.3.4"].user_agent == "ua-a" 

964 assert "2001:db8::1" in by_ip 

965 assert by_ip["2001:db8::1"].api_call_count == 3 

966 # NULL ip_address row produces an empty-string ip_address in the proto 

967 assert "" in by_ip 

968 assert by_ip[""].api_call_count == 1 

969 # out-of-window row is excluded by the 90-day default 

970 assert "9.9.9.9" not in by_ip 

971 

972 # explicit end_time should bound the upper end of the window (regression: was >=) 

973 with real_admin_session(super_token) as api: 

974 res = api.AccessStats( 

975 admin_pb2.AccessStatsReq( 

976 user=normal_user.username, 

977 start_time=Timestamp_from_datetime(now() - timedelta(days=5)), 

978 end_time=Timestamp_from_datetime(now()), 

979 ) 

980 ) 

981 ips = {s.ip_address for s in res.stats} 

982 assert ips == {"1.2.3.4", ""} 

983 

984 

985def test_SetLastDonated(db): 

986 super_user, super_token = generate_user(is_superuser=True) 

987 normal_user, normal_token = generate_user(last_donated=None) 

988 

989 with real_admin_session(super_token) as api: 

990 # user starts with no last_donated 

991 with session_scope() as session: 

992 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

993 assert user.last_donated is None 

994 

995 # can set last_donated 

996 donation_time = now() - timedelta(days=30) 

997 res = api.SetLastDonated( 

998 admin_pb2.SetLastDonatedReq( 

999 user=normal_user.username, 

1000 last_donated=Timestamp_from_datetime(donation_time), 

1001 ) 

1002 ) 

1003 

1004 with session_scope() as session: 

1005 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

1006 assert user.last_donated is not None 

1007 # check timestamp is close (within a second) 

1008 assert abs((user.last_donated - donation_time).total_seconds()) < 1 

1009 

1010 # can clear last_donated by not setting the field 

1011 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username)) 

1012 

1013 with session_scope() as session: 

1014 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

1015 assert user.last_donated is None 

1016 

1017 # user not found 

1018 with pytest.raises(grpc.RpcError) as e: 

1019 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent")) 

1020 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1021 assert e.value.details() == "Couldn't find that user." 

1022 

1023 

1024def test_admin_actions_level(db): 

1025 super_user, super_token = generate_user(is_superuser=True) 

1026 normal_user, _ = generate_user() 

1027 

1028 with real_admin_session(super_token) as api: 

1029 # Default level is NORMAL 

1030 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note="normal note")) 

1031 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL 

1032 

1033 # Explicitly set to DEBUG 

1034 res = api.AddAdminNote( 

1035 admin_pb2.AddAdminNoteReq( 

1036 user=normal_user.username, 

1037 admin_note="debug note", 

1038 level=admin_pb2.ADMIN_ACTION_LEVEL_DEBUG, 

1039 ) 

1040 ) 

1041 assert len(res.admin_actions) == 2 

1042 assert res.admin_actions[1].level == admin_pb2.ADMIN_ACTION_LEVEL_DEBUG 

1043 

1044 # Explicitly set to HIGH 

1045 res = api.AddAdminNote( 

1046 admin_pb2.AddAdminNoteReq( 

1047 user=normal_user.username, 

1048 admin_note="high note", 

1049 level=admin_pb2.ADMIN_ACTION_LEVEL_HIGH, 

1050 ) 

1051 ) 

1052 assert len(res.admin_actions) == 3 

1053 assert res.admin_actions[2].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

1054 

1055 

1056def test_admin_actions_on_mutations(db, push_collector: PushCollector): 

1057 super_user, super_token = generate_user(is_superuser=True) 

1058 normal_user, _ = generate_user() 

1059 

1060 original_gender = normal_user.gender 

1061 original_birthdate = normal_user.birthdate 

1062 

1063 with real_admin_session(super_token) as api: 

1064 # ChangeUserGender 

1065 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

1066 assert any( 

1067 a.action_type == "change_gender" and a.note == f"Changed from '{original_gender}' to 'Machine'" 

1068 for a in res.admin_actions 

1069 ) 

1070 

1071 # ChangeUserBirthdate 

1072 res = api.ChangeUserBirthdate( 

1073 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-01-01") 

1074 ) 

1075 assert any( 

1076 a.action_type == "change_birthdate" and a.note == f"Changed from {original_birthdate} to 1990-01-01" 

1077 for a in res.admin_actions 

1078 ) 

1079 

1080 # SetPassportSexGenderException 

1081 res = api.SetPassportSexGenderException( 

1082 admin_pb2.SetPassportSexGenderExceptionReq(user=normal_user.username, passport_sex_gender_exception=True) 

1083 ) 

1084 assert any( 

1085 a.action_type == "set_passport_sex_gender_exception" and a.note == "Changed from False to True" 

1086 for a in res.admin_actions 

1087 ) 

1088 

1089 # SendModNote with notify 

1090 res = api.SendModNote( 

1091 admin_pb2.SendModNoteReq( 

1092 user=normal_user.username, content="Please update your profile", internal_id="test1" 

1093 ) 

1094 ) 

1095 assert any( 

1096 a.action_type == "send_mod_note" and a.note == "Notify user: Yes\n\nPlease update your profile" 

1097 for a in res.admin_actions 

1098 ) 

1099 

1100 # SendModNote with do_not_notify 

1101 res = api.SendModNote( 

1102 admin_pb2.SendModNoteReq( 

1103 user=normal_user.username, 

1104 content="Silent note", 

1105 internal_id="test2", 

1106 do_not_notify=True, 

1107 ) 

1108 ) 

1109 assert any( 

1110 a.action_type == "send_mod_note" and a.note == "Notify user: No\n\nSilent note" for a in res.admin_actions 

1111 ) 

1112 

1113 # DeleteUser 

1114 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

1115 assert any(a.action_type == "delete_user" for a in res.admin_actions) 

1116 assert any( 

1117 a.action_type == "delete_user" and a.level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH for a in res.admin_actions 

1118 ) 

1119 

1120 # RecoverDeletedUser 

1121 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

1122 assert any(a.action_type == "recover_user" for a in res.admin_actions) 

1123 

1124 # MarkUserNeedsLocationUpdate 

1125 res = api.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=normal_user.username)) 

1126 assert any( 

1127 a.action_type == "mark_needs_location_update" and a.note == "Marked user as needing location update" 

1128 for a in res.admin_actions 

1129 ) 

1130 

1131 # SetLastDonated 

1132 res = api.SetLastDonated( 

1133 admin_pb2.SetLastDonatedReq( 

1134 user=normal_user.username, 

1135 last_donated=Timestamp_from_datetime(now()), 

1136 ) 

1137 ) 

1138 assert any(a.action_type == "set_last_donated" for a in res.admin_actions) 

1139 

1140 

1141def test_create_admin_tag(db): 

1142 super_user, super_token = generate_user(is_superuser=True) 

1143 

1144 with real_admin_session(super_token) as api: 

1145 res = api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

1146 assert res.tag == "test-tag" 

1147 assert res.admin_tag_id > 0 

1148 

1149 

1150def test_create_admin_tag_duplicate(db): 

1151 super_user, super_token = generate_user(is_superuser=True) 

1152 

1153 with real_admin_session(super_token) as api: 

1154 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

1155 with pytest.raises(grpc.RpcError) as e: 

1156 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

1157 assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS 

1158 assert e.value.details() == "That admin tag already exists." 

1159 

1160 

1161def test_create_admin_tag_empty(db): 

1162 super_user, super_token = generate_user(is_superuser=True) 

1163 

1164 with real_admin_session(super_token) as api: 

1165 with pytest.raises(grpc.RpcError) as e: 

1166 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="")) 

1167 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1168 assert e.value.details() == "The admin tag cannot be empty." 

1169 

1170 with pytest.raises(grpc.RpcError) as e: 

1171 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=" ")) 

1172 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1173 assert e.value.details() == "The admin tag cannot be empty." 

1174 

1175 

1176def test_list_admin_tags(db): 

1177 super_user, super_token = generate_user(is_superuser=True) 

1178 

1179 with real_admin_session(super_token) as api: 

1180 # Empty initially 

1181 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq()) 

1182 assert len(res.tags) == 0 

1183 

1184 # Add some tags 

1185 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="bravo")) 

1186 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="alpha")) 

1187 

1188 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq()) 

1189 assert len(res.tags) == 2 

1190 # Ordered alphabetically 

1191 assert res.tags[0].tag == "alpha" 

1192 assert res.tags[1].tag == "bravo" 

1193 

1194 

1195def test_add_admin_tag_to_user(db): 

1196 super_user, super_token = generate_user(is_superuser=True) 

1197 normal_user, _ = generate_user() 

1198 

1199 with real_admin_session(super_token) as api: 

1200 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1201 

1202 res = api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1203 assert "vip" in res.admin_tags 

1204 assert any(a.action_type == "add_tag" and a.tag == "vip" for a in res.admin_actions) 

1205 

1206 

1207def test_add_admin_tag_to_user_duplicate(db): 

1208 super_user, super_token = generate_user(is_superuser=True) 

1209 normal_user, _ = generate_user() 

1210 

1211 with real_admin_session(super_token) as api: 

1212 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1213 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1214 

1215 with pytest.raises(grpc.RpcError) as e: 

1216 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1217 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1218 assert e.value.details() == "The user already has that admin tag." 

1219 

1220 

1221def test_add_admin_tag_to_user_tag_not_found(db): 

1222 super_user, super_token = generate_user(is_superuser=True) 

1223 normal_user, _ = generate_user() 

1224 

1225 with real_admin_session(super_token) as api: 

1226 with pytest.raises(grpc.RpcError) as e: 

1227 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="nonexistent")) 

1228 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1229 assert e.value.details() == "Admin tag not found." 

1230 

1231 

1232def test_remove_admin_tag_from_user(db): 

1233 super_user, super_token = generate_user(is_superuser=True) 

1234 normal_user, _ = generate_user() 

1235 

1236 with real_admin_session(super_token) as api: 

1237 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1238 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1239 

1240 res = api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip")) 

1241 assert "vip" not in res.admin_tags 

1242 assert any(a.action_type == "remove_tag" and a.tag == "vip" for a in res.admin_actions) 

1243 

1244 

1245def test_remove_admin_tag_from_user_not_assigned(db): 

1246 super_user, super_token = generate_user(is_superuser=True) 

1247 normal_user, _ = generate_user() 

1248 

1249 with real_admin_session(super_token) as api: 

1250 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1251 

1252 with pytest.raises(grpc.RpcError) as e: 

1253 api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip")) 

1254 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1255 assert e.value.details() == "The user does not have that admin tag." 

1256 

1257 

1258def test_search_users_by_admin_tag(db): 

1259 super_user, super_token = generate_user(is_superuser=True) 

1260 user1, _ = generate_user() 

1261 user2, _ = generate_user() 

1262 user3, _ = generate_user() 

1263 

1264 with real_admin_session(super_token) as api: 

1265 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1266 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="flagged")) 

1267 

1268 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user1.username, tag="vip")) 

1269 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="vip")) 

1270 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="flagged")) 

1271 

1272 # Search for users with "vip" tag 

1273 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip"])) 

1274 user_ids = {u.user_id for u in res.users} 

1275 assert user1.id in user_ids 

1276 assert user2.id in user_ids 

1277 assert user3.id not in user_ids 

1278 

1279 # Search for users with both "vip" AND "flagged" tags (AND logic) 

1280 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip", "flagged"])) 

1281 user_ids = {u.user_id for u in res.users} 

1282 assert user2.id in user_ids 

1283 assert user1.id not in user_ids 

1284 

1285 # Search for non-existent tag returns no results 

1286 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["nonexistent"])) 

1287 assert len(res.users) == 0 

1288 

1289 

1290def test_search_users_by_admin_note(db): 

1291 super_user, super_token = generate_user(is_superuser=True) 

1292 user1, _ = generate_user() 

1293 user2, _ = generate_user() 

1294 

1295 with real_admin_session(super_token) as api: 

1296 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="suspicious activity")) 

1297 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="normal user")) 

1298 

1299 # Search by admin action log content (ilike) 

1300 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_action_log="%suspicious%")) 

1301 user_ids = {u.user_id for u in res.users} 

1302 assert user1.id in user_ids 

1303 assert user2.id not in user_ids 

1304 

1305 

1306# community invite feature tested in test_events.py 

1307# SendBlogPostNotification tested in test_notifications.py 

1308# MarkUserNeedsLocationUpdate tested in test_jail.py