Coverage for app / backend / src / tests / test_admin.py: 100%

674 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1from datetime import date, datetime, timedelta 

2 

3import grpc 

4import pytest 

5from sqlalchemy import select 

6from sqlalchemy.sql import func 

7 

8from couchers.db import session_scope 

9from couchers.models import ( 

10 AccountDeletionToken, 

11 ContentReport, 

12 EventOccurrence, 

13 ModerationUserList, 

14 Reference, 

15 User, 

16 UserSession, 

17) 

18from couchers.proto import account_pb2, admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2 

19from couchers.utils import Timestamp_from_datetime, now, parse_date 

20from tests.fixtures.db import add_users_to_new_moderation_list, generate_user, make_friends 

21from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email 

22from tests.fixtures.sessions import ( 

23 account_session, 

24 auth_api_session, 

25 events_session, 

26 real_admin_session, 

27 references_session, 

28 reporting_session, 

29) 

30from tests.test_communities import create_community 

31 

32 

33@pytest.fixture(autouse=True) 

34def _(testconfig): 

35 pass 

36 

37 

38def test_access_by_normal_user(db): 

39 normal_user, normal_token = generate_user() 

40 

41 with real_admin_session(normal_token) as api: 

42 # all requests to the admin servicer should break when done by a non-super_user 

43 with pytest.raises(grpc.RpcError) as e: 

44 api.GetUserDetails( 

45 admin_pb2.GetUserDetailsReq( 

46 user=str(normal_user.id), 

47 ) 

48 ) 

49 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

50 

51 

52def test_GetUser(db): 

53 super_user, super_token = generate_user(is_superuser=True) 

54 normal_user, normal_token = generate_user() 

55 

56 with real_admin_session(super_token) as api: 

57 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

58 assert res.user_id == normal_user.id 

59 assert res.username == normal_user.username 

60 

61 with real_admin_session(super_token) as api: 

62 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

63 

64 with real_admin_session(super_token) as api: 

65 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

66 assert res.user_id == normal_user.id 

67 assert res.username == normal_user.username 

68 

69 

70def test_GetUserDetails(db): 

71 super_user, super_token = generate_user(is_superuser=True) 

72 normal_user, normal_token = generate_user() 

73 

74 with real_admin_session(super_token) as api: 

75 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

76 assert res.user_id == normal_user.id 

77 assert res.username == normal_user.username 

78 assert res.email == normal_user.email 

79 assert res.gender == normal_user.gender 

80 assert parse_date(res.birthdate) == normal_user.birthdate 

81 assert not res.banned 

82 assert not res.deleted 

83 

84 with real_admin_session(super_token) as api: 

85 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

86 assert res.user_id == normal_user.id 

87 assert res.username == normal_user.username 

88 assert res.email == normal_user.email 

89 assert res.gender == normal_user.gender 

90 assert parse_date(res.birthdate) == normal_user.birthdate 

91 assert not res.banned 

92 assert not res.deleted 

93 

94 with real_admin_session(super_token) as api: 

95 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

96 assert res.user_id == normal_user.id 

97 assert res.username == normal_user.username 

98 assert res.email == normal_user.email 

99 assert res.gender == normal_user.gender 

100 assert parse_date(res.birthdate) == normal_user.birthdate 

101 assert not res.banned 

102 assert not res.deleted 

103 

104 

105def test_ChangeUserGender(db, push_collector: PushCollector): 

106 super_user, super_token = generate_user(is_superuser=True) 

107 normal_user, normal_token = generate_user() 

108 

109 with real_admin_session(super_token) as api: 

110 with mock_notification_email() as mock: 

111 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

112 assert res.user_id == normal_user.id 

113 assert res.username == normal_user.username 

114 assert res.email == normal_user.email 

115 assert res.gender == "Machine" 

116 assert parse_date(res.birthdate) == normal_user.birthdate 

117 assert not res.banned 

118 assert not res.deleted 

119 

120 mock.assert_called_once() 

121 e = email_fields(mock) 

122 assert e.subject == "[TEST] Your gender was changed" 

123 assert e.recipient == normal_user.email 

124 assert "Machine" in e.plain 

125 assert "Machine" in e.html 

126 

127 push = push_collector.pop_for_user(normal_user.id, last=True) 

128 assert push.content.title == "Gender changed" 

129 assert push.content.body == "An admin changed your gender to Machine." 

130 

131 

132def test_ChangeUserBirthdate(db, push_collector: PushCollector): 

133 super_user, super_token = generate_user(is_superuser=True) 

134 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

135 

136 with real_admin_session(super_token) as api: 

137 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

138 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

139 

140 with mock_notification_email() as mock: 

141 res = api.ChangeUserBirthdate( 

142 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

143 ) 

144 

145 assert res.user_id == normal_user.id 

146 assert res.username == normal_user.username 

147 assert res.email == normal_user.email 

148 assert res.birthdate == "1990-05-25" 

