Coverage for src/tests/test_admin.py: 100%

396 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1from datetime import date, datetime 

2from re import match 

3 

4import grpc 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers import errors 

9from couchers.db import session_scope 

10from couchers.models import AccountDeletionToken, Cluster, ContentReport, EventOccurrence, Node, Reference, UserSession 

11from couchers.sql import couchers_select as select 

12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta 

13from proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2 

14from tests.test_communities import create_community 

15from tests.test_fixtures import ( # noqa 

16 auth_api_session, 

17 db, 

18 email_fields, 

19 events_session, 

20 generate_user, 

21 get_user_id_and_token, 

22 mock_notification_email, 

23 push_collector, 

24 real_admin_session, 

25 references_session, 

26 reporting_session, 

27 testconfig, 

28) 

29 

30 

31@pytest.fixture(autouse=True) 

32def _(testconfig): 

33 pass 

34 

35 

36def test_access_by_normal_user(db): 

37 normal_user, normal_token = generate_user() 

38 

39 with real_admin_session(normal_token) as api: 

40 # all requests to the admin servicer should break when done by a non-super_user 

41 with pytest.raises(grpc.RpcError) as e: 

42 api.GetUserDetails( 

43 admin_pb2.GetUserDetailsReq( 

44 user=str(normal_user.id), 

45 ) 

46 ) 

47 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

48 

49 

50def test_GetUser(db): 

51 super_user, super_token = generate_user(is_superuser=True) 

52 normal_user, normal_token = generate_user() 

53 

54 with real_admin_session(super_token) as api: 

55 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

56 assert res.user_id == normal_user.id 

57 assert res.username == normal_user.username 

58 

59 with real_admin_session(super_token) as api: 

60 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning")) 

61 

62 with real_admin_session(super_token) as api: 

63 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id))) 

64 assert res.user_id == normal_user.id 

65 assert res.username == normal_user.username 

66 

67 

68def test_GetUserDetails(db): 

69 super_user, super_token = generate_user(is_superuser=True) 

70 normal_user, normal_token = generate_user() 

71 

72 with real_admin_session(super_token) as api: 

73 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

74 assert res.user_id == normal_user.id 

75 assert res.username == normal_user.username 

76 assert res.email == normal_user.email 

77 assert res.gender == normal_user.gender 

78 assert parse_date(res.birthdate) == normal_user.birthdate 

79 assert not res.banned 

80 assert not res.deleted 

81 

82 with real_admin_session(super_token) as api: 

83 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

84 assert res.user_id == normal_user.id 

85 assert res.username == normal_user.username 

86 assert res.email == normal_user.email 

87 assert res.gender == normal_user.gender 

88 assert parse_date(res.birthdate) == normal_user.birthdate 

89 assert not res.banned 

90 assert not res.deleted 

91 

92 with real_admin_session(super_token) as api: 

93 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

94 assert res.user_id == normal_user.id 

95 assert res.username == normal_user.username 

96 assert res.email == normal_user.email 

97 assert res.gender == normal_user.gender 

98 assert parse_date(res.birthdate) == normal_user.birthdate 

99 assert not res.banned 

100 assert not res.deleted 

101 

102 

103def test_ChangeUserGender(db, push_collector): 

104 super_user, super_token = generate_user(is_superuser=True) 

105 normal_user, normal_token = generate_user() 

106 

107 with real_admin_session(super_token) as api: 

108 with mock_notification_email() as mock: 

109 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

110 assert res.user_id == normal_user.id 

111 assert res.username == normal_user.username 

112 assert res.email == normal_user.email 

113 assert res.gender == "Machine" 

114 assert parse_date(res.birthdate) == normal_user.birthdate 

115 assert not res.banned 

116 assert not res.deleted 

117 

118 mock.assert_called_once() 

119 e = email_fields(mock) 

120 assert e.subject == "[TEST] Your gender was changed" 

121 assert e.recipient == normal_user.email 

122 assert "Machine" in e.plain 

123 assert "Machine" in e.html 

124 

