Coverage for app / backend / src / tests / test_admin.py: 100%

504 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1from datetime import date, datetime, timedelta 

2from re import match 

3 

4import grpc 

5import pytest 

6from sqlalchemy import select 

7from sqlalchemy.sql import func 

8 

9from couchers.db import session_scope 

10from couchers.models import ( 

11 AccountDeletionToken, 

12 ContentReport, 

13 EventOccurrence, 

14 ModerationUserList, 

15 Reference, 

16 User, 

17 UserSession, 

18) 

19from couchers.proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2 

20from couchers.utils import Timestamp_from_datetime, now, parse_date 

21from tests.fixtures.db import add_users_to_new_moderation_list, generate_user, make_friends 

22from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email 

23from tests.fixtures.sessions import ( 

24 auth_api_session, 

25 events_session, 

26 real_admin_session, 

27 references_session, 

28 reporting_session, 

29) 

30from tests.test_communities import create_community 

31 

32 

33@pytest.fixture(autouse=True) 

34def _(testconfig): 

35 pass 

36 

37 

38def test_access_by_normal_user(db): 

39 normal_user, normal_token = generate_user() 

40 

41 with real_admin_session(normal_token) as api: 

42 # all requests to the admin servicer should break when done by a non-super_user 

43 with pytest.raises(grpc.RpcError) as e: 

44 api.GetUserDetails( 

45 admin_pb2.GetUserDetailsReq( 

46 user=str(normal_user.id), 

47 ) 

48 ) 

49 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

50 

51 

52def test_GetUser(db): 

53 super_user, super_token = generate_user(is_superuser=True) 

54 normal_user, normal_token = generate_user() 

55 

56 with real_admin_session(super_token) as api: 

57 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

58 assert res.user_id == normal_user.id 

59 assert res.username == normal_user.username 

60 

61 with real_admin_session(super_token) as api: 

62 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

63 

64 with real_admin_session(super_token) as api: 

65 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

66 assert res.user_id == normal_user.id 

67 assert res.username == normal_user.username 

68 

69 

70def test_GetUserDetails(db): 

71 super_user, super_token = generate_user(is_superuser=True) 

72 normal_user, normal_token = generate_user() 

73 

74 with real_admin_session(super_token) as api: 

75 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

76 assert res.user_id == normal_user.id 

77 assert res.username == normal_user.username 

78 assert res.email == normal_user.email 

79 assert res.gender == normal_user.gender 

80 assert parse_date(res.birthdate) == normal_user.birthdate 

81 assert not res.banned 

82 assert not res.deleted 

83 

84 with real_admin_session(super_token) as api: 

85 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

86 assert res.user_id == normal_user.id 

87 assert res.username == normal_user.username 

88 assert res.email == normal_user.email 

89 assert res.gender == normal_user.gender 

90 assert parse_date(res.birthdate) == normal_user.birthdate 

91 assert not res.banned 

92 assert not res.deleted 

93 

94 with real_admin_session(super_token) as api: 

95 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

96 assert res.user_id == normal_user.id 

97 assert res.username == normal_user.username 

98 assert res.email == normal_user.email 

99 assert res.gender == normal_user.gender 

100 assert parse_date(res.birthdate) == normal_user.birthdate 

101 assert not res.banned 

102 assert not res.deleted 

103 

104 

105def test_ChangeUserGender(db, push_collector: PushCollector): 

106 super_user, super_token = generate_user(is_superuser=True) 

107 normal_user, normal_token = generate_user() 

108 

109 with real_admin_session(super_token) as api: 

110 with mock_notification_email() as mock: 

111 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

112 assert res.user_id == normal_user.id 

113 assert res.username == normal_user.username 

114 assert res.email == normal_user.email 

115 assert res.gender == "Machine" 

116 assert parse_date(res.birthdate) == normal_user.birthdate 

117 assert not res.banned 

118 assert not res.deleted 

119 

120 mock.assert_called_once() 

121 e = email_fields(mock) 

122 assert e.subject == "[TEST] Your gender was changed" 

123 assert e.recipient == normal_user.email 

124 assert "Machine" in e.plain 

