Coverage for src/tests/test_admin.py: 100%

479 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1from datetime import date, datetime 

2from re import match 

3 

4import grpc 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers import errors 

9from couchers.db import session_scope 

10from couchers.models import ( 

11 AccountDeletionToken, 

12 Cluster, 

13 ContentReport, 

14 EventOccurrence, 

15 ModerationUserList, 

16 Node, 

17 Reference, 

18 UserSession, 

19) 

20from couchers.sql import couchers_select as select 

21from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta 

22from proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2 

23from tests.test_communities import create_community 

24from tests.test_fixtures import ( # noqa 

25 add_users_to_new_moderation_list, 

26 auth_api_session, 

27 db, 

28 email_fields, 

29 events_session, 

30 generate_user, 

31 get_user_id_and_token, 

32 mock_notification_email, 

33 push_collector, 

34 real_admin_session, 

35 references_session, 

36 reporting_session, 

37 testconfig, 

38) 

39 

40 

41@pytest.fixture(autouse=True) 

42def _(testconfig): 

43 pass 

44 

45 

46def test_access_by_normal_user(db): 

47 normal_user, normal_token = generate_user() 

48 

49 with real_admin_session(normal_token) as api: 

50 # all requests to the admin servicer should break when done by a non-super_user 

51 with pytest.raises(grpc.RpcError) as e: 

52 api.GetUserDetails( 

53 admin_pb2.GetUserDetailsReq( 

54 user=str(normal_user.id), 

55 ) 

56 ) 

57 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

58 

59 

60def test_GetUser(db): 

61 super_user, super_token = generate_user(is_superuser=True) 

62 normal_user, normal_token = generate_user() 

63 

64 with real_admin_session(super_token) as api: 

65 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

66 assert res.user_id == normal_user.id 

67 assert res.username == normal_user.username 

68 

69 with real_admin_session(super_token) as api: 

70 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

71 

72 with real_admin_session(super_token) as api: 

73 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

74 assert res.user_id == normal_user.id 

75 assert res.username == normal_user.username 

76 

77 

78def test_GetUserDetails(db): 

79 super_user, super_token = generate_user(is_superuser=True) 

80 normal_user, normal_token = generate_user() 

81 

82 with real_admin_session(super_token) as api: 

83 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

84 assert res.user_id == normal_user.id 

85 assert res.username == normal_user.username 

86 assert res.email == normal_user.email 

87 assert res.gender == normal_user.gender 

88 assert parse_date(res.birthdate) == normal_user.birthdate 

89 assert not res.banned 

90 assert not res.deleted 

91 

92 with real_admin_session(super_token) as api: 

93 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

94 assert res.user_id == normal_user.id 

95 assert res.username == normal_user.username 

96 assert res.email == normal_user.email 

97 assert res.gender == normal_user.gender 

98 assert parse_date(res.birthdate) == normal_user.birthdate 

99 assert not res.banned 

100 assert not res.deleted 

101 

102 with real_admin_session(super_token) as api: 

103 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

104 assert res.user_id == normal_user.id 

105 assert res.username == normal_user.username 

106 assert res.email == normal_user.email 

107 assert res.gender == normal_user.gender 

108 assert parse_date(res.birthdate) == normal_user.birthdate 

109 assert not res.banned 

110 assert not res.deleted 

111 

112 

113def test_ChangeUserGender(db, push_collector): 

114 super_user, super_token = generate_user(is_superuser=True) 

115 normal_user, normal_token = generate_user() 

116 

117 with real_admin_session(super_token) as api: 

118 with mock_notification_email() as mock: 

119 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

120 assert res.user_id == normal_user.id 

121 assert res.username == normal_user.username 

122 assert res.email == normal_user.email 

123 assert res.gender == "Machine" 

124 assert parse_date(res.birthdate) == normal_user.birthdate 

125 assert not res.banned 

126 assert not res.deleted 

127 

128 mock.assert_called_once() 

129 e = email_fields(mock) 

130 assert e.subject == "[TEST] Your gender was changed" 

131 assert e.recipient == normal_user.email 

132 assert "Machine" in e.plain 

133 assert "Machine" in e.html 

134 

135 push_collector.assert_user_has_single_matching( 

136 normal_user.id, 

137 title="Your gender was changed", 

138 body="Your gender on Couchers.org was changed to Machine by an admin.", 

139 ) 

140 