149 assert res.gender == normal_user.gender 

150 assert not res.banned 

151 assert not res.deleted 

152 

153 mock.assert_called_once() 

154 e = email_fields(mock) 

155 assert e.subject == "[TEST] Your date of birth was changed" 

156 assert e.recipient == normal_user.email 

157 assert "1990" in e.plain 

158 assert "1990" in e.html 

159 

160 push = push_collector.pop_for_user(normal_user.id, last=True) 

161 assert push.content.title == "Birthdate changed" 

162 assert push.content.body == "An admin changed your date of birth to May 25, 1990." 

163 

164 

165def test_BanUser(db): 

166 super_user, super_token = generate_user(is_superuser=True) 

167 normal_user, _ = generate_user() 

168 admin_note = "A good reason" 

169 

170 with real_admin_session(super_token) as api: 

171 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

172 assert res.user_id == normal_user.id 

173 assert res.username == normal_user.username 

174 assert res.email == normal_user.email 

175 assert res.gender == normal_user.gender 

176 assert parse_date(res.birthdate) == normal_user.birthdate 

177 assert res.banned 

178 assert not res.deleted 

179 assert len(res.admin_actions) == 1 

180 assert res.admin_actions[0].action_type == "ban" 

181 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

182 assert res.admin_actions[0].note == admin_note 

183 assert res.admin_actions[0].admin_user_id == super_user.id 

184 assert res.admin_actions[0].admin_username == super_user.username 

185 

186 

187def test_UnbanUser(db): 

188 super_user, super_token = generate_user(is_superuser=True) 

189 normal_user, _ = generate_user() 

190 admin_note = "A good reason" 

191 

192 with real_admin_session(super_token) as api: 

193 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

194 assert res.user_id == normal_user.id 

195 assert res.username == normal_user.username 

196 assert res.email == normal_user.email 

197 assert res.gender == normal_user.gender 

198 assert parse_date(res.birthdate) == normal_user.birthdate 

199 assert not res.banned 

200 assert not res.deleted 

201 assert len(res.admin_actions) == 1 

202 assert res.admin_actions[0].action_type == "unban" 

203 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

204 

205 

206def test_AddAdminNote(db): 

207 super_user, super_token = generate_user(is_superuser=True) 

208 normal_user, _ = generate_user() 

209 admin_note1 = "User reported strange behavior" 

210 admin_note2 = "Insert private information here" 

211 

212 with real_admin_session(super_token) as api: 

213 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

214 assert res.user_id == normal_user.id 

215 assert res.username == normal_user.username 

216 assert res.email == normal_user.email 

217 assert res.gender == normal_user.gender 

218 assert parse_date(res.birthdate) == normal_user.birthdate 

219 assert not res.banned 

220 assert not res.deleted 

221 assert len(res.admin_actions) == 1 

222 assert res.admin_actions[0].action_type == "note" 

223 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL 

224 assert res.admin_actions[0].note == admin_note1 

225 

226 with real_admin_session(super_token) as api: 

227 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

228 assert len(res.admin_actions) == 2 

229 assert res.admin_actions[0].note == admin_note1 

230 assert res.admin_actions[1].note == admin_note2 

231 

232 

233def test_AddAdminNote_blank(db): 

234 super_user, super_token = generate_user(is_superuser=True) 

235 normal_user, _ = generate_user() 

236 empty_admin_note = " \t \n " 

237 

238 with real_admin_session(super_token) as api: 

239 with pytest.raises(grpc.RpcError) as e: 

240 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

241 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

242 assert e.value.details() == "The admin note cannot be empty." 

243 

244 

245def test_admin_content_reports(db): 

246 super_user, super_token = generate_user(is_superuser=True) 

247 normal_user, token = generate_user() 

248 bad_user1, _ = generate_user() 

249 bad_user2, _ = generate_user() 

250 

251 with reporting_session(token) as api: 

252 api.Report( 

253 reporting_pb2.ReportReq( 

254 reason="spam", 

255 description="r1", 

256 content_ref="comment/123", 

257 author_user=bad_user1.username, 

258 user_agent="n/a", 

259 page="https://couchers.org/comment/123", 

260 ) 

261 ) 

262 api.Report( 

263 reporting_pb2.ReportReq( 

264 reason="spam", 

265 description="r2", 

266 content_ref="comment/124", 

267 author_user=bad_user2.username, 

268 user_agent="n/a", 

269 page="https://couchers.org/comment/124", 

270 ) 

271 ) 

272 api.Report( 

273 reporting_pb2.ReportReq( 

274 reason="something else", 

275 description="r3", 

276 content_ref="page/321", 

277 author_user=bad_user1.username, 

278 user_agent="n/a", 

279 page="https://couchers.org/page/321", 

280 ) 

281 ) 

282 

283 with session_scope() as session: 

284 id_by_description: dict[str, int] = dict( 

285 session.execute(select(ContentReport.description, ContentReport.id)).all() # type: ignore[arg-type] 

286 ) 