125 assert "Machine" in e.html 

126 

127 push = push_collector.pop_for_user(normal_user.id, last=True) 

128 assert push.content.title == "Gender changed" 

129 assert push.content.body == "An admin changed your gender to Machine." 

130 

131 

132def test_ChangeUserBirthdate(db, push_collector: PushCollector): 

133 super_user, super_token = generate_user(is_superuser=True) 

134 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

135 

136 with real_admin_session(super_token) as api: 

137 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

138 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

139 

140 with mock_notification_email() as mock: 

141 res = api.ChangeUserBirthdate( 

142 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

143 ) 

144 

145 assert res.user_id == normal_user.id 

146 assert res.username == normal_user.username 

147 assert res.email == normal_user.email 

148 assert res.birthdate == "1990-05-25" 

149 assert res.gender == normal_user.gender 

150 assert not res.banned 

151 assert not res.deleted 

152 

153 mock.assert_called_once() 

154 e = email_fields(mock) 

155 assert e.subject == "[TEST] Your date of birth was changed" 

156 assert e.recipient == normal_user.email 

157 assert "1990" in e.plain 

158 assert "1990" in e.html 

159 

160 push = push_collector.pop_for_user(normal_user.id, last=True) 

161 assert push.content.title == "Birthdate changed" 

162 assert push.content.body == "An admin changed your date of birth to Friday 25 May 1990." 

163 

164 

165def test_BanUser(db): 

166 super_user, super_token = generate_user(is_superuser=True) 

167 normal_user, _ = generate_user() 

168 admin_note = "A good reason" 

169 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

170 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

171 

172 with real_admin_session(super_token) as api: 

173 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

174 assert res.user_id == normal_user.id 

175 assert res.username == normal_user.username 

176 assert res.email == normal_user.email 

177 assert res.gender == normal_user.gender 

178 assert parse_date(res.birthdate) == normal_user.birthdate 

179 assert res.banned 

180 assert not res.deleted 

181 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

182 

183 

184def test_UnbanUser(db): 

185 super_user, super_token = generate_user(is_superuser=True) 

186 normal_user, _ = generate_user() 

187 admin_note = "A good reason" 

188 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

189 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

190 

191 with real_admin_session(super_token) as api: 

192 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

193 assert res.user_id == normal_user.id 

194 assert res.username == normal_user.username 

195 assert res.email == normal_user.email 

196 assert res.gender == normal_user.gender 

197 assert parse_date(res.birthdate) == normal_user.birthdate 

198 assert not res.banned 

199 assert not res.deleted 

200 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

201 

202 

203def test_AddAdminNote(db): 

204 super_user, super_token = generate_user(is_superuser=True) 

205 normal_user, _ = generate_user() 

206 admin_note1 = "User reported strange behavior" 

207 admin_note2 = "Insert private information here" 

208 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

209 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

210 

211 with real_admin_session(super_token) as api: 

212 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

213 assert res.user_id == normal_user.id 

214 assert res.username == normal_user.username 

215 assert res.email == normal_user.email 

216 assert res.gender == normal_user.gender 

217 assert parse_date(res.birthdate) == normal_user.birthdate 

218 assert not res.banned 

219 assert not res.deleted 

220 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note) 

221 

222 with real_admin_session(super_token) as api: 

223 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

224 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note) 

225 

226 

227def test_AddAdminNote_blank(db): 

228 super_user, super_token = generate_user(is_superuser=True) 

229 normal_user, _ = generate_user() 

230 empty_admin_note = " \t \n " 

231 

232 with real_admin_session(super_token) as api: 

233 with pytest.raises(grpc.RpcError) as e: 

234 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

235 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

236 assert e.value.details() == "The admin note cannot be empty." 

237 

238 

239def test_admin_content_reports(db): 

240 super_user, super_token = generate_user(is_superuser=True) 

241 normal_user, token = generate_user() 

242 bad_user1, _ = generate_user() 

243 bad_user2, _ = generate_user() 

244 

245 with reporting_session(token) as api: 

