Coverage for app/backend/src/tests/test_admin.py: 100%

1062 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import json 

2from datetime import UTC, date, datetime, timedelta 

3from unittest.mock import patch 

4 

5import grpc 

6import pytest 

7from sqlalchemy import select 

8from sqlalchemy.sql import func 

9 

10from couchers.db import session_scope 

11from couchers.models import ( 

12 AccountDeletionToken, 

13 ContentReport, 

14 EventOccurrence, 

15 FriendRelationship, 

16 FriendStatus, 

17 ModerationObjectType, 

18 ModerationState, 

19 ModerationUserList, 

20 ModerationVisibility, 

21 NonvisibleUserAccess, 

22 NonvisibleUserAccessType, 

23 NonvisibleUserState, 

24 PhotoGallery, 

25 PhotoGalleryItem, 

26 Reference, 

27 Upload, 

28 User, 

29 UserActivity, 

30 UserSession, 

31) 

32from couchers.proto import ( 

33 account_pb2, 

34 admin_pb2, 

35 auth_pb2, 

36 events_pb2, 

37 references_pb2, 

38 reporting_pb2, 

39 requests_pb2, 

40) 

41from couchers.utils import Timestamp_from_datetime, now, parse_date 

42from tests.fixtures.db import add_users_to_new_moderation_list, generate_user, make_friends 

43from tests.fixtures.misc import EmailCollector, PushCollector 

44from tests.fixtures.sessions import ( 

45 account_session, 

46 auth_api_session, 

47 events_session, 

48 real_admin_session, 

49 references_session, 

50 reporting_session, 

51 requests_session, 

52) 

53from tests.test_communities import create_community 

54from tests.test_requests import valid_request_text 

55 

56 

57@pytest.fixture(autouse=True) 

58def _(testconfig): 

59 pass 

60 

61 

62def test_access_by_normal_user(db): 

63 normal_user, normal_token = generate_user() 

64 

65 with real_admin_session(normal_token) as api: 

66 # all requests to the admin servicer should break when done by a non-super_user 

67 with pytest.raises(grpc.RpcError) as e: 

68 api.GetUserDetails( 

69 admin_pb2.GetUserDetailsReq( 

70 user=str(normal_user.id), 

71 ) 

72 ) 

73 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

74 

75 

76def test_GetNonvisibleUserAccessLog(db): 

77 super_user, super_token = generate_user(is_superuser=True) 

78 target, _ = generate_user(username="target") 

79 viewer, _ = generate_user(username="viewer") 

80 

81 with session_scope() as session: 

82 session.add( 

83 NonvisibleUserAccess( 

84 access_type=NonvisibleUserAccessType.login_attempt, 

85 target_user_id=target.id, 

86 target_state=NonvisibleUserState.banned, 

87 actor_user_id=target.id, 

88 ip_address="1.2.3.4", 

89 sofa="device-cookie", 

90 ) 

91 ) 

92 session.add( 

93 NonvisibleUserAccess( 

94 access_type=NonvisibleUserAccessType.ghost_served, 

95 target_user_id=target.id, 

96 target_state=NonvisibleUserState.banned, 

97 actor_user_id=viewer.id, 

98 ) 

99 ) 

100 session.add( 

101 NonvisibleUserAccess( 

102 access_type=NonvisibleUserAccessType.ghost_served, 

103 target_user_id=target.id, 

104 target_state=NonvisibleUserState.banned, 

105 actor_user_id=None, 

106 ) 

107 ) 

108 

109 with real_admin_session(super_token) as api: 

110 res = api.GetNonvisibleUserAccessLog(admin_pb2.GetNonvisibleUserAccessLogReq(user="target")) 

111 

112 assert len(res.entries) == 3 

113 for entry in res.entries: 

114 assert entry.target_user_id == target.id 

115 assert entry.target_state == admin_pb2.NONVISIBLE_USER_STATE_BANNED 

116 

117 login = [e for e in res.entries if e.access_type == admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_LOGIN_ATTEMPT] 

118 views = [e for e in res.entries if e.access_type == admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_GHOST_SERVED] 

119 assert len(login) == 1 

120 assert len(views) == 2 

121 

122 assert login[0].actor_user_id.value == target.id 

123 assert login[0].actor_username == "target" 

124 assert login[0].ip_address == "1.2.3.4" 

125 assert login[0].sofa == "device-cookie" 

126 

127 logged_in_view = [e for e in views if e.actor_username == "viewer"] 

128 logged_out_view = [e for e in views if not e.actor_username] 

129 assert len(logged_in_view) == 1 

130 assert logged_in_view[0].actor_user_id.value == viewer.id 

131 assert len(logged_out_view) == 1 

132 assert not logged_out_view[0].HasField("actor_user_id") 

133 

134 

135def test_GetUser(db): 

136 super_user, super_token = generate_user(is_superuser=True) 

137 normal_user, normal_token = generate_user() 

138 

139 with real_admin_session(super_token) as api: 

140 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

141 assert res.user_id == normal_user.id 

142 assert res.username == normal_user.username 

143 

144 with real_admin_session(super_token) as api: 

145 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

146 

147 with real_admin_session(super_token) as api: 

148 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

149 assert res.user_id == normal_user.id 

150 assert res.username == normal_user.username 

151 

152 

153def test_GetUserDetails(db): 

154 super_user, super_token = generate_user(is_superuser=True) 

155 normal_user, normal_token = generate_user() 

156 

157 with real_admin_session(super_token) as api: 

158 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

159 assert res.user_id == normal_user.id 

160 assert res.username == normal_user.username 

161 assert res.email == normal_user.email 

162 assert res.gender == normal_user.gender 

163 assert parse_date(res.birthdate) == normal_user.birthdate 

164 assert not res.banned 

165 assert not res.deleted 

166 

167 with real_admin_session(super_token) as api: 

168 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

169 assert res.user_id == normal_user.id 

170 assert res.username == normal_user.username 

171 assert res.email == normal_user.email 

172 assert res.gender == normal_user.gender 

173 assert parse_date(res.birthdate) == normal_user.birthdate 

174 assert not res.banned 

175 assert not res.deleted 

176 

177 with real_admin_session(super_token) as api: 

178 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

179 assert res.user_id == normal_user.id 

180 assert res.username == normal_user.username 

181 assert res.email == normal_user.email 

182 assert res.gender == normal_user.gender 

183 assert parse_date(res.birthdate) == normal_user.birthdate 

184 assert not res.banned 

185 assert not res.deleted 

186 

187 

188def test_ChangeUserGender(db, email_collector: EmailCollector, push_collector: PushCollector): 

189 super_user, super_token = generate_user(is_superuser=True) 

190 normal_user, normal_token = generate_user() 

191 

192 with real_admin_session(super_token) as api: 

193 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

194 assert res.user_id == normal_user.id 

195 assert res.username == normal_user.username 

196 assert res.email == normal_user.email 

197 assert res.gender == "Machine" 

198 assert parse_date(res.birthdate) == normal_user.birthdate 

199 assert not res.banned 

200 assert not res.deleted 

201 

202 email = email_collector.pop_for_recipient(normal_user.email, last=True) 

203 assert email.subject == "[TEST] Your gender was changed" 

204 assert email.recipient == normal_user.email 

205 assert "Machine" in email.plain 

206 assert "Machine" in email.html 

207 

208 push = push_collector.pop_for_user(normal_user.id, last=True) 

209 assert push.content.title == "Gender changed" 

210 assert push.content.body == "An admin changed your gender to Machine." 

211 

212 

213def test_ChangeUserBirthdate(db, email_collector: EmailCollector, push_collector: PushCollector): 

214 super_user, super_token = generate_user(is_superuser=True) 

215 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

216 

217 with real_admin_session(super_token) as api: 

218 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

219 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

220 

221 res = api.ChangeUserBirthdate( 

222 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

223 ) 

224 

225 assert res.user_id == normal_user.id 

226 assert res.username == normal_user.username 

227 assert res.email == normal_user.email 

228 assert res.birthdate == "1990-05-25" 

229 assert res.gender == normal_user.gender 

230 assert not res.banned 

231 assert not res.deleted 

232 

233 email = email_collector.pop_for_recipient(normal_user.email, last=True) 

234 assert email.subject == "[TEST] Your date of birth was changed" 

235 assert email.recipient == normal_user.email 

236 assert "1990" in email.plain 

237 assert "1990" in email.html 

238 

239 push = push_collector.pop_for_user(normal_user.id, last=True) 

240 assert push.content.title == "Birthdate changed" 

241 assert push.content.body == "An admin changed your date of birth to May 25, 1990." 

242 

243 

244def test_BanUser(db): 

245 super_user, super_token = generate_user(is_superuser=True) 

246 normal_user, _ = generate_user() 

247 admin_note = "A good reason" 

248 

249 with real_admin_session(super_token) as api: 

250 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

251 assert res.user_id == normal_user.id 

252 assert res.username == normal_user.username 

253 assert res.email == normal_user.email 

254 assert res.gender == normal_user.gender 

255 assert parse_date(res.birthdate) == normal_user.birthdate 

256 assert res.banned 

257 assert not res.deleted 

258 assert len(res.admin_actions) == 1 

259 assert res.admin_actions[0].action_type == "ban" 

260 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

261 assert res.admin_actions[0].note == admin_note 

262 assert res.admin_actions[0].admin_user_id == super_user.id 