125 push_collector.assert_user_has_single_matching( 

126 normal_user.id, 

127 title="Your gender was changed", 

128 body="Your gender on Couchers.org was changed to Machine by an admin.", 

129 ) 

130 

131 

132def test_ChangeUserBirthdate(db, push_collector): 

133 super_user, super_token = generate_user(is_superuser=True) 

134 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

135 

136 with real_admin_session(super_token) as api: 

137 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

138 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

139 

140 with mock_notification_email() as mock: 

141 res = api.ChangeUserBirthdate( 

142 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

143 ) 

144 

145 assert res.user_id == normal_user.id 

146 assert res.username == normal_user.username 

147 assert res.email == normal_user.email 

148 assert res.birthdate == "1990-05-25" 

149 assert res.gender == normal_user.gender 

150 assert not res.banned 

151 assert not res.deleted 

152 

153 mock.assert_called_once() 

154 e = email_fields(mock) 

155 assert e.subject == "[TEST] Your date of birth was changed" 

156 assert e.recipient == normal_user.email 

157 assert "1990" in e.plain 

158 assert "1990" in e.html 

159 

160 push_collector.assert_user_has_single_matching( 

161 normal_user.id, 

162 title="Your date of birth was changed", 

163 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.", 

164 ) 

165 

166 

167def test_BanUser(db): 

168 super_user, super_token = generate_user(is_superuser=True) 

169 normal_user, _ = generate_user() 

170 admin_note = "A good reason" 

171 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

172 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

173 

174 with real_admin_session(super_token) as api: 

175 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

176 assert res.user_id == normal_user.id 

177 assert res.username == normal_user.username 

178 assert res.email == normal_user.email 

179 assert res.gender == normal_user.gender 

180 assert parse_date(res.birthdate) == normal_user.birthdate 

181 assert res.banned 

182 assert not res.deleted 

183 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

184 

185 

186def test_UnbanUser(db): 

187 super_user, super_token = generate_user(is_superuser=True) 

188 normal_user, _ = generate_user() 

189 admin_note = "A good reason" 

190 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

191 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

192 

193 with real_admin_session(super_token) as api: 

194 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

195 assert res.user_id == normal_user.id 

196 assert res.username == normal_user.username 

197 assert res.email == normal_user.email 

198 assert res.gender == normal_user.gender 

199 assert parse_date(res.birthdate) == normal_user.birthdate 

200 assert not res.banned 

201 assert not res.deleted 

202 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

203 

204 

205def test_AddAdminNote(db): 

206 super_user, super_token = generate_user(is_superuser=True) 

207 normal_user, _ = generate_user() 

208 admin_note1 = "User reported strange behavior" 

209 admin_note2 = "Insert private information here" 

210 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

211 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

212 

213 with real_admin_session(super_token) as api: 

214 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

215 assert res.user_id == normal_user.id 

216 assert res.username == normal_user.username 

217 assert res.email == normal_user.email 

218 assert res.gender == normal_user.gender 

219 assert parse_date(res.birthdate) == normal_user.birthdate 

220 assert not res.banned 

221 assert not res.deleted 

222 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note) 

223 

224 with real_admin_session(super_token) as api: 

225 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

226 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note) 

227 

228 

229def test_AddAdminNote_blank(db): 

230 super_user, super_token = generate_user(is_superuser=True) 

231 normal_user, _ = generate_user() 

232 empty_admin_note = " \t \n " 

233 

234 with real_admin_session(super_token) as api: 

235 with pytest.raises(grpc.RpcError) as e: 

236 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

237 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

238 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY 

239 

240 

241def test_admin_content_reports(db): 

242 super_user, super_token = generate_user(is_superuser=True) 

243 normal_user, token = generate_user() 

244 bad_user1, _ = generate_user() 

245 bad_user2, _ = generate_user() 

246 

247 with reporting_session(token) as api: 

248 api.Report( 

249 reporting_pb2.ReportReq( 

250 reason="spam", 

251 description="r1", 

252 content_ref="comment/123", 

253 author_user=bad_user1.username, 

254 user_agent="n/a", 

255 page="https://couchers.org/comment/123", 

256 ) 

257 ) 

