Coverage for src/tests/test_admin.py: 100%

452 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-17 23:52 +0000

1from datetime import date, datetime 

2from re import match 

3 

4import grpc 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers.db import session_scope 

9from couchers.models import ( 

10 AccountDeletionToken, 

11 ContentReport, 

12 EventOccurrence, 

13 ModerationUserList, 

14 Reference, 

15 User, 

16 UserSession, 

17) 

18from couchers.proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2 

19from couchers.sql import couchers_select as select 

20from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta 

21from tests.test_communities import create_community 

22from tests.test_fixtures import ( # noqa 

23 add_users_to_new_moderation_list, 

24 auth_api_session, 

25 db, 

26 email_fields, 

27 events_session, 

28 generate_user, 

29 get_user_id_and_token, 

30 make_friends, 

31 mock_notification_email, 

32 push_collector, 

33 real_admin_session, 

34 references_session, 

35 reporting_session, 

36 requests_session, 

37 testconfig, 

38) 

39 

40 

41@pytest.fixture(autouse=True) 

42def _(testconfig): 

43 pass 

44 

45 

46def test_access_by_normal_user(db): 

47 normal_user, normal_token = generate_user() 

48 

49 with real_admin_session(normal_token) as api: 

50 # all requests to the admin servicer should break when done by a non-super_user 

51 with pytest.raises(grpc.RpcError) as e: 

52 api.GetUserDetails( 

53 admin_pb2.GetUserDetailsReq( 

54 user=str(normal_user.id), 

55 ) 

56 ) 

57 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

58 

59 

60def test_GetUser(db): 

61 super_user, super_token = generate_user(is_superuser=True) 

62 normal_user, normal_token = generate_user() 

63 

64 with real_admin_session(super_token) as api: 

65 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

66 assert res.user_id == normal_user.id 

67 assert res.username == normal_user.username 

68 

69 with real_admin_session(super_token) as api: 

70 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

71 

72 with real_admin_session(super_token) as api: 

73 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

74 assert res.user_id == normal_user.id 

75 assert res.username == normal_user.username 

76 

77 

78def test_GetUserDetails(db): 

79 super_user, super_token = generate_user(is_superuser=True) 

80 normal_user, normal_token = generate_user() 

81 

82 with real_admin_session(super_token) as api: 

83 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

84 assert res.user_id == normal_user.id 

85 assert res.username == normal_user.username 

86 assert res.email == normal_user.email 

87 assert res.gender == normal_user.gender 

88 assert parse_date(res.birthdate) == normal_user.birthdate 

89 assert not res.banned 

90 assert not res.deleted 

91 

92 with real_admin_session(super_token) as api: 

93 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

94 assert res.user_id == normal_user.id 

95 assert res.username == normal_user.username 

96 assert res.email == normal_user.email 

97 assert res.gender == normal_user.gender 

98 assert parse_date(res.birthdate) == normal_user.birthdate 

99 assert not res.banned 

100 assert not res.deleted 

101 

102 with real_admin_session(super_token) as api: 

103 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

104 assert res.user_id == normal_user.id 

105 assert res.username == normal_user.username 

106 assert res.email == normal_user.email 

107 assert res.gender == normal_user.gender 

108 assert parse_date(res.birthdate) == normal_user.birthdate 

109 assert not res.banned 

110 assert not res.deleted 

111 

112 

113def test_ChangeUserGender(db, push_collector): 

114 super_user, super_token = generate_user(is_superuser=True) 

115 normal_user, normal_token = generate_user() 

116 

117 with real_admin_session(super_token) as api: 

118 with mock_notification_email() as mock: 

119 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

120 assert res.user_id == normal_user.id 

121 assert res.username == normal_user.username 

122 assert res.email == normal_user.email 

123 assert res.gender == "Machine" 

124 assert parse_date(res.birthdate) == normal_user.birthdate 

125 assert not res.banned 

126 assert not res.deleted 

127 

128 mock.assert_called_once() 

129 e = email_fields(mock) 

130 assert e.subject == "[TEST] Your gender was changed" 

131 assert e.recipient == normal_user.email 

132 assert "Machine" in e.plain 

133 assert "Machine" in e.html 

134 

135 push_collector.assert_user_has_single_matching( 

136 normal_user.id, 

137 title="Your gender was changed", 

138 body="Your gender on Couchers.org was changed to Machine by an admin.", 

139 ) 

