Coverage for src/tests/test_admin.py: 100%

478 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-18 14:01 +0000

1from datetime import date, datetime 

2from re import match 

3 

4import grpc 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers.db import session_scope 

9from couchers.models import ( 

10 AccountDeletionToken, 

11 Cluster, 

12 ContentReport, 

13 EventOccurrence, 

14 ModerationUserList, 

15 Node, 

16 Reference, 

17 UserSession, 

18) 

19from couchers.proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2 

20from couchers.sql import couchers_select as select 

21from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta 

22from tests.test_communities import create_community 

23from tests.test_fixtures import ( # noqa 

24 add_users_to_new_moderation_list, 

25 auth_api_session, 

26 db, 

27 email_fields, 

28 events_session, 

29 generate_user, 

30 get_user_id_and_token, 

31 mock_notification_email, 

32 push_collector, 

33 real_admin_session, 

34 references_session, 

35 reporting_session, 

36 testconfig, 

37) 

38 

39 

40@pytest.fixture(autouse=True) 

41def _(testconfig): 

42 pass 

43 

44 

45def test_access_by_normal_user(db): 

46 normal_user, normal_token = generate_user() 

47 

48 with real_admin_session(normal_token) as api: 

49 # all requests to the admin servicer should break when done by a non-super_user 

50 with pytest.raises(grpc.RpcError) as e: 

51 api.GetUserDetails( 

52 admin_pb2.GetUserDetailsReq( 

53 user=str(normal_user.id), 

54 ) 

55 ) 

56 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

57 

58 

59def test_GetUser(db): 

60 super_user, super_token = generate_user(is_superuser=True) 

61 normal_user, normal_token = generate_user() 

62 

63 with real_admin_session(super_token) as api: 

64 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

65 assert res.user_id == normal_user.id 

66 assert res.username == normal_user.username 

67 

68 with real_admin_session(super_token) as api: 

69 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

70 

71 with real_admin_session(super_token) as api: 

72 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

73 assert res.user_id == normal_user.id 

74 assert res.username == normal_user.username 

75 

76 

77def test_GetUserDetails(db): 

78 super_user, super_token = generate_user(is_superuser=True) 

79 normal_user, normal_token = generate_user() 

80 

81 with real_admin_session(super_token) as api: 

82 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

83 assert res.user_id == normal_user.id 

84 assert res.username == normal_user.username 

85 assert res.email == normal_user.email 

86 assert res.gender == normal_user.gender 

87 assert parse_date(res.birthdate) == normal_user.birthdate 

88 assert not res.banned 

89 assert not res.deleted 

90 

91 with real_admin_session(super_token) as api: 

92 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

93 assert res.user_id == normal_user.id 

94 assert res.username == normal_user.username 

95 assert res.email == normal_user.email 

96 assert res.gender == normal_user.gender 

97 assert parse_date(res.birthdate) == normal_user.birthdate 

98 assert not res.banned 

99 assert not res.deleted 

100 

101 with real_admin_session(super_token) as api: 

102 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

103 assert res.user_id == normal_user.id 

104 assert res.username == normal_user.username 

105 assert res.email == normal_user.email 

106 assert res.gender == normal_user.gender 

107 assert parse_date(res.birthdate) == normal_user.birthdate 

108 assert not res.banned 

109 assert not res.deleted 

110 

111 

112def test_ChangeUserGender(db, push_collector): 

113 super_user, super_token = generate_user(is_superuser=True) 

114 normal_user, normal_token = generate_user() 

115 

116 with real_admin_session(super_token) as api: 

117 with mock_notification_email() as mock: 

118 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

119 assert res.user_id == normal_user.id 

120 assert res.username == normal_user.username 

121 assert res.email == normal_user.email 

122 assert res.gender == "Machine" 

123 assert parse_date(res.birthdate) == normal_user.birthdate 

124 assert not res.banned 

125 assert not res.deleted 

126 

127 mock.assert_called_once() 

128 e = email_fields(mock) 

129 assert e.subject == "[TEST] Your gender was changed" 

130 assert e.recipient == normal_user.email 

131 assert "Machine" in e.plain 

132 assert "Machine" in e.html 

133 