287 

288 with real_admin_session(super_token) as api: 

289 with pytest.raises(grpc.RpcError) as e: 

290 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

291 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

292 assert e.value.details() == "Content report not found." 

293 

294 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

295 rep = res.content_report 

296 assert rep.content_report_id == id_by_description["r2"] 

297 assert rep.reporting_user_id == normal_user.id 

298 assert rep.author_user_id == bad_user2.id 

299 assert rep.reason == "spam" 

300 assert rep.description == "r2" 

301 assert rep.content_ref == "comment/124" 

302 assert rep.user_agent == "n/a" 

303 assert rep.page == "https://couchers.org/comment/124" 

304 

305 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

306 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

307 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

308 

309 

310def test_DeleteUser(db): 

311 super_user, super_token = generate_user(is_superuser=True) 

312 normal_user, normal_token = generate_user() 

313 

314 with real_admin_session(super_token) as api: 

315 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

316 assert res.user_id == normal_user.id 

317 assert res.username == normal_user.username 

318 assert res.email == normal_user.email 

319 assert res.gender == normal_user.gender 

320 assert parse_date(res.birthdate) == normal_user.birthdate 

321 assert not res.banned 

322 assert res.deleted 

323 

324 with real_admin_session(super_token) as api: 

325 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

326 assert res.user_id == normal_user.id 

327 assert res.username == normal_user.username 

328 assert res.email == normal_user.email 

329 assert res.gender == normal_user.gender 

330 assert parse_date(res.birthdate) == normal_user.birthdate 

331 assert not res.banned 

332 assert not res.deleted 

333 

334 

335def test_RecoverDeletedUser_after_user_initiated_deletion(db, push_collector: PushCollector): 

336 """ 

337 When a user deletes their account through the normal flow (ConfirmDeleteAccount), 

338 undelete_token and undelete_until are set. The admin RecoverDeletedUser must clear 

339 these fields to satisfy the undelete_nullity database constraint. 

340 """ 

341 super_user, super_token = generate_user(is_superuser=True) 

342 normal_user, normal_token = generate_user() 

343 user_id = normal_user.id 

344 

345 # User initiates account deletion 

346 with account_session(normal_token) as account: 

347 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

348 

349 # Get the deletion confirmation token 

350 with session_scope() as session: 

351 deletion_token = session.execute(select(AccountDeletionToken)).scalar_one().token 

352 

353 # User confirms account deletion (this sets undelete_token and undelete_until) 

354 with auth_api_session() as (auth_api, metadata_interceptor): 

355 auth_api.ConfirmDeleteAccount(auth_pb2.ConfirmDeleteAccountReq(token=deletion_token)) 

356 

357 # Verify the user is deleted and has undelete fields set 

358 with session_scope() as session: 

359 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

360 assert user.deleted_at is not None 

361 assert user.undelete_token is not None 

362 assert user.undelete_until is not None 

363 

364 # Admin recovers the user 

365 with real_admin_session(super_token) as api: 

366 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

367 assert res.user_id == user_id 

368 assert not res.deleted 

369 

370 # Verify undelete fields are cleared 

371 with session_scope() as session: 

372 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

373 assert user.deleted_at is None 

374 assert user.undelete_token is None 

375 assert user.undelete_until is None 

376 

377 

378def test_CreateApiKey(db, push_collector: PushCollector): 

379 with session_scope() as session: 

380 super_user, super_token = generate_user(is_superuser=True) 

381 normal_user, normal_token = generate_user() 

382 

383 assert ( 

384 session.execute( 

385 select(func.count()) 

386 .select_from(UserSession) 

387 .where(UserSession.is_api_key == True) 

388 .where(UserSession.user_id == normal_user.id) 

389 ).scalar_one() 

390 == 0 

391 ) 

392 

393 with mock_notification_email() as mock: 

394 with real_admin_session(super_token) as api: 

395 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

396 

397 mock.assert_called_once() 

398 e = email_fields(mock) 

399 assert e.subject == "[TEST] Your API key for Couchers.org" 

400 

401 with session_scope() as session: 

402 token = session.execute( 

403 select(UserSession.token) 

404 .where(UserSession.is_valid) 

405 .where(UserSession.is_api_key == True) 

406 .where(UserSession.user_id == normal_user.id) 

407 ).scalar_one() 

408 

409 assert token in e.plain 

410 assert token in e.html 

411 

412 assert e.recipient == normal_user.email 

413 assert "api key" in e.subject.lower() 

414 unique_string = "We've issued you with the following API key:" 

415 assert unique_string in e.plain 

416 assert unique_string in e.html 

417 assert "support@couchers.org" in e.plain 

418 assert "support@couchers.org" in e.html 

419 

420 push = push_collector.pop_for_user(normal_user.id, last=True) 

421 assert push.content.title == "API key created" 

422 assert push.content.body == "Details were sent to you via email." 