258 api.Report( 

259 reporting_pb2.ReportReq( 

260 reason="spam", 

261 description="r2", 

262 content_ref="comment/124", 

263 author_user=bad_user2.username, 

264 user_agent="n/a", 

265 page="https://couchers.org/comment/124", 

266 ) 

267 ) 

268 api.Report( 

269 reporting_pb2.ReportReq( 

270 reason="something else", 

271 description="r3", 

272 content_ref="page/321", 

273 author_user=bad_user1.username, 

274 user_agent="n/a", 

275 page="https://couchers.org/page/321", 

276 ) 

277 ) 

278 

279 with session_scope() as session: 

280 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all()) 

281 

282 with real_admin_session(super_token) as api: 

283 with pytest.raises(grpc.RpcError) as e: 

284 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

285 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

286 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND 

287 

288 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

289 rep = res.content_report 

290 assert rep.content_report_id == id_by_description["r2"] 

291 assert rep.reporting_user_id == normal_user.id 

292 assert rep.author_user_id == bad_user2.id 

293 assert rep.reason == "spam" 

294 assert rep.description == "r2" 

295 assert rep.content_ref == "comment/124" 

296 assert rep.user_agent == "n/a" 

297 assert rep.page == "https://couchers.org/comment/124" 

298 

299 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

300 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

301 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

302 

303 

304def test_DeleteUser(db): 

305 super_user, super_token = generate_user(is_superuser=True) 

306 normal_user, normal_token = generate_user() 

307 

308 with real_admin_session(super_token) as api: 

309 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

310 assert res.user_id == normal_user.id 

311 assert res.username == normal_user.username 

312 assert res.email == normal_user.email 

313 assert res.gender == normal_user.gender 

314 assert parse_date(res.birthdate) == normal_user.birthdate 

315 assert not res.banned 

316 assert res.deleted 

317 

318 

319def test_CreateApiKey(db, push_collector): 

320 with session_scope() as session: 

321 super_user, super_token = generate_user(is_superuser=True) 

322 normal_user, normal_token = generate_user() 

323 

324 assert ( 

325 session.execute( 

326 select(func.count()) 

327 .select_from(UserSession) 

328 .where(UserSession.is_api_key == True) 

329 .where(UserSession.user_id == normal_user.id) 

330 ).scalar_one() 

331 == 0 

332 ) 

333 

334 with mock_notification_email() as mock: 

335 with real_admin_session(super_token) as api: 

336 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

337 

338 mock.assert_called_once() 

339 e = email_fields(mock) 

340 assert e.subject == "[TEST] Your API key for Couchers.org" 

341 

342 with session_scope() as session: 

343 api_key = session.execute( 

344 select(UserSession) 

345 .where(UserSession.is_valid) 

346 .where(UserSession.is_api_key == True) 

347 .where(UserSession.user_id == normal_user.id) 

348 ).scalar_one() 

349 

350 assert api_key.token in e.plain 

351 assert api_key.token in e.html 

352 

353 assert e.recipient == normal_user.email 

354 assert "api key" in e.subject.lower() 

355 unique_string = "We've issued you with the following API key:" 

356 assert unique_string in e.plain 

357 assert unique_string in e.html 

358 assert "support@couchers.org" in e.plain 

359 assert "support@couchers.org" in e.html 

360 

361 push_collector.assert_user_has_single_matching( 

362 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email." 

363 ) 

364 

365 

366VALID_GEOJSON_MULTIPOLYGON = """ 

367 { 

368 "type": "MultiPolygon", 

369 "coordinates": 

370 [ 

371 [ 

372 [ 

373 [ 

374 -73.98114904754641, 

375 40.7470284264813 

376 ], 

377 [ 

378 -73.98314135177611, 

379 40.73416844413217 

380 ], 

381 [ 

382 -74.00538969848634, 

383 40.734314779027144 

384 ], 

385 [ 

386 -74.00479214294432, 

387 40.75027851544338 

388 ], 

389 [ 

390 -73.98114904754641, 

391 40.7470284264813 

392 ] 

393 ] 

394 ] 

395 ] 

396 } 

397""" 