134 push_collector.assert_user_has_single_matching( 

135 normal_user.id, 

136 title="Your gender was changed", 

137 body="Your gender on Couchers.org was changed to Machine by an admin.", 

138 ) 

139 

140 

141def test_ChangeUserBirthdate(db, push_collector): 

142 super_user, super_token = generate_user(is_superuser=True) 

143 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

144 

145 with real_admin_session(super_token) as api: 

146 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

147 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

148 

149 with mock_notification_email() as mock: 

150 res = api.ChangeUserBirthdate( 

151 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

152 ) 

153 

154 assert res.user_id == normal_user.id 

155 assert res.username == normal_user.username 

156 assert res.email == normal_user.email 

157 assert res.birthdate == "1990-05-25" 

158 assert res.gender == normal_user.gender 

159 assert not res.banned 

160 assert not res.deleted 

161 

162 mock.assert_called_once() 

163 e = email_fields(mock) 

164 assert e.subject == "[TEST] Your date of birth was changed" 

165 assert e.recipient == normal_user.email 

166 assert "1990" in e.plain 

167 assert "1990" in e.html 

168 

169 push_collector.assert_user_has_single_matching( 

170 normal_user.id, 

171 title="Your date of birth was changed", 

172 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.", 

173 ) 

174 

175 

176def test_BanUser(db): 

177 super_user, super_token = generate_user(is_superuser=True) 

178 normal_user, _ = generate_user() 

179 admin_note = "A good reason" 

180 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

181 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

182 

183 with real_admin_session(super_token) as api: 

184 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

185 assert res.user_id == normal_user.id 

186 assert res.username == normal_user.username 

187 assert res.email == normal_user.email 

188 assert res.gender == normal_user.gender 

189 assert parse_date(res.birthdate) == normal_user.birthdate 

190 assert res.banned 

191 assert not res.deleted 

192 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

193 

194 

195def test_UnbanUser(db): 

196 super_user, super_token = generate_user(is_superuser=True) 

197 normal_user, _ = generate_user() 

198 admin_note = "A good reason" 

199 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

200 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

201 

202 with real_admin_session(super_token) as api: 

203 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

204 assert res.user_id == normal_user.id 

205 assert res.username == normal_user.username 

206 assert res.email == normal_user.email 

207 assert res.gender == normal_user.gender 

208 assert parse_date(res.birthdate) == normal_user.birthdate 

209 assert not res.banned 

210 assert not res.deleted 

211 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

212 

213 

214def test_AddAdminNote(db): 

215 super_user, super_token = generate_user(is_superuser=True) 

216 normal_user, _ = generate_user() 

217 admin_note1 = "User reported strange behavior" 

218 admin_note2 = "Insert private information here" 

219 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

220 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

221 

222 with real_admin_session(super_token) as api: 

223 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

224 assert res.user_id == normal_user.id 

225 assert res.username == normal_user.username 

226 assert res.email == normal_user.email 

227 assert res.gender == normal_user.gender 

228 assert parse_date(res.birthdate) == normal_user.birthdate 

229 assert not res.banned 

230 assert not res.deleted 

231 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note) 

232 

233 with real_admin_session(super_token) as api: 

234 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

235 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note) 

236 

237 

238def test_AddAdminNote_blank(db): 

239 super_user, super_token = generate_user(is_superuser=True) 

240 normal_user, _ = generate_user() 

241 empty_admin_note = " \t \n " 

242 

243 with real_admin_session(super_token) as api: 

244 with pytest.raises(grpc.RpcError) as e: 

245 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

246 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

247 assert e.value.details() == "The admin note cannot be empty." 

248 

249 

250def test_admin_content_reports(db): 

251 super_user, super_token = generate_user(is_superuser=True) 

252 normal_user, token = generate_user() 

253 bad_user1, _ = generate_user() 

254 bad_user2, _ = generate_user() 

255 

256 with reporting_session(token) as api: 

257 api.Report( 

258 reporting_pb2.ReportReq( 

259 reason="spam", 

260 description="r1", 

261 content_ref="comment/123", 

262 author_user=bad_user1.username, 

263 user_agent="n/a", 

264 page="https://couchers.org/comment/123", 

265 ) 

266 ) 