423 

424 

425def test_GetChats(db): 

426 super_user, super_token = generate_user(is_superuser=True) 

427 normal_user, normal_token = generate_user() 

428 

429 with real_admin_session(super_token) as api: 

430 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

431 assert res.user.user_id == normal_user.id 

432 assert res.user.username == normal_user.username 

433 assert res.user.name == normal_user.name 

434 # New user should have no chats 

435 assert len(res.host_requests) == 0 

436 assert len(res.group_chats) == 0 

437 

438 

439def test_badges(db, push_collector: PushCollector): 

440 super_user, super_token = generate_user(is_superuser=True) 

441 normal_user, normal_token = generate_user() 

442 

443 with real_admin_session(super_token) as api: 

444 # can add a badge 

445 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

446 with mock_notification_email() as mock: 

447 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

448 assert "swagster" in res.badges 

449 

450 # badge emails are disabled by default 

451 mock.assert_not_called() 

452 

453 push = push_collector.pop_for_user(normal_user.id, last=True) 

454 assert push.content.title == "New profile badge: Swagster" 

455 assert push.content.body == "The Swagster badge was added to your profile." 

456 

457 # can't add/edit special tags 

458 with pytest.raises(grpc.RpcError) as e: 

459 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

460 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

461 assert e.value.details() == "Admins cannot edit that badge." 

462 

463 # double add badge 

464 with pytest.raises(grpc.RpcError) as e: 

465 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

466 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

467 assert e.value.details() == "The user already has that badge." 

468 

469 # can remove badge 

470 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

471 with mock_notification_email() as mock: 

472 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

473 assert "swagster" not in res.badges 

474 

475 # badge emails are disabled by default 

476 mock.assert_not_called() 

477 

478 push = push_collector.pop_for_user(normal_user.id, last=True) 

479 assert push.content.title == "Profile badge removed" 

480 assert push.content.body == "The Swagster badge was removed from your profile." 

481 

482 # not found on user 

483 with pytest.raises(grpc.RpcError) as e: 

484 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

485 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

486 assert e.value.details() == "The user does not have that badge." 

487 

488 # not found in general 

489 with pytest.raises(grpc.RpcError) as e: 

490 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

491 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

492 assert e.value.details() == "Badge not found." 

493 

494 

495def test_DeleteEvent(db): 

496 super_user, super_token = generate_user(is_superuser=True) 

497 normal_user, normal_token = generate_user() 

498 

499 with session_scope() as session: 

500 create_community(session, 0, 2, "Community", [normal_user], [], None) 

501 

502 start_time = now() + timedelta(hours=2) 

503 end_time = start_time + timedelta(hours=3) 

504 with events_session(normal_token) as api: 

505 res = api.CreateEvent( 

506 events_pb2.CreateEventReq( 

507 title="Dummy Title", 

508 content="Dummy content.", 

509 photo_key=None, 

510 offline_information=events_pb2.OfflineEventInformation( 

511 address="Near Null Island", 

512 lat=0.1, 

513 lng=0.2, 

514 ), 

515 start_time=Timestamp_from_datetime(start_time), 

516 end_time=Timestamp_from_datetime(end_time), 

517 timezone="UTC", 

518 ) 

519 ) 

520 event_id = res.event_id 

521 assert not res.is_deleted 

522 

523 with session_scope() as session: 

524 with real_admin_session(super_token) as api: 

525 api.DeleteEvent( 

526 admin_pb2.DeleteEventReq( 

527 event_id=event_id, 

528 ) 

529 ) 

530 occurrence = session.get_one(EventOccurrence, ident=event_id) 

531 assert occurrence.is_deleted 

532 

533 

534def test_ListUserIds(db): 

535 super_user, super_token = generate_user(is_superuser=True) 

536 normal_user, normal_token = generate_user() 

537 

538 with real_admin_session(super_token) as api: 

539 res = api.ListUserIds( 

540 admin_pb2.ListUserIdsReq( 

541 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

542 ) 

543 ) 

544 assert len(res.user_ids) == 2 

545 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

546 

547 with real_admin_session(super_token) as api: 

548 res = api.ListUserIds( 

549 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

550 ) 

551 assert res.user_ids == [] 

552 

553 

554def test_EditReferenceText(db): 

555 super_user, super_token = generate_user(is_superuser=True) 

556 test_new_text = "New Text" 

557 

558 user1, user1_token = generate_user() 

559 user2, user2_token = generate_user() 

560 make_friends(user1, user2) 

561 

562 with session_scope() as session: 

563 with references_session(user1_token) as api: 

564 reference = api.WriteFriendReference( 

565 references_pb2.WriteFriendReferenceReq( 

566 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

567 ) 

568 ) 

569 

570 with real_admin_session(super_token) as admin_api: 

571 admin_api.EditReferenceText( 

572 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

573 ) 

574 

575 session.expire_all() 

576 

