Coverage for src/tests/test_admin.py: 100%

337 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1from datetime import date, datetime 

2from re import match 

3 

4import grpc 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers import errors 

9from couchers.db import session_scope 

10from couchers.models import Cluster, ContentReport, EventOccurrence, Node, UserSession 

11from couchers.sql import couchers_select as select 

12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta 

13from proto import admin_pb2, events_pb2, reporting_pb2 

14from tests.test_communities import create_community 

15from tests.test_fixtures import ( # noqa 

16 db, 

17 email_fields, 

18 events_session, 

19 generate_user, 

20 get_user_id_and_token, 

21 mock_notification_email, 

22 push_collector, 

23 real_admin_session, 

24 reporting_session, 

25 testconfig, 

26) 

27 

28 

29@pytest.fixture(autouse=True) 

30def _(testconfig): 

31 pass 

32 

33 

34def test_access_by_normal_user(db): 

35 normal_user, normal_token = generate_user() 

36 

37 with real_admin_session(normal_token) as api: 

38 # all requests to the admin servicer should break when done by a non-super_user 

39 with pytest.raises(grpc.RpcError) as e: 

40 api.GetUserDetails( 

41 admin_pb2.GetUserDetailsReq( 

42 user=str(normal_user.id), 

43 ) 

44 ) 

45 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

46 

47 

48def test_GetUserDetails(db): 

49 super_user, super_token = generate_user(is_superuser=True) 

50 normal_user, normal_token = generate_user() 

51 

52 with real_admin_session(super_token) as api: 

53 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

54 assert res.user_id == normal_user.id 

55 assert res.username == normal_user.username 

56 assert res.email == normal_user.email 

57 assert res.gender == normal_user.gender 

58 assert parse_date(res.birthdate) == normal_user.birthdate 

59 assert not res.banned 

60 assert not res.deleted 

61 

62 with real_admin_session(super_token) as api: 

63 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

64 assert res.user_id == normal_user.id 

65 assert res.username == normal_user.username 

66 assert res.email == normal_user.email 

67 assert res.gender == normal_user.gender 

68 assert parse_date(res.birthdate) == normal_user.birthdate 

69 assert not res.banned 

70 assert not res.deleted 

71 

72 with real_admin_session(super_token) as api: 

73 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

74 assert res.user_id == normal_user.id 

75 assert res.username == normal_user.username 

76 assert res.email == normal_user.email 

77 assert res.gender == normal_user.gender 

78 assert parse_date(res.birthdate) == normal_user.birthdate 

79 assert not res.banned 

80 assert not res.deleted 

81 

82 

83def test_ChangeUserGender(db, push_collector): 

84 super_user, super_token = generate_user(is_superuser=True) 

85 normal_user, normal_token = generate_user() 

86 

87 with real_admin_session(super_token) as api: 

88 with mock_notification_email() as mock: 

89 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

90 assert res.user_id == normal_user.id 

91 assert res.username == normal_user.username 

92 assert res.email == normal_user.email 

93 assert res.gender == "Machine" 

94 assert parse_date(res.birthdate) == normal_user.birthdate 

95 assert not res.banned 

96 assert not res.deleted 

97 

98 mock.assert_called_once() 

99 e = email_fields(mock) 

100 assert e.subject == "[TEST] Your gender was changed" 

101 assert e.recipient == normal_user.email 

102 assert "Machine" in e.plain 

103 assert "Machine" in e.html 

104 

105 push_collector.assert_user_has_single_matching( 

106 normal_user.id, 

107 title="Your gender was changed", 

108 body="Your gender on Couchers.org was changed to Machine by an admin.", 

109 ) 

110 

111 

112def test_ChangeUserBirthdate(db, push_collector): 

113 super_user, super_token = generate_user(is_superuser=True) 

114 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

115 

116 with real_admin_session(super_token) as api: 

117 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

118 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

119 

120 with mock_notification_email() as mock: 

121 res = api.ChangeUserBirthdate( 

122 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

123 ) 

124 

125 assert res.user_id == normal_user.id 

126 assert res.username == normal_user.username 

