Coverage for src/tests/test_admin.py: 100%

378 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-04-16 15:13 +0000

1from datetime import date, datetime 

2from re import match 

3 

4import grpc 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers import errors 

9from couchers.db import session_scope 

10from couchers.models import Cluster, ContentReport, EventOccurrence, Node, Reference, UserSession 

11from couchers.sql import couchers_select as select 

12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta 

13from proto import admin_pb2, events_pb2, references_pb2, reporting_pb2 

14from tests.test_communities import create_community 

15from tests.test_fixtures import ( # noqa 

16 db, 

17 email_fields, 

18 events_session, 

19 generate_user, 

20 get_user_id_and_token, 

21 mock_notification_email, 

22 push_collector, 

23 real_admin_session, 

24 references_session, 

25 reporting_session, 

26 testconfig, 

27) 

28 

29 

30@pytest.fixture(autouse=True) 

31def _(testconfig): 

32 pass 

33 

34 

35def test_access_by_normal_user(db): 

36 normal_user, normal_token = generate_user() 

37 

38 with real_admin_session(normal_token) as api: 

39 # all requests to the admin servicer should break when done by a non-super_user 

40 with pytest.raises(grpc.RpcError) as e: 

41 api.GetUserDetails( 

42 admin_pb2.GetUserDetailsReq( 

43 user=str(normal_user.id), 

44 ) 

45 ) 

46 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

47 

48 

49def test_GetUser(db): 

50 super_user, super_token = generate_user(is_superuser=True) 

51 normal_user, normal_token = generate_user() 

52 

53 with real_admin_session(super_token) as api: 

54 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

55 assert res.user_id == normal_user.id 

56 assert res.username == normal_user.username 

57 

58 with real_admin_session(super_token) as api: 

59 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

60 

61 with real_admin_session(super_token) as api: 

62 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

63 assert res.user_id == normal_user.id 

64 assert res.username == normal_user.username 

65 

66 

67def test_GetUserDetails(db): 

68 super_user, super_token = generate_user(is_superuser=True) 

69 normal_user, normal_token = generate_user() 

70 

71 with real_admin_session(super_token) as api: 

72 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

73 assert res.user_id == normal_user.id 

74 assert res.username == normal_user.username 

75 assert res.email == normal_user.email 

76 assert res.gender == normal_user.gender 

77 assert parse_date(res.birthdate) == normal_user.birthdate 

78 assert not res.banned 

79 assert not res.deleted 

80 

81 with real_admin_session(super_token) as api: 

82 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

83 assert res.user_id == normal_user.id 

84 assert res.username == normal_user.username 

85 assert res.email == normal_user.email 

86 assert res.gender == normal_user.gender 

87 assert parse_date(res.birthdate) == normal_user.birthdate 

88 assert not res.banned 

89 assert not res.deleted 

90 

91 with real_admin_session(super_token) as api: 

92 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

93 assert res.user_id == normal_user.id 

94 assert res.username == normal_user.username 

95 assert res.email == normal_user.email 

96 assert res.gender == normal_user.gender 

97 assert parse_date(res.birthdate) == normal_user.birthdate 

98 assert not res.banned 

99 assert not res.deleted 

100 

101 

102def test_ChangeUserGender(db, push_collector): 

103 super_user, super_token = generate_user(is_superuser=True) 

104 normal_user, normal_token = generate_user() 

105 

106 with real_admin_session(super_token) as api: 

107 with mock_notification_email() as mock: 

108 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

109 assert res.user_id == normal_user.id 

110 assert res.username == normal_user.username 

111 assert res.email == normal_user.email 

112 assert res.gender == "Machine" 

113 assert parse_date(res.birthdate) == normal_user.birthdate 

114 assert not res.banned 

115 assert not res.deleted 

116 

117 mock.assert_called_once() 

118 e = email_fields(mock) 

119 assert e.subject == "[TEST] Your gender was changed" 

120 assert e.recipient == normal_user.email 

121 assert "Machine" in e.plain 

122 assert "Machine" in e.html 

123 

124 push_collector.assert_user_has_single_matching( 

125 normal_user.id, 

126 title="Your gender was changed", 

127 body="Your gender on Couchers.org was changed to Machine by an admin.", 

128 ) 

129 

130 

131def test_ChangeUserBirthdate(db, push_collector): 

132 super_user, super_token = generate_user(is_superuser=True) 

133 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

134 

135 with real_admin_session(super_token) as api: 

136 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

137 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

138 

139 with mock_notification_email() as mock: 