141 

142def test_ChangeUserBirthdate(db, push_collector): 

143 super_user, super_token = generate_user(is_superuser=True) 

144 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

145 

146 with real_admin_session(super_token) as api: 

147 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

148 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

149 

150 with mock_notification_email() as mock: 

151 res = api.ChangeUserBirthdate( 

152 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

153 ) 

154 

155 assert res.user_id == normal_user.id 

156 assert res.username == normal_user.username 

157 assert res.email == normal_user.email 

158 assert res.birthdate == "1990-05-25" 

159 assert res.gender == normal_user.gender 

160 assert not res.banned 

161 assert not res.deleted 

162 

163 mock.assert_called_once() 

164 e = email_fields(mock) 

165 assert e.subject == "[TEST] Your date of birth was changed" 

166 assert e.recipient == normal_user.email 

167 assert "1990" in e.plain 

168 assert "1990" in e.html 

169 

170 push_collector.assert_user_has_single_matching( 

171 normal_user.id, 

172 title="Your date of birth was changed", 

173 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.", 

174 ) 

175 

176 

177def test_BanUser(db): 

178 super_user, super_token = generate_user(is_superuser=True) 

179 normal_user, _ = generate_user() 

180 admin_note = "A good reason" 

181 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

182 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

183 

184 with real_admin_session(super_token) as api: 

185 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

186 assert res.user_id == normal_user.id 

187 assert res.username == normal_user.username 

188 assert res.email == normal_user.email 

189 assert res.gender == normal_user.gender 

190 assert parse_date(res.birthdate) == normal_user.birthdate 

191 assert res.banned 

192 assert not res.deleted 

193 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

194 

195 

196def test_UnbanUser(db): 

197 super_user, super_token = generate_user(is_superuser=True) 

198 normal_user, _ = generate_user() 

199 admin_note = "A good reason" 

200 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

201 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

202 

203 with real_admin_session(super_token) as api: 

204 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

205 assert res.user_id == normal_user.id 

206 assert res.username == normal_user.username 

207 assert res.email == normal_user.email 

208 assert res.gender == normal_user.gender 

209 assert parse_date(res.birthdate) == normal_user.birthdate 

210 assert not res.banned 

211 assert not res.deleted 

212 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

213 

214 

215def test_AddAdminNote(db): 

216 super_user, super_token = generate_user(is_superuser=True) 

217 normal_user, _ = generate_user() 

218 admin_note1 = "User reported strange behavior" 

219 admin_note2 = "Insert private information here" 

220 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

221 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

222 

223 with real_admin_session(super_token) as api: 

224 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

225 assert res.user_id == normal_user.id 

226 assert res.username == normal_user.username 

227 assert res.email == normal_user.email 

228 assert res.gender == normal_user.gender 

229 assert parse_date(res.birthdate) == normal_user.birthdate 

230 assert not res.banned 

231 assert not res.deleted 

232 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note) 

233 

234 with real_admin_session(super_token) as api: 

235 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

236 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note) 

237 

238 

239def test_AddAdminNote_blank(db): 

240 super_user, super_token = generate_user(is_superuser=True) 

241 normal_user, _ = generate_user() 

242 empty_admin_note = " \t \n " 

243 

244 with real_admin_session(super_token) as api: 

245 with pytest.raises(grpc.RpcError) as e: 

246 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

247 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

248 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY 

249 

250 

251def test_admin_content_reports(db): 

252 super_user, super_token = generate_user(is_superuser=True) 

253 normal_user, token = generate_user() 

254 bad_user1, _ = generate_user() 

255 bad_user2, _ = generate_user() 

256 

257 with reporting_session(token) as api: 

258 api.Report( 

259 reporting_pb2.ReportReq( 

260 reason="spam", 

261 description="r1", 

262 content_ref="comment/123", 

263 author_user=bad_user1.username, 

264 user_agent="n/a", 

265 page="https://couchers.org/comment/123", 

266 ) 

267 ) 

268 api.Report( 

269 reporting_pb2.ReportReq( 

270 reason="spam", 

271 description="r2", 

272 content_ref="comment/124", 

273 author_user=bad_user2.username, 

274 user_agent="n/a", 

275 page="https://couchers.org/comment/124", 

276 ) 

277 ) 