267 api.Report( 

268 reporting_pb2.ReportReq( 

269 reason="spam", 

270 description="r2", 

271 content_ref="comment/124", 

272 author_user=bad_user2.username, 

273 user_agent="n/a", 

274 page="https://couchers.org/comment/124", 

275 ) 

276 ) 

277 api.Report( 

278 reporting_pb2.ReportReq( 

279 reason="something else", 

280 description="r3", 

281 content_ref="page/321", 

282 author_user=bad_user1.username, 

283 user_agent="n/a", 

284 page="https://couchers.org/page/321", 

285 ) 

286 ) 

287 

288 with session_scope() as session: 

289 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all()) 

290 

291 with real_admin_session(super_token) as api: 

292 with pytest.raises(grpc.RpcError) as e: 

293 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

294 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

295 assert e.value.details() == "Content report not found." 

296 

297 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

298 rep = res.content_report 

299 assert rep.content_report_id == id_by_description["r2"] 

300 assert rep.reporting_user_id == normal_user.id 

301 assert rep.author_user_id == bad_user2.id 

302 assert rep.reason == "spam" 

303 assert rep.description == "r2" 

304 assert rep.content_ref == "comment/124" 

305 assert rep.user_agent == "n/a" 

306 assert rep.page == "https://couchers.org/comment/124" 

307 

308 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

309 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

310 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

311 

312 

313def test_DeleteUser(db): 

314 super_user, super_token = generate_user(is_superuser=True) 

315 normal_user, normal_token = generate_user() 

316 

317 with real_admin_session(super_token) as api: 

318 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

319 assert res.user_id == normal_user.id 

320 assert res.username == normal_user.username 

321 assert res.email == normal_user.email 

322 assert res.gender == normal_user.gender 

323 assert parse_date(res.birthdate) == normal_user.birthdate 

324 assert not res.banned 

325 assert res.deleted 

326 

327 with real_admin_session(super_token) as api: 

328 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

329 assert res.user_id == normal_user.id 

330 assert res.username == normal_user.username 

331 assert res.email == normal_user.email 

332 assert res.gender == normal_user.gender 

333 assert parse_date(res.birthdate) == normal_user.birthdate 

334 assert not res.banned 

335 assert not res.deleted 

336 

337 

338def test_CreateApiKey(db, push_collector): 

339 with session_scope() as session: 

340 super_user, super_token = generate_user(is_superuser=True) 

341 normal_user, normal_token = generate_user() 

342 

343 assert ( 

344 session.execute( 

345 select(func.count()) 

346 .select_from(UserSession) 

347 .where(UserSession.is_api_key == True) 

348 .where(UserSession.user_id == normal_user.id) 

349 ).scalar_one() 

350 == 0 

351 ) 

352 

353 with mock_notification_email() as mock: 

354 with real_admin_session(super_token) as api: 

355 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

356 

357 mock.assert_called_once() 

358 e = email_fields(mock) 

359 assert e.subject == "[TEST] Your API key for Couchers.org" 

360 

361 with session_scope() as session: 

362 api_key = session.execute( 

363 select(UserSession) 

364 .where(UserSession.is_valid) 

365 .where(UserSession.is_api_key == True) 

366 .where(UserSession.user_id == normal_user.id) 

367 ).scalar_one() 

368 

369 assert api_key.token in e.plain 

370 assert api_key.token in e.html 

371 

372 assert e.recipient == normal_user.email 

373 assert "api key" in e.subject.lower() 

374 unique_string = "We've issued you with the following API key:" 

375 assert unique_string in e.plain 

376 assert unique_string in e.html 

377 assert "support@couchers.org" in e.plain 

378 assert "support@couchers.org" in e.html 

379 

380 push_collector.assert_user_has_single_matching( 

381 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email." 

382 ) 

383 

384 

385VALID_GEOJSON_MULTIPOLYGON = """ 

386 { 

387 "type": "MultiPolygon", 

388 "coordinates": 

389 [ 

390 [ 

391 [ 

392 [ 

393 -73.98114904754641, 

394 40.7470284264813 

395 ], 

396 [ 

397 -73.98314135177611, 

398 40.73416844413217 

399 ], 

400 [ 

401 -74.00538969848634, 

402 40.734314779027144 

403 ], 

404 [ 

405 -74.00479214294432, 

406 40.75027851544338 

407 ], 

408 [ 

409 -73.98114904754641, 

410 40.7470284264813 

411 ] 

412 ] 

413 ] 

414 ] 

415 } 

416""" 