140 res = api.ChangeUserBirthdate( 

141 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

142 ) 

143 

144 assert res.user_id == normal_user.id 

145 assert res.username == normal_user.username 

146 assert res.email == normal_user.email 

147 assert res.birthdate == "1990-05-25" 

148 assert res.gender == normal_user.gender 

149 assert not res.banned 

150 assert not res.deleted 

151 

152 mock.assert_called_once() 

153 e = email_fields(mock) 

154 assert e.subject == "[TEST] Your date of birth was changed" 

155 assert e.recipient == normal_user.email 

156 assert "1990" in e.plain 

157 assert "1990" in e.html 

158 

159 push_collector.assert_user_has_single_matching( 

160 normal_user.id, 

161 title="Your date of birth was changed", 

162 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.", 

163 ) 

164 

165 

166def test_BanUser(db): 

167 super_user, super_token = generate_user(is_superuser=True) 

168 normal_user, _ = generate_user() 

169 admin_note = "A good reason" 

170 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

171 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

172 

173 with real_admin_session(super_token) as api: 

174 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

175 assert res.user_id == normal_user.id 

176 assert res.username == normal_user.username 

177 assert res.email == normal_user.email 

178 assert res.gender == normal_user.gender 

179 assert parse_date(res.birthdate) == normal_user.birthdate 

180 assert res.banned 

181 assert not res.deleted 

182 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

183 

184 

185def test_UnbanUser(db): 

186 super_user, super_token = generate_user(is_superuser=True) 

187 normal_user, _ = generate_user() 

188 admin_note = "A good reason" 

189 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

190 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

191 

192 with real_admin_session(super_token) as api: 

193 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

194 assert res.user_id == normal_user.id 

195 assert res.username == normal_user.username 

196 assert res.email == normal_user.email 

197 assert res.gender == normal_user.gender 

198 assert parse_date(res.birthdate) == normal_user.birthdate 

199 assert not res.banned 

200 assert not res.deleted 

201 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

202 

203 

204def test_AddAdminNote(db): 

205 super_user, super_token = generate_user(is_superuser=True) 

206 normal_user, _ = generate_user() 

207 admin_note1 = "User reported strange behavior" 

208 admin_note2 = "Insert private information here" 

209 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

210 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

211 

212 with real_admin_session(super_token) as api: 

213 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

214 assert res.user_id == normal_user.id 

215 assert res.username == normal_user.username 

216 assert res.email == normal_user.email 

217 assert res.gender == normal_user.gender 

218 assert parse_date(res.birthdate) == normal_user.birthdate 

219 assert not res.banned 

220 assert not res.deleted 

221 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note) 

222 

223 with real_admin_session(super_token) as api: 

224 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

225 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note) 

226 

227 

228def test_AddAdminNote_blank(db): 

229 super_user, super_token = generate_user(is_superuser=True) 

230 normal_user, _ = generate_user() 

231 empty_admin_note = " \t \n " 

232 

233 with real_admin_session(super_token) as api: 

234 with pytest.raises(grpc.RpcError) as e: 

235 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

236 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

237 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY 

238 

239 

240def test_admin_content_reports(db): 

241 super_user, super_token = generate_user(is_superuser=True) 

242 normal_user, token = generate_user() 

243 bad_user1, _ = generate_user() 

244 bad_user2, _ = generate_user() 

245 

246 with reporting_session(token) as api: 

247 api.Report( 

248 reporting_pb2.ReportReq( 

249 reason="spam", 

250 description="r1", 

251 content_ref="comment/123", 

252 author_user=bad_user1.username, 

253 user_agent="n/a", 

254 page="https://couchers.org/comment/123", 

255 ) 

256 ) 

257 api.Report( 

258 reporting_pb2.ReportReq( 

259 reason="spam", 

260 description="r2", 

261 content_ref="comment/124", 

262 author_user=bad_user2.username, 

263 user_agent="n/a", 

264 page="https://couchers.org/comment/124", 

265 ) 

266 ) 

267 api.Report( 

268 reporting_pb2.ReportReq( 

269 reason="something else", 

270 description="r3", 

271 content_ref="page/321", 

272 author_user=bad_user1.username, 

273 user_agent="n/a", 

274 page="https://couchers.org/page/321", 

275 ) 

276 ) 

277 

278 with session_scope() as session: 

279 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all()) 

280 

281 with real_admin_session(super_token) as api: 

282 with pytest.raises(grpc.RpcError) as e: 

283 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

284 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

285 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND 

286 

287 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