246 api.Report( 

247 reporting_pb2.ReportReq( 

248 reason="spam", 

249 description="r1", 

250 content_ref="comment/123", 

251 author_user=bad_user1.username, 

252 user_agent="n/a", 

253 page="https://couchers.org/comment/123", 

254 ) 

255 ) 

256 api.Report( 

257 reporting_pb2.ReportReq( 

258 reason="spam", 

259 description="r2", 

260 content_ref="comment/124", 

261 author_user=bad_user2.username, 

262 user_agent="n/a", 

263 page="https://couchers.org/comment/124", 

264 ) 

265 ) 

266 api.Report( 

267 reporting_pb2.ReportReq( 

268 reason="something else", 

269 description="r3", 

270 content_ref="page/321", 

271 author_user=bad_user1.username, 

272 user_agent="n/a", 

273 page="https://couchers.org/page/321", 

274 ) 

275 ) 

276 

277 with session_scope() as session: 

278 id_by_description: dict[str, int] = dict( 

279 session.execute(select(ContentReport.description, ContentReport.id)).all() # type: ignore[arg-type] 

280 ) 

281 

282 with real_admin_session(super_token) as api: 

283 with pytest.raises(grpc.RpcError) as e: 

284 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

285 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

286 assert e.value.details() == "Content report not found." 

287 

288 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

289 rep = res.content_report 

290 assert rep.content_report_id == id_by_description["r2"] 

291 assert rep.reporting_user_id == normal_user.id 

292 assert rep.author_user_id == bad_user2.id 

293 assert rep.reason == "spam" 

294 assert rep.description == "r2" 

295 assert rep.content_ref == "comment/124" 

296 assert rep.user_agent == "n/a" 

297 assert rep.page == "https://couchers.org/comment/124" 

298 

299 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

300 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

301 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

302 

303 

304def test_DeleteUser(db): 

305 super_user, super_token = generate_user(is_superuser=True) 

306 normal_user, normal_token = generate_user() 

307 

308 with real_admin_session(super_token) as api: 

309 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

310 assert res.user_id == normal_user.id 

311 assert res.username == normal_user.username 

312 assert res.email == normal_user.email 

313 assert res.gender == normal_user.gender 

314 assert parse_date(res.birthdate) == normal_user.birthdate 

315 assert not res.banned 

316 assert res.deleted 

317 

318 with real_admin_session(super_token) as api: 

319 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

320 assert res.user_id == normal_user.id 

321 assert res.username == normal_user.username 

322 assert res.email == normal_user.email 

323 assert res.gender == normal_user.gender 

324 assert parse_date(res.birthdate) == normal_user.birthdate 

325 assert not res.banned 

326 assert not res.deleted 

327 

328 

329def test_CreateApiKey(db, push_collector: PushCollector): 

330 with session_scope() as session: 

331 super_user, super_token = generate_user(is_superuser=True) 

332 normal_user, normal_token = generate_user() 

333 

334 assert ( 

335 session.execute( 

336 select(func.count()) 

337 .select_from(UserSession) 

338 .where(UserSession.is_api_key == True) 

339 .where(UserSession.user_id == normal_user.id) 

340 ).scalar_one() 

341 == 0 

342 ) 

343 

344 with mock_notification_email() as mock: 

345 with real_admin_session(super_token) as api: 

346 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

347 

348 mock.assert_called_once() 

349 e = email_fields(mock) 

350 assert e.subject == "[TEST] Your API key for Couchers.org" 

351 

352 with session_scope() as session: 

353 token = session.execute( 

354 select(UserSession.token) 

355 .where(UserSession.is_valid) 

356 .where(UserSession.is_api_key == True) 

357 .where(UserSession.user_id == normal_user.id) 

358 ).scalar_one() 

359 

360 assert token in e.plain 

361 assert token in e.html 

362 

363 assert e.recipient == normal_user.email 

364 assert "api key" in e.subject.lower() 

365 unique_string = "We've issued you with the following API key:" 

366 assert unique_string in e.plain 

367 assert unique_string in e.html 

368 assert "support@couchers.org" in e.plain 

369 assert "support@couchers.org" in e.html 

370 

371 push = push_collector.pop_for_user(normal_user.id, last=True) 

372 assert push.content.title == "API key created" 