278 api.Report( 

279 reporting_pb2.ReportReq( 

280 reason="something else", 

281 description="r3", 

282 content_ref="page/321", 

283 author_user=bad_user1.username, 

284 user_agent="n/a", 

285 page="https://couchers.org/page/321", 

286 ) 

287 ) 

288 

289 with session_scope() as session: 

290 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all()) 

291 

292 with real_admin_session(super_token) as api: 

293 with pytest.raises(grpc.RpcError) as e: 

294 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

295 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

296 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND 

297 

298 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

299 rep = res.content_report 

300 assert rep.content_report_id == id_by_description["r2"] 

301 assert rep.reporting_user_id == normal_user.id 

302 assert rep.author_user_id == bad_user2.id 

303 assert rep.reason == "spam" 

304 assert rep.description == "r2" 

305 assert rep.content_ref == "comment/124" 

306 assert rep.user_agent == "n/a" 

307 assert rep.page == "https://couchers.org/comment/124" 

308 

309 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

310 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

311 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

312 

313 

314def test_DeleteUser(db): 

315 super_user, super_token = generate_user(is_superuser=True) 

316 normal_user, normal_token = generate_user() 

317 

318 with real_admin_session(super_token) as api: 

319 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

320 assert res.user_id == normal_user.id 

321 assert res.username == normal_user.username 

322 assert res.email == normal_user.email 

323 assert res.gender == normal_user.gender 

324 assert parse_date(res.birthdate) == normal_user.birthdate 

325 assert not res.banned 

326 assert res.deleted 

327 

328 with real_admin_session(super_token) as api: 

329 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username)) 

330 assert res.user_id == normal_user.id 

331 assert res.username == normal_user.username 

332 assert res.email == normal_user.email 

333 assert res.gender == normal_user.gender 

334 assert parse_date(res.birthdate) == normal_user.birthdate 

335 assert not res.banned 

336 assert not res.deleted 

337 

338 

339def test_CreateApiKey(db, push_collector): 

340 with session_scope() as session: 

341 super_user, super_token = generate_user(is_superuser=True) 

342 normal_user, normal_token = generate_user() 

343 

344 assert ( 

345 session.execute( 

346 select(func.count()) 

347 .select_from(UserSession) 

348 .where(UserSession.is_api_key == True) 

349 .where(UserSession.user_id == normal_user.id) 

350 ).scalar_one() 

351 == 0 

352 ) 

353 

354 with mock_notification_email() as mock: 

355 with real_admin_session(super_token) as api: 

356 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

357 

358 mock.assert_called_once() 

359 e = email_fields(mock) 

360 assert e.subject == "[TEST] Your API key for Couchers.org" 

361 

362 with session_scope() as session: 

363 api_key = session.execute( 

364 select(UserSession) 

365 .where(UserSession.is_valid) 

366 .where(UserSession.is_api_key == True) 

367 .where(UserSession.user_id == normal_user.id) 

368 ).scalar_one() 

369 

370 assert api_key.token in e.plain 

371 assert api_key.token in e.html 

372 

373 assert e.recipient == normal_user.email 

374 assert "api key" in e.subject.lower() 

375 unique_string = "We've issued you with the following API key:" 

376 assert unique_string in e.plain 

377 assert unique_string in e.html 

378 assert "support@couchers.org" in e.plain 

379 assert "support@couchers.org" in e.html 

380 

381 push_collector.assert_user_has_single_matching( 

382 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email." 

383 ) 

384 

385 

386VALID_GEOJSON_MULTIPOLYGON = """ 

387 { 

388 "type": "MultiPolygon", 

389 "coordinates": 

390 [ 

391 [ 

392 [ 

393 [ 

394 -73.98114904754641, 

395 40.7470284264813 

396 ], 

397 [ 

398 -73.98314135177611, 

399 40.73416844413217 

400 ], 

401 [ 

402 -74.00538969848634, 

403 40.734314779027144 

404 ], 

405 [ 

406 -74.00479214294432, 

407 40.75027851544338 

408 ], 

409 [ 

410 -73.98114904754641, 

411 40.7470284264813 

412 ] 

413 ] 

414 ] 

415 ] 

416 } 

417""" 

418 

419POINT_GEOJSON = """ 

420{ "type": "Point", "coordinates": [100.0, 0.0] } 

421""" 

422 

423 

424def test_CreateCommunity_invalid_geojson(db): 

425 super_user, super_token = generate_user(is_superuser=True) 