288 rep = res.content_report 

289 assert rep.content_report_id == id_by_description["r2"] 

290 assert rep.reporting_user_id == normal_user.id 

291 assert rep.author_user_id == bad_user2.id 

292 assert rep.reason == "spam" 

293 assert rep.description == "r2" 

294 assert rep.content_ref == "comment/124" 

295 assert rep.user_agent == "n/a" 

296 assert rep.page == "https://couchers.org/comment/124" 

297 

298 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

299 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

300 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

301 

302 

303def test_DeleteUser(db): 

304 super_user, super_token = generate_user(is_superuser=True) 

305 normal_user, normal_token = generate_user() 

306 

307 with real_admin_session(super_token) as api: 

308 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

309 assert res.user_id == normal_user.id 

310 assert res.username == normal_user.username 

311 assert res.email == normal_user.email 

312 assert res.gender == normal_user.gender 

313 assert parse_date(res.birthdate) == normal_user.birthdate 

314 assert not res.banned 

315 assert res.deleted 

316 

317 

318def test_CreateApiKey(db, push_collector): 

319 with session_scope() as session: 

320 super_user, super_token = generate_user(is_superuser=True) 

321 normal_user, normal_token = generate_user() 

322 

323 assert ( 

324 session.execute( 

325 select(func.count()) 

326 .select_from(UserSession) 

327 .where(UserSession.is_api_key == True) 

328 .where(UserSession.user_id == normal_user.id) 

329 ).scalar_one() 

330 == 0 

331 ) 

332 

333 with mock_notification_email() as mock: 

334 with real_admin_session(super_token) as api: 

335 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

336 

337 mock.assert_called_once() 

338 e = email_fields(mock) 

339 assert e.subject == "[TEST] Your API key for Couchers.org" 

340 

341 with session_scope() as session: 

342 api_key = session.execute( 

343 select(UserSession) 

344 .where(UserSession.is_valid) 

345 .where(UserSession.is_api_key == True) 

346 .where(UserSession.user_id == normal_user.id) 

347 ).scalar_one() 

348 

349 assert api_key.token in e.plain 

350 assert api_key.token in e.html 

351 

352 assert e.recipient == normal_user.email 

353 assert "api key" in e.subject.lower() 

354 unique_string = "We've issued you with the following API key:" 

355 assert unique_string in e.plain 

356 assert unique_string in e.html 

357 assert "support@couchers.org" in e.plain 

358 assert "support@couchers.org" in e.html 

359 

360 push_collector.assert_user_has_single_matching( 

361 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email." 

362 ) 

363 

364 

365VALID_GEOJSON_MULTIPOLYGON = """ 

366 { 

367 "type": "MultiPolygon", 

368 "coordinates": 

369 [ 

370 [ 

371 [ 

372 [ 

373 -73.98114904754641, 

374 40.7470284264813 

375 ], 

376 [ 

377 -73.98314135177611, 

378 40.73416844413217 

379 ], 

380 [ 

381 -74.00538969848634, 

382 40.734314779027144 

383 ], 

384 [ 

385 -74.00479214294432, 

386 40.75027851544338 

387 ], 

388 [ 

389 -73.98114904754641, 

390 40.7470284264813 

391 ] 

392 ] 

393 ] 

394 ] 

395 } 

396""" 

397 

398POINT_GEOJSON = """ 

399{ "type": "Point", "coordinates": [100.0, 0.0] } 

400""" 

401 

402 

403def test_CreateCommunity_invalid_geojson(db): 

404 super_user, super_token = generate_user(is_superuser=True) 

405 normal_user, normal_token = generate_user() 

406 with real_admin_session(super_token) as api: 

407 with pytest.raises(grpc.RpcError) as e: 

408 api.CreateCommunity( 

409 admin_pb2.CreateCommunityReq( 

410 name="test community", 

411 description="community for testing", 

412 admin_ids=[], 

413 geojson=POINT_GEOJSON, 

414 ) 

415 ) 

416 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

417 assert e.value.details() == errors.NO_MULTIPOLYGON 

418 

419 

420def test_CreateCommunity(db): 

421 with session_scope() as session: 

422 super_user, super_token = generate_user(is_superuser=True) 

423 normal_user, normal_token = generate_user() 

424 with real_admin_session(super_token) as api: 

425 api.CreateCommunity( 

426 admin_pb2.CreateCommunityReq( 

427 name="test community", 

428 description="community for testing", 

429 admin_ids=[], 

430 geojson=VALID_GEOJSON_MULTIPOLYGON, 

431 ) 

432 ) 