263 assert res.admin_actions[0].admin_username == super_user.username 

264 

265 

266def test_UnbanUser(db): 

267 super_user, super_token = generate_user(is_superuser=True) 

268 normal_user, _ = generate_user() 

269 admin_note = "A good reason" 

270 

271 with real_admin_session(super_token) as api: 

272 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

273 assert res.user_id == normal_user.id 

274 assert res.username == normal_user.username 

275 assert res.email == normal_user.email 

276 assert res.gender == normal_user.gender 

277 assert parse_date(res.birthdate) == normal_user.birthdate 

278 assert not res.banned 

279 assert not res.deleted 

280 assert len(res.admin_actions) == 1 

281 assert res.admin_actions[0].action_type == "unban" 

282 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

283 

284 

285def test_ShadowUser(db): 

286 super_user, super_token = generate_user(is_superuser=True) 

287 surfer, surfer_token = generate_user() 

288 host, _ = generate_user() 

289 admin_note = "Spammer" 

290 

291 # Create a host request from `surfer` and approve its moderation state to VISIBLE so we can verify the cascade 

292 today_plus_2 = (date.today() + timedelta(days=2)).isoformat() 

293 today_plus_3 = (date.today() + timedelta(days=3)).isoformat() 

294 with requests_session(surfer_token) as api: 

295 host_request_id = api.CreateHostRequest( 

296 requests_pb2.CreateHostRequestReq( 

297 host_user_id=host.id, 

298 from_date=today_plus_2, 

299 to_date=today_plus_3, 

300 text=valid_request_text(), 

301 ) 

302 ).host_request_id 

303 with session_scope() as session: 

304 state = session.execute( 

305 select(ModerationState) 

306 .where(ModerationState.object_type == ModerationObjectType.host_request) 

307 .where(ModerationState.object_id == host_request_id) 

308 ).scalar_one() 

309 state.visibility = ModerationVisibility.visible 

310 

311 with real_admin_session(super_token) as api: 

312 res = api.ShadowUser(admin_pb2.ShadowUserReq(user=surfer.username, admin_note=admin_note)) 

313 assert res.user_id == surfer.id 

314 assert res.shadowed 

315 assert not res.banned 

316 assert not res.deleted 

317 assert len(res.admin_actions) == 1 

318 assert res.admin_actions[0].action_type == "shadow" 

319 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

320 assert res.admin_actions[0].note == admin_note 

321 

322 # The previously-visible host request is now shadowed 

323 with session_scope() as session: 

324 state = session.execute( 

325 select(ModerationState) 

326 .where(ModerationState.object_type == ModerationObjectType.host_request) 

327 .where(ModerationState.object_id == host_request_id) 

328 ).scalar_one() 

329 assert state.visibility == ModerationVisibility.shadowed 

330 

331 

332def test_UnshadowUser(db): 

333 super_user, super_token = generate_user(is_superuser=True) 

334 surfer, surfer_token = generate_user() 

335 host, _ = generate_user() 

336 

337 today_plus_2 = (date.today() + timedelta(days=2)).isoformat() 

338 today_plus_3 = (date.today() + timedelta(days=3)).isoformat() 

339 with requests_session(surfer_token) as api: 

340 shadow_cascade_request_id = api.CreateHostRequest( 

341 requests_pb2.CreateHostRequestReq( 

342 host_user_id=host.id, 

343 from_date=today_plus_2, 

344 to_date=today_plus_3, 

345 text=valid_request_text(), 

346 ) 

347 ).host_request_id 

348 admin_hidden_request_id = api.CreateHostRequest( 

349 requests_pb2.CreateHostRequestReq( 

350 host_user_id=host.id, 

351 from_date=today_plus_2, 

352 to_date=today_plus_3, 

353 text=valid_request_text(), 

354 ) 

355 ).host_request_id 

356 

357 with session_scope() as session: 

358 session.execute(select(User).where(User.id == surfer.id)).scalar_one().shadowed_at = now() 

359 session.execute( 

360 select(ModerationState) 

361 .where(ModerationState.object_type == ModerationObjectType.host_request) 

362 .where(ModerationState.object_id == shadow_cascade_request_id) 

363 ).scalar_one().visibility = ModerationVisibility.shadowed 

364 session.execute( 

365 select(ModerationState) 

366 .where(ModerationState.object_type == ModerationObjectType.host_request) 

367 .where(ModerationState.object_id == admin_hidden_request_id) 

368 ).scalar_one().visibility = ModerationVisibility.hidden 

369 

370 with real_admin_session(super_token) as api: 

371 res = api.UnshadowUser(admin_pb2.UnshadowUserReq(user=surfer.username, admin_note="rehabilitated")) 

372 assert not res.shadowed 

373 assert len(res.admin_actions) == 1 

374 assert res.admin_actions[0].action_type == "unshadow" 

375 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

376 

377 with session_scope() as session: 

378 assert ( 

379 session.execute( 

380 select(ModerationState) 

381 .where(ModerationState.object_type == ModerationObjectType.host_request) 

382 .where(ModerationState.object_id == shadow_cascade_request_id) 

383 ) 

384 .scalar_one() 

385 .visibility 

386 == ModerationVisibility.visible 

387 ) 

388 assert ( 

389 session.execute( 

390 select(ModerationState) 

391 .where(ModerationState.object_type == ModerationObjectType.host_request) 

392 .where(ModerationState.object_id == admin_hidden_request_id) 

393 ) 

394 .scalar_one() 

395 .visibility 

396 == ModerationVisibility.hidden 

397 ) 

398 

399 

400def test_ShadowUser_blank_note(db): 

401 super_user, super_token = generate_user(is_superuser=True) 

402 normal_user, _ = generate_user() 

403 

404 with real_admin_session(super_token) as api: 

405 with pytest.raises(grpc.RpcError) as e: 

406 api.ShadowUser(admin_pb2.ShadowUserReq(user=normal_user.username, admin_note=" \t ")) 

407 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

408 

409 

410def test_AddAdminNote(db): 

411 super_user, super_token = generate_user(is_superuser=True) 

412 normal_user, _ = generate_user() 

413 admin_note1 = "User reported strange behavior" 

414 admin_note2 = "Insert private information here" 

415 

416 with real_admin_session(super_token) as api: 

417 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

418 assert res.user_id == normal_user.id 

419 assert res.username == normal_user.username 

420 assert res.email == normal_user.email 

421 assert res.gender == normal_user.gender 

422 assert parse_date(res.birthdate) == normal_user.birthdate 

423 assert not res.banned 

424 assert not res.deleted 

425 assert len(res.admin_actions) == 1 

426 assert res.admin_actions[0].action_type == "note" 

427 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL 

428 assert res.admin_actions[0].note == admin_note1 

429 

430 with real_admin_session(super_token) as api: 

431 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

432 assert len(res.admin_actions) == 2 

433 assert res.admin_actions[0].note == admin_note1 

434 assert res.admin_actions[1].note == admin_note2 

435 

436 

437def test_AddAdminNote_blank(db): 

438 super_user, super_token = generate_user(is_superuser=True) 

439 normal_user, _ = generate_user() 

440 empty_admin_note = " \t \n " 

441 

442 with real_admin_session(super_token) as api: 

443 with pytest.raises(grpc.RpcError) as e: 

444 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

445 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

446 assert e.value.details() == "Provide exactly one of admin_note or data." 

447 

448 

449def test_AddAdminNote_data(db): 

450 super_user, super_token = generate_user(is_superuser=True) 

451 normal_user, _ = generate_user() 

452 payload = '{"kind": "flag", "score": 0.87, "reasons": ["spam", "burst"]}' 

453 

454 with real_admin_session(super_token) as api: 

455 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, data=payload)) 

456 assert len(res.admin_actions) == 1 

457 assert res.admin_actions[0].action_type == "note" 

458 assert res.admin_actions[0].note == "" 

459 assert json.loads(res.admin_actions[0].data) == {"kind": "flag", "score": 0.87, "reasons": ["spam", "burst"]} 

460 

461 

462def test_AddAdminNote_both_note_and_data(db): 

463 super_user, super_token = generate_user(is_superuser=True) 

464 normal_user, _ = generate_user() 

465 

466 with real_admin_session(super_token) as api: 

467 with pytest.raises(grpc.RpcError) as e: 

468 api.AddAdminNote( 

469 admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note="note text", data='{"x": 1}') 

470 ) 

471 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

472 assert e.value.details() == "Provide exactly one of admin_note or data." 

473 

474 

475def test_AddAdminNote_neither(db): 

476 super_user, super_token = generate_user(is_superuser=True) 

477 normal_user, _ = generate_user() 

478 

479 with real_admin_session(super_token) as api: 

480 with pytest.raises(grpc.RpcError) as e: 

481 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username)) 

482 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

483 assert e.value.details() == "Provide exactly one of admin_note or data." 

484 

485 

486def test_AddAdminNote_invalid_json(db): 

487 super_user, super_token = generate_user(is_superuser=True) 

488 normal_user, _ = generate_user() 

489 

490 with real_admin_session(super_token) as api: 

491 with pytest.raises(grpc.RpcError) as e: 

492 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, data="{not valid json")) 

493 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

494 assert e.value.details() == "The admin note data must be valid JSON." 

495 

496 

497def test_admin_content_reports(db): 

498 super_user, super_token = generate_user(is_superuser=True) 

499 normal_user, token = generate_user() 

500 bad_user1, _ = generate_user() 