398 

399POINT_GEOJSON = """ 

400{ "type": "Point", "coordinates": [100.0, 0.0] } 

401""" 

402 

403 

404def test_CreateCommunity_invalid_geojson(db): 

405 super_user, super_token = generate_user(is_superuser=True) 

406 normal_user, normal_token = generate_user() 

407 with real_admin_session(super_token) as api: 

408 with pytest.raises(grpc.RpcError) as e: 

409 api.CreateCommunity( 

410 admin_pb2.CreateCommunityReq( 

411 name="test community", 

412 description="community for testing", 

413 admin_ids=[], 

414 geojson=POINT_GEOJSON, 

415 ) 

416 ) 

417 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

418 assert e.value.details() == errors.NO_MULTIPOLYGON 

419 

420 

421def test_CreateCommunity(db): 

422 with session_scope() as session: 

423 super_user, super_token = generate_user(is_superuser=True) 

424 normal_user, normal_token = generate_user() 

425 with real_admin_session(super_token) as api: 

426 api.CreateCommunity( 

427 admin_pb2.CreateCommunityReq( 

428 name="test community", 

429 description="community for testing", 

430 admin_ids=[], 

431 geojson=VALID_GEOJSON_MULTIPOLYGON, 

432 ) 

433 ) 

434 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

435 assert community.description == "community for testing" 

436 assert community.slug == "test-community" 

437 

438 

439def test_UpdateCommunity_invalid_geojson(db): 

440 super_user, super_token = generate_user(is_superuser=True) 

441 

442 with session_scope() as session: 

443 with real_admin_session(super_token) as api: 

444 api.CreateCommunity( 

445 admin_pb2.CreateCommunityReq( 

446 name="test community", 

447 description="community for testing", 

448 admin_ids=[], 

449 geojson=VALID_GEOJSON_MULTIPOLYGON, 

450 ) 

451 ) 

452 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

453 

454 with pytest.raises(grpc.RpcError) as e: 

455 api.UpdateCommunity( 

456 admin_pb2.UpdateCommunityReq( 

457 community_id=community.parent_node_id, 

458 name="test community 2", 

459 description="community for testing 2", 

460 geojson=POINT_GEOJSON, 

461 ) 

462 ) 

463 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

464 assert e.value.details() == errors.NO_MULTIPOLYGON 

465 

466 

467def test_UpdateCommunity_invalid_id(db): 

468 super_user, super_token = generate_user(is_superuser=True) 

469 

470 with session_scope() as session: 

471 with real_admin_session(super_token) as api: 

472 api.CreateCommunity( 

473 admin_pb2.CreateCommunityReq( 

474 name="test community", 

475 description="community for testing", 

476 admin_ids=[], 

477 geojson=VALID_GEOJSON_MULTIPOLYGON, 

478 ) 

479 ) 

480 

481 with pytest.raises(grpc.RpcError) as e: 

482 api.UpdateCommunity( 

483 admin_pb2.UpdateCommunityReq( 

484 community_id=1000, 

485 name="test community 1000", 

486 description="community for testing 1000", 

487 geojson=VALID_GEOJSON_MULTIPOLYGON, 

488 ) 

489 ) 

490 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

491 assert e.value.details() == errors.COMMUNITY_NOT_FOUND 

492 

493 

494def test_UpdateCommunity(db): 

495 super_user, super_token = generate_user(is_superuser=True) 

496 

497 with session_scope() as session: 

498 with real_admin_session(super_token) as api: 

499 api.CreateCommunity( 

500 admin_pb2.CreateCommunityReq( 

501 name="test community", 

502 description="community for testing", 

503 admin_ids=[], 

504 geojson=VALID_GEOJSON_MULTIPOLYGON, 

505 ) 

506 ) 

507 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

508 assert community.description == "community for testing" 