417 

418POINT_GEOJSON = """ 

419{ "type": "Point", "coordinates": [100.0, 0.0] } 

420""" 

421 

422 

423def test_CreateCommunity_invalid_geojson(db): 

424 super_user, super_token = generate_user(is_superuser=True) 

425 normal_user, normal_token = generate_user() 

426 with real_admin_session(super_token) as api: 

427 with pytest.raises(grpc.RpcError) as e: 

428 api.CreateCommunity( 

429 admin_pb2.CreateCommunityReq( 

430 name="test community", 

431 description="community for testing", 

432 admin_ids=[], 

433 geojson=POINT_GEOJSON, 

434 ) 

435 ) 

436 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

437 assert e.value.details() == "GeoJson was not of type MultiPolygon." 

438 

439 

440def test_CreateCommunity(db): 

441 with session_scope() as session: 

442 super_user, super_token = generate_user(is_superuser=True) 

443 normal_user, normal_token = generate_user() 

444 with real_admin_session(super_token) as api: 

445 api.CreateCommunity( 

446 admin_pb2.CreateCommunityReq( 

447 name="test community", 

448 description="community for testing", 

449 admin_ids=[], 

450 geojson=VALID_GEOJSON_MULTIPOLYGON, 

451 ) 

452 ) 

453 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

454 assert community.description == "community for testing" 

455 assert community.slug == "test-community" 

456 

457 

458def test_UpdateCommunity_invalid_geojson(db): 

459 super_user, super_token = generate_user(is_superuser=True) 

460 

461 with session_scope() as session: 

462 with real_admin_session(super_token) as api: 

463 api.CreateCommunity( 

464 admin_pb2.CreateCommunityReq( 

465 name="test community", 

466 description="community for testing", 

467 admin_ids=[], 

468 geojson=VALID_GEOJSON_MULTIPOLYGON, 

469 ) 

470 ) 

471 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

472 

473 with pytest.raises(grpc.RpcError) as e: 

474 api.UpdateCommunity( 

475 admin_pb2.UpdateCommunityReq( 

476 community_id=community.parent_node_id, 

477 name="test community 2", 

478 description="community for testing 2", 

479 geojson=POINT_GEOJSON, 

480 ) 

481 ) 

482 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

483 assert e.value.details() == "GeoJson was not of type MultiPolygon." 

484 

485 

486def test_UpdateCommunity_invalid_id(db): 

487 super_user, super_token = generate_user(is_superuser=True) 

488 

489 with session_scope() as session: 

490 with real_admin_session(super_token) as api: 

491 api.CreateCommunity( 

492 admin_pb2.CreateCommunityReq( 

493 name="test community", 

494 description="community for testing", 

495 admin_ids=[], 

496 geojson=VALID_GEOJSON_MULTIPOLYGON, 

497 ) 

498 ) 

499 

500 with pytest.raises(grpc.RpcError) as e: 

501 api.UpdateCommunity( 

502 admin_pb2.UpdateCommunityReq( 

503 community_id=1000, 

504 name="test community 1000", 

505 description="community for testing 1000", 

506 geojson=VALID_GEOJSON_MULTIPOLYGON, 

507 ) 

508 ) 

509 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

510 assert e.value.details() == "Community not found." 

511 

512 

513def test_UpdateCommunity(db): 

514 super_user, super_token = generate_user(is_superuser=True) 

515 

516 with session_scope() as session: 

517 with real_admin_session(super_token) as api: 

518 api.CreateCommunity( 

519 admin_pb2.CreateCommunityReq( 

520 name="test community", 

521 description="community for testing", 

522 admin_ids=[], 

523 geojson=VALID_GEOJSON_MULTIPOLYGON, 

524 ) 

525 ) 

526 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

527 assert community.description == "community for testing" 

528 