501 bad_user2, _ = generate_user() 

502 

503 with reporting_session(token) as api: 

504 api.Report( 

505 reporting_pb2.ReportReq( 

506 reason="spam", 

507 description="r1", 

508 content_ref="comment/123", 

509 author_user=bad_user1.username, 

510 user_agent="n/a", 

511 page="https://couchers.org/comment/123", 

512 ) 

513 ) 

514 api.Report( 

515 reporting_pb2.ReportReq( 

516 reason="spam", 

517 description="r2", 

518 content_ref="comment/124", 

519 author_user=bad_user2.username, 

520 user_agent="n/a", 

521 page="https://couchers.org/comment/124", 

522 ) 

523 ) 

524 api.Report( 

525 reporting_pb2.ReportReq( 

526 reason="something else", 

527 description="r3", 

528 content_ref="page/321", 

529 author_user=bad_user1.username, 

530 user_agent="n/a", 

531 page="https://couchers.org/page/321", 

532 ) 

533 ) 

534 

535 with session_scope() as session: 

536 id_by_description: dict[str, int] = dict( 

537 session.execute(select(ContentReport.description, ContentReport.id)).all() # type: ignore[arg-type] 

538 ) 

539 

540 with real_admin_session(super_token) as api: 

541 with pytest.raises(grpc.RpcError) as e: 

542 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

543 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

544 assert e.value.details() == "Content report not found." 

545 

546 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

547 rep = res.content_report 

548 assert rep.content_report_id == id_by_description["r2"] 

549 assert rep.reporting_user_id == normal_user.id 

550 assert rep.author_user_id == bad_user2.id 

551 assert rep.reason == "spam" 

552 assert rep.description == "r2" 

553 assert rep.content_ref == "comment/124" 

554 assert rep.user_agent == "n/a" 

555 assert rep.page == "https://couchers.org/comment/124" 

556 

557 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

558 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

559 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

560 

561 

562def test_DeleteUser(db): 

563 super_user, super_token = generate_user(is_superuser=True) 

564 normal_user, normal_token = generate_user() 

565 

566 with real_admin_session(super_token) as api: 

567 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

568 assert res.user_id == normal_user.id 

569 assert res.username == normal_user.username 

570 assert res.email == normal_user.email 

571 assert res.gender == normal_user.gender 

572 assert parse_date(res.birthdate) == normal_user.birthdate 

573 assert not res.banned 

574 assert res.deleted 

575 

576 with real_admin_session(super_token) as api: 

577 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

578 assert res.user_id == normal_user.id 

579 assert res.username == normal_user.username 

580 assert res.email == normal_user.email 

581 assert res.gender == normal_user.gender 

582 assert parse_date(res.birthdate) == normal_user.birthdate 

583 assert not res.banned 

584 assert not res.deleted 

585 

586 

587def test_RecoverDeletedUser_after_user_initiated_deletion(db, push_collector: PushCollector): 

588 """ 

589 When a user deletes their account through the normal flow (ConfirmDeleteAccount), 

590 undelete_token and undelete_until are set. The admin RecoverDeletedUser must clear 

591 these fields to satisfy the undelete_nullity database constraint. 

592 """ 

593 super_user, super_token = generate_user(is_superuser=True) 

594 normal_user, normal_token = generate_user() 

595 user_id = normal_user.id 

596 

597 # User initiates account deletion 

598 with account_session(normal_token) as account: 

599 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

600 

601 # Get the deletion confirmation token 

602 with session_scope() as session: 

603 deletion_token = session.execute(select(AccountDeletionToken)).scalar_one().token 

604 

605 # User confirms account deletion (this sets undelete_token and undelete_until) 

606 with auth_api_session() as (auth_api, metadata_interceptor): 

607 auth_api.ConfirmDeleteAccount(auth_pb2.ConfirmDeleteAccountReq(token=deletion_token)) 

608 

609 # Verify the user is deleted and has undelete fields set 

610 with session_scope() as session: 

611 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

612 assert user.deleted_at is not None 

613 assert user.undelete_token is not None 

614 assert user.undelete_until is not None 

615 

616 # Admin recovers the user 

617 with real_admin_session(super_token) as api: 

618 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

619 assert res.user_id == user_id 

620 assert not res.deleted 

621 

622 # Verify undelete fields are cleared 

623 with session_scope() as session: 

624 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

625 assert user.deleted_at is None 

626 assert user.undelete_token is None 

627 assert user.undelete_until is None 

628 

629 

630def test_CreateApiKey(db, email_collector: EmailCollector, push_collector: PushCollector): 

631 with session_scope() as session: 

632 super_user, super_token = generate_user(is_superuser=True) 

633 normal_user, normal_token = generate_user() 

634 

635 assert ( 

636 session.execute( 

637 select(func.count()) 

638 .select_from(UserSession) 

639 .where(UserSession.is_api_key == True) 

640 .where(UserSession.user_id == normal_user.id) 

641 ).scalar_one() 

642 == 0 

643 ) 

644 

645 with real_admin_session(super_token) as api: 

646 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

647 

648 email = email_collector.pop_for_recipient(normal_user.email, last=True) 

649 assert email.subject == "[TEST] Your API key for Couchers.org" 

650 

651 with session_scope() as session: 

652 token = session.execute( 

653 select(UserSession.token) 

654 .where(UserSession.is_valid) 

655 .where(UserSession.is_api_key == True) 

656 .where(UserSession.user_id == normal_user.id) 

657 ).scalar_one() 

658 

659 assert token in email.plain 

660 assert token in email.html 

661 

662 assert email.recipient == normal_user.email 

663 assert "api key" in email.subject.lower() 

664 unique_string = "We've issued you with the following API key:" 

665 assert unique_string in email.plain 

666 assert unique_string in email.html 

667 assert "support@couchers.org" in email.plain 

668 assert "support@couchers.org" in email.html 

669 

670 push = push_collector.pop_for_user(normal_user.id, last=True) 

671 assert push.content.title == "API key created" 

672 assert push.content.body == "Details were sent to you via email." 

673 

674 

675def test_GetChats(db): 

676 super_user, super_token = generate_user(is_superuser=True) 

677 normal_user, normal_token = generate_user() 

678 

679 with real_admin_session(super_token) as api: 

680 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

681 assert res.user.user_id == normal_user.id 

682 assert res.user.username == normal_user.username 

683 assert res.user.name == normal_user.name 

684 # New user should have no chats 

685 assert len(res.host_requests) == 0 

686 assert len(res.group_chats) == 0 

687 

688 

689def test_badges(db, email_collector: EmailCollector, push_collector: PushCollector): 

690 super_user, super_token = generate_user(is_superuser=True) 

691 normal_user, normal_token = generate_user() 

692 

693 with real_admin_session(super_token) as api: 

694 # can add a badge 

695 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

696 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

697 assert "swagster" in res.badges 

698 

699 # badge emails are disabled by default 

700 assert email_collector.count_for_recipient(normal_user.email) == 0 

701 

702 push = push_collector.pop_for_user(normal_user.id, last=True) 

703 assert push.content.title == "New profile badge: Swagster" 

704 assert push.content.body == "The Swagster badge was added to your profile." 

705 

706 # can't add/edit special tags 

707 with pytest.raises(grpc.RpcError) as e: 

708 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

709 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

710 assert e.value.details() == "Admins cannot edit that badge." 

711 

712 # double add badge 

713 with pytest.raises(grpc.RpcError) as e: 

714 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

715 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

716 assert e.value.details() == "The user already has that badge." 

717 

718 # can remove badge 

719 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

720 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

721 assert "swagster" not in res.badges 

722 

723 # badge emails are disabled by default 

724 assert email_collector.count_for_recipient(normal_user.email) == 0 

725 

726 push = push_collector.pop_for_user(normal_user.id, last=True) 

727 assert push.content.title == "Profile badge removed" 

728 assert push.content.body == "The Swagster badge was removed from your profile." 

729 

730 # not found on user 

731 with pytest.raises(grpc.RpcError) as e: 

732 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

733 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

734 assert e.value.details() == "The user does not have that badge." 

735 

736 # not found in general 

737 with pytest.raises(grpc.RpcError) as e: 

738 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

739 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

740 assert e.value.details() == "Badge not found." 

741 

742 

743def test_DeleteEvent(db): 

744 super_user, super_token = generate_user(is_superuser=True) 

745 normal_user, normal_token = generate_user() 

746 

747 with session_scope() as session: 

748 create_community(session, 0, 2, "Community", [normal_user], [], None) 

749 

750 start_time = now() + timedelta(hours=2) 

751 end_time = start_time + timedelta(hours=3) 

752 with events_session(normal_token) as api: 

753 res = api.CreateEvent( 

754 events_pb2.CreateEventReq( 

755 title="Dummy Title", 

756 content="Dummy content.", 

757 photo_key=None, 

758 offline_information=events_pb2.OfflineEventInformation( 

759 address="Near Null Island", 

760 lat=0.1, 

761 lng=0.2, 

762 ), 

763 start_time=Timestamp_from_datetime(start_time), 

764 end_time=Timestamp_from_datetime(end_time), 

765 timezone="UTC", 

766 ) 

767 ) 

768 event_id = res.event_id 

769 assert not res.is_deleted 

770 

771 with session_scope() as session: 

772 with real_admin_session(super_token) as api: 

773 api.DeleteEvent( 

774 admin_pb2.DeleteEventReq( 

775 event_id=event_id, 

776 ) 

777 ) 