127 assert res.email == normal_user.email 

128 assert res.birthdate == "1990-05-25" 

129 assert res.gender == normal_user.gender 

130 assert not res.banned 

131 assert not res.deleted 

132 

133 mock.assert_called_once() 

134 e = email_fields(mock) 

135 assert e.subject == "[TEST] Your date of birth was changed" 

136 assert e.recipient == normal_user.email 

137 assert "1990" in e.plain 

138 assert "1990" in e.html 

139 

140 push_collector.assert_user_has_single_matching( 

141 normal_user.id, 

142 title="Your date of birth was changed", 

143 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.", 

144 ) 

145 

146 

147def test_BanUser(db): 

148 super_user, super_token = generate_user(is_superuser=True) 

149 normal_user, _ = generate_user() 

150 admin_note = "A good reason" 

151 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

152 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

153 

154 with real_admin_session(super_token) as api: 

155 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

156 assert res.user_id == normal_user.id 

157 assert res.username == normal_user.username 

158 assert res.email == normal_user.email 

159 assert res.gender == normal_user.gender 

160 assert parse_date(res.birthdate) == normal_user.birthdate 

161 assert res.banned 

162 assert not res.deleted 

163 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

164 

165 

166def test_UnbanUser(db): 

167 super_user, super_token = generate_user(is_superuser=True) 

168 normal_user, _ = generate_user() 

169 admin_note = "A good reason" 

170 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

171 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

172 

173 with real_admin_session(super_token) as api: 

174 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

175 assert res.user_id == normal_user.id 

176 assert res.username == normal_user.username 

177 assert res.email == normal_user.email 

178 assert res.gender == normal_user.gender 

179 assert parse_date(res.birthdate) == normal_user.birthdate 

180 assert not res.banned 

181 assert not res.deleted 

182 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

183 

184 

185def test_AddAdminNote(db): 

186 super_user, super_token = generate_user(is_superuser=True) 

187 normal_user, _ = generate_user() 

188 admin_note1 = "User reported strange behavior" 

189 admin_note2 = "Insert private information here" 

190 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

191 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

192 

193 with real_admin_session(super_token) as api: 

194 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

195 assert res.user_id == normal_user.id 

196 assert res.username == normal_user.username 

197 assert res.email == normal_user.email 

198 assert res.gender == normal_user.gender 

199 assert parse_date(res.birthdate) == normal_user.birthdate 

200 assert not res.banned 

201 assert not res.deleted 

202 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note) 

203 

204 with real_admin_session(super_token) as api: 

205 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

206 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note) 

207 

208 

209def test_AddAdminNote_blank(db): 

210 super_user, super_token = generate_user(is_superuser=True) 

211 normal_user, _ = generate_user() 

212 empty_admin_note = " \t \n " 

213 

214 with real_admin_session(super_token) as api: 

215 with pytest.raises(grpc.RpcError) as e: 

216 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

217 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

218 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY 

219 

220 

221def test_admin_content_reports(db): 

222 super_user, super_token = generate_user(is_superuser=True) 

223 normal_user, token = generate_user() 

224 bad_user1, _ = generate_user() 

225 bad_user2, _ = generate_user() 

226 

227 with reporting_session(token) as api: 

228 api.Report( 

229 reporting_pb2.ReportReq( 

230 reason="spam", 

231 description="r1", 

232 content_ref="comment/123", 

233 author_user=bad_user1.username, 

234 user_agent="n/a", 

235 page="https://couchers.org/comment/123", 

236 ) 

237 ) 

238 api.Report( 

239 reporting_pb2.ReportReq( 

240 reason="spam", 

241 description="r2", 

242 content_ref="comment/124", 

243 author_user=bad_user2.username, 

244 user_agent="n/a", 

245 page="https://couchers.org/comment/124", 

246 ) 

247 ) 

248 api.Report( 

249 reporting_pb2.ReportReq( 

250 reason="something else", 

251 description="r3", 

252 content_ref="page/321", 

253 author_user=bad_user1.username, 

254 user_agent="n/a", 

255 page="https://couchers.org/page/321", 

256 ) 

257 ) 