373 assert push.content.body == "Details were sent to you via email." 

374 

375 

376def test_GetChats(db): 

377 super_user, super_token = generate_user(is_superuser=True) 

378 normal_user, normal_token = generate_user() 

379 

380 with real_admin_session(super_token) as api: 

381 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

382 # Check the structured response fields - user field contains full UserDetails 

383 assert res.user.user_id == normal_user.id 

384 assert res.user.username == normal_user.username 

385 assert res.user.name == normal_user.name 

386 assert res.user.email == normal_user.email 

387 # New user should have no chats 

388 assert len(res.host_requests) == 0 

389 assert len(res.group_chats) == 0 

390 

391 

392def test_badges(db, push_collector: PushCollector): 

393 super_user, super_token = generate_user(is_superuser=True) 

394 normal_user, normal_token = generate_user() 

395 

396 with real_admin_session(super_token) as api: 

397 # can add a badge 

398 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

399 with mock_notification_email() as mock: 

400 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

401 assert "swagster" in res.badges 

402 

403 # badge emails are disabled by default 

404 mock.assert_not_called() 

405 

406 push = push_collector.pop_for_user(normal_user.id, last=True) 

407 assert push.content.title == "New profile badge: Swagster" 

408 assert push.content.body == "The Swagster badge was added to your profile." 

409 

410 # can't add/edit special tags 

411 with pytest.raises(grpc.RpcError) as e: 

412 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

413 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

414 assert e.value.details() == "Admins cannot edit that badge." 

415 

416 # double add badge 

417 with pytest.raises(grpc.RpcError) as e: 

418 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

419 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

420 assert e.value.details() == "The user already has that badge." 

421 

422 # can remove badge 

423 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

424 with mock_notification_email() as mock: 

425 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

426 assert "swagster" not in res.badges 

427 

428 # badge emails are disabled by default 

429 mock.assert_not_called() 

430 

431 push = push_collector.pop_for_user(normal_user.id, last=True) 

432 assert push.content.title == "Profile badge removed" 

433 assert push.content.body == "The Swagster badge was removed from your profile." 

434 

435 # not found on user 

436 with pytest.raises(grpc.RpcError) as e: 

437 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

438 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

439 assert e.value.details() == "The user does not have that badge." 

440 

441 # not found in general 

442 with pytest.raises(grpc.RpcError) as e: 

443 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

444 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

445 assert e.value.details() == "Badge not found." 

446 

447 

448def test_DeleteEvent(db): 

449 super_user, super_token = generate_user(is_superuser=True) 

450 normal_user, normal_token = generate_user() 

451 

452 with session_scope() as session: 

453 create_community(session, 0, 2, "Community", [normal_user], [], None) 

454 

455 start_time = now() + timedelta(hours=2) 

456 end_time = start_time + timedelta(hours=3) 

457 with events_session(normal_token) as api: 

458 res = api.CreateEvent( 

459 events_pb2.CreateEventReq( 

460 title="Dummy Title", 

461 content="Dummy content.", 

462 photo_key=None, 

463 offline_information=events_pb2.OfflineEventInformation( 

464 address="Near Null Island", 

465 lat=0.1, 

466 lng=0.2, 

467 ), 

468 start_time=Timestamp_from_datetime(start_time), 

469 end_time=Timestamp_from_datetime(end_time), 

470 timezone="UTC", 

471 ) 

472 ) 

473 event_id = res.event_id 

474 assert not res.is_deleted 

475 

476 with session_scope() as session: 

477 with real_admin_session(super_token) as api: 

478 api.DeleteEvent( 

479 admin_pb2.DeleteEventReq( 

480 event_id=event_id, 

481 ) 

482 ) 

483 occurrence = session.get_one(EventOccurrence, ident=event_id) 

484 assert occurrence.is_deleted 

485 

486 

487def test_ListUserIds(db): 

488 super_user, super_token = generate_user(is_superuser=True) 

489 normal_user, normal_token = generate_user() 

490 

491 with real_admin_session(super_token) as api: 

492 res = api.ListUserIds( 

493 admin_pb2.ListUserIdsReq( 

494 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

495 ) 

496 ) 