140 

141 

142def test_ChangeUserBirthdate(db, push_collector): 

143 super_user, super_token = generate_user(is_superuser=True) 

144 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

145 

146 with real_admin_session(super_token) as api: 

147 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

148 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

149 

150 with mock_notification_email() as mock: 

151 res = api.ChangeUserBirthdate( 

152 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

153 ) 

154 

155 assert res.user_id == normal_user.id 

156 assert res.username == normal_user.username 

157 assert res.email == normal_user.email 

158 assert res.birthdate == "1990-05-25" 

159 assert res.gender == normal_user.gender 

160 assert not res.banned 

161 assert not res.deleted 

162 

163 mock.assert_called_once() 

164 e = email_fields(mock) 

165 assert e.subject == "[TEST] Your date of birth was changed" 

166 assert e.recipient == normal_user.email 

167 assert "1990" in e.plain 

168 assert "1990" in e.html 

169 

170 push_collector.assert_user_has_single_matching( 

171 normal_user.id, 

172 title="Your date of birth was changed", 

173 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.", 

174 ) 

175 

176 

177def test_BanUser(db): 

178 super_user, super_token = generate_user(is_superuser=True) 

179 normal_user, _ = generate_user() 

180 admin_note = "A good reason" 

181 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

182 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

183 

184 with real_admin_session(super_token) as api: 

185 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

186 assert res.user_id == normal_user.id 

187 assert res.username == normal_user.username 

188 assert res.email == normal_user.email 

189 assert res.gender == normal_user.gender 

190 assert parse_date(res.birthdate) == normal_user.birthdate 

191 assert res.banned 

192 assert not res.deleted 

193 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

194 

195 

196def test_UnbanUser(db): 

197 super_user, super_token = generate_user(is_superuser=True) 

198 normal_user, _ = generate_user() 

199 admin_note = "A good reason" 

200 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

201 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

202 

203 with real_admin_session(super_token) as api: 

204 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

205 assert res.user_id == normal_user.id 

206 assert res.username == normal_user.username 

207 assert res.email == normal_user.email 

208 assert res.gender == normal_user.gender 

209 assert parse_date(res.birthdate) == normal_user.birthdate 

210 assert not res.banned 

211 assert not res.deleted 

212 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

213 

214 

215def test_AddAdminNote(db): 

216 super_user, super_token = generate_user(is_superuser=True) 

217 normal_user, _ = generate_user() 

218 admin_note1 = "User reported strange behavior" 

219 admin_note2 = "Insert private information here" 

220 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

221 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

222 

223 with real_admin_session(super_token) as api: 

224 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

225 assert res.user_id == normal_user.id 

226 assert res.username == normal_user.username 

227 assert res.email == normal_user.email 

228 assert res.gender == normal_user.gender 

229 assert parse_date(res.birthdate) == normal_user.birthdate 

230 assert not res.banned 

231 assert not res.deleted 

232 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note) 

233 

234 with real_admin_session(super_token) as api: 

235 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

236 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note) 

237 

238 

239def test_AddAdminNote_blank(db): 

240 super_user, super_token = generate_user(is_superuser=True) 

241 normal_user, _ = generate_user() 

242 empty_admin_note = " \t \n " 

243 

244 with real_admin_session(super_token) as api: 

245 with pytest.raises(grpc.RpcError) as e: 

246 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

247 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

248 assert e.value.details() == "The admin note cannot be empty." 

249 

250 

251def test_admin_content_reports(db): 

252 super_user, super_token = generate_user(is_superuser=True) 

253 normal_user, token = generate_user() 

254 bad_user1, _ = generate_user() 

255 bad_user2, _ = generate_user() 

256 

257 with reporting_session(token) as api: 

258 api.Report( 

259 reporting_pb2.ReportReq( 

260 reason="spam", 

261 description="r1", 

262 content_ref="comment/123", 

263 author_user=bad_user1.username, 

264 user_agent="n/a", 

265 page="https://couchers.org/comment/123", 

266 ) 

267 ) 

268 api.Report( 

269 reporting_pb2.ReportReq( 

270 reason="spam", 

271 description="r2", 

272 content_ref="comment/124", 

273 author_user=bad_user2.username, 

274 user_agent="n/a", 

275 page="https://couchers.org/comment/124", 

276 ) 

277 ) 