426 normal_user, normal_token = generate_user() 

427 with real_admin_session(super_token) as api: 

428 with pytest.raises(grpc.RpcError) as e: 

429 api.CreateCommunity( 

430 admin_pb2.CreateCommunityReq( 

431 name="test community", 

432 description="community for testing", 

433 admin_ids=[], 

434 geojson=POINT_GEOJSON, 

435 ) 

436 ) 

437 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

438 assert e.value.details() == errors.NO_MULTIPOLYGON 

439 

440 

441def test_CreateCommunity(db): 

442 with session_scope() as session: 

443 super_user, super_token = generate_user(is_superuser=True) 

444 normal_user, normal_token = generate_user() 

445 with real_admin_session(super_token) as api: 

446 api.CreateCommunity( 

447 admin_pb2.CreateCommunityReq( 

448 name="test community", 

449 description="community for testing", 

450 admin_ids=[], 

451 geojson=VALID_GEOJSON_MULTIPOLYGON, 

452 ) 

453 ) 

454 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

455 assert community.description == "community for testing" 

456 assert community.slug == "test-community" 

457 

458 

459def test_UpdateCommunity_invalid_geojson(db): 

460 super_user, super_token = generate_user(is_superuser=True) 

461 

462 with session_scope() as session: 

463 with real_admin_session(super_token) as api: 

464 api.CreateCommunity( 

465 admin_pb2.CreateCommunityReq( 

466 name="test community", 

467 description="community for testing", 

468 admin_ids=[], 

469 geojson=VALID_GEOJSON_MULTIPOLYGON, 

470 ) 

471 ) 

472 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

473 

474 with pytest.raises(grpc.RpcError) as e: 

475 api.UpdateCommunity( 

476 admin_pb2.UpdateCommunityReq( 

477 community_id=community.parent_node_id, 

478 name="test community 2", 

479 description="community for testing 2", 

480 geojson=POINT_GEOJSON, 

481 ) 

482 ) 

483 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

484 assert e.value.details() == errors.NO_MULTIPOLYGON 

485 

486 

487def test_UpdateCommunity_invalid_id(db): 

488 super_user, super_token = generate_user(is_superuser=True) 

489 

490 with session_scope() as session: 

491 with real_admin_session(super_token) as api: 

492 api.CreateCommunity( 

493 admin_pb2.CreateCommunityReq( 

494 name="test community", 

495 description="community for testing", 

496 admin_ids=[], 

497 geojson=VALID_GEOJSON_MULTIPOLYGON, 

498 ) 

499 ) 

500 

501 with pytest.raises(grpc.RpcError) as e: 

502 api.UpdateCommunity( 

503 admin_pb2.UpdateCommunityReq( 

504 community_id=1000, 

505 name="test community 1000", 

506 description="community for testing 1000", 

507 geojson=VALID_GEOJSON_MULTIPOLYGON, 

508 ) 

509 ) 

510 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

511 assert e.value.details() == errors.COMMUNITY_NOT_FOUND 

512 

513 

514def test_UpdateCommunity(db): 

515 super_user, super_token = generate_user(is_superuser=True) 

516 

517 with session_scope() as session: 

518 with real_admin_session(super_token) as api: 

519 api.CreateCommunity( 

520 admin_pb2.CreateCommunityReq( 

521 name="test community", 

522 description="community for testing", 

523 admin_ids=[], 

524 geojson=VALID_GEOJSON_MULTIPOLYGON, 

525 ) 

526 ) 

527 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

528 assert community.description == "community for testing" 

529 

530 api.CreateCommunity( 

531 admin_pb2.CreateCommunityReq( 

532 name="test community 2", 

533 description="community for testing 2", 

534 admin_ids=[], 

535 geojson=VALID_GEOJSON_MULTIPOLYGON, 

536 ) 

537 ) 

538 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one() 

539 

540 api.UpdateCommunity( 

541 admin_pb2.UpdateCommunityReq( 

542 community_id=community.parent_node_id, 

543 name="test community 2", 

544 description="community for testing 2", 

545 geojson=VALID_GEOJSON_MULTIPOLYGON, 

546 parent_node_id=community_2.parent_node_id, 

547 ) 

548 ) 

549 session.commit() 

550 

551 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one() 

552 assert community_updated.description == "community for testing 2" 

553 assert community_updated.slug == "test-community-2" 

554 

555 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one() 