497 assert len(res.user_ids) == 2 

498 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

499 

500 with real_admin_session(super_token) as api: 

501 res = api.ListUserIds( 

502 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

503 ) 

504 assert res.user_ids == [] 

505 

506 

507def test_EditReferenceText(db): 

508 super_user, super_token = generate_user(is_superuser=True) 

509 test_new_text = "New Text" 

510 

511 user1, user1_token = generate_user() 

512 user2, user2_token = generate_user() 

513 make_friends(user1, user2) 

514 

515 with session_scope() as session: 

516 with references_session(user1_token) as api: 

517 reference = api.WriteFriendReference( 

518 references_pb2.WriteFriendReferenceReq( 

519 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

520 ) 

521 ) 

522 

523 with real_admin_session(super_token) as admin_api: 

524 admin_api.EditReferenceText( 

525 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

526 ) 

527 

528 session.expire_all() 

529 

530 modified_reference = session.execute( 

531 select(Reference).where(Reference.id == reference.reference_id) 

532 ).scalar_one() 

533 assert modified_reference.text == test_new_text 

534 

535 

536def test_DeleteReference(db): 

537 super_user, super_token = generate_user(is_superuser=True) 

538 

539 user1, user1_token = generate_user() 

540 user2, user2_token = generate_user() 

541 make_friends(user1, user2) 

542 

543 with references_session(user1_token) as api: 

544 reference = api.WriteFriendReference( 

545 references_pb2.WriteFriendReferenceReq( 

546 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

547 ) 

548 ) 

549 

550 with references_session(user1_token) as api: 

551 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

552 

553 with real_admin_session(super_token) as admin_api: 

554 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

555 

556 with references_session(user1_token) as api: 

557 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

558 

559 with session_scope() as session: 

560 modified_reference = session.execute( 

561 select(Reference).where(Reference.id == reference.reference_id) 

562 ).scalar_one() 

563 assert modified_reference.is_deleted 

564 

565 

566def test_GetUserReferences(db): 

567 super_user, super_token = generate_user(is_superuser=True) 

568 

569 user1, user1_token = generate_user() 

570 user2, user2_token = generate_user() 

571 user3, user3_token = generate_user() 

572 make_friends(user1, user2) 

573 make_friends(user1, user3) 

574 make_friends(user2, user3) 

575 

576 # user1 writes reference about user2 

577 with references_session(user1_token) as api: 

578 ref1 = api.WriteFriendReference( 

579 references_pb2.WriteFriendReferenceReq( 

580 to_user_id=user2.id, 

581 text="Reference from user1 to user2", 

582 private_text="", 

583 was_appropriate=True, 

584 rating=1, 

585 ) 

586 ) 

587 

588 # user2 writes reference about user1 

589 with references_session(user2_token) as api: 

590 ref2 = api.WriteFriendReference( 

591 references_pb2.WriteFriendReferenceReq( 

592 to_user_id=user1.id, 

593 text="Reference from user2 to user1", 

594 private_text="Private note", 

595 was_appropriate=True, 

596 rating=0.8, 

597 ) 

598 ) 

599 

600 # user3 writes reference about user1 

601 with references_session(user3_token) as api: 

602 ref3 = api.WriteFriendReference( 

603 references_pb2.WriteFriendReferenceReq( 

604 to_user_id=user1.id, 

605 text="Reference from user3 to user1", 

606 private_text="", 

607 was_appropriate=False, 

608 rating=0.5, 

609 ) 

610 ) 

611 

612 # Delete ref3 

613 with real_admin_session(super_token) as admin_api: 

614 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=ref3.reference_id)) 

615 

616 # Test GetUserReferences for user1 

617 with real_admin_session(super_token) as admin_api: 

618 res = admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user=user1.username)) 

619 

620 # user1 wrote 1 reference 

621 assert len(res.references_from) == 1 

622 assert res.references_from[0].reference_id == ref1.reference_id 

623 assert res.references_from[0].from_user_id == user1.id 

624 assert res.references_from[0].to_user_id == user2.id 

625 assert res.references_from[0].text == "Reference from user1 to user2" 