509 

510 api.CreateCommunity( 

511 admin_pb2.CreateCommunityReq( 

512 name="test community 2", 

513 description="community for testing 2", 

514 admin_ids=[], 

515 geojson=VALID_GEOJSON_MULTIPOLYGON, 

516 ) 

517 ) 

518 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one() 

519 

520 api.UpdateCommunity( 

521 admin_pb2.UpdateCommunityReq( 

522 community_id=community.parent_node_id, 

523 name="test community 2", 

524 description="community for testing 2", 

525 geojson=VALID_GEOJSON_MULTIPOLYGON, 

526 parent_node_id=community_2.parent_node_id, 

527 ) 

528 ) 

529 session.commit() 

530 

531 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one() 

532 assert community_updated.description == "community for testing 2" 

533 assert community_updated.slug == "test-community-2" 

534 

535 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one() 

536 assert node_updated.parent_node_id == community_2.parent_node_id 

537 

538 

539def test_GetChats(db): 

540 super_user, super_token = generate_user(is_superuser=True) 

541 normal_user, normal_token = generate_user() 

542 

543 with real_admin_session(super_token) as api: 

544 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

545 assert res.response 

546 

547 

548def test_badges(db, push_collector): 

549 super_user, super_token = generate_user(is_superuser=True) 

550 normal_user, normal_token = generate_user() 

551 

552 with real_admin_session(super_token) as api: 

553 # can add a badge 

554 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

555 with mock_notification_email() as mock: 

556 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

557 assert "volunteer" in res.badges 

558 

559 # badge emails are disabled by default 

560 mock.assert_not_called() 

561 

562 push_collector.assert_user_has_single_matching( 

563 normal_user.id, 

564 title="The Active Volunteer badge was added to your profile", 

565 body="Check out your profile to see the new badge!", 

566 ) 

567 

568 # can't add/edit special tags 

569 with pytest.raises(grpc.RpcError) as e: 

570 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

571 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

572 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE 

573 

574 # double add badge 

575 with pytest.raises(grpc.RpcError) as e: 

576 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

577 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

578 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE 

579 

580 # can remove badge 

581 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

582 with mock_notification_email() as mock: 

583 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

584 assert "volunteer" not in res.badges 

585 

586 # badge emails are disabled by default 

587 mock.assert_not_called() 

588 

589 push_collector.assert_user_push_matches_fields( 

590 normal_user.id, 

591 ix=1, 

592 title="The Active Volunteer badge was removed from your profile", 

593 body="You can see all your badges on your profile.", 

594 ) 

595 

596 # not found on user 

597 with pytest.raises(grpc.RpcError) as e: 

598 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

599 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

600 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE 

601 

602 # not found in general 

603 with pytest.raises(grpc.RpcError) as e: 

604 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

605 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

606 assert e.value.details() == errors.BADGE_NOT_FOUND 

607 

608 

609def test_DeleteEvent(db): 

610 super_user, super_token = generate_user(is_superuser=True) 

611 normal_user, normal_token = generate_user() 

612 

613 with session_scope() as session: 

614 create_community(session, 0, 2, "Community", [normal_user], [], None) 

615 

616 start_time = now() + timedelta(hours=2) 

617 end_time = start_time + timedelta(hours=3) 

618 with events_session(normal_token) as api: 

619 res = api.CreateEvent( 

620 events_pb2.CreateEventReq( 

621 title="Dummy Title", 

622 content="Dummy content.", 

623 photo_key=None, 

624 offline_information=events_pb2.OfflineEventInformation( 

625 address="Near Null Island", 

626 lat=0.1, 

627 lng=0.2, 

628 ), 

629 start_time=Timestamp_from_datetime(start_time), 

630 end_time=Timestamp_from_datetime(end_time), 

631 timezone="UTC", 

632 ) 

633 ) 

634 event_id = res.event_id 

635 assert not res.is_deleted 

636 

637 with session_scope() as session: 

638 with real_admin_session(super_token) as api: 