278 api.Report( 

279 reporting_pb2.ReportReq( 

280 reason="something else", 

281 description="r3", 

282 content_ref="page/321", 

283 author_user=bad_user1.username, 

284 user_agent="n/a", 

285 page="https://couchers.org/page/321", 

286 ) 

287 ) 

288 

289 with session_scope() as session: 

290 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all()) 

291 

292 with real_admin_session(super_token) as api: 

293 with pytest.raises(grpc.RpcError) as e: 

294 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

295 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

296 assert e.value.details() == "Content report not found." 

297 

298 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

299 rep = res.content_report 

300 assert rep.content_report_id == id_by_description["r2"] 

301 assert rep.reporting_user_id == normal_user.id 

302 assert rep.author_user_id == bad_user2.id 

303 assert rep.reason == "spam" 

304 assert rep.description == "r2" 

305 assert rep.content_ref == "comment/124" 

306 assert rep.user_agent == "n/a" 

307 assert rep.page == "https://couchers.org/comment/124" 

308 

309 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

310 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

311 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

312 

313 

314def test_DeleteUser(db): 

315 super_user, super_token = generate_user(is_superuser=True) 

316 normal_user, normal_token = generate_user() 

317 

318 with real_admin_session(super_token) as api: 

319 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

320 assert res.user_id == normal_user.id 

321 assert res.username == normal_user.username 

322 assert res.email == normal_user.email 

323 assert res.gender == normal_user.gender 

324 assert parse_date(res.birthdate) == normal_user.birthdate 

325 assert not res.banned 

326 assert res.deleted 

327 

328 with real_admin_session(super_token) as api: 

329 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

330 assert res.user_id == normal_user.id 

331 assert res.username == normal_user.username 

332 assert res.email == normal_user.email 

333 assert res.gender == normal_user.gender 

334 assert parse_date(res.birthdate) == normal_user.birthdate 

335 assert not res.banned 

336 assert not res.deleted 

337 

338 

339def test_CreateApiKey(db, push_collector): 

340 with session_scope() as session: 

341 super_user, super_token = generate_user(is_superuser=True) 

342 normal_user, normal_token = generate_user() 

343 

344 assert ( 

345 session.execute( 

346 select(func.count()) 

347 .select_from(UserSession) 

348 .where(UserSession.is_api_key == True) 

349 .where(UserSession.user_id == normal_user.id) 

350 ).scalar_one() 

351 == 0 

352 ) 

353 

354 with mock_notification_email() as mock: 

355 with real_admin_session(super_token) as api: 

356 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

357 

358 mock.assert_called_once() 

359 e = email_fields(mock) 

360 assert e.subject == "[TEST] Your API key for Couchers.org" 

361 

362 with session_scope() as session: 

363 token = session.execute( 

364 select(UserSession.token) 

365 .where(UserSession.is_valid) 

366 .where(UserSession.is_api_key == True) 

367 .where(UserSession.user_id == normal_user.id) 

368 ).scalar_one() 

369 

370 assert token in e.plain 

371 assert token in e.html 

372 

373 assert e.recipient == normal_user.email 

374 assert "api key" in e.subject.lower() 

375 unique_string = "We've issued you with the following API key:" 

376 assert unique_string in e.plain 

377 assert unique_string in e.html 

378 assert "support@couchers.org" in e.plain 

379 assert "support@couchers.org" in e.html 

380 

381 push_collector.assert_user_has_single_matching( 

382 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email." 

383 ) 

384 

385 

386def test_GetChats(db): 

387 super_user, super_token = generate_user(is_superuser=True) 

388 normal_user, normal_token = generate_user() 

389 

390 with real_admin_session(super_token) as api: 

391 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

392 # Check the structured response fields - user field contains full UserDetails 

393 assert res.user.user_id == normal_user.id 

394 assert res.user.username == normal_user.username 

395 assert res.user.name == normal_user.name 

396 assert res.user.email == normal_user.email 

397 # New user should have no chats 

398 assert len(res.host_requests) == 0 

399 assert len(res.group_chats) == 0 

400 

401 

402def test_badges(db, push_collector): 

403 super_user, super_token = generate_user(is_superuser=True) 

404 normal_user, normal_token = generate_user() 

405 

