Coverage for src/tests/test_admin.py: 100%

292 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 17:19 +0000

1from datetime import date 

2from re import match 

3 

4import grpc 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers import errors 

9from couchers.db import session_scope 

10from couchers.models import Cluster, ContentReport, EventOccurrence, UserSession 

11from couchers.sql import couchers_select as select 

12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta 

13from proto import admin_pb2, events_pb2, reporting_pb2 

14from tests.test_communities import create_community 

15from tests.test_fixtures import ( # noqa 

16 db, 

17 email_fields, 

18 events_session, 

19 generate_user, 

20 get_user_id_and_token, 

21 mock_notification_email, 

22 push_collector, 

23 real_admin_session, 

24 reporting_session, 

25 testconfig, 

26) 

27 

28 

29@pytest.fixture(autouse=True) 

30def _(testconfig): 

31 pass 

32 

33 

34def test_access_by_normal_user(db): 

35 normal_user, normal_token = generate_user() 

36 

37 with real_admin_session(normal_token) as api: 

38 # all requests to the admin servicer should break when done by a non-super_user 

39 with pytest.raises(grpc.RpcError) as e: 

40 api.GetUserDetails( 

41 admin_pb2.GetUserDetailsReq( 

42 user=str(normal_user.id), 

43 ) 

44 ) 

45 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

46 

47 

48def test_GetUserDetails(db): 

49 super_user, super_token = generate_user(is_superuser=True) 

50 normal_user, normal_token = generate_user() 

51 

52 with real_admin_session(super_token) as api: 

53 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

54 assert res.user_id == normal_user.id 

55 assert res.username == normal_user.username 

56 assert res.email == normal_user.email 

57 assert res.gender == normal_user.gender 

58 assert parse_date(res.birthdate) == normal_user.birthdate 

59 assert not res.banned 

60 assert not res.deleted 

61 

62 with real_admin_session(super_token) as api: 

63 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

64 assert res.user_id == normal_user.id 

65 assert res.username == normal_user.username 

66 assert res.email == normal_user.email 

67 assert res.gender == normal_user.gender 

68 assert parse_date(res.birthdate) == normal_user.birthdate 

69 assert not res.banned 

70 assert not res.deleted 

71 

72 with real_admin_session(super_token) as api: 

73 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

74 assert res.user_id == normal_user.id 

75 assert res.username == normal_user.username 

76 assert res.email == normal_user.email 

77 assert res.gender == normal_user.gender 

78 assert parse_date(res.birthdate) == normal_user.birthdate 

79 assert not res.banned 

80 assert not res.deleted 

81 

82 

83def test_ChangeUserGender(db, push_collector): 

84 super_user, super_token = generate_user(is_superuser=True) 

85 normal_user, normal_token = generate_user() 

86 

87 with real_admin_session(super_token) as api: 

88 with mock_notification_email() as mock: 

89 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

90 assert res.user_id == normal_user.id 

91 assert res.username == normal_user.username 

92 assert res.email == normal_user.email 

93 assert res.gender == "Machine" 

94 assert parse_date(res.birthdate) == normal_user.birthdate 

95 assert not res.banned 

96 assert not res.deleted 

97 

98 mock.assert_called_once() 

99 e = email_fields(mock) 

100 assert e.subject == "[TEST] Your gender was changed" 

101 assert e.recipient == normal_user.email 

102 assert "Machine" in e.plain 

103 assert "Machine" in e.html 

104 

105 push_collector.assert_user_has_single_matching( 

106 normal_user.id, 

107 title="Your gender was changed", 

108 body="Your gender on Couchers.org was changed to Machine by an admin.", 

109 ) 

110 

111 

112def test_ChangeUserBirthdate(db, push_collector): 

113 super_user, super_token = generate_user(is_superuser=True) 

114 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

115 

116 with real_admin_session(super_token) as api: 

117 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

118 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

119 

120 with mock_notification_email() as mock: 

121 res = api.ChangeUserBirthdate( 

122 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

123 ) 

124 

125 assert res.user_id == normal_user.id 

126 assert res.username == normal_user.username 

127 assert res.email == normal_user.email 

128 assert res.birthdate == "1990-05-25" 