577 modified_reference = session.execute( 

578 select(Reference).where(Reference.id == reference.reference_id) 

579 ).scalar_one() 

580 assert modified_reference.text == test_new_text 

581 

582 

583def test_DeleteReference(db): 

584 super_user, super_token = generate_user(is_superuser=True) 

585 

586 user1, user1_token = generate_user() 

587 user2, user2_token = generate_user() 

588 make_friends(user1, user2) 

589 

590 with references_session(user1_token) as api: 

591 reference = api.WriteFriendReference( 

592 references_pb2.WriteFriendReferenceReq( 

593 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

594 ) 

595 ) 

596 

597 with references_session(user1_token) as api: 

598 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

599 

600 with real_admin_session(super_token) as admin_api: 

601 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

602 

603 with references_session(user1_token) as api: 

604 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

605 

606 with session_scope() as session: 

607 modified_reference = session.execute( 

608 select(Reference).where(Reference.id == reference.reference_id) 

609 ).scalar_one() 

610 assert modified_reference.is_deleted 

611 

612 

613def test_GetUserReferences(db): 

614 super_user, super_token = generate_user(is_superuser=True) 

615 

616 user1, user1_token = generate_user() 

617 user2, user2_token = generate_user() 

618 user3, user3_token = generate_user() 

619 make_friends(user1, user2) 

620 make_friends(user1, user3) 

621 make_friends(user2, user3) 

622 

623 # user1 writes reference about user2 

624 with references_session(user1_token) as api: 

625 ref1 = api.WriteFriendReference( 

626 references_pb2.WriteFriendReferenceReq( 

627 to_user_id=user2.id, 

628 text="Reference from user1 to user2", 

629 private_text="", 

630 was_appropriate=True, 

631 rating=1, 

632 ) 

633 ) 

634 

635 # user2 writes reference about user1 

636 with references_session(user2_token) as api: 

637 ref2 = api.WriteFriendReference( 

638 references_pb2.WriteFriendReferenceReq( 

639 to_user_id=user1.id, 

640 text="Reference from user2 to user1", 

641 private_text="Private note", 

642 was_appropriate=True, 

643 rating=0.8, 

644 ) 

645 ) 

646 

647 # user3 writes reference about user1 

648 with references_session(user3_token) as api: 

649 ref3 = api.WriteFriendReference( 

650 references_pb2.WriteFriendReferenceReq( 

651 to_user_id=user1.id, 

652 text="Reference from user3 to user1", 

653 private_text="", 

654 was_appropriate=False, 

655 rating=0.5, 

656 ) 

657 ) 

658 

659 # Delete ref3 

660 with real_admin_session(super_token) as admin_api: 

661 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=ref3.reference_id)) 

662 

663 # Test GetUserReferences for user1 

664 with real_admin_session(super_token) as admin_api: 

665 res = admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user=user1.username)) 

666 

667 # user1 wrote 1 reference 

668 assert len(res.references_from) == 1 

669 assert res.references_from[0].reference_id == ref1.reference_id 

670 assert res.references_from[0].from_user_id == user1.id 

671 assert res.references_from[0].to_user_id == user2.id 

672 assert res.references_from[0].text == "Reference from user1 to user2" 

673 assert res.references_from[0].is_deleted is False 

674 

675 # user1 received 2 references (including the deleted one) 

676 assert len(res.references_to) == 2 

677 # Ordered by id descending, so ref3 comes first 

678 assert res.references_to[0].reference_id == ref3.reference_id 

679 assert res.references_to[0].is_deleted is True 

680 assert res.references_to[0].was_appropriate is False 

681 

682 assert res.references_to[1].reference_id == ref2.reference_id 

683 assert res.references_to[1].private_text == "Private note" 

684 assert res.references_to[1].rating == 0.8 

685 assert res.references_to[1].is_deleted is False 

686 

687 

688def test_GetUserReferences_not_found(db): 

689 super_user, super_token = generate_user(is_superuser=True) 

690 

691 with real_admin_session(super_token) as admin_api: 

692 with pytest.raises(grpc.RpcError) as e: 

693 admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user="nonexistent")) 

694 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

695 

696 

697def test_AddUsersToModerationUserList(db): 

698 super_user, super_token = generate_user(is_superuser=True) 

699 user1, _ = generate_user() 

700 user2, _ = generate_user() 

701 user3, _ = generate_user() 

702 user4, _ = generate_user() 

703 user5, _ = generate_user() 

704 moderation_list_id = add_users_to_new_moderation_list([user1]) 

705 

706 with session_scope() as session: 

707 with real_admin_session(super_token) as api: 

708 # Test adding users to a non-existent moderation list (should raise an error) 

709 with pytest.raises(grpc.RpcError) as e: 

710 api.AddUsersToModerationUserList( 

711 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999), 

712 ) 

713 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

714 assert "Moderation user list not found." == e.value.details() 

715 

716 # Test with non-existent user (should raise an error) 