778 occurrence = session.get_one(EventOccurrence, ident=event_id) 

779 assert occurrence.is_deleted 

780 

781 

782def test_ListUserIds(db): 

783 super_user, super_token = generate_user(is_superuser=True) 

784 normal_user, normal_token = generate_user() 

785 

786 with real_admin_session(super_token) as api: 

787 res = api.ListUserIds( 

788 admin_pb2.ListUserIdsReq( 

789 start_time=Timestamp_from_datetime(datetime(2000, 1, 1, tzinfo=UTC)), 

790 end_time=Timestamp_from_datetime(now()), 

791 ) 

792 ) 

793 assert len(res.user_ids) == 2 

794 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

795 

796 with real_admin_session(super_token) as api: 

797 res = api.ListUserIds( 

798 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

799 ) 

800 assert res.user_ids == [] 

801 

802 

803def test_EditReferenceText(db): 

804 super_user, super_token = generate_user(is_superuser=True) 

805 test_new_text = "New Text" 

806 

807 user1, user1_token = generate_user() 

808 user2, user2_token = generate_user() 

809 make_friends(user1, user2) 

810 

811 with session_scope() as session: 

812 with references_session(user1_token) as api: 

813 reference = api.WriteFriendReference( 

814 references_pb2.WriteFriendReferenceReq( 

815 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

816 ) 

817 ) 

818 

819 with real_admin_session(super_token) as admin_api: 

820 admin_api.EditReferenceText( 

821 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

822 ) 

823 

824 session.expire_all() 

825 

826 modified_reference = session.execute( 

827 select(Reference).where(Reference.id == reference.reference_id) 

828 ).scalar_one() 

829 assert modified_reference.text == test_new_text 

830 

831 

832def test_DeleteReference_deprecated(db): 

833 """DeleteReference is deprecated; admins should hide via UMS instead.""" 

834 super_user, super_token = generate_user(is_superuser=True) 

835 

836 user1, user1_token = generate_user() 

837 user2, user2_token = generate_user() 

838 make_friends(user1, user2) 

839 

840 with references_session(user1_token) as api: 

841 reference = api.WriteFriendReference( 

842 references_pb2.WriteFriendReferenceReq( 

843 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

844 ) 

845 ) 

846 

847 with real_admin_session(super_token) as admin_api: 

848 with pytest.raises(grpc.RpcError) as e: 

849 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

850 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

851 

852 

853def test_GetUserReferences(db): 

854 super_user, super_token = generate_user(is_superuser=True) 

855 

856 user1, user1_token = generate_user() 

857 user2, user2_token = generate_user() 

858 user3, user3_token = generate_user() 

859 make_friends(user1, user2) 

860 make_friends(user1, user3) 

861 make_friends(user2, user3) 

862 

863 # user1 writes reference about user2 

864 with references_session(user1_token) as api: 

865 ref1 = api.WriteFriendReference( 

866 references_pb2.WriteFriendReferenceReq( 

867 to_user_id=user2.id, 

868 text="Reference from user1 to user2", 

869 private_text="", 

870 was_appropriate=True, 

871 rating=1, 

872 ) 

873 ) 

874 

875 # user2 writes reference about user1 

876 with references_session(user2_token) as api: 

877 ref2 = api.WriteFriendReference( 

878 references_pb2.WriteFriendReferenceReq( 

879 to_user_id=user1.id, 

880 text="Reference from user2 to user1", 

881 private_text="Private note", 

882 was_appropriate=True, 

883 rating=0.8, 

884 ) 

885 ) 

886 

887 # user3 writes reference about user1 

888 with references_session(user3_token) as api: 

889 ref3 = api.WriteFriendReference( 

890 references_pb2.WriteFriendReferenceReq( 

891 to_user_id=user1.id, 

892 text="Reference from user3 to user1", 

893 private_text="", 

894 was_appropriate=False, 

895 rating=0.5, 

896 ) 

897 ) 

898 

899 # Test GetUserReferences for user1 (admin view shows everything regardless of UMS state). 

900 with real_admin_session(super_token) as admin_api: 

901 res = admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user=user1.username)) 

902 

903 # user1 wrote 1 reference 

904 assert len(res.references_from) == 1 

905 assert res.references_from[0].reference_id == ref1.reference_id 

906 assert res.references_from[0].from_user_id == user1.id 

907 assert res.references_from[0].to_user_id == user2.id 

908 assert res.references_from[0].text == "Reference from user1 to user2" 

909 

910 # user1 received 2 references 

911 assert len(res.references_to) == 2 

912 # Ordered by id descending, so ref3 comes first 

913 assert res.references_to[0].reference_id == ref3.reference_id 

914 assert res.references_to[0].was_appropriate is False 

915 

916 assert res.references_to[1].reference_id == ref2.reference_id 

917 assert res.references_to[1].private_text == "Private note" 

918 assert res.references_to[1].rating == 0.8 

919 

920 

921def test_GetUserReferences_not_found(db): 

922 super_user, super_token = generate_user(is_superuser=True) 

923 

924 with real_admin_session(super_token) as admin_api: 

925 with pytest.raises(grpc.RpcError) as e: 

926 admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user="nonexistent")) 

927 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

928 

929 

930def test_GetFriendRequests(db): 

931 super_user, super_token = generate_user(is_superuser=True) 

932 

933 user1, _ = generate_user() 

934 user2, _ = generate_user() 

935 user3, _ = generate_user() 

936 user4, _ = generate_user() 

937 

938 # Create a mix of friend requests directly so we control the state 

939 def _add_friend_request(from_user_id, to_user_id, status, visibility, time_responded=None): 

940 with session_scope() as session: 

941 mod_state = ModerationState( 

942 object_type=ModerationObjectType.friend_request, 

943 object_id=0, 

944 visibility=visibility, 

945 ) 

946 session.add(mod_state) 

947 session.flush() 

948 rel = FriendRelationship( 

949 from_user_id=from_user_id, 

950 to_user_id=to_user_id, 

951 status=status, 

952 moderation_state_id=mod_state.id, 

953 time_responded=time_responded, 

954 ) 

955 session.add(rel) 

956 session.flush() 

957 mod_state.object_id = rel.id 

958 

959 # user1 -> user2: pending, shadowed 

960 _add_friend_request(user1.id, user2.id, FriendStatus.pending, ModerationVisibility.shadowed) 

961 # user1 -> user3: accepted, visible 

962 _add_friend_request(user1.id, user3.id, FriendStatus.accepted, ModerationVisibility.visible, time_responded=now()) 

963 # user4 -> user1: rejected, visible 

964 _add_friend_request(user4.id, user1.id, FriendStatus.rejected, ModerationVisibility.visible, time_responded=now()) 

965 

966 with real_admin_session(super_token) as admin_api: 

967 res = admin_api.GetFriendRequests(admin_pb2.GetFriendRequestsReq(user=user1.username)) 

968 

969 # user1 sent two: to user2 (pending) and to user3 (accepted), ordered by id desc 

970 assert len(res.sent) == 2 

971 assert res.sent[0].from_user.user_id == user1.id 

972 assert res.sent[0].to_user.user_id == user3.id 

973 assert res.sent[0].status == "accepted" 

974 assert res.sent[0].HasField("time_responded") 

975 assert res.sent[0].moderation_visibility == "visible" 

976 

977 assert res.sent[1].from_user.user_id == user1.id 

978 assert res.sent[1].to_user.user_id == user2.id 

979 assert res.sent[1].status == "pending" 

980 assert not res.sent[1].HasField("time_responded") 

981 assert res.sent[1].moderation_visibility == "shadowed" 

982 

983 # user1 received one: from user4 (rejected) 

984 assert len(res.received) == 1 

985 assert res.received[0].from_user.user_id == user4.id 

986 assert res.received[0].to_user.user_id == user1.id 

987 assert res.received[0].status == "rejected" 

988 

989 

990def test_GetFriendRequests_not_found(db): 

991 super_user, super_token = generate_user(is_superuser=True) 

992 

993 with real_admin_session(super_token) as admin_api: 

994 with pytest.raises(grpc.RpcError) as e: 

995 admin_api.GetFriendRequests(admin_pb2.GetFriendRequestsReq(user="nonexistent")) 

996 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

997 

998 

999def test_AddUsersToModerationUserList(db): 

1000 super_user, super_token = generate_user(is_superuser=True) 

1001 user1, _ = generate_user() 

1002 user2, _ = generate_user() 

1003 user3, _ = generate_user() 

1004 user4, _ = generate_user() 

1005 user5, _ = generate_user() 

1006 moderation_list_id = add_users_to_new_moderation_list([user1]) 

1007 

1008 with session_scope() as session: 

1009 with real_admin_session(super_token) as api: 

1010 # Test adding users to a non-existent moderation list (should raise an error) 

1011 with pytest.raises(grpc.RpcError) as e: 

1012 api.AddUsersToModerationUserList( 

1013 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999), 

1014 ) 

1015 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1016 assert "Moderation user list not found." == e.value.details() 

1017 

1018 # Test with non-existent user (should raise an error) 

1019 with pytest.raises(grpc.RpcError) as e: 

1020 api.AddUsersToModerationUserList( 

1021 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]), 

1022 ) 

1023 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1024 assert "Couldn't find that user." == e.value.details() 

1025 

1026 # Test successful creation of new moderation list (no moderation_list_id provided) 