406 with real_admin_session(super_token) as api: 

407 # can add a badge 

408 assert "swagster" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

409 with mock_notification_email() as mock: 

410 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

411 assert "swagster" in res.badges 

412 

413 # badge emails are disabled by default 

414 mock.assert_not_called() 

415 

416 push_collector.assert_user_has_single_matching( 

417 normal_user.id, 

418 title="The Swagster badge was added to your profile", 

419 body="Check out your profile to see the new badge!", 

420 ) 

421 

422 # can't add/edit special tags 

423 with pytest.raises(grpc.RpcError) as e: 

424 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

425 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

426 assert e.value.details() == "Admins cannot edit that badge." 

427 

428 # double add badge 

429 with pytest.raises(grpc.RpcError) as e: 

430 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="swagster")) 

431 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

432 assert e.value.details() == "The user already has that badge." 

433 

434 # can remove badge 

435 assert "swagster" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

436 with mock_notification_email() as mock: 

437 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

438 assert "swagster" not in res.badges 

439 

440 # badge emails are disabled by default 

441 mock.assert_not_called() 

442 

443 push_collector.assert_user_push_matches_fields( 

444 normal_user.id, 

445 ix=1, 

446 title="The Swagster badge was removed from your profile", 

447 body="You can see all your badges on your profile.", 

448 ) 

449 

450 # not found on user 

451 with pytest.raises(grpc.RpcError) as e: 

452 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="swagster")) 

453 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

454 assert e.value.details() == "The user does not have that badge." 

455 

456 # not found in general 

457 with pytest.raises(grpc.RpcError) as e: 

458 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

459 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

460 assert e.value.details() == "Badge not found." 

461 

462 

463def test_DeleteEvent(db): 

464 super_user, super_token = generate_user(is_superuser=True) 

465 normal_user, normal_token = generate_user() 

466 

467 with session_scope() as session: 

468 create_community(session, 0, 2, "Community", [normal_user], [], None) 

469 

470 start_time = now() + timedelta(hours=2) 

471 end_time = start_time + timedelta(hours=3) 

472 with events_session(normal_token) as api: 

473 res = api.CreateEvent( 

474 events_pb2.CreateEventReq( 

475 title="Dummy Title", 

476 content="Dummy content.", 

477 photo_key=None, 

478 offline_information=events_pb2.OfflineEventInformation( 

479 address="Near Null Island", 

480 lat=0.1, 

481 lng=0.2, 

482 ), 

483 start_time=Timestamp_from_datetime(start_time), 

484 end_time=Timestamp_from_datetime(end_time), 

485 timezone="UTC", 

486 ) 

487 ) 

488 event_id = res.event_id 

489 assert not res.is_deleted 

490 

491 with session_scope() as session: 

492 with real_admin_session(super_token) as api: 

493 api.DeleteEvent( 

494 admin_pb2.DeleteEventReq( 

495 event_id=event_id, 

496 ) 

497 ) 

498 occurrence = session.get(EventOccurrence, ident=event_id) 

499 assert occurrence.is_deleted 

500 

501 

502def test_ListUserIds(db): 

503 super_user, super_token = generate_user(is_superuser=True) 

504 normal_user, normal_token = generate_user() 

505 

506 with real_admin_session(super_token) as api: 

507 res = api.ListUserIds( 

508 admin_pb2.ListUserIdsReq( 

509 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

510 ) 

511 ) 

512 assert len(res.user_ids) == 2 

513 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

514 

515 with real_admin_session(super_token) as api: 

516 res = api.ListUserIds( 

517 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

518 ) 

519 assert res.user_ids == [] 

520 

521 

522def test_EditReferenceText(db): 

523 super_user, super_token = generate_user(is_superuser=True) 

524 test_new_text = "New Text" 

525 

526 user1, user1_token = generate_user() 

527 user2, user2_token = generate_user() 

528 make_friends(user1, user2) 

529 

530 with session_scope() as session: 

531 with references_session(user1_token) as api: 

532 reference = api.WriteFriendReference( 

533 references_pb2.WriteFriendReferenceReq( 

534 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

535 ) 

536 ) 

537 

538 with real_admin_session(super_token) as admin_api: 

539 admin_api.EditReferenceText( 

540 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

541 ) 

542 

543 session.expire_all() 

544 