717 with pytest.raises(grpc.RpcError) as e: 

718 api.AddUsersToModerationUserList( 

719 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]), 

720 ) 

721 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

722 assert "Couldn't find that user." == e.value.details() 

723 

724 # Test successful creation of new moderation list (no moderation_list_id provided) 

725 res = api.AddUsersToModerationUserList( 

726 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]), 

727 ) 

728 assert res.moderation_list_id > 0 

729 with session_scope() as session: 

730 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id) 

731 assert moderation_user_list is not None 

732 assert len(moderation_user_list.users) == 3 

733 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users}) 

734 

735 # Test list endpoint returns same moderation list with same members not repeated 

736 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

737 assert len(listRes.moderation_lists) == 1 

738 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id 

739 assert len(listRes.moderation_lists[0].member_ids) == 3 

740 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids) 

741 

742 # Test user can be in multiple moderation lists 

743 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

744 assert len(listRes3.moderation_lists) == 2 

745 

746 # Test adding users to an existing moderation list 

747 res2 = api.AddUsersToModerationUserList( 

748 admin_pb2.AddUsersToModerationUserListReq( 

749 users=[user4.username, user5.username], moderation_list_id=moderation_list_id 

750 ), 

751 ) 

752 assert res2.moderation_list_id == moderation_list_id 

753 with session_scope() as session: 

754 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id) 

755 assert len(moderation_user_list.users) == 3 

756 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users}) 

757 

758 # Test list user moderation lists endpoint returns the right moderation list 

759 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username)) 

760 assert len(listRes2.moderation_lists) == 1 

761 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id 

762 assert len(listRes2.moderation_lists[0].member_ids) == 3 

763 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids) 

764 

765 

766def test_RemoveUserFromModerationUserList(db): 

767 super_user, super_token = generate_user(is_superuser=True) 

768 user1, _ = generate_user() 

769 user2, _ = generate_user() 

770 user3, _ = generate_user() 

771 moderation_list_id = add_users_to_new_moderation_list([user1, user2]) 

772 

773 with real_admin_session(super_token) as api: 

774 # Test with non-existent user (should raise error) 

775 with pytest.raises(grpc.RpcError) as e: 

776 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent")) 

777 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

778 assert "Couldn't find that user." == e.value.details() 

779 

780 # Test without providing moderation list id (should raise error) 

781 with pytest.raises(grpc.RpcError) as e: 

782 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username)) 

783 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

784 assert "Missing moderation user list id." == e.value.details() 

785 

786 # Test removing user that's not in the provided moderation list (should raise error) 

787 with pytest.raises(grpc.RpcError) as e: 

788 api.RemoveUserFromModerationUserList( 

789 admin_pb2.RemoveUserFromModerationUserListReq( 

790 user=user3.username, moderation_list_id=moderation_list_id 

791 ) 

792 ) 

793 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

794 assert "User is not in the moderation user list." == e.value.details() 

795 

796 # Test successful removal 

797 api.RemoveUserFromModerationUserList( 

798 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id) 

799 ) 

800 with session_scope() as session: 

801 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id) 

802 assert user1.id not in {user.id for user in moderation_user_list.users} 

803 assert user2.id in {user.id for user in moderation_user_list.users} 

804 

805 # Test list user moderation lists endpoint returns right number of moderation lists 

806 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

807 assert len(listRes.moderation_lists) == 0 

808 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

809 assert len(listRes2.moderation_lists) == 1 

810 

811 # Test removing all users from moderation list should also delete the moderation list 

812 api.RemoveUserFromModerationUserList( 

813 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id) 

814 ) 

815 with session_scope() as session: 

816 assert session.get(ModerationUserList, moderation_list_id) is None 

817 

818 

819def test_admin_delete_account_url(db, push_collector: PushCollector): 

820 super_user, super_token = generate_user(is_superuser=True) 

821 

822 user, token = generate_user() 

823 user_id = user.id 

824 

825 with real_admin_session(super_token) as admin_api: 

826 url = admin_api.CreateAccountDeletionLink( 

827 admin_pb2.CreateAccountDeletionLinkReq(user=user.username) 

828 ).account_deletion_confirm_url 

829 

830 assert push_collector.count_for_user(user_id) == 0 

831 

832 with session_scope() as session: 

833 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

834 token = token_o.token 

835 assert token_o.user.id == user_id 

836 assert url == f"http://localhost:3000/delete-account?token={token}" 

837 

838 with mock_notification_email() as mock: 

839 with auth_api_session() as (auth_api, metadata_interceptor): 

840 auth_api.ConfirmDeleteAccount( 

841 auth_pb2.ConfirmDeleteAccountReq( 

842 token=token, 

843 ) 

844 ) 

845 

846 push = push_collector.pop_for_user(user_id, last=True) 

847 assert push.content.title == "Account deleted" 

848 assert push.content.body == "You can restore it within 7 days using the link we emailed you." 