1027 res = api.AddUsersToModerationUserList( 

1028 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]), 

1029 ) 

1030 assert res.moderation_list_id > 0 

1031 with session_scope() as session: 

1032 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id) 

1033 assert moderation_user_list is not None 

1034 assert len(moderation_user_list.users) == 3 

1035 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users}) 

1036 

1037 # Test list endpoint returns same moderation list with same members not repeated 

1038 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

1039 assert len(listRes.moderation_lists) == 1 

1040 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id 

1041 assert len(listRes.moderation_lists[0].members) == 3 

1042 assert {user1.id, user2.id, user3.id}.issubset({m.user_id for m in listRes.moderation_lists[0].members}) 

1043 

1044 # Test user can be in multiple moderation lists 

1045 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

1046 assert len(listRes3.moderation_lists) == 2 

1047 

1048 # Test adding users to an existing moderation list 

1049 res2 = api.AddUsersToModerationUserList( 

1050 admin_pb2.AddUsersToModerationUserListReq( 

1051 users=[user4.username, user5.username], moderation_list_id=moderation_list_id 

1052 ), 

1053 ) 

1054 assert res2.moderation_list_id == moderation_list_id 

1055 with session_scope() as session: 

1056 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id) 

1057 assert len(moderation_user_list.users) == 3 

1058 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users}) 

1059 

1060 # Test list user moderation lists endpoint returns the right moderation list 

1061 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username)) 

1062 assert len(listRes2.moderation_lists) == 1 

1063 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id 

1064 assert len(listRes2.moderation_lists[0].members) == 3 

1065 assert {user1.id, user4.id, user5.id}.issubset({m.user_id for m in listRes2.moderation_lists[0].members}) 

1066 

1067 

1068def test_RemoveUserFromModerationUserList(db): 

1069 super_user, super_token = generate_user(is_superuser=True) 

1070 user1, _ = generate_user() 

1071 user2, _ = generate_user() 

1072 user3, _ = generate_user() 

1073 moderation_list_id = add_users_to_new_moderation_list([user1, user2]) 

1074 

1075 with real_admin_session(super_token) as api: 

1076 # Test with non-existent user (should raise error) 

1077 with pytest.raises(grpc.RpcError) as e: 

1078 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent")) 

1079 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1080 assert "Couldn't find that user." == e.value.details() 

1081 

1082 # Test without providing moderation list id (should raise error) 

1083 with pytest.raises(grpc.RpcError) as e: 

1084 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username)) 

1085 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1086 assert "Missing moderation user list id." == e.value.details() 

1087 

1088 # Test removing user that's not in the provided moderation list (should raise error) 

1089 with pytest.raises(grpc.RpcError) as e: 

1090 api.RemoveUserFromModerationUserList( 

1091 admin_pb2.RemoveUserFromModerationUserListReq( 

1092 user=user3.username, moderation_list_id=moderation_list_id 

1093 ) 

1094 ) 

1095 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1096 assert "User is not in the moderation user list." == e.value.details() 

1097 

1098 # Test successful removal 

1099 api.RemoveUserFromModerationUserList( 

1100 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id) 

1101 ) 

1102 with session_scope() as session: 

1103 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id) 

1104 assert user1.id not in {user.id for user in moderation_user_list.users} 

1105 assert user2.id in {user.id for user in moderation_user_list.users} 

1106 

1107 # Test list user moderation lists endpoint returns right number of moderation lists 

1108 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

1109 assert len(listRes.moderation_lists) == 0 

1110 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

1111 assert len(listRes2.moderation_lists) == 1 

1112 

1113 # Test removing all users from moderation list should also delete the moderation list 

1114 api.RemoveUserFromModerationUserList( 

1115 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id) 

1116 ) 

1117 with session_scope() as session: 

1118 assert session.get(ModerationUserList, moderation_list_id) is None 

1119 

1120 

1121def test_admin_delete_account_url(db, email_collector: EmailCollector, push_collector: PushCollector): 

1122 super_user, super_token = generate_user(is_superuser=True) 

1123 

1124 user, token = generate_user() 

1125 user_id = user.id 

1126 

1127 with real_admin_session(super_token) as admin_api: 

1128 url = admin_api.CreateAccountDeletionLink( 

1129 admin_pb2.CreateAccountDeletionLinkReq(user=user.username) 

1130 ).account_deletion_confirm_url 

1131 

1132 assert push_collector.count_for_user(user_id) == 0 

1133 

1134 with session_scope() as session: 

1135 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

1136 token = token_o.token 

1137 assert token_o.user.id == user_id 

1138 assert url == f"http://localhost:3000/delete-account?token={token}" 

1139 

1140 with auth_api_session() as (auth_api, metadata_interceptor): 

1141 auth_api.ConfirmDeleteAccount( 

1142 auth_pb2.ConfirmDeleteAccountReq( 

1143 token=token, 

1144 ) 

1145 ) 

1146 

1147 push = push_collector.pop_for_user(user_id, last=True) 

1148 assert push.content.title == "Account deleted" 

1149 assert push.content.body == "You can restore it within 7 days using the link we emailed you." 

1150 email_collector.pop_for_recipient(user.email, last=True) 

1151 

1152 

1153def test_AccessStats(db): 

1154 super_user, super_token = generate_user(is_superuser=True) 

1155 normal_user, normal_token = generate_user() 

1156 

1157 # Insert UserActivity rows: a couple inside the default 90-day window, one well 

1158 # outside it, and one with NULL ip_address / user_agent. The INET column is 

1159 # returned by psycopg3 as an IPv4Address/IPv6Address object, which used to 

1160 # crash the proto string assignment. 

1161 in_window_1 = now() - timedelta(days=1) 

1162 in_window_2 = now() - timedelta(days=10) 

1163 out_of_window = now() - timedelta(days=200) 

1164 with session_scope() as session: 

1165 session.add( 

1166 UserActivity( 

1167 user_id=normal_user.id, period=in_window_1, ip_address="1.2.3.4", user_agent="ua-a", api_calls=5 

1168 ) 

1169 ) 

1170 session.add( 

1171 UserActivity( 

1172 user_id=normal_user.id, period=in_window_2, ip_address="2001:db8::1", user_agent="ua-b", api_calls=3 

1173 ) 

1174 ) 

1175 session.add( 

1176 UserActivity( 

1177 user_id=normal_user.id, period=out_of_window, ip_address="9.9.9.9", user_agent="ua-old", api_calls=99 

1178 ) 

1179 ) 

1180 session.add(UserActivity(user_id=normal_user.id, period=in_window_1, api_calls=1)) 

1181 

1182 with real_admin_session(super_token) as api: 

1183 res = api.AccessStats(admin_pb2.AccessStatsReq(user=normal_user.username)) 

1184 

1185 by_ip = {s.ip_address: s for s in res.stats} 

1186 assert "1.2.3.4" in by_ip 

1187 assert by_ip["1.2.3.4"].api_call_count == 5 

1188 assert by_ip["1.2.3.4"].user_agent == "ua-a" 

1189 assert "2001:db8::1" in by_ip 

1190 assert by_ip["2001:db8::1"].api_call_count == 3 

1191 # NULL ip_address row produces an empty-string ip_address in the proto 

1192 assert "" in by_ip 

1193 assert by_ip[""].api_call_count == 1 

1194 # out-of-window row is excluded by the 90-day default 

1195 assert "9.9.9.9" not in by_ip 

1196 

1197 # explicit end_time should bound the upper end of the window (regression: was >=) 

1198 with real_admin_session(super_token) as api: 

1199 res = api.AccessStats( 

1200 admin_pb2.AccessStatsReq( 

1201 user=normal_user.username, 

1202 start_time=Timestamp_from_datetime(now() - timedelta(days=5)), 

1203 end_time=Timestamp_from_datetime(now()), 

1204 ) 

1205 ) 

1206 ips = {s.ip_address for s in res.stats} 

1207 assert ips == {"1.2.3.4", ""} 

1208 

1209 

1210def test_SetLastDonated(db): 

1211 super_user, super_token = generate_user(is_superuser=True) 

1212 normal_user, normal_token = generate_user(last_donated=None) 

1213 

1214 with real_admin_session(super_token) as api: 

1215 # user starts with no last_donated 

1216 with session_scope() as session: 

1217 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

1218 assert user.last_donated is None 

1219 

1220 # can set last_donated 

1221 donation_time = now() - timedelta(days=30) 

1222 res = api.SetLastDonated( 

1223 admin_pb2.SetLastDonatedReq( 

1224 user=normal_user.username, 

1225 last_donated=Timestamp_from_datetime(donation_time), 

1226 ) 

1227 ) 

1228 

1229 with session_scope() as session: 

1230 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

1231 assert user.last_donated is not None 

1232 # check timestamp is close (within a second) 

1233 assert abs((user.last_donated - donation_time).total_seconds()) < 1 

1234 

1235 # can clear last_donated by not setting the field 

1236 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username)) 

1237 

1238 with session_scope() as session: 

1239 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

1240 assert user.last_donated is None 

1241 

1242 # user not found 

1243 with pytest.raises(grpc.RpcError) as e: 

1244 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent")) 

1245 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1246 assert e.value.details() == "Couldn't find that user." 

1247 

1248 

1249def test_admin_actions_level(db): 

1250 super_user, super_token = generate_user(is_superuser=True) 

1251 normal_user, _ = generate_user() 

1252 