626 assert res.references_from[0].is_deleted is False 

627 

628 # user1 received 2 references (including the deleted one) 

629 assert len(res.references_to) == 2 

630 # Ordered by id descending, so ref3 comes first 

631 assert res.references_to[0].reference_id == ref3.reference_id 

632 assert res.references_to[0].is_deleted is True 

633 assert res.references_to[0].was_appropriate is False 

634 

635 assert res.references_to[1].reference_id == ref2.reference_id 

636 assert res.references_to[1].private_text == "Private note" 

637 assert res.references_to[1].rating == 0.8 

638 assert res.references_to[1].is_deleted is False 

639 

640 

641def test_GetUserReferences_not_found(db): 

642 super_user, super_token = generate_user(is_superuser=True) 

643 

644 with real_admin_session(super_token) as admin_api: 

645 with pytest.raises(grpc.RpcError) as e: 

646 admin_api.GetUserReferences(admin_pb2.GetUserReferencesReq(user="nonexistent")) 

647 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

648 

649 

650def test_AddUsersToModerationUserList(db): 

651 super_user, super_token = generate_user(is_superuser=True) 

652 user1, _ = generate_user() 

653 user2, _ = generate_user() 

654 user3, _ = generate_user() 

655 user4, _ = generate_user() 

656 user5, _ = generate_user() 

657 moderation_list_id = add_users_to_new_moderation_list([user1]) 

658 

659 with session_scope() as session: 

660 with real_admin_session(super_token) as api: 

661 # Test adding users to a non-existent moderation list (should raise an error) 

662 with pytest.raises(grpc.RpcError) as e: 

663 api.AddUsersToModerationUserList( 

664 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999), 

665 ) 

666 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

667 assert "Moderation user list not found." == e.value.details() 

668 

669 # Test with non-existent user (should raise an error) 

670 with pytest.raises(grpc.RpcError) as e: 

671 api.AddUsersToModerationUserList( 

672 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]), 

673 ) 

674 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

675 assert "Couldn't find that user." == e.value.details() 

676 

677 # Test successful creation of new moderation list (no moderation_list_id provided) 

678 res = api.AddUsersToModerationUserList( 

679 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]), 

680 ) 

681 assert res.moderation_list_id > 0 

682 with session_scope() as session: 

683 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id) 

684 assert moderation_user_list is not None 

685 assert len(moderation_user_list.users) == 3 

686 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users}) 

687 

688 # Test list endpoint returns same moderation list with same members not repeated 

689 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

690 assert len(listRes.moderation_lists) == 1 

691 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id 

692 assert len(listRes.moderation_lists[0].member_ids) == 3 

693 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids) 

694 

695 # Test user can be in multiple moderation lists 

696 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

697 assert len(listRes3.moderation_lists) == 2 

698 

699 # Test adding users to an existing moderation list 

700 res2 = api.AddUsersToModerationUserList( 

701 admin_pb2.AddUsersToModerationUserListReq( 

702 users=[user4.username, user5.username], moderation_list_id=moderation_list_id 

703 ), 

704 ) 

705 assert res2.moderation_list_id == moderation_list_id 

706 with session_scope() as session: 

707 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id) 

708 assert len(moderation_user_list.users) == 3 

709 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users}) 

710 

711 # Test list user moderation lists endpoint returns the right moderation list 

712 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username)) 

713 assert len(listRes2.moderation_lists) == 1 

714 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id 

715 assert len(listRes2.moderation_lists[0].member_ids) == 3 

716 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids) 

717 

718 

719def test_RemoveUserFromModerationUserList(db): 

720 super_user, super_token = generate_user(is_superuser=True) 

721 user1, _ = generate_user() 

722 user2, _ = generate_user() 

723 user3, _ = generate_user() 

724 moderation_list_id = add_users_to_new_moderation_list([user1, user2]) 

725 

726 with real_admin_session(super_token) as api: 

727 # Test with non-existent user (should raise error) 

728 with pytest.raises(grpc.RpcError) as e: 

729 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent")) 

730 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

731 assert "Couldn't find that user." == e.value.details() 

732 

733 # Test without providing moderation list id (should raise error) 