849 mock.assert_called_once() 

850 e = email_fields(mock) 

851 

852 

853def test_SetLastDonated(db): 

854 super_user, super_token = generate_user(is_superuser=True) 

855 normal_user, normal_token = generate_user(last_donated=None) 

856 

857 with real_admin_session(super_token) as api: 

858 # user starts with no last_donated 

859 with session_scope() as session: 

860 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

861 assert user.last_donated is None 

862 

863 # can set last_donated 

864 donation_time = now() - timedelta(days=30) 

865 res = api.SetLastDonated( 

866 admin_pb2.SetLastDonatedReq( 

867 user=normal_user.username, 

868 last_donated=Timestamp_from_datetime(donation_time), 

869 ) 

870 ) 

871 

872 with session_scope() as session: 

873 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

874 assert user.last_donated is not None 

875 # check timestamp is close (within a second) 

876 assert abs((user.last_donated - donation_time).total_seconds()) < 1 

877 

878 # can clear last_donated by not setting the field 

879 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username)) 

880 

881 with session_scope() as session: 

882 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

883 assert user.last_donated is None 

884 

885 # user not found 

886 with pytest.raises(grpc.RpcError) as e: 

887 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent")) 

888 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

889 assert e.value.details() == "Couldn't find that user." 

890 

891 

892def test_admin_actions_level(db): 

893 super_user, super_token = generate_user(is_superuser=True) 

894 normal_user, _ = generate_user() 

895 

896 with real_admin_session(super_token) as api: 

897 # Default level is NORMAL 

898 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note="normal note")) 

899 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL 

900 

901 # Explicitly set to DEBUG 

902 res = api.AddAdminNote( 

903 admin_pb2.AddAdminNoteReq( 

904 user=normal_user.username, 

905 admin_note="debug note", 

906 level=admin_pb2.ADMIN_ACTION_LEVEL_DEBUG, 

907 ) 

908 ) 

909 assert len(res.admin_actions) == 2 

910 assert res.admin_actions[1].level == admin_pb2.ADMIN_ACTION_LEVEL_DEBUG 

911 

912 # Explicitly set to HIGH 

913 res = api.AddAdminNote( 

914 admin_pb2.AddAdminNoteReq( 

915 user=normal_user.username, 

916 admin_note="high note", 

917 level=admin_pb2.ADMIN_ACTION_LEVEL_HIGH, 

918 ) 

919 ) 

920 assert len(res.admin_actions) == 3 

921 assert res.admin_actions[2].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

922 

923 

924def test_admin_actions_on_mutations(db, push_collector: PushCollector): 

925 super_user, super_token = generate_user(is_superuser=True) 

926 normal_user, _ = generate_user() 

927 

928 with real_admin_session(super_token) as api: 

929 # ChangeUserGender 

930 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

931 assert any(a.action_type == "change_gender" for a in res.admin_actions) 

932 

933 # ChangeUserBirthdate 

934 res = api.ChangeUserBirthdate( 

935 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-01-01") 

936 ) 

937 assert any(a.action_type == "change_birthdate" for a in res.admin_actions) 

938 

939 # DeleteUser 

940 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

941 assert any(a.action_type == "delete_user" for a in res.admin_actions) 

942 assert any( 

943 a.action_type == "delete_user" and a.level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH for a in res.admin_actions 

944 ) 

945 

946 # RecoverDeletedUser 

947 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

948 assert any(a.action_type == "recover_user" for a in res.admin_actions) 

949 

950 # MarkUserNeedsLocationUpdate 

951 res = api.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=normal_user.username)) 

952 assert any(a.action_type == "mark_needs_location_update" for a in res.admin_actions) 

953 

954 # SetLastDonated 

955 res = api.SetLastDonated( 

956 admin_pb2.SetLastDonatedReq( 

957 user=normal_user.username, 

958 last_donated=Timestamp_from_datetime(now()), 

959 ) 

960 ) 

961 assert any(a.action_type == "set_last_donated" for a in res.admin_actions) 

962 

963 

964def test_create_admin_tag(db): 

965 super_user, super_token = generate_user(is_superuser=True) 

966 

967 with real_admin_session(super_token) as api: 

968 res = api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

969 assert res.tag == "test-tag" 

970 assert res.admin_tag_id > 0 

971 

972 

973def test_create_admin_tag_duplicate(db): 

974 super_user, super_token = generate_user(is_superuser=True) 

975 

976 with real_admin_session(super_token) as api: 

977 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

978 with pytest.raises(grpc.RpcError) as e: 

979 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

980 assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS 

981 assert e.value.details() == "That admin tag already exists." 

982 

983 

984def test_create_admin_tag_empty(db): 

985 super_user, super_token = generate_user(is_superuser=True) 

986 

987 with real_admin_session(super_token) as api: 

988 with pytest.raises(grpc.RpcError) as e: 

989 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="")) 

990 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

991 assert e.value.details() == "The admin tag cannot be empty." 