545 modified_reference = session.execute( 

546 select(Reference).where(Reference.id == reference.reference_id) 

547 ).scalar_one_or_none() 

548 assert modified_reference.text == test_new_text 

549 

550 

551def test_DeleteReference(db): 

552 super_user, super_token = generate_user(is_superuser=True) 

553 

554 user1, user1_token = generate_user() 

555 user2, user2_token = generate_user() 

556 make_friends(user1, user2) 

557 

558 with references_session(user1_token) as api: 

559 reference = api.WriteFriendReference( 

560 references_pb2.WriteFriendReferenceReq( 

561 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

562 ) 

563 ) 

564 

565 with references_session(user1_token) as api: 

566 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

567 

568 with real_admin_session(super_token) as admin_api: 

569 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

570 

571 with references_session(user1_token) as api: 

572 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

573 

574 with session_scope() as session: 

575 modified_reference = session.execute( 

576 select(Reference).where(Reference.id == reference.reference_id) 

577 ).scalar_one_or_none() 

578 assert modified_reference.is_deleted 

579 

580 

581def test_AddUsersToModerationUserList(db): 

582 super_user, super_token = generate_user(is_superuser=True) 

583 user1, _ = generate_user() 

584 user2, _ = generate_user() 

585 user3, _ = generate_user() 

586 user4, _ = generate_user() 

587 user5, _ = generate_user() 

588 moderation_list_id = add_users_to_new_moderation_list([user1]) 

589 

590 with session_scope() as session: 

591 with real_admin_session(super_token) as api: 

592 # Test adding users to a non-existent moderation list (should raise an error) 

593 with pytest.raises(grpc.RpcError) as e: 

594 api.AddUsersToModerationUserList( 

595 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999), 

596 ) 

597 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

598 assert "Moderation user list not found." == e.value.details() 

599 

600 # Test with non-existent user (should raise an error) 

601 with pytest.raises(grpc.RpcError) as e: 

602 api.AddUsersToModerationUserList( 

603 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]), 

604 ) 

605 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

606 assert "Couldn't find that user." == e.value.details() 

607 

608 # Test successful creation of new moderation list (no moderation_list_id provided) 

609 res = api.AddUsersToModerationUserList( 

610 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]), 

611 ) 

612 assert res.moderation_list_id > 0 

613 with session_scope() as session: 

614 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id) 

615 assert moderation_user_list is not None 

616 assert len(moderation_user_list.users) == 3 

617 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users}) 

618 

619 # Test list endpoint returns same moderation list with same members not repeated 

620 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

621 assert len(listRes.moderation_lists) == 1 

622 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id 

623 assert len(listRes.moderation_lists[0].member_ids) == 3 

624 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids) 

625 

626 # Test user can be in multiple moderation lists 

627 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

628 assert len(listRes3.moderation_lists) == 2 

629 

630 # Test adding users to an existing moderation list 

631 res2 = api.AddUsersToModerationUserList( 

632 admin_pb2.AddUsersToModerationUserListReq( 

633 users=[user4.username, user5.username], moderation_list_id=moderation_list_id 

634 ), 

635 ) 

636 assert res2.moderation_list_id == moderation_list_id 

637 with session_scope() as session: 

638 moderation_user_list = session.get(ModerationUserList, moderation_list_id) 

639 assert len(moderation_user_list.users) == 3 

640 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users}) 

641 

642 # Test list user moderation lists endpoint returns the right moderation list 

643 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username)) 

644 assert len(listRes2.moderation_lists) == 1 

645 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id 

646 assert len(listRes2.moderation_lists[0].member_ids) == 3 

647 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids) 

648 

649 

650def test_RemoveUserFromModerationUserList(db): 

651 super_user, super_token = generate_user(is_superuser=True) 

652 user1, _ = generate_user() 

653 user2, _ = generate_user() 

654 user3, _ = generate_user() 

655 moderation_list_id = add_users_to_new_moderation_list([user1, user2]) 

656 

657 with real_admin_session(super_token) as api: 

658 # Test with non-existent user (should raise error) 

659 with pytest.raises(grpc.RpcError) as e: 

660 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent")) 

661 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

662 assert "Couldn't find that user." == e.value.details() 

663 

664 # Test without providing moderation list id (should raise error) 

665 with pytest.raises(grpc.RpcError) as e: 

666 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username)) 

667 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