556 assert node_updated.parent_node_id == community_2.parent_node_id 

557 

558 

559def test_GetChats(db): 

560 super_user, super_token = generate_user(is_superuser=True) 

561 normal_user, normal_token = generate_user() 

562 

563 with real_admin_session(super_token) as api: 

564 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

565 assert res.response 

566 

567 

568def test_badges(db, push_collector): 

569 super_user, super_token = generate_user(is_superuser=True) 

570 normal_user, normal_token = generate_user() 

571 

572 with real_admin_session(super_token) as api: 

573 # can add a badge 

574 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

575 with mock_notification_email() as mock: 

576 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

577 assert "volunteer" in res.badges 

578 

579 # badge emails are disabled by default 

580 mock.assert_not_called() 

581 

582 push_collector.assert_user_has_single_matching( 

583 normal_user.id, 

584 title="The Active Volunteer badge was added to your profile", 

585 body="Check out your profile to see the new badge!", 

586 ) 

587 

588 # can't add/edit special tags 

589 with pytest.raises(grpc.RpcError) as e: 

590 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

591 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

592 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE 

593 

594 # double add badge 

595 with pytest.raises(grpc.RpcError) as e: 

596 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

597 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

598 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE 

599 

600 # can remove badge 

601 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

602 with mock_notification_email() as mock: 

603 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

604 assert "volunteer" not in res.badges 

605 

606 # badge emails are disabled by default 

607 mock.assert_not_called() 

608 

609 push_collector.assert_user_push_matches_fields( 

610 normal_user.id, 

611 ix=1, 

612 title="The Active Volunteer badge was removed from your profile", 

613 body="You can see all your badges on your profile.", 

614 ) 

615 

616 # not found on user 

617 with pytest.raises(grpc.RpcError) as e: 

618 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

619 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

620 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE 

621 

622 # not found in general 

623 with pytest.raises(grpc.RpcError) as e: 

624 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

625 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

626 assert e.value.details() == errors.BADGE_NOT_FOUND 

627 

628 

629def test_DeleteEvent(db): 

630 super_user, super_token = generate_user(is_superuser=True) 

631 normal_user, normal_token = generate_user() 

632 

633 with session_scope() as session: 

634 create_community(session, 0, 2, "Community", [normal_user], [], None) 

635 

636 start_time = now() + timedelta(hours=2) 

637 end_time = start_time + timedelta(hours=3) 

638 with events_session(normal_token) as api: 

639 res = api.CreateEvent( 

640 events_pb2.CreateEventReq( 

641 title="Dummy Title", 

642 content="Dummy content.", 

643 photo_key=None, 

644 offline_information=events_pb2.OfflineEventInformation( 

645 address="Near Null Island", 

646 lat=0.1, 

647 lng=0.2, 

648 ), 

649 start_time=Timestamp_from_datetime(start_time), 

650 end_time=Timestamp_from_datetime(end_time), 

651 timezone="UTC", 

652 ) 

653 ) 

654 event_id = res.event_id 

655 assert not res.is_deleted 

656 

657 with session_scope() as session: 

658 with real_admin_session(super_token) as api: 

659 api.DeleteEvent( 

660 admin_pb2.DeleteEventReq( 

661 event_id=event_id, 

662 ) 

663 ) 

664 occurrence = session.get(EventOccurrence, ident=event_id) 

665 assert occurrence.is_deleted 

666 

667 

668def test_ListUserIds(db): 

669 super_user, super_token = generate_user(is_superuser=True) 

670 normal_user, normal_token = generate_user() 

671 

672 with real_admin_session(super_token) as api: 

673 res = api.ListUserIds( 

674 admin_pb2.ListUserIdsReq( 

675 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

676 ) 

677 ) 

678 assert len(res.user_ids) == 2 

679 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

680 

681 with real_admin_session(super_token) as api: 

682 res = api.ListUserIds( 

683 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

684 ) 

685 assert res.user_ids == [] 

686 

687 

688def test_EditReferenceText(db): 

689 super_user, super_token = generate_user(is_superuser=True) 

690 test_new_text = "New Text" 

691 

692 user1, user1_token = generate_user() 

693 user2, user2_token = generate_user() 

694 

695 with session_scope() as session: 

696 with references_session(user1_token) as api: 

697 reference = api.WriteFriendReference( 

698 references_pb2.WriteFriendReferenceReq( 

699 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

700 ) 

701 ) 