992 

993 with pytest.raises(grpc.RpcError) as e: 

994 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=" ")) 

995 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

996 assert e.value.details() == "The admin tag cannot be empty." 

997 

998 

999def test_list_admin_tags(db): 

1000 super_user, super_token = generate_user(is_superuser=True) 

1001 

1002 with real_admin_session(super_token) as api: 

1003 # Empty initially 

1004 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq()) 

1005 assert len(res.tags) == 0 

1006 

1007 # Add some tags 

1008 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="bravo")) 

1009 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="alpha")) 

1010 

1011 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq()) 

1012 assert len(res.tags) == 2 

1013 # Ordered alphabetically 

1014 assert res.tags[0].tag == "alpha" 

1015 assert res.tags[1].tag == "bravo" 

1016 

1017 

1018def test_add_admin_tag_to_user(db): 

1019 super_user, super_token = generate_user(is_superuser=True) 

1020 normal_user, _ = generate_user() 

1021 

1022 with real_admin_session(super_token) as api: 

1023 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1024 

1025 res = api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1026 assert "vip" in res.admin_tags 

1027 assert any(a.action_type == "add_tag" and a.tag == "vip" for a in res.admin_actions) 

1028 

1029 

1030def test_add_admin_tag_to_user_duplicate(db): 

1031 super_user, super_token = generate_user(is_superuser=True) 

1032 normal_user, _ = generate_user() 

1033 

1034 with real_admin_session(super_token) as api: 

1035 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1036 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1037 

1038 with pytest.raises(grpc.RpcError) as e: 

1039 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1040 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1041 assert e.value.details() == "The user already has that admin tag." 

1042 

1043 

1044def test_add_admin_tag_to_user_tag_not_found(db): 

1045 super_user, super_token = generate_user(is_superuser=True) 

1046 normal_user, _ = generate_user() 

1047 

1048 with real_admin_session(super_token) as api: 

1049 with pytest.raises(grpc.RpcError) as e: 

1050 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="nonexistent")) 

1051 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1052 assert e.value.details() == "Admin tag not found." 

1053 

1054 

1055def test_remove_admin_tag_from_user(db): 

1056 super_user, super_token = generate_user(is_superuser=True) 

1057 normal_user, _ = generate_user() 

1058 

1059 with real_admin_session(super_token) as api: 

1060 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1061 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1062 

1063 res = api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip")) 

1064 assert "vip" not in res.admin_tags 

1065 assert any(a.action_type == "remove_tag" and a.tag == "vip" for a in res.admin_actions) 

1066 

1067 

1068def test_remove_admin_tag_from_user_not_assigned(db): 

1069 super_user, super_token = generate_user(is_superuser=True) 

1070 normal_user, _ = generate_user() 

1071 

1072 with real_admin_session(super_token) as api: 

1073 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1074 

1075 with pytest.raises(grpc.RpcError) as e: 

1076 api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip")) 

1077 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1078 assert e.value.details() == "The user does not have that admin tag." 

1079 

1080 

1081def test_search_users_by_admin_tag(db): 

1082 super_user, super_token = generate_user(is_superuser=True) 

1083 user1, _ = generate_user() 

1084 user2, _ = generate_user() 

1085 user3, _ = generate_user() 

1086 

1087 with real_admin_session(super_token) as api: 

1088 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1089 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="flagged")) 

1090 

1091 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user1.username, tag="vip")) 

1092 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="vip")) 

1093 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="flagged")) 

1094 

1095 # Search for users with "vip" tag 

1096 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip"])) 

1097 user_ids = {u.user_id for u in res.users} 

1098 assert user1.id in user_ids 

1099 assert user2.id in user_ids 

1100 assert user3.id not in user_ids 

1101 

1102 # Search for users with both "vip" AND "flagged" tags (AND logic) 

1103 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip", "flagged"])) 

1104 user_ids = {u.user_id for u in res.users} 

1105 assert user2.id in user_ids 

1106 assert user1.id not in user_ids 

1107 

1108 # Search for non-existent tag returns no results 

1109 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["nonexistent"])) 

1110 assert len(res.users) == 0 

1111 

1112 

1113def test_search_users_by_admin_note(db): 

1114 super_user, super_token = generate_user(is_superuser=True) 

1115 user1, _ = generate_user() 

1116 user2, _ = generate_user() 

1117 

1118 with real_admin_session(super_token) as api: 

1119 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="suspicious activity")) 

1120 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="normal user")) 

1121 

1122 # Search by admin action log content (ilike) 

1123 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_action_log="%suspicious%")) 

1124 user_ids = {u.user_id for u in res.users} 

1125 assert user1.id in user_ids 

1126 assert user2.id not in user_ids 

1127 

1128 

1129# community invite feature tested in test_events.py 

1130# SendBlogPostNotification tested in test_notifications.py 

1131# MarkUserNeedsLocationUpdate tested in test_jail.py