129 assert res.gender == normal_user.gender 

130 assert not res.banned 

131 assert not res.deleted 

132 

133 mock.assert_called_once() 

134 e = email_fields(mock) 

135 assert e.subject == "[TEST] Your date of birth was changed" 

136 assert e.recipient == normal_user.email 

137 assert "1990" in e.plain 

138 assert "1990" in e.html 

139 

140 push_collector.assert_user_has_single_matching( 

141 normal_user.id, 

142 title="Your date of birth was changed", 

143 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.", 

144 ) 

145 

146 

147def test_BanUser(db): 

148 super_user, super_token = generate_user(is_superuser=True) 

149 normal_user, _ = generate_user() 

150 admin_note = "A good reason" 

151 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

152 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

153 

154 with real_admin_session(super_token) as api: 

155 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note)) 

156 assert res.user_id == normal_user.id 

157 assert res.username == normal_user.username 

158 assert res.email == normal_user.email 

159 assert res.gender == normal_user.gender 

160 assert parse_date(res.birthdate) == normal_user.birthdate 

161 assert res.banned 

162 assert not res.deleted 

163 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

164 

165 

166def test_UnbanUser(db): 

167 super_user, super_token = generate_user(is_superuser=True) 

168 normal_user, _ = generate_user() 

169 admin_note = "A good reason" 

170 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

171 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

172 

173 with real_admin_session(super_token) as api: 

174 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note)) 

175 assert res.user_id == normal_user.id 

176 assert res.username == normal_user.username 

177 assert res.email == normal_user.email 

178 assert res.gender == normal_user.gender 

179 assert parse_date(res.birthdate) == normal_user.birthdate 

180 assert not res.banned 

181 assert not res.deleted 

182 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note) 

183 

184 

185def test_AddAdminNote(db): 

186 super_user, super_token = generate_user(is_superuser=True) 

187 normal_user, _ = generate_user() 

188 admin_note1 = "User reported strange behavior" 

189 admin_note2 = "Insert private information here" 

190 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00" 

191 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)" 

192 

193 with real_admin_session(super_token) as api: 

194 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1)) 

195 assert res.user_id == normal_user.id 

196 assert res.username == normal_user.username 

197 assert res.email == normal_user.email 

198 assert res.gender == normal_user.gender 

199 assert parse_date(res.birthdate) == normal_user.birthdate 

200 assert not res.banned 

201 assert not res.deleted 

202 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note) 

203 

204 with real_admin_session(super_token) as api: 

205 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2)) 

206 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note) 

207 

208 

209def test_AddAdminNote_blank(db): 

210 super_user, super_token = generate_user(is_superuser=True) 

211 normal_user, _ = generate_user() 

212 empty_admin_note = " \t \n " 

213 

214 with real_admin_session(super_token) as api: 

215 with pytest.raises(grpc.RpcError) as e: 

216 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note)) 

217 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

218 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY 

219 

220 

221def test_admin_content_reports(db): 

222 super_user, super_token = generate_user(is_superuser=True) 

223 normal_user, token = generate_user() 

224 bad_user1, _ = generate_user() 

225 bad_user2, _ = generate_user() 

226 

227 with reporting_session(token) as api: 

228 api.Report( 

229 reporting_pb2.ReportReq( 

230 reason="spam", 

231 description="r1", 

232 content_ref="comment/123", 

233 author_user=bad_user1.username, 

234 user_agent="n/a", 

235 page="https://couchers.org/comment/123", 

236 ) 

237 ) 

238 api.Report( 

239 reporting_pb2.ReportReq( 

240 reason="spam", 

241 description="r2", 

242 content_ref="comment/124", 

243 author_user=bad_user2.username, 

244 user_agent="n/a", 

245 page="https://couchers.org/comment/124", 

246 ) 

247 ) 

248 api.Report( 

249 reporting_pb2.ReportReq( 

250 reason="something else", 

251 description="r3", 

252 content_ref="page/321", 

253 author_user=bad_user1.username, 

254 user_agent="n/a", 

255 page="https://couchers.org/page/321", 

256 ) 

257 ) 

258 

259 with session_scope() as session: 

260 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all()) 

261 