433 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

434 assert community.description == "community for testing" 

435 assert community.slug == "test-community" 

436 

437 

438def test_UpdateCommunity_invalid_geojson(db): 

439 super_user, super_token = generate_user(is_superuser=True) 

440 

441 with session_scope() as session: 

442 with real_admin_session(super_token) as api: 

443 api.CreateCommunity( 

444 admin_pb2.CreateCommunityReq( 

445 name="test community", 

446 description="community for testing", 

447 admin_ids=[], 

448 geojson=VALID_GEOJSON_MULTIPOLYGON, 

449 ) 

450 ) 

451 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

452 

453 with pytest.raises(grpc.RpcError) as e: 

454 api.UpdateCommunity( 

455 admin_pb2.UpdateCommunityReq( 

456 community_id=community.parent_node_id, 

457 name="test community 2", 

458 description="community for testing 2", 

459 geojson=POINT_GEOJSON, 

460 ) 

461 ) 

462 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

463 assert e.value.details() == errors.NO_MULTIPOLYGON 

464 

465 

466def test_UpdateCommunity_invalid_id(db): 

467 super_user, super_token = generate_user(is_superuser=True) 

468 

469 with session_scope() as session: 

470 with real_admin_session(super_token) as api: 

471 api.CreateCommunity( 

472 admin_pb2.CreateCommunityReq( 

473 name="test community", 

474 description="community for testing", 

475 admin_ids=[], 

476 geojson=VALID_GEOJSON_MULTIPOLYGON, 

477 ) 

478 ) 

479 

480 with pytest.raises(grpc.RpcError) as e: 

481 api.UpdateCommunity( 

482 admin_pb2.UpdateCommunityReq( 

483 community_id=1000, 

484 name="test community 1000", 

485 description="community for testing 1000", 

486 geojson=VALID_GEOJSON_MULTIPOLYGON, 

487 ) 

488 ) 

489 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

490 assert e.value.details() == errors.COMMUNITY_NOT_FOUND 

491 

492 

493def test_UpdateCommunity(db): 

494 super_user, super_token = generate_user(is_superuser=True) 

495 

496 with session_scope() as session: 

497 with real_admin_session(super_token) as api: 

498 api.CreateCommunity( 

499 admin_pb2.CreateCommunityReq( 

500 name="test community", 

501 description="community for testing", 

502 admin_ids=[], 

503 geojson=VALID_GEOJSON_MULTIPOLYGON, 

504 ) 

505 ) 

506 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

507 assert community.description == "community for testing" 

508 

509 api.CreateCommunity( 

510 admin_pb2.CreateCommunityReq( 

511 name="test community 2", 

512 description="community for testing 2", 

513 admin_ids=[], 

514 geojson=VALID_GEOJSON_MULTIPOLYGON, 

515 ) 

516 ) 

517 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one() 

518 

519 api.UpdateCommunity( 

520 admin_pb2.UpdateCommunityReq( 

521 community_id=community.parent_node_id, 

522 name="test community 2", 

523 description="community for testing 2", 

524 geojson=VALID_GEOJSON_MULTIPOLYGON, 

525 parent_node_id=community_2.parent_node_id, 

526 ) 

527 ) 

528 session.commit() 

529 

530 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one() 

531 assert community_updated.description == "community for testing 2" 

532 assert community_updated.slug == "test-community-2" 

533 

534 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one() 

535 assert node_updated.parent_node_id == community_2.parent_node_id 

536 

537 

538def test_GetChats(db): 

539 super_user, super_token = generate_user(is_superuser=True) 

540 normal_user, normal_token = generate_user() 

541 

542 with real_admin_session(super_token) as api: 

543 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

544 assert res.response 

545 

546 

547def test_badges(db, push_collector): 

548 super_user, super_token = generate_user(is_superuser=True) 

549 normal_user, normal_token = generate_user() 

550 

551 with real_admin_session(super_token) as api: 

552 # can add a badge 

553 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

554 with mock_notification_email() as mock: 

555 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

556 assert "volunteer" in res.badges 

557 

558 # badge emails are disabled by default 

559 mock.assert_not_called() 

560 

561 push_collector.assert_user_has_single_matching( 

562 normal_user.id, 

563 title="The Active Volunteer badge was added to your profile", 

564 body="Check out your profile to see the new badge!", 

565 ) 

566 

567 # can't add/edit special tags 

568 with pytest.raises(grpc.RpcError) as e: 

569 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

570 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

571 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE 

572 

573 # double add badge 