529 api.CreateCommunity( 

530 admin_pb2.CreateCommunityReq( 

531 name="test community 2", 

532 description="community for testing 2", 

533 admin_ids=[], 

534 geojson=VALID_GEOJSON_MULTIPOLYGON, 

535 ) 

536 ) 

537 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one() 

538 

539 api.UpdateCommunity( 

540 admin_pb2.UpdateCommunityReq( 

541 community_id=community.parent_node_id, 

542 name="test community 2", 

543 description="community for testing 2", 

544 geojson=VALID_GEOJSON_MULTIPOLYGON, 

545 parent_node_id=community_2.parent_node_id, 

546 ) 

547 ) 

548 session.commit() 

549 

550 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one() 

551 assert community_updated.description == "community for testing 2" 

552 assert community_updated.slug == "test-community-2" 

553 

554 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one() 

555 assert node_updated.parent_node_id == community_2.parent_node_id 

556 

557 

558def test_GetChats(db): 

559 super_user, super_token = generate_user(is_superuser=True) 

560 normal_user, normal_token = generate_user() 

561 

562 with real_admin_session(super_token) as api: 

563 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

564 assert res.response 

565 

566 

567def test_badges(db, push_collector): 

568 super_user, super_token = generate_user(is_superuser=True) 

569 normal_user, normal_token = generate_user() 

570 

571 with real_admin_session(super_token) as api: 

572 # can add a badge 

573 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

574 with mock_notification_email() as mock: 

575 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

576 assert "volunteer" in res.badges 

577 

578 # badge emails are disabled by default 

579 mock.assert_not_called() 

580 

581 push_collector.assert_user_has_single_matching( 

582 normal_user.id, 

583 title="The Active Volunteer badge was added to your profile", 

584 body="Check out your profile to see the new badge!", 

585 ) 

586 

587 # can't add/edit special tags 

588 with pytest.raises(grpc.RpcError) as e: 

589 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

590 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

591 assert e.value.details() == "Admins cannot edit that badge." 

592 

593 # double add badge 

594 with pytest.raises(grpc.RpcError) as e: 

595 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

596 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

597 assert e.value.details() == "The user already has that badge." 

598 

599 # can remove badge 

600 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

601 with mock_notification_email() as mock: 

602 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

603 assert "volunteer" not in res.badges 

604 

605 # badge emails are disabled by default 

606 mock.assert_not_called() 

607 

608 push_collector.assert_user_push_matches_fields( 

609 normal_user.id, 

610 ix=1, 

611 title="The Active Volunteer badge was removed from your profile", 

612 body="You can see all your badges on your profile.", 

613 ) 

614 

615 # not found on user 

616 with pytest.raises(grpc.RpcError) as e: 

617 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

618 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

619 assert e.value.details() == "The user does not have that badge." 

620 

621 # not found in general 

622 with pytest.raises(grpc.RpcError) as e: 

623 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

624 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

625 assert e.value.details() == "Badge not found." 

626 

627 

628def test_DeleteEvent(db): 

629 super_user, super_token = generate_user(is_superuser=True) 

630 normal_user, normal_token = generate_user() 

631 

632 with session_scope() as session: 

633 create_community(session, 0, 2, "Community", [normal_user], [], None) 

634 

635 start_time = now() + timedelta(hours=2) 

636 end_time = start_time + timedelta(hours=3) 

637 with events_session(normal_token) as api: 

638 res = api.CreateEvent( 

639 events_pb2.CreateEventReq( 

640 title="Dummy Title", 

641 content="Dummy content.", 

642 photo_key=None, 

643 offline_information=events_pb2.OfflineEventInformation( 

644 address="Near Null Island", 

645 lat=0.1, 

646 lng=0.2, 

647 ), 

648 start_time=Timestamp_from_datetime(start_time), 

649 end_time=Timestamp_from_datetime(end_time), 

650 timezone="UTC", 

651 ) 

652 ) 

653 event_id = res.event_id 

654 assert not res.is_deleted 

655 

656 with session_scope() as session: 

657 with real_admin_session(super_token) as api: 

658 api.DeleteEvent( 

659 admin_pb2.DeleteEventReq( 

660 event_id=event_id, 

661 ) 

662 ) 

663 occurrence = session.get(EventOccurrence, ident=event_id) 

664 assert occurrence.is_deleted 