1253 with real_admin_session(super_token) as api: 

1254 # Default level is NORMAL 

1255 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note="normal note")) 

1256 assert res.admin_actions[0].level == admin_pb2.ADMIN_ACTION_LEVEL_NORMAL 

1257 

1258 # Explicitly set to DEBUG 

1259 res = api.AddAdminNote( 

1260 admin_pb2.AddAdminNoteReq( 

1261 user=normal_user.username, 

1262 admin_note="debug note", 

1263 level=admin_pb2.ADMIN_ACTION_LEVEL_DEBUG, 

1264 ) 

1265 ) 

1266 assert len(res.admin_actions) == 2 

1267 assert res.admin_actions[1].level == admin_pb2.ADMIN_ACTION_LEVEL_DEBUG 

1268 

1269 # Explicitly set to HIGH 

1270 res = api.AddAdminNote( 

1271 admin_pb2.AddAdminNoteReq( 

1272 user=normal_user.username, 

1273 admin_note="high note", 

1274 level=admin_pb2.ADMIN_ACTION_LEVEL_HIGH, 

1275 ) 

1276 ) 

1277 assert len(res.admin_actions) == 3 

1278 assert res.admin_actions[2].level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH 

1279 

1280 # Explicitly set to TRACE 

1281 res = api.AddAdminNote( 

1282 admin_pb2.AddAdminNoteReq( 

1283 user=normal_user.username, 

1284 admin_note="trace note", 

1285 level=admin_pb2.ADMIN_ACTION_LEVEL_TRACE, 

1286 ) 

1287 ) 

1288 assert len(res.admin_actions) == 4 

1289 assert res.admin_actions[3].level == admin_pb2.ADMIN_ACTION_LEVEL_TRACE 

1290 

1291 

1292def test_admin_actions_on_mutations(db, push_collector: PushCollector): 

1293 super_user, super_token = generate_user(is_superuser=True) 

1294 normal_user, _ = generate_user() 

1295 

1296 original_gender = normal_user.gender 

1297 original_birthdate = normal_user.birthdate 

1298 

1299 with real_admin_session(super_token) as api: 

1300 # ChangeUserGender 

1301 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

1302 assert any( 

1303 a.action_type == "change_gender" and a.note == f"Changed from '{original_gender}' to 'Machine'" 

1304 for a in res.admin_actions 

1305 ) 

1306 

1307 # ChangeUserBirthdate 

1308 res = api.ChangeUserBirthdate( 

1309 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-01-01") 

1310 ) 

1311 assert any( 

1312 a.action_type == "change_birthdate" and a.note == f"Changed from {original_birthdate} to 1990-01-01" 

1313 for a in res.admin_actions 

1314 ) 

1315 

1316 # SetPassportSexGenderException 

1317 res = api.SetPassportSexGenderException( 

1318 admin_pb2.SetPassportSexGenderExceptionReq(user=normal_user.username, passport_sex_gender_exception=True) 

1319 ) 

1320 assert any( 

1321 a.action_type == "set_passport_sex_gender_exception" and a.note == "Changed from False to True" 

1322 for a in res.admin_actions 

1323 ) 

1324 

1325 # SendModNote with notify 

1326 res = api.SendModNote( 

1327 admin_pb2.SendModNoteReq( 

1328 user=normal_user.username, content="Please update your profile", internal_id="test1" 

1329 ) 

1330 ) 

1331 assert any( 

1332 a.action_type == "send_mod_note" and a.note == "Notify user: Yes\n\nPlease update your profile" 

1333 for a in res.admin_actions 

1334 ) 

1335 

1336 # SendModNote with do_not_notify 

1337 res = api.SendModNote( 

1338 admin_pb2.SendModNoteReq( 

1339 user=normal_user.username, 

1340 content="Silent note", 

1341 internal_id="test2", 

1342 do_not_notify=True, 

1343 ) 

1344 ) 

1345 assert any( 

1346 a.action_type == "send_mod_note" and a.note == "Notify user: No\n\nSilent note" for a in res.admin_actions 

1347 ) 

1348 

1349 # DeleteUser 

1350 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

1351 assert any(a.action_type == "delete_user" for a in res.admin_actions) 

1352 assert any( 

1353 a.action_type == "delete_user" and a.level == admin_pb2.ADMIN_ACTION_LEVEL_HIGH for a in res.admin_actions 

1354 ) 

1355 

1356 # RecoverDeletedUser 

1357 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

1358 assert any(a.action_type == "recover_user" for a in res.admin_actions) 

1359 

1360 # MarkUserNeedsLocationUpdate 

1361 res = api.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=normal_user.username)) 

1362 assert any( 

1363 a.action_type == "mark_needs_location_update" and a.note == "Marked user as needing location update" 

1364 for a in res.admin_actions 

1365 ) 

1366 

1367 # SetLastDonated 

1368 res = api.SetLastDonated( 

1369 admin_pb2.SetLastDonatedReq( 

1370 user=normal_user.username, 

1371 last_donated=Timestamp_from_datetime(now()), 

1372 ) 

1373 ) 

1374 assert any(a.action_type == "set_last_donated" for a in res.admin_actions) 

1375 

1376 

1377def test_create_admin_tag(db): 

1378 super_user, super_token = generate_user(is_superuser=True) 

1379 

1380 with real_admin_session(super_token) as api: 

1381 res = api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

1382 assert res.tag == "test-tag" 

1383 assert res.admin_tag_id > 0 

1384 

1385 

1386def test_create_admin_tag_duplicate(db): 

1387 super_user, super_token = generate_user(is_superuser=True) 

1388 

1389 with real_admin_session(super_token) as api: 

1390 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

1391 with pytest.raises(grpc.RpcError) as e: 

1392 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="test-tag")) 

1393 assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS 

1394 assert e.value.details() == "That admin tag already exists." 

1395 

1396 

1397def test_create_admin_tag_empty(db): 

1398 super_user, super_token = generate_user(is_superuser=True) 

1399 

1400 with real_admin_session(super_token) as api: 

1401 with pytest.raises(grpc.RpcError) as e: 

1402 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="")) 

1403 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1404 assert e.value.details() == "The admin tag cannot be empty." 

1405 

1406 with pytest.raises(grpc.RpcError) as e: 

1407 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag=" ")) 

1408 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1409 assert e.value.details() == "The admin tag cannot be empty." 

1410 

1411 

1412def test_list_admin_tags(db): 

1413 super_user, super_token = generate_user(is_superuser=True) 

1414 

1415 with real_admin_session(super_token) as api: 

1416 # Empty initially 

1417 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq()) 

1418 assert len(res.tags) == 0 

1419 

1420 # Add some tags 

1421 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="bravo")) 

1422 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="alpha")) 

1423 

1424 res = api.ListAdminTags(admin_pb2.ListAdminTagsReq()) 

1425 assert len(res.tags) == 2 

1426 # Ordered alphabetically 

1427 assert res.tags[0].tag == "alpha" 

1428 assert res.tags[1].tag == "bravo" 

1429 

1430 

1431def test_add_admin_tag_to_user(db): 

1432 super_user, super_token = generate_user(is_superuser=True) 

1433 normal_user, _ = generate_user() 

1434 

1435 with real_admin_session(super_token) as api: 

1436 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1437 

1438 res = api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1439 assert "vip" in res.admin_tags 

1440 assert any(a.action_type == "add_tag" and a.tag == "vip" for a in res.admin_actions) 

1441 

1442 

1443def test_add_admin_tag_to_user_duplicate(db): 

1444 super_user, super_token = generate_user(is_superuser=True) 

1445 normal_user, _ = generate_user() 

1446 

1447 with real_admin_session(super_token) as api: 

1448 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1449 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1450 

1451 with pytest.raises(grpc.RpcError) as e: 

1452 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1453 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1454 assert e.value.details() == "The user already has that admin tag." 

1455 

1456 

1457def test_add_admin_tag_to_user_tag_not_found(db): 

1458 super_user, super_token = generate_user(is_superuser=True) 

1459 normal_user, _ = generate_user() 

1460 

1461 with real_admin_session(super_token) as api: 

1462 with pytest.raises(grpc.RpcError) as e: 

1463 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="nonexistent")) 

1464 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1465 assert e.value.details() == "Admin tag not found." 

1466 

1467 

1468def test_remove_admin_tag_from_user(db): 

1469 super_user, super_token = generate_user(is_superuser=True) 

1470 normal_user, _ = generate_user() 

1471 

1472 with real_admin_session(super_token) as api: 

1473 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1474 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=normal_user.username, tag="vip")) 

1475 

1476 res = api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip")) 

1477 assert "vip" not in res.admin_tags 

1478 assert any(a.action_type == "remove_tag" and a.tag == "vip" for a in res.admin_actions) 

1479 

1480 

1481def test_remove_admin_tag_from_user_not_assigned(db): 

1482 super_user, super_token = generate_user(is_superuser=True) 

1483 normal_user, _ = generate_user() 

1484 

1485 with real_admin_session(super_token) as api: 

1486 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1487 

1488 with pytest.raises(grpc.RpcError) as e: 

1489 api.RemoveAdminTagFromUser(admin_pb2.RemoveAdminTagFromUserReq(user=normal_user.username, tag="vip")) 

1490 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1491 assert e.value.details() == "The user does not have that admin tag." 

1492 

1493 

1494def test_search_users_by_admin_tag(db): 