574 with pytest.raises(grpc.RpcError) as e: 

575 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

576 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

577 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE 

578 

579 # can remove badge 

580 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

581 with mock_notification_email() as mock: 

582 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

583 assert "volunteer" not in res.badges 

584 

585 # badge emails are disabled by default 

586 mock.assert_not_called() 

587 

588 push_collector.assert_user_push_matches_fields( 

589 normal_user.id, 

590 ix=1, 

591 title="The Active Volunteer badge was removed from your profile", 

592 body="You can see all your badges on your profile.", 

593 ) 

594 

595 # not found on user 

596 with pytest.raises(grpc.RpcError) as e: 

597 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

598 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

599 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE 

600 

601 # not found in general 

602 with pytest.raises(grpc.RpcError) as e: 

603 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

604 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

605 assert e.value.details() == errors.BADGE_NOT_FOUND 

606 

607 

608def test_DeleteEvent(db): 

609 super_user, super_token = generate_user(is_superuser=True) 

610 normal_user, normal_token = generate_user() 

611 

612 with session_scope() as session: 

613 create_community(session, 0, 2, "Community", [normal_user], [], None) 

614 

615 start_time = now() + timedelta(hours=2) 

616 end_time = start_time + timedelta(hours=3) 

617 with events_session(normal_token) as api: 

618 res = api.CreateEvent( 

619 events_pb2.CreateEventReq( 

620 title="Dummy Title", 

621 content="Dummy content.", 

622 photo_key=None, 

623 offline_information=events_pb2.OfflineEventInformation( 

624 address="Near Null Island", 

625 lat=0.1, 

626 lng=0.2, 

627 ), 

628 start_time=Timestamp_from_datetime(start_time), 

629 end_time=Timestamp_from_datetime(end_time), 

630 timezone="UTC", 

631 ) 

632 ) 

633 event_id = res.event_id 

634 assert not res.is_deleted 

635 

636 with session_scope() as session: 

637 with real_admin_session(super_token) as api: 

638 api.DeleteEvent( 

639 admin_pb2.DeleteEventReq( 

640 event_id=event_id, 

641 ) 

642 ) 

643 occurrence = session.get(EventOccurrence, ident=event_id) 

644 assert occurrence.is_deleted 

645 

646 

647def test_ListUserIds(db): 

648 super_user, super_token = generate_user(is_superuser=True) 

649 normal_user, normal_token = generate_user() 

650 

651 with real_admin_session(super_token) as api: 

652 res = api.ListUserIds( 

653 admin_pb2.ListUserIdsReq( 

654 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

655 ) 

656 ) 

657 assert len(res.user_ids) == 2 

658 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

659 

660 with real_admin_session(super_token) as api: 

661 res = api.ListUserIds( 

662 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

663 ) 

664 assert res.user_ids == [] 

665 

666 

667def test_EditReferenceText(db): 

668 super_user, super_token = generate_user(is_superuser=True) 

669 test_new_text = "New Text" 

670 

671 user1, user1_token = generate_user() 

672 user2, user2_token = generate_user() 

673 

674 with session_scope() as session: 

675 with references_session(user1_token) as api: 

676 reference = api.WriteFriendReference( 

677 references_pb2.WriteFriendReferenceReq( 

678 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

679 ) 

680 ) 

681 

682 with real_admin_session(super_token) as admin_api: 

683 admin_api.EditReferenceText( 

684 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

685 ) 

686 

687 session.expire_all() 

688 

689 modified_reference = session.execute( 

690 select(Reference).where(Reference.id == reference.reference_id) 

691 ).scalar_one_or_none() 

692 assert modified_reference.text == test_new_text 

693 

694 

695def test_DeleteReference(db): 

696 super_user, super_token = generate_user(is_superuser=True) 

697 

698 user1, user1_token = generate_user() 

699 user2, user2_token = generate_user() 

700 

701 with references_session(user1_token) as api: 

702 reference = api.WriteFriendReference( 

703 references_pb2.WriteFriendReferenceReq( 

704 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

705 ) 

706 ) 

707 

708 with references_session(user1_token) as api: 

709 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

710 

711 with real_admin_session(super_token) as admin_api: 

712 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

713 

714 with references_session(user1_token) as api: 

715 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

716 

717 with session_scope() as session: 

718 modified_reference = session.execute( 

719 select(Reference).where(Reference.id == reference.reference_id) 

720 ).scalar_one_or_none() 

721 assert modified_reference.is_deleted 

722 

723 

724# community invite feature tested in test_events.py