665 

666 

667def test_ListUserIds(db): 

668 super_user, super_token = generate_user(is_superuser=True) 

669 normal_user, normal_token = generate_user() 

670 

671 with real_admin_session(super_token) as api: 

672 res = api.ListUserIds( 

673 admin_pb2.ListUserIdsReq( 

674 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

675 ) 

676 ) 

677 assert len(res.user_ids) == 2 

678 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

679 

680 with real_admin_session(super_token) as api: 

681 res = api.ListUserIds( 

682 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

683 ) 

684 assert res.user_ids == [] 

685 

686 

687def test_EditReferenceText(db): 

688 super_user, super_token = generate_user(is_superuser=True) 

689 test_new_text = "New Text" 

690 

691 user1, user1_token = generate_user() 

692 user2, user2_token = generate_user() 

693 

694 with session_scope() as session: 

695 with references_session(user1_token) as api: 

696 reference = api.WriteFriendReference( 

697 references_pb2.WriteFriendReferenceReq( 

698 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

699 ) 

700 ) 

701 

702 with real_admin_session(super_token) as admin_api: 

703 admin_api.EditReferenceText( 

704 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

705 ) 

706 

707 session.expire_all() 

708 

709 modified_reference = session.execute( 

710 select(Reference).where(Reference.id == reference.reference_id) 

711 ).scalar_one_or_none() 

712 assert modified_reference.text == test_new_text 

713 

714 

715def test_DeleteReference(db): 

716 super_user, super_token = generate_user(is_superuser=True) 

717 

718 user1, user1_token = generate_user() 

719 user2, user2_token = generate_user() 

720 

721 with references_session(user1_token) as api: 

722 reference = api.WriteFriendReference( 

723 references_pb2.WriteFriendReferenceReq( 

724 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

725 ) 

726 ) 

727 

728 with references_session(user1_token) as api: 

729 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

730 

731 with real_admin_session(super_token) as admin_api: 

732 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

733 

734 with references_session(user1_token) as api: 

735 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

736 

737 with session_scope() as session: 

738 modified_reference = session.execute( 

739 select(Reference).where(Reference.id == reference.reference_id) 

740 ).scalar_one_or_none() 

741 assert modified_reference.is_deleted 

742 

743 

744def test_AddUsersToModerationUserList(db): 

745 super_user, super_token = generate_user(is_superuser=True) 

746 user1, _ = generate_user() 

747 user2, _ = generate_user() 

748 user3, _ = generate_user() 

749 user4, _ = generate_user() 

750 user5, _ = generate_user() 

751 moderation_list_id = add_users_to_new_moderation_list([user1]) 

752 

753 with session_scope() as session: 

754 with real_admin_session(super_token) as api: 

755 # Test adding users to a non-existent moderation list (should raise an error) 

756 with pytest.raises(grpc.RpcError) as e: 

757 api.AddUsersToModerationUserList( 

758 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999), 

759 ) 

760 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

761 assert "Moderation user list not found." == e.value.details() 

762 

763 # Test with non-existent user (should raise an error) 

764 with pytest.raises(grpc.RpcError) as e: 

765 api.AddUsersToModerationUserList( 

766 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]), 

767 ) 

768 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

769 assert "Couldn't find that user." == e.value.details() 

770 

771 # Test successful creation of new moderation list (no moderation_list_id provided) 

772 res = api.AddUsersToModerationUserList( 

773 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]), 

774 ) 

775 assert res.moderation_list_id > 0 

776 with session_scope() as session: 

777 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id) 

778 assert moderation_user_list is not None 

779 assert len(moderation_user_list.users) == 3 

780 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users}) 

781 

782 # Test list endpoint returns same moderation list with same members not repeated 

783 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

784 assert len(listRes.moderation_lists) == 1 

785 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id 

786 assert len(listRes.moderation_lists[0].member_ids) == 3 

787 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids) 

788 

789 # Test user can be in multiple moderation lists 

790 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

791 assert len(listRes3.moderation_lists) == 2 

792 

793 # Test adding users to an existing moderation list 

794 res2 = api.AddUsersToModerationUserList( 

795 admin_pb2.AddUsersToModerationUserListReq( 

796 users=[user4.username, user5.username], moderation_list_id=moderation_list_id 

797 ), 

798 ) 