702 

703 with real_admin_session(super_token) as admin_api: 

704 admin_api.EditReferenceText( 

705 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

706 ) 

707 

708 session.expire_all() 

709 

710 modified_reference = session.execute( 

711 select(Reference).where(Reference.id == reference.reference_id) 

712 ).scalar_one_or_none() 

713 assert modified_reference.text == test_new_text 

714 

715 

716def test_DeleteReference(db): 

717 super_user, super_token = generate_user(is_superuser=True) 

718 

719 user1, user1_token = generate_user() 

720 user2, user2_token = generate_user() 

721 

722 with references_session(user1_token) as api: 

723 reference = api.WriteFriendReference( 

724 references_pb2.WriteFriendReferenceReq( 

725 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

726 ) 

727 ) 

728 

729 with references_session(user1_token) as api: 

730 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

731 

732 with real_admin_session(super_token) as admin_api: 

733 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

734 

735 with references_session(user1_token) as api: 

736 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

737 

738 with session_scope() as session: 

739 modified_reference = session.execute( 

740 select(Reference).where(Reference.id == reference.reference_id) 

741 ).scalar_one_or_none() 

742 assert modified_reference.is_deleted 

743 

744 

745def test_AddUsersToModerationUserList(db): 

746 super_user, super_token = generate_user(is_superuser=True) 

747 user1, _ = generate_user() 

748 user2, _ = generate_user() 

749 user3, _ = generate_user() 

750 user4, _ = generate_user() 

751 user5, _ = generate_user() 

752 moderation_list_id = add_users_to_new_moderation_list([user1]) 

753 

754 with session_scope() as session: 

755 with real_admin_session(super_token) as api: 

756 # Test adding users to a non-existent moderation list (should raise an error) 

757 with pytest.raises(grpc.RpcError) as e: 

758 api.AddUsersToModerationUserList( 

759 admin_pb2.AddUsersToModerationUserListReq(users=[user2.username], moderation_list_id=999), 

760 ) 

761 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

762 assert errors.MODERATION_USER_LIST_NOT_FOUND == e.value.details() 

763 

764 # Test with non-existent user (should raise an error) 

765 with pytest.raises(grpc.RpcError) as e: 

766 api.AddUsersToModerationUserList( 

767 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, "nonexistent"]), 

768 ) 

769 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

770 assert errors.USER_NOT_FOUND == e.value.details() 

771 

772 # Test successful creation of new moderation list (no moderation_list_id provided) 

773 res = api.AddUsersToModerationUserList( 

774 admin_pb2.AddUsersToModerationUserListReq(users=[user1.username, user2.username, user3.username]), 

775 ) 

776 assert res.moderation_list_id > 0 

777 with session_scope() as session: 

778 moderation_user_list = session.get(ModerationUserList, res.moderation_list_id) 

779 assert moderation_user_list is not None 

780 assert len(moderation_user_list.users) == 3 

781 assert {user1.id, user2.id, user3.id}.issubset({user.id for user in moderation_user_list.users}) 

782 

783 # Test list endpoint returns same moderation list with same members not repeated 

784 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

785 assert len(listRes.moderation_lists) == 1 

786 assert listRes.moderation_lists[0].moderation_list_id == res.moderation_list_id 

787 assert len(listRes.moderation_lists[0].member_ids) == 3 

788 assert {user1.id, user2.id, user3.id}.issubset(listRes.moderation_lists[0].member_ids) 

789 

790 # Test user can be in multiple moderation lists 

791 listRes3 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

792 assert len(listRes3.moderation_lists) == 2 

793 

794 # Test adding users to an existing moderation list 

795 res2 = api.AddUsersToModerationUserList( 

796 admin_pb2.AddUsersToModerationUserListReq( 

797 users=[user4.username, user5.username], moderation_list_id=moderation_list_id 

798 ), 

799 ) 

800 assert res2.moderation_list_id == moderation_list_id 

801 with session_scope() as session: 

802 moderation_user_list = session.get(ModerationUserList, moderation_list_id) 

803 assert len(moderation_user_list.users) == 3 

804 assert {user1.id, user4.id, user5.id}.issubset({user.id for user in moderation_user_list.users}) 

805 

806 # Test list user moderation lists endpoint returns the right moderation list 

807 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user5.username)) 

808 assert len(listRes2.moderation_lists) == 1 