258 

259 with session_scope() as session: 

260 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all()) 

261 

262 with real_admin_session(super_token) as api: 

263 with pytest.raises(grpc.RpcError) as e: 

264 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

265 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

266 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND 

267 

268 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

269 rep = res.content_report 

270 assert rep.content_report_id == id_by_description["r2"] 

271 assert rep.reporting_user_id == normal_user.id 

272 assert rep.author_user_id == bad_user2.id 

273 assert rep.reason == "spam" 

274 assert rep.description == "r2" 

275 assert rep.content_ref == "comment/124" 

276 assert rep.user_agent == "n/a" 

277 assert rep.page == "https://couchers.org/comment/124" 

278 

279 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

280 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

281 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

282 

283 

284def test_DeleteUser(db): 

285 super_user, super_token = generate_user(is_superuser=True) 

286 normal_user, normal_token = generate_user() 

287 

288 with real_admin_session(super_token) as api: 

289 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

290 assert res.user_id == normal_user.id 

291 assert res.username == normal_user.username 

292 assert res.email == normal_user.email 

293 assert res.gender == normal_user.gender 

294 assert parse_date(res.birthdate) == normal_user.birthdate 

295 assert not res.banned 

296 assert res.deleted 

297 

298 

299def test_CreateApiKey(db, push_collector): 

300 with session_scope() as session: 

301 super_user, super_token = generate_user(is_superuser=True) 

302 normal_user, normal_token = generate_user() 

303 

304 assert ( 

305 session.execute( 

306 select(func.count()) 

307 .select_from(UserSession) 

308 .where(UserSession.is_api_key == True) 

309 .where(UserSession.user_id == normal_user.id) 

310 ).scalar_one() 

311 == 0 

312 ) 

313 

314 with mock_notification_email() as mock: 

315 with real_admin_session(super_token) as api: 

316 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

317 

318 mock.assert_called_once() 

319 e = email_fields(mock) 

320 assert e.subject == "[TEST] Your API key for Couchers.org" 

321 

322 with session_scope() as session: 

323 api_key = session.execute( 

324 select(UserSession) 

325 .where(UserSession.is_valid) 

326 .where(UserSession.is_api_key == True) 

327 .where(UserSession.user_id == normal_user.id) 

328 ).scalar_one() 

329 

330 assert api_key.token in e.plain 

331 assert api_key.token in e.html 

332 

333 assert e.recipient == normal_user.email 

334 assert "api key" in e.subject.lower() 

335 unique_string = "We've issued you with the following API key:" 

336 assert unique_string in e.plain 

337 assert unique_string in e.html 

338 assert "support@couchers.org" in e.plain 

339 assert "support@couchers.org" in e.html 

340 

341 push_collector.assert_user_has_single_matching( 

342 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email." 

343 ) 

344 

345 

346VALID_GEOJSON_MULTIPOLYGON = """ 

347 { 

348 "type": "MultiPolygon", 

349 "coordinates": 

350 [ 

351 [ 

352 [ 

353 [ 

354 -73.98114904754641, 

355 40.7470284264813 

356 ], 

357 [ 

358 -73.98314135177611, 

359 40.73416844413217 

360 ], 

361 [ 

362 -74.00538969848634, 

363 40.734314779027144 

364 ], 

365 [ 

366 -74.00479214294432, 

367 40.75027851544338 

368 ], 

369 [ 

370 -73.98114904754641, 

371 40.7470284264813 

372 ] 

373 ] 

374 ] 

375 ] 

376 } 

377""" 

378 

379POINT_GEOJSON = """ 

380{ "type": "Point", "coordinates": [100.0, 0.0] } 

381""" 

382 

383 

384def test_CreateCommunity_invalid_geojson(db): 

385 super_user, super_token = generate_user(is_superuser=True) 

386 normal_user, normal_token = generate_user() 

387 with real_admin_session(super_token) as api: 

388 with pytest.raises(grpc.RpcError) as e: 