262 with real_admin_session(super_token) as api: 

263 with pytest.raises(grpc.RpcError) as e: 

264 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1)) 

265 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

266 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND 

267 

268 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"])) 

269 rep = res.content_report 

270 assert rep.content_report_id == id_by_description["r2"] 

271 assert rep.reporting_user_id == normal_user.id 

272 assert rep.author_user_id == bad_user2.id 

273 assert rep.reason == "spam" 

274 assert rep.description == "r2" 

275 assert rep.content_ref == "comment/124" 

276 assert rep.user_agent == "n/a" 

277 assert rep.page == "https://couchers.org/comment/124" 

278 

279 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username)) 

280 assert res.content_reports[0].content_report_id == id_by_description["r3"] 

281 assert res.content_reports[1].content_report_id == id_by_description["r1"] 

282 

283 

284def test_DeleteUser(db): 

285 super_user, super_token = generate_user(is_superuser=True) 

286 normal_user, normal_token = generate_user() 

287 

288 with real_admin_session(super_token) as api: 

289 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

290 assert res.user_id == normal_user.id 

291 assert res.username == normal_user.username 

292 assert res.email == normal_user.email 

293 assert res.gender == normal_user.gender 

294 assert parse_date(res.birthdate) == normal_user.birthdate 

295 assert not res.banned 

296 assert res.deleted 

297 

298 

299def test_CreateApiKey(db, push_collector): 

300 with session_scope() as session: 

301 super_user, super_token = generate_user(is_superuser=True) 

302 normal_user, normal_token = generate_user() 

303 

304 assert ( 

305 session.execute( 

306 select(func.count()) 

307 .select_from(UserSession) 

308 .where(UserSession.is_api_key == True) 

309 .where(UserSession.user_id == normal_user.id) 

310 ).scalar_one() 

311 == 0 

312 ) 

313 

314 with mock_notification_email() as mock: 

315 with real_admin_session(super_token) as api: 

316 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

317 

318 mock.assert_called_once() 

319 e = email_fields(mock) 

320 assert e.subject == "[TEST] Your API key for Couchers.org" 

321 

322 with session_scope() as session: 

323 api_key = session.execute( 

324 select(UserSession) 

325 .where(UserSession.is_valid) 

326 .where(UserSession.is_api_key == True) 

327 .where(UserSession.user_id == normal_user.id) 

328 ).scalar_one() 

329 

330 assert api_key.token in e.plain 

331 assert api_key.token in e.html 

332 

333 assert e.recipient == normal_user.email 

334 assert "api key" in e.subject.lower() 

335 unique_string = "We've issued you with the following API key:" 

336 assert unique_string in e.plain 

337 assert unique_string in e.html 

338 assert "support@couchers.org" in e.plain 

339 assert "support@couchers.org" in e.html 

340 

341 push_collector.assert_user_has_single_matching( 

342 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email." 

343 ) 

344 

345 

346VALID_GEOJSON_MULTIPOLYGON = """ 

347 { 

348 "type": "MultiPolygon", 

349 "coordinates": 

350 [ 

351 [ 

352 [ 

353 [ 

354 -73.98114904754641, 

355 40.7470284264813 

356 ], 

357 [ 

358 -73.98314135177611, 

359 40.73416844413217 

360 ], 

361 [ 

362 -74.00538969848634, 

363 40.734314779027144 

364 ], 

365 [ 

366 -74.00479214294432, 

367 40.75027851544338 

368 ], 

369 [ 

370 -73.98114904754641, 

371 40.7470284264813 

372 ] 

373 ] 

374 ] 

375 ] 

376 } 

377""" 

378 

379POINT_GEOJSON = """ 

380{ "type": "Point", "coordinates": [100.0, 0.0] } 

381""" 

382 

383 

384def test_CreateCommunity_invalid_geojson(db): 

385 super_user, super_token = generate_user(is_superuser=True) 

386 normal_user, normal_token = generate_user() 

387 with real_admin_session(super_token) as api: 

388 with pytest.raises(grpc.RpcError) as e: 

389 api.CreateCommunity( 

390 admin_pb2.CreateCommunityReq( 

391 name="test community", 

392 description="community for testing", 

393 admin_ids=[], 

394 geojson=POINT_GEOJSON, 

395 ) 

396 ) 