734 with pytest.raises(grpc.RpcError) as e: 

735 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username)) 

736 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

737 assert "Missing moderation user list id." == e.value.details() 

738 

739 # Test removing user that's not in the provided moderation list (should raise error) 

740 with pytest.raises(grpc.RpcError) as e: 

741 api.RemoveUserFromModerationUserList( 

742 admin_pb2.RemoveUserFromModerationUserListReq( 

743 user=user3.username, moderation_list_id=moderation_list_id 

744 ) 

745 ) 

746 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

747 assert "User is not in the moderation user list." == e.value.details() 

748 

749 # Test successful removal 

750 api.RemoveUserFromModerationUserList( 

751 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id) 

752 ) 

753 with session_scope() as session: 

754 moderation_user_list = session.get_one(ModerationUserList, moderation_list_id) 

755 assert user1.id not in {user.id for user in moderation_user_list.users} 

756 assert user2.id in {user.id for user in moderation_user_list.users} 

757 

758 # Test list user moderation lists endpoint returns right number of moderation lists 

759 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

760 assert len(listRes.moderation_lists) == 0 

761 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

762 assert len(listRes2.moderation_lists) == 1 

763 

764 # Test removing all users from moderation list should also delete the moderation list 

765 api.RemoveUserFromModerationUserList( 

766 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id) 

767 ) 

768 with session_scope() as session: 

769 assert session.get(ModerationUserList, moderation_list_id) is None 

770 

771 

772def test_admin_delete_account_url(db, push_collector: PushCollector): 

773 super_user, super_token = generate_user(is_superuser=True) 

774 

775 user, token = generate_user() 

776 user_id = user.id 

777 

778 with real_admin_session(super_token) as admin_api: 

779 url = admin_api.CreateAccountDeletionLink( 

780 admin_pb2.CreateAccountDeletionLinkReq(user=user.username) 

781 ).account_deletion_confirm_url 

782 

783 assert push_collector.count_for_user(user_id) == 0 

784 

785 with session_scope() as session: 

786 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

787 token = token_o.token 

788 assert token_o.user.id == user_id 

789 assert url == f"http://localhost:3000/delete-account?token={token}" 

790 

791 with mock_notification_email() as mock: 

792 with auth_api_session() as (auth_api, metadata_interceptor): 

793 auth_api.ConfirmDeleteAccount( 

794 auth_pb2.ConfirmDeleteAccountReq( 

795 token=token, 

796 ) 

797 ) 

798 

799 push = push_collector.pop_for_user(user_id, last=True) 

800 assert push.content.title == "Account deleted" 

801 assert push.content.body == "You can restore it within 7 days using the link we emailed you." 

802 mock.assert_called_once() 

803 e = email_fields(mock) 

804 

805 

806def test_SetLastDonated(db): 

807 super_user, super_token = generate_user(is_superuser=True) 

808 normal_user, normal_token = generate_user(last_donated=None) 

809 

810 with real_admin_session(super_token) as api: 

811 # user starts with no last_donated 

812 with session_scope() as session: 

813 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

814 assert user.last_donated is None 

815 

816 # can set last_donated 

817 donation_time = now() - timedelta(days=30) 

818 res = api.SetLastDonated( 

819 admin_pb2.SetLastDonatedReq( 

820 user=normal_user.username, 

821 last_donated=Timestamp_from_datetime(donation_time), 

822 ) 

823 ) 

824 

825 with session_scope() as session: 

826 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

827 assert user.last_donated is not None 

828 # check timestamp is close (within a second) 

829 assert abs((user.last_donated - donation_time).total_seconds()) < 1 

830 

831 # can clear last_donated by not setting the field 

832 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username)) 

833 

834 with session_scope() as session: 

835 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

836 assert user.last_donated is None 

837 

838 # user not found 

839 with pytest.raises(grpc.RpcError) as e: 

840 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent")) 

841 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

842 assert e.value.details() == "Couldn't find that user." 

843 

844 

845# community invite feature tested in test_events.py 

846# SendBlogPostNotification tested in test_notifications.py 

847# MarkUserNeedsLocationUpdate tested in test_jail.py