389 api.CreateCommunity( 

390 admin_pb2.CreateCommunityReq( 

391 name="test community", 

392 description="community for testing", 

393 admin_ids=[], 

394 geojson=POINT_GEOJSON, 

395 ) 

396 ) 

397 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

398 assert e.value.details() == errors.NO_MULTIPOLYGON 

399 

400 

401def test_CreateCommunity(db): 

402 with session_scope() as session: 

403 super_user, super_token = generate_user(is_superuser=True) 

404 normal_user, normal_token = generate_user() 

405 with real_admin_session(super_token) as api: 

406 api.CreateCommunity( 

407 admin_pb2.CreateCommunityReq( 

408 name="test community", 

409 description="community for testing", 

410 admin_ids=[], 

411 geojson=VALID_GEOJSON_MULTIPOLYGON, 

412 ) 

413 ) 

414 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

415 assert community.description == "community for testing" 

416 assert community.slug == "test-community" 

417 

418 

419def test_UpdateCommunity_invalid_geojson(db): 

420 super_user, super_token = generate_user(is_superuser=True) 

421 

422 with session_scope() as session: 

423 with real_admin_session(super_token) as api: 

424 api.CreateCommunity( 

425 admin_pb2.CreateCommunityReq( 

426 name="test community", 

427 description="community for testing", 

428 admin_ids=[], 

429 geojson=VALID_GEOJSON_MULTIPOLYGON, 

430 ) 

431 ) 

432 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

433 

434 with pytest.raises(grpc.RpcError) as e: 

435 api.UpdateCommunity( 

436 admin_pb2.UpdateCommunityReq( 

437 community_id=community.parent_node_id, 

438 name="test community 2", 

439 description="community for testing 2", 

440 geojson=POINT_GEOJSON, 

441 ) 

442 ) 

443 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

444 assert e.value.details() == errors.NO_MULTIPOLYGON 

445 

446 

447def test_UpdateCommunity_invalid_id(db): 

448 super_user, super_token = generate_user(is_superuser=True) 

449 

450 with session_scope() as session: 

451 with real_admin_session(super_token) as api: 

452 api.CreateCommunity( 

453 admin_pb2.CreateCommunityReq( 

454 name="test community", 

455 description="community for testing", 

456 admin_ids=[], 

457 geojson=VALID_GEOJSON_MULTIPOLYGON, 

458 ) 

459 ) 

460 

461 with pytest.raises(grpc.RpcError) as e: 

462 api.UpdateCommunity( 

463 admin_pb2.UpdateCommunityReq( 

464 community_id=1000, 

465 name="test community 1000", 

466 description="community for testing 1000", 

467 geojson=VALID_GEOJSON_MULTIPOLYGON, 

468 ) 

469 ) 

470 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

471 assert e.value.details() == errors.COMMUNITY_NOT_FOUND 

472 

473 

474def test_UpdateCommunity(db): 

475 super_user, super_token = generate_user(is_superuser=True) 

476 

477 with session_scope() as session: 

478 with real_admin_session(super_token) as api: 

479 api.CreateCommunity( 

480 admin_pb2.CreateCommunityReq( 

481 name="test community", 

482 description="community for testing", 

483 admin_ids=[], 

484 geojson=VALID_GEOJSON_MULTIPOLYGON, 

485 ) 

486 ) 

487 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

488 assert community.description == "community for testing" 

489 

490 api.CreateCommunity( 

491 admin_pb2.CreateCommunityReq( 

492 name="test community 2", 

493 description="community for testing 2", 

494 admin_ids=[], 

495 geojson=VALID_GEOJSON_MULTIPOLYGON, 

496 ) 

497 ) 

498 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one() 

499 

500 api.UpdateCommunity( 

501 admin_pb2.UpdateCommunityReq( 

502 community_id=community.parent_node_id, 

503 name="test community 2", 

504 description="community for testing 2", 

505 geojson=VALID_GEOJSON_MULTIPOLYGON, 

506 parent_node_id=community_2.parent_node_id, 

507 ) 

508 ) 

509 session.commit() 

510 

511 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one() 

512 assert community_updated.description == "community for testing 2" 