809 assert listRes2.moderation_lists[0].moderation_list_id == moderation_list_id 

810 assert len(listRes2.moderation_lists[0].member_ids) == 3 

811 assert {user1.id, user4.id, user5.id}.issubset(listRes2.moderation_lists[0].member_ids) 

812 

813 

814def test_RemoveUserFromModerationUserList(db): 

815 super_user, super_token = generate_user(is_superuser=True) 

816 user1, _ = generate_user() 

817 user2, _ = generate_user() 

818 user3, _ = generate_user() 

819 moderation_list_id = add_users_to_new_moderation_list([user1, user2]) 

820 

821 with real_admin_session(super_token) as api: 

822 # Test with non-existent user (should raise error) 

823 with pytest.raises(grpc.RpcError) as e: 

824 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user="nonexistent")) 

825 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

826 assert errors.USER_NOT_FOUND == e.value.details() 

827 

828 # Test without providing moderation list id (should raise error) 

829 with pytest.raises(grpc.RpcError) as e: 

830 api.RemoveUserFromModerationUserList(admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username)) 

831 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

832 assert errors.MISSING_MODERATION_USER_LIST_ID == e.value.details() 

833 

834 # Test removing user that's not in the provided moderation list (should raise error) 

835 with pytest.raises(grpc.RpcError) as e: 

836 api.RemoveUserFromModerationUserList( 

837 admin_pb2.RemoveUserFromModerationUserListReq( 

838 user=user3.username, moderation_list_id=moderation_list_id 

839 ) 

840 ) 

841 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

842 assert errors.USER_NOT_IN_THE_MODERATION_USER_LIST == e.value.details() 

843 

844 # Test successful removal 

845 api.RemoveUserFromModerationUserList( 

846 admin_pb2.RemoveUserFromModerationUserListReq(user=user1.username, moderation_list_id=moderation_list_id) 

847 ) 

848 with session_scope() as session: 

849 moderation_user_list = session.get(ModerationUserList, moderation_list_id) 

850 assert user1.id not in {user.id for user in moderation_user_list.users} 

851 assert user2.id in {user.id for user in moderation_user_list.users} 

852 

853 # Test list user moderation lists endpoint returns right number of moderation lists 

854 listRes = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user1.username)) 

855 assert len(listRes.moderation_lists) == 0 

856 listRes2 = api.ListModerationUserLists(admin_pb2.ListModerationUserListsReq(user=user2.username)) 

857 assert len(listRes2.moderation_lists) == 1 

858 

859 # Test removing all users from moderation list should also delete the moderation list 

860 api.RemoveUserFromModerationUserList( 

861 admin_pb2.RemoveUserFromModerationUserListReq(user=user2.username, moderation_list_id=moderation_list_id) 

862 ) 

863 with session_scope() as session: 

864 assert session.get(ModerationUserList, moderation_list_id) is None 

865 

866 

867def test_admin_delete_account_url(db, push_collector): 

868 super_user, super_token = generate_user(is_superuser=True) 

869 

870 user, token = generate_user() 

871 user_id = user.id 

872 

873 with real_admin_session(super_token) as admin_api: 

874 url = admin_api.CreateAccountDeletionLink( 

875 admin_pb2.CreateAccountDeletionLinkReq(user=user.username) 

876 ).account_deletion_confirm_url 

877 

878 push_collector.assert_user_has_count(user_id, 0) 

879 

880 with session_scope() as session: 

881 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

882 token = token_o.token 

883 assert token_o.user.id == user_id 

884 assert url == f"http://localhost:3000/delete-account?token={token}" 

885 

886 with mock_notification_email() as mock: 

887 with auth_api_session() as (auth_api, metadata_interceptor): 

888 auth_api.ConfirmDeleteAccount( 

889 auth_pb2.ConfirmDeleteAccountReq( 

890 token=token, 

891 ) 

892 ) 

893 

894 push_collector.assert_user_push_matches_fields( 

895 user_id, 

896 ix=0, 

897 title="Your Couchers.org account has been deleted", 

898 body="You can still undo this by following the link we emailed to you within 7 days.", 

899 ) 

900 

901 mock.assert_called_once() 

902 e = email_fields(mock) 

903 

904 

905# community invite feature tested in test_events.py 

906# SendBlogPostNotification tested in test_notifications.py 

907# MarkUserNeedsLocationUpdate tested in test_jail.py