639 api.DeleteEvent( 

640 admin_pb2.DeleteEventReq( 

641 event_id=event_id, 

642 ) 

643 ) 

644 occurrence = session.get(EventOccurrence, ident=event_id) 

645 assert occurrence.is_deleted 

646 

647 

648def test_ListUserIds(db): 

649 super_user, super_token = generate_user(is_superuser=True) 

650 normal_user, normal_token = generate_user() 

651 

652 with real_admin_session(super_token) as api: 

653 res = api.ListUserIds( 

654 admin_pb2.ListUserIdsReq( 

655 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now()) 

656 ) 

657 ) 

658 assert len(res.user_ids) == 2 

659 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id]) 

660 

661 with real_admin_session(super_token) as api: 

662 res = api.ListUserIds( 

663 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now())) 

664 ) 

665 assert res.user_ids == [] 

666 

667 

668def test_EditReferenceText(db): 

669 super_user, super_token = generate_user(is_superuser=True) 

670 test_new_text = "New Text" 

671 

672 user1, user1_token = generate_user() 

673 user2, user2_token = generate_user() 

674 

675 with session_scope() as session: 

676 with references_session(user1_token) as api: 

677 reference = api.WriteFriendReference( 

678 references_pb2.WriteFriendReferenceReq( 

679 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

680 ) 

681 ) 

682 

683 with real_admin_session(super_token) as admin_api: 

684 admin_api.EditReferenceText( 

685 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text) 

686 ) 

687 

688 session.expire_all() 

689 

690 modified_reference = session.execute( 

691 select(Reference).where(Reference.id == reference.reference_id) 

692 ).scalar_one_or_none() 

693 assert modified_reference.text == test_new_text 

694 

695 

696def test_DeleteReference(db): 

697 super_user, super_token = generate_user(is_superuser=True) 

698 

699 user1, user1_token = generate_user() 

700 user2, user2_token = generate_user() 

701 

702 with references_session(user1_token) as api: 

703 reference = api.WriteFriendReference( 

704 references_pb2.WriteFriendReferenceReq( 

705 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1 

706 ) 

707 ) 

708 

709 with references_session(user1_token) as api: 

710 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

711 

712 with real_admin_session(super_token) as admin_api: 

713 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id)) 

714 

715 with references_session(user1_token) as api: 

716 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references 

717 

718 with session_scope() as session: 

719 modified_reference = session.execute( 

720 select(Reference).where(Reference.id == reference.reference_id) 

721 ).scalar_one_or_none() 

722 assert modified_reference.is_deleted 

723 

724 

725def test_admin_delete_account_url(db, push_collector): 

726 super_user, super_token = generate_user(is_superuser=True) 

727 

728 user, token = generate_user() 

729 user_id = user.id 

730 

731 with real_admin_session(super_token) as admin_api: 

732 url = admin_api.CreateAccountDeletionLink( 

733 admin_pb2.CreateAccountDeletionLinkReq(user=user.username) 

734 ).account_deletion_confirm_url 

735 

736 push_collector.assert_user_has_count(user_id, 0) 

737 

738 with session_scope() as session: 

739 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

740 token = token_o.token 

741 assert token_o.user.id == user_id 

742 assert url == f"http://localhost:3000/delete-account?token={token}" 

743 

744 with mock_notification_email() as mock: 

745 with auth_api_session() as (auth_api, metadata_interceptor): 

746 auth_api.ConfirmDeleteAccount( 

747 auth_pb2.ConfirmDeleteAccountReq( 

748 token=token, 

749 ) 

750 ) 

751 

752 push_collector.assert_user_push_matches_fields( 

753 user_id, 

754 ix=0, 

755 title="Your Couchers.org account has been deleted", 

756 body="You can still undo this by following the link we emailed to you within 7 days.", 

757 ) 

758 

759 mock.assert_called_once() 

760 e = email_fields(mock) 

761 

762 

763# community invite feature tested in test_events.py 

764# SendBlogPostNotification tested in test_notifications.py 

765# MarkUserNeedsLocationUpdate tested in test_jail.py