513 assert community_updated.slug == "test-community-2" 

514 

515 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one() 

516 assert node_updated.parent_node_id == community_2.parent_node_id 

517 

518 

519def test_GetChats(db): 

520 super_user, super_token = generate_user(is_superuser=True) 

521 normal_user, normal_token = generate_user() 

522 

523 with real_admin_session(super_token) as api: 

524 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

525 assert res.response 

526 

527 

528def test_badges(db, push_collector): 

529 super_user, super_token = generate_user(is_superuser=True) 

530 normal_user, normal_token = generate_user() 

531 

532 with real_admin_session(super_token) as api: 

533 # can add a badge 

534 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

535 with mock_notification_email() as mock: 

536 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

537 assert "volunteer" in res.badges 

538 

539 # badge emails are disabled by default 

540 mock.assert_not_called() 

541 

542 push_collector.assert_user_has_single_matching( 

543 normal_user.id, 

544 title="The Active Volunteer badge was added to your profile", 

545 body="Check out your profile to see the new badge!", 

546 ) 

547 

548 # can't add/edit special tags 

549 with pytest.raises(grpc.RpcError) as e: 

550 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

551 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

552 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE 

553 

554 # double add badge 

555 with pytest.raises(grpc.RpcError) as e: 

556 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

557 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

558 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE 

559 

560 # can remove badge 

561 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

562 with mock_notification_email() as mock: 

563 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

564 assert "volunteer" not in res.badges 

565 

566 # badge emails are disabled by default 

567 mock.assert_not_called() 

568 

569 push_collector.assert_user_push_matches_fields( 

570 normal_user.id, 

571 ix=1, 

572 title="The Active Volunteer badge was removed from your profile", 

573 body="You can see all your badges on your profile.", 

574 ) 

575 

576 # not found on user 

577 with pytest.raises(grpc.RpcError) as e: 

578 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

579 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

580 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE 

581 

582 # not found in general 

583 with pytest.raises(grpc.RpcError) as e: 

584 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

585 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

586 assert e.value.details() == errors.BADGE_NOT_FOUND 

587 

588 

589def test_DeleteEvent(db): 

590 super_user, super_token = generate_user(is_superuser=True) 

591 normal_user, normal_token = generate_user() 

592 

593 with session_scope() as session: 

594 create_community(session, 0, 2, "Community", [normal_user], [], None) 

595 

596 start_time = now() + timedelta(hours=2) 

597 end_time = start_time + timedelta(hours=3) 

598 with events_session(normal_token) as api: 

599 res = api.CreateEvent( 

600 events_pb2.CreateEventReq( 

601 title="Dummy Title", 

602 content="Dummy content.", 

603 photo_key=None, 

604 offline_information=events_pb2.OfflineEventInformation( 

605 address="Near Null Island", 

606 lat=0.1, 

607 lng=0.2, 

608 ), 

609 start_time=Timestamp_from_datetime(start_time), 

610 end_time=Timestamp_from_datetime(end_time), 

611 timezone="UTC", 

612 ) 

613 ) 

614 event_id = res.event_id 

615 assert not res.is_deleted 

616 

617 with session_scope() as session: 

618 with real_admin_session(super_token) as api: 

619 api.DeleteEvent( 

620 admin_pb2.DeleteEventReq( 

621 event_id=event_id, 

622 ) 

623 ) 

624 occurrence = session.get(EventOccurrence, ident=event_id) 

625 assert occurrence.is_deleted 

626 

627 

628def test_ListUserIds(db): 

629 super_user, super_token = generate_user(is_superuser=True) 

630 normal_user, normal_token = generate_user() 

631 

632 with real_admin_session(super_token) as api: 

633 res = api.ListUserIds( 

634 admin_pb2.ListUserIdsReq( 

635 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

636 ) 

637 ) 

638 assert len(res.user_ids) == 2 

639 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

640 

641 with real_admin_session(super_token) as api: 

642 res = api.ListUserIds( 

643 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

644 ) 

645 assert res.user_ids == [] 

646 

647 

648# community invite feature tested in test_events.py