1495 super_user, super_token = generate_user(is_superuser=True) 

1496 user1, _ = generate_user() 

1497 user2, _ = generate_user() 

1498 user3, _ = generate_user() 

1499 

1500 with real_admin_session(super_token) as api: 

1501 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="vip")) 

1502 api.CreateAdminTag(admin_pb2.CreateAdminTagReq(tag="flagged")) 

1503 

1504 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user1.username, tag="vip")) 

1505 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="vip")) 

1506 api.AddAdminTagToUser(admin_pb2.AddAdminTagToUserReq(user=user2.username, tag="flagged")) 

1507 

1508 # Search for users with "vip" tag 

1509 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip"])) 

1510 user_ids = {u.user_id for u in res.users} 

1511 assert user1.id in user_ids 

1512 assert user2.id in user_ids 

1513 assert user3.id not in user_ids 

1514 

1515 # Search for users with both "vip" AND "flagged" tags (AND logic) 

1516 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["vip", "flagged"])) 

1517 user_ids = {u.user_id for u in res.users} 

1518 assert user2.id in user_ids 

1519 assert user1.id not in user_ids 

1520 

1521 # Search for non-existent tag returns no results 

1522 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_tags=["nonexistent"])) 

1523 assert len(res.users) == 0 

1524 

1525 

1526def test_search_users_by_admin_note(db): 

1527 super_user, super_token = generate_user(is_superuser=True) 

1528 user1, _ = generate_user() 

1529 user2, _ = generate_user() 

1530 

1531 with real_admin_session(super_token) as api: 

1532 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="suspicious activity")) 

1533 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="normal user")) 

1534 

1535 # Search by admin action log content (ilike) 

1536 res = api.SearchUsers(admin_pb2.SearchUsersReq(admin_action_log="%suspicious%")) 

1537 user_ids = {u.user_id for u in res.users} 

1538 assert user1.id in user_ids 

1539 assert user2.id not in user_ids 

1540 

1541 

1542def test_ListAdminActions_empty(db): 

1543 super_user, super_token = generate_user(is_superuser=True) 

1544 

1545 with real_admin_session(super_token) as api: 

1546 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq()) 

1547 assert len(res.admin_actions) == 0 

1548 assert res.next_page_token == "" 

1549 

1550 

1551def test_ListAdminActions_returns_newest_first_with_target_info(db): 

1552 super_user, super_token = generate_user(is_superuser=True) 

1553 user1, _ = generate_user() 

1554 user2, _ = generate_user() 

1555 

1556 with real_admin_session(super_token) as api: 

1557 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="first note")) 

1558 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="second note")) 

1559 api.BanUser(admin_pb2.BanUserReq(user=user1.username, admin_note="ban reason")) 

1560 

1561 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq()) 

1562 

1563 assert len(res.admin_actions) == 3 

1564 # Newest first 

1565 assert res.admin_actions[0].action_type == "ban" 

1566 assert res.admin_actions[0].target_user_id == user1.id 

1567 assert res.admin_actions[0].target_username == user1.username 

1568 assert res.admin_actions[0].admin_user_id == super_user.id 

1569 assert res.admin_actions[0].admin_username == super_user.username 

1570 assert res.admin_actions[1].action_type == "note" 

1571 assert res.admin_actions[1].target_user_id == user2.id 

1572 assert res.admin_actions[2].action_type == "note" 

1573 assert res.admin_actions[2].target_user_id == user1.id 

1574 

1575 

1576def test_ListAdminActions_filter_by_admin_and_target(db): 

1577 super1, super1_token = generate_user(is_superuser=True) 

1578 super2, super2_token = generate_user(is_superuser=True) 

1579 user1, _ = generate_user() 

1580 user2, _ = generate_user() 

1581 

1582 with real_admin_session(super1_token) as api: 

1583 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="from super1 to user1")) 

1584 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user2.username, admin_note="from super1 to user2")) 

1585 with real_admin_session(super2_token) as api: 

1586 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user1.username, admin_note="from super2 to user1")) 

1587 

1588 with real_admin_session(super1_token) as api: 

1589 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq(admin_user_id=super1.id)) 

1590 assert {a.note for a in res.admin_actions} == {"from super1 to user1", "from super1 to user2"} 

1591 

1592 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq(target_user_id=user1.id)) 

1593 assert {a.note for a in res.admin_actions} == {"from super1 to user1", "from super2 to user1"} 

1594 

1595 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq(admin_user_id=super1.id, target_user_id=user1.id)) 

1596 assert [a.note for a in res.admin_actions] == ["from super1 to user1"] 

1597 

1598 

1599def test_ListAdminActions_pagination(db): 

1600 super_user, super_token = generate_user(is_superuser=True) 

1601 user, _ = generate_user() 

1602 

1603 with real_admin_session(super_token) as api: 

1604 for i in range(3): 

1605 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=user.username, admin_note=f"note {i}")) 

1606 

1607 res = api.ListAdminActions(admin_pb2.ListAdminActionsReq(page_size=2)) 

1608 assert len(res.admin_actions) == 2 

1609 assert res.next_page_token != "" 

1610 first_page_notes = [a.note for a in res.admin_actions] 

1611 

1612 res2 = api.ListAdminActions(admin_pb2.ListAdminActionsReq(page_size=2, page_token=res.next_page_token)) 

1613 assert len(res2.admin_actions) == 1 

1614 assert res2.next_page_token == "" 

1615 

1616 all_notes = first_page_notes + [a.note for a in res2.admin_actions] 

1617 assert set(all_notes) == {"note 0", "note 1", "note 2"} 

1618 

1619 

1620def test_ListUserUploads(db): 

1621 super_user, super_token = generate_user(is_superuser=True) 

1622 user, _ = generate_user(complete_profile=False) 

1623 other_user, _ = generate_user() 

1624 

1625 with session_scope() as session: 

1626 for i in range(3): 

1627 session.add( 

1628 Upload( 

1629 key=f"key{i}", 

1630 filename=f"photo{i}.jpg", 

1631 creator_user_id=user.id, 

1632 credit=f"credit {i}" if i == 0 else None, 

1633 ) 

1634 ) 

1635 session.add(Upload(key="other_key", filename="other.jpg", creator_user_id=other_user.id)) 

1636 

1637 with real_admin_session(super_token) as api: 

1638 res = api.ListUserUploads(admin_pb2.ListUserUploadsReq(user=user.username)) 

1639 

1640 assert len(res.uploads) == 3 

1641 assert res.next_page_token == "" 

1642 assert {u.filename for u in res.uploads} == {"photo0.jpg", "photo1.jpg", "photo2.jpg"} 

1643 

1644 upload0 = next(u for u in res.uploads if u.key == "key0") 

1645 assert upload0.credit == "credit 0" 

1646 assert upload0.full_url.endswith("/img/full/photo0.jpg") 

1647 assert upload0.thumbnail_url.endswith("/img/thumbnail/photo0.jpg") 

1648 assert upload0.HasField("created") 

1649 

1650 

1651def test_ListUserUploads_pagination(db): 

1652 super_user, super_token = generate_user(is_superuser=True) 

1653 user, _ = generate_user(complete_profile=False) 

1654 

1655 with session_scope() as session: 

1656 for i in range(3): 

1657 session.add(Upload(key=f"key{i}", filename=f"photo{i}.jpg", creator_user_id=user.id)) 

1658 

1659 with real_admin_session(super_token) as api: 

1660 res = api.ListUserUploads(admin_pb2.ListUserUploadsReq(user=user.username, page_size=2)) 

1661 assert len(res.uploads) == 2 

1662 assert res.next_page_token != "" 

1663 first_page_keys = [u.key for u in res.uploads] 

1664 

1665 res2 = api.ListUserUploads( 

1666 admin_pb2.ListUserUploadsReq(user=user.username, page_size=2, page_token=res.next_page_token) 

1667 ) 

1668 assert len(res2.uploads) == 1 

1669 assert res2.next_page_token == "" 

1670 

1671 all_keys = first_page_keys + [u.key for u in res2.uploads] 

1672 assert set(all_keys) == {"key0", "key1", "key2"} 

1673 

1674 

1675def test_ListUserUploads_uses(db): 

1676 super_user, super_token = generate_user(is_superuser=True) 

1677 user, _ = generate_user(complete_profile=False) 

1678 

1679 with session_scope() as session: 

1680 session.add(Upload(key="used_key", filename="used.jpg", creator_user_id=user.id)) 

1681 session.add(Upload(key="unused_key", filename="unused.jpg", creator_user_id=user.id)) 

1682 gallery = PhotoGallery(owner_user_id=user.id) 

1683 session.add(gallery) 

1684 session.flush() 

1685 session.add(PhotoGalleryItem(gallery_id=gallery.id, upload_key="used_key", position=1.0)) 

1686 

1687 with real_admin_session(super_token) as api: 

1688 res = api.ListUserUploads(admin_pb2.ListUserUploadsReq(user=user.username)) 

1689 

1690 uploads = {u.key: u for u in res.uploads} 

1691 assert list(uploads["unused_key"].uses) == [] 

1692 

1693 used_uses = uploads["used_key"].uses 

1694 assert len(used_uses) == 1 

1695 assert used_uses[0].type == admin_pb2.UPLOAD_USE_TYPE_PROFILE_GALLERY_PHOTO_AVATAR 

1696 assert used_uses[0].is_current 

1697 assert used_uses[0].user_id == user.id 

1698 