668 assert "Missing moderation user list id." == e.value.details() 

669 

670 # Test removing user that's not in the provided moderation list (should raise error) 

671 with pytest.raises(grpc.RpcError) as e: 

672 api.RemoveUserFromModerationUserList( 

673 admin_pb2.RemoveUserFromModerationUserListReq( 

674 user=user3.username, moderation_list_id=moderation_list_id 

675 ) 

676 ) 

677 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

678 assert "User is not in the moderation user list." == e.value.details() 

679 

680 # Test successful removal 

681 api.RemoveUserFromModerationUserList( 

682 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id) 

683 ) 

684 with session_scope() as session: 

685 moderation_user_list = session.get(ModerationUserList, moderation_list_id) 

686 assert user1.id not in {user.id for user in moderation_user_list.users} 

687 assert user2.id in {user.id for user in moderation_user_list.users} 

688 

689 # Test list user moderation lists endpoint returns right number of moderation lists 

690 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

691 assert len(listRes.moderation_lists) == 0 

692 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

693 assert len(listRes2.moderation_lists) == 1 

694 

695 # Test removing all users from moderation list should also delete the moderation list 

696 api.RemoveUserFromModerationUserList( 

697 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id) 

698 ) 

699 with session_scope() as session: 

700 assert session.get(ModerationUserList, moderation_list_id) is None 

701 

702 

703def test_admin_delete_account_url(db, push_collector): 

704 super_user, super_token = generate_user(is_superuser=True) 

705 

706 user, token = generate_user() 

707 user_id = user.id 

708 

709 with real_admin_session(super_token) as admin_api: 

710 url = admin_api.CreateAccountDeletionLink( 

711 admin_pb2.CreateAccountDeletionLinkReq(user=user.username) 

712 ).account_deletion_confirm_url 

713 

714 push_collector.assert_user_has_count(user_id, 0) 

715 

716 with session_scope() as session: 

717 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

718 token = token_o.token 

719 assert token_o.user.id == user_id 

720 assert url == f"http://localhost:3000/delete-account?token={token}" 

721 

722 with mock_notification_email() as mock: 

723 with auth_api_session() as (auth_api, metadata_interceptor): 

724 auth_api.ConfirmDeleteAccount( 

725 auth_pb2.ConfirmDeleteAccountReq( 

726 token=token, 

727 ) 

728 ) 

729 

730 push_collector.assert_user_push_matches_fields( 

731 user_id, 

732 ix=0, 

733 title="Your Couchers.org account has been deleted", 

734 body="You can still undo this by following the link we emailed to you within 7 days.", 

735 ) 

736 

737 mock.assert_called_once() 

738 e = email_fields(mock) 

739 

740 

741def test_SetLastDonated(db): 

742 super_user, super_token = generate_user(is_superuser=True) 

743 normal_user, normal_token = generate_user(last_donated=None) 

744 

745 with real_admin_session(super_token) as api: 

746 # user starts with no last_donated 

747 with session_scope() as session: 

748 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

749 assert user.last_donated is None 

750 

751 # can set last_donated 

752 donation_time = now() - timedelta(days=30) 

753 res = api.SetLastDonated( 

754 admin_pb2.SetLastDonatedReq( 

755 user=normal_user.username, 

756 last_donated=Timestamp_from_datetime(donation_time), 

757 ) 

758 ) 

759 

760 with session_scope() as session: 

761 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

762 assert user.last_donated is not None 

763 # check timestamp is close (within a second) 

764 assert abs((user.last_donated - donation_time).total_seconds()) < 1 

765 

766 # can clear last_donated by not setting the field 

767 res = api.SetLastDonated(admin_pb2.SetLastDonatedReq(user=normal_user.username)) 

768 

769 with session_scope() as session: 

770 user = session.execute(select(User).where(User.id == normal_user.id)).scalar_one() 

771 assert user.last_donated is None 

772 

773 # user not found 

774 with pytest.raises(grpc.RpcError) as e: 

775 api.SetLastDonated(admin_pb2.SetLastDonatedReq(user="nonexistent")) 

776 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

777 assert e.value.details() == "Couldn't find that user." 

778 

779 

780# community invite feature tested in test_events.py 

781# SendBlogPostNotification tested in test_notifications.py 

782# MarkUserNeedsLocationUpdate tested in test_jail.py