397 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

398 assert e.value.details() == errors.NO_MULTIPOLYGON 

399 

400 

401def test_CreateCommunity(db): 

402 with session_scope() as session: 

403 super_user, super_token = generate_user(is_superuser=True) 

404 normal_user, normal_token = generate_user() 

405 with real_admin_session(super_token) as api: 

406 api.CreateCommunity( 

407 admin_pb2.CreateCommunityReq( 

408 name="test community", 

409 description="community for testing", 

410 admin_ids=[], 

411 geojson=VALID_GEOJSON_MULTIPOLYGON, 

412 ) 

413 ) 

414 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

415 assert community.description == "community for testing" 

416 assert community.slug == "test-community" 

417 

418 

419def test_GetChats(db): 

420 super_user, super_token = generate_user(is_superuser=True) 

421 normal_user, normal_token = generate_user() 

422 

423 with real_admin_session(super_token) as api: 

424 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

425 assert res.response 

426 

427 

428def test_badges(db, push_collector): 

429 super_user, super_token = generate_user(is_superuser=True) 

430 normal_user, normal_token = generate_user() 

431 

432 with real_admin_session(super_token) as api: 

433 # can add a badge 

434 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

435 with mock_notification_email() as mock: 

436 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

437 assert "volunteer" in res.badges 

438 

439 # badge emails are disabled by default 

440 mock.assert_not_called() 

441 

442 push_collector.assert_user_has_single_matching( 

443 normal_user.id, 

444 title="The Active Volunteer badge was added to your profile", 

445 body="Check out your profile to see the new badge!", 

446 ) 

447 

448 # can't add/edit special tags 

449 with pytest.raises(grpc.RpcError) as e: 

450 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder")) 

451 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

452 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE 

453 

454 # double add badge 

455 with pytest.raises(grpc.RpcError) as e: 

456 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer")) 

457 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

458 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE 

459 

460 # can remove badge 

461 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges 

462 with mock_notification_email() as mock: 

463 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

464 assert "volunteer" not in res.badges 

465 

466 # badge emails are disabled by default 

467 mock.assert_not_called() 

468 

469 push_collector.assert_user_push_matches_fields( 

470 normal_user.id, 

471 ix=1, 

472 title="The Active Volunteer badge was removed from your profile", 

473 body="You can see all your badges on your profile.", 

474 ) 

475 

476 # not found on user 

477 with pytest.raises(grpc.RpcError) as e: 

478 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer")) 

479 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

480 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE 

481 

482 # not found in general 

483 with pytest.raises(grpc.RpcError) as e: 

484 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge")) 

485 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

486 assert e.value.details() == errors.BADGE_NOT_FOUND 

487 

488 

489def test_DeleteEvent(db): 

490 super_user, super_token = generate_user(is_superuser=True) 

491 normal_user, normal_token = generate_user() 

492 

493 with session_scope() as session: 

494 create_community(session, 0, 2, "Community", [normal_user], [], None) 

495 

496 start_time = now() + timedelta(hours=2) 

497 end_time = start_time + timedelta(hours=3) 

498 with events_session(normal_token) as api: 

499 res = api.CreateEvent( 

500 events_pb2.CreateEventReq( 

501 title="Dummy Title", 

502 content="Dummy content.", 

503 photo_key=None, 

504 offline_information=events_pb2.OfflineEventInformation( 

505 address="Near Null Island", 

506 lat=0.1, 

507 lng=0.2, 

508 ), 

509 start_time=Timestamp_from_datetime(start_time), 

510 end_time=Timestamp_from_datetime(end_time), 

511 timezone="UTC", 

512 ) 

513 ) 

514 event_id = res.event_id 

515 assert not res.is_deleted 

516 

517 with session_scope() as session: 

518 with real_admin_session(super_token) as api: 

519 api.DeleteEvent( 

520 admin_pb2.DeleteEventReq( 

521 event_id=event_id, 

522 ) 

523 ) 

524 occurrence = session.get(EventOccurrence, ident=event_id) 

525 assert occurrence.is_deleted 

526 

527 

528# community invite feature tested in test_events.py