1699 

1700def test_ListUserUploads_not_found(db): 

1701 super_user, super_token = generate_user(is_superuser=True) 

1702 

1703 with real_admin_session(super_token) as api: 

1704 with pytest.raises(grpc.RpcError) as e: 

1705 api.ListUserUploads(admin_pb2.ListUserUploadsReq(user="nonexistent")) 

1706 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1707 

1708 

1709# community invite feature tested in test_events.py 

1710# SendBlogPostNotification tested in test_notifications.py 

1711# MarkUserNeedsLocationUpdate tested in test_jail.py 

1712 

1713 

1714def _ota_manifest(*, version, fingerprint, created_at="2026-05-31T00:00:00.000Z"): 

1715 return { 

1716 "id": f"id-{version}", 

1717 "createdAt": created_at, 

1718 "runtimeVersion": fingerprint, 

1719 "launchAsset": {"key": "bundle", "url": f"https://cdn.testing.invalid/{version}/bundle.hbc"}, 

1720 "assets": [], 

1721 "metadata": {}, 

1722 "extra": {}, 

1723 } 

1724 

1725 

1726def _ota_signed_multipart(manifest): 

1727 # Mimics the signed multipart body the CDN holds (signature header omitted; we only read the JSON). 

1728 boundary = "COUCHERS_OTA_BOUNDARY" 

1729 

1730 def part(name, body, content_type): 

1731 return f'--{boundary}\r\ncontent-disposition: form-data; name="{name}"\r\ncontent-type: {content_type}\r\n\r\n{body}\r\n' 

1732 

1733 body = ( 

1734 part("manifest", json.dumps(manifest), "application/json; charset=utf-8") 

1735 + part("extensions", "{}", "application/json") 

1736 + f"--{boundary}--\r\n" 

1737 ) 

1738 return f"multipart/mixed; boundary={boundary}", body.encode() 

1739 

1740 

1741def _patch_ota_cdn(manifests): 

1742 # manifests: {version: manifest_dict}. URL is {cdn_root}/{version}/{platform}/manifest. 

1743 def fake(url): 

1744 version = url.split("/")[-3] 

1745 if version not in manifests: 

1746 return "multipart/mixed; boundary=COUCHERS_OTA_BOUNDARY", b"" 

1747 return _ota_signed_multipart(manifests[version]) 

1748 

1749 return patch("couchers.servicers.admin._fetch_signed_manifest", side_effect=fake) 

1750 

1751 

1752def test_CreateOTAPackage(db): 

1753 super_user, super_token = generate_user(is_superuser=True) 

1754 

1755 manifests = {"v1.3.1.aaaa": _ota_manifest(version="v1.3.1.aaaa", fingerprint="ios-fp")} 

1756 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api: 

1757 res = api.CreateOTAPackage( 

1758 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.aaaa") 

1759 ) 

1760 

1761 assert res.platform == admin_pb2.OTA_PLATFORM_IOS 

1762 assert res.fingerprint == "ios-fp" 

1763 assert res.version == "v1.3.1.aaaa" 

1764 assert res.manifest_id == "id-v1.3.1.aaaa" 

1765 assert res.banned is False 

1766 assert res.live is True 

1767 assert res.creator_user_id == super_user.id 

1768 

1769 

1770def test_CreateOTAPackage_invalid(db): 

1771 _, super_token = generate_user(is_superuser=True) 

1772 

1773 manifests = {"v-incomplete": {"id": "x"}} # on the CDN but missing runtimeVersion / createdAt 

1774 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api: 

1775 # missing version 

1776 with pytest.raises(grpc.RpcError) as e: 

1777 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS)) 

1778 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1779 

1780 # nothing published at this version 

1781 with pytest.raises(grpc.RpcError) as e: 

1782 api.CreateOTAPackage( 

1783 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v-missing") 

1784 ) 

1785 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1786 

1787 # manifest present but missing required fields 

1788 with pytest.raises(grpc.RpcError) as e: 

1789 api.CreateOTAPackage( 

1790 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v-incomplete") 

1791 ) 

1792 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1793 

1794 

1795def test_CreateOTAPackage_rejects_duplicate_version(db): 

1796 _, super_token = generate_user(is_superuser=True) 

1797 

1798 manifests = {"v1.3.1.aaaa": _ota_manifest(version="v1.3.1.aaaa", fingerprint="ios-fp")} 

1799 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api: 

1800 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.aaaa")) 

1801 with pytest.raises(grpc.RpcError) as e: 

1802 api.CreateOTAPackage( 

1803 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.aaaa") 

1804 ) 

1805 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1806 

1807 

1808def test_ListOTAPackages(db): 

1809 _, super_token = generate_user(is_superuser=True) 

1810 

1811 manifests = { 

1812 "v1.3.1.ios": _ota_manifest(version="v1.3.1.ios", fingerprint="ios-fp", created_at="2026-05-30T00:00:00.000Z"), 

1813 "v1.3.2.ios": _ota_manifest(version="v1.3.2.ios", fingerprint="ios-fp", created_at="2026-05-31T00:00:00.000Z"), 

1814 "v1.3.2.android": _ota_manifest( 

1815 version="v1.3.2.android", fingerprint="android-fp", created_at="2026-06-01T00:00:00.000Z" 

1816 ), 

1817 } 

1818 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api: 

1819 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.ios")) 

1820 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.2.ios")) 

1821 api.CreateOTAPackage( 

1822 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_ANDROID, version="v1.3.2.android") 

1823 ) 

1824 

1825 res = api.ListOTAPackages(admin_pb2.ListOTAPackagesReq()) 

1826 # newest (by manifest createdAt) first 

1827 assert [p.version for p in res.packages] == ["v1.3.2.android", "v1.3.2.ios", "v1.3.1.ios"] 

1828 # only the newest per (platform, fingerprint) is live 

1829 live = {p.version: p.live for p in res.packages} 

1830 assert live == {"v1.3.2.android": True, "v1.3.2.ios": True, "v1.3.1.ios": False} 

1831 

1832 ios = api.ListOTAPackages(admin_pb2.ListOTAPackagesReq(platform=admin_pb2.OTA_PLATFORM_IOS)) 

1833 assert [p.version for p in ios.packages] == ["v1.3.2.ios", "v1.3.1.ios"] 

1834 

1835 

1836def test_BanOTAPackage(db): 

1837 super_user, super_token = generate_user(is_superuser=True) 

1838 

1839 manifests = { 

1840 "v1.3.1.good": _ota_manifest( 

1841 version="v1.3.1.good", fingerprint="ios-fp", created_at="2026-05-30T00:00:00.000Z" 

1842 ), 

1843 "v1.3.2.bad": _ota_manifest(version="v1.3.2.bad", fingerprint="ios-fp", created_at="2026-05-31T00:00:00.000Z"), 

1844 } 

1845 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api: 

1846 api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1.good")) 

1847 second = api.CreateOTAPackage( 

1848 admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.2.bad") 

1849 ) 

1850 assert second.live is True 

1851 

1852 banned = api.BanOTAPackage( 

1853 admin_pb2.BanOTAPackageReq(ota_package_id=second.ota_package_id, reason="bad bundle") 

1854 ) 

1855 assert banned.banned is True 

1856 assert banned.banned_reason == "bad bundle" 

1857 assert banned.banned_by_user_id == super_user.id 

1858 assert banned.live is False 

1859 

1860 # banning the newest stops new check-ins getting it; the previous one becomes live again 

1861 res = api.ListOTAPackages(admin_pb2.ListOTAPackagesReq(include_banned=True)) 

1862 live = {p.version: p.live for p in res.packages} 

1863 assert live == {"v1.3.2.bad": False, "v1.3.1.good": True} 

1864 

1865 # banned packages are excluded by default 

1866 non_banned = api.ListOTAPackages(admin_pb2.ListOTAPackagesReq()) 

1867 assert [p.version for p in non_banned.packages] == ["v1.3.1.good"] 

1868 

1869 

1870def test_BanOTAPackage_requires_reason(db): 

1871 _, super_token = generate_user(is_superuser=True) 

1872 

1873 manifests = { 

1874 "v1.3.1": _ota_manifest(version="v1.3.1", fingerprint="ios-fp", created_at="2026-05-30T00:00:00.000Z"), 

1875 } 

1876 with _patch_ota_cdn(manifests), real_admin_session(super_token) as api: 

1877 pkg = api.CreateOTAPackage(admin_pb2.CreateOTAPackageReq(platform=admin_pb2.OTA_PLATFORM_IOS, version="v1.3.1")) 

1878 with pytest.raises(grpc.RpcError) as e: 

1879 api.BanOTAPackage(admin_pb2.BanOTAPackageReq(ota_package_id=pkg.ota_package_id)) 

1880 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1881 with pytest.raises(grpc.RpcError) as e: 

1882 api.BanOTAPackage(admin_pb2.BanOTAPackageReq(ota_package_id=pkg.ota_package_id, reason=" ")) 

1883 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1884 

1885 

1886def test_BanOTAPackage_not_found(db): 

1887 _, super_token = generate_user(is_superuser=True) 

1888 

1889 with real_admin_session(super_token) as api: 

1890 with pytest.raises(grpc.RpcError) as e: 

1891 api.BanOTAPackage(admin_pb2.BanOTAPackageReq(ota_package_id=123456, reason="never mind")) 

1892 assert e.value.code() == grpc.StatusCode.NOT_FOUND