799 assert res2.moderation_list_id == moderation_list_id 

800 with session_scope() as session: 

801 moderation_user_list = session.get(ModerationUserList, moderation_list_id) 

802 assert len(moderation_user_list.users) == 3 

803 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users}) 

804 

805 # Test list user moderation lists endpoint returns the right moderation list 

806 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username)) 

807 assert len(listRes2.moderation_lists) == 1 

808 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id 

809 assert len(listRes2.moderation_lists[0].member_ids) == 3 

810 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids) 

811 

812 

813def test_RemoveUserFromModerationUserList(db): 

814 super_user, super_token = generate_user(is_superuser=True) 

815 user1, _ = generate_user() 

816 user2, _ = generate_user() 

817 user3, _ = generate_user() 

818 moderation_list_id = add_users_to_new_moderation_list([user1, user2]) 

819 

820 with real_admin_session(super_token) as api: 

821 # Test with non-existent user (should raise error) 

822 with pytest.raises(grpc.RpcError) as e: 

823 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent")) 

824 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

825 assert "Couldn't find that user." == e.value.details() 

826 

827 # Test without providing moderation list id (should raise error) 

828 with pytest.raises(grpc.RpcError) as e: 

829 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username)) 

830 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

831 assert "Missing moderation user list id." == e.value.details() 

832 

833 # Test removing user that's not in the provided moderation list (should raise error) 

834 with pytest.raises(grpc.RpcError) as e: 

835 api.RemoveUserFromModerationUserList( 

836 admin_pb2.RemoveUserFromModerationUserListReq( 

837 user=user3.username, moderation_list_id=moderation_list_id 

838 ) 

839 ) 

840 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

841 assert "User is not in the moderation user list." == e.value.details() 

842 

843 # Test successful removal 

844 api.RemoveUserFromModerationUserList( 

845 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id) 

846 ) 

847 with session_scope() as session: 

848 moderation_user_list = session.get(ModerationUserList, moderation_list_id) 

849 assert user1.id not in {user.id for user in moderation_user_list.users} 

850 assert user2.id in {user.id for user in moderation_user_list.users} 

851 

852 # Test list user moderation lists endpoint returns right number of moderation lists 

853 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

854 assert len(listRes.moderation_lists) == 0 

855 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

856 assert len(listRes2.moderation_lists) == 1 

857 

858 # Test removing all users from moderation list should also delete the moderation list 

859 api.RemoveUserFromModerationUserList( 

860 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id) 

861 ) 

862 with session_scope() as session: 

863 assert session.get(ModerationUserList, moderation_list_id) is None 

864 

865 

866def test_admin_delete_account_url(db, push_collector): 

867 super_user, super_token = generate_user(is_superuser=True) 

868 

869 user, token = generate_user() 

870 user_id = user.id 

871 

872 with real_admin_session(super_token) as admin_api: 

873 url = admin_api.CreateAccountDeletionLink( 

874 admin_pb2.CreateAccountDeletionLinkReq(user=user.username) 

875 ).account_deletion_confirm_url 

876 

877 push_collector.assert_user_has_count(user_id, 0) 

878 

879 with session_scope() as session: 

880 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

881 token = token_o.token 

882 assert token_o.user.id == user_id 

883 assert url == f"http://localhost:3000/delete-account?token={token}" 

884 

885 with mock_notification_email() as mock: 

886 with auth_api_session() as (auth_api, metadata_interceptor): 

887 auth_api.ConfirmDeleteAccount( 

888 auth_pb2.ConfirmDeleteAccountReq( 

889 token=token, 

890 ) 

891 ) 

892 

893 push_collector.assert_user_push_matches_fields( 

894 user_id, 

895 ix=0, 

896 title="Your Couchers.org account has been deleted", 

897 body="You can still undo this by following the link we emailed to you within 7 days.", 

898 ) 

899 

900 mock.assert_called_once() 

901 e = email_fields(mock) 

902 

903 

904# community invite feature tested in test_events.py 

905# SendBlogPostNotification tested in test_notifications.py 

906# MarkUserNeedsLocationUpdate tested in test_jail.py