Coverage for src/tests/test_admin.py: 100%
405 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-05 18:47 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-05 18:47 +0000
1from datetime import date, datetime
2from re import match
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers import errors
9from couchers.db import session_scope
10from couchers.models import AccountDeletionToken, Cluster, ContentReport, EventOccurrence, Node, Reference, UserSession
11from couchers.sql import couchers_select as select
12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta
13from proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2
14from tests.test_communities import create_community
15from tests.test_fixtures import ( # noqa
16 auth_api_session,
17 db,
18 email_fields,
19 events_session,
20 generate_user,
21 get_user_id_and_token,
22 mock_notification_email,
23 push_collector,
24 real_admin_session,
25 references_session,
26 reporting_session,
27 testconfig,
28)
31@pytest.fixture(autouse=True)
32def _(testconfig):
33 pass
36def test_access_by_normal_user(db):
37 normal_user, normal_token = generate_user()
39 with real_admin_session(normal_token) as api:
40 # all requests to the admin servicer should break when done by a non-super_user
41 with pytest.raises(grpc.RpcError) as e:
42 api.GetUserDetails(
43 admin_pb2.GetUserDetailsReq(
44 user=str(normal_user.id),
45 )
46 )
47 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
50def test_GetUser(db):
51 super_user, super_token = generate_user(is_superuser=True)
52 normal_user, normal_token = generate_user()
54 with real_admin_session(super_token) as api:
55 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
56 assert res.user_id == normal_user.id
57 assert res.username == normal_user.username
59 with real_admin_session(super_token) as api:
60 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
62 with real_admin_session(super_token) as api:
63 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
64 assert res.user_id == normal_user.id
65 assert res.username == normal_user.username
68def test_GetUserDetails(db):
69 super_user, super_token = generate_user(is_superuser=True)
70 normal_user, normal_token = generate_user()
72 with real_admin_session(super_token) as api:
73 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
74 assert res.user_id == normal_user.id
75 assert res.username == normal_user.username
76 assert res.email == normal_user.email
77 assert res.gender == normal_user.gender
78 assert parse_date(res.birthdate) == normal_user.birthdate
79 assert not res.banned
80 assert not res.deleted
82 with real_admin_session(super_token) as api:
83 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
84 assert res.user_id == normal_user.id
85 assert res.username == normal_user.username
86 assert res.email == normal_user.email
87 assert res.gender == normal_user.gender
88 assert parse_date(res.birthdate) == normal_user.birthdate
89 assert not res.banned
90 assert not res.deleted
92 with real_admin_session(super_token) as api:
93 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
94 assert res.user_id == normal_user.id
95 assert res.username == normal_user.username
96 assert res.email == normal_user.email
97 assert res.gender == normal_user.gender
98 assert parse_date(res.birthdate) == normal_user.birthdate
99 assert not res.banned
100 assert not res.deleted
103def test_ChangeUserGender(db, push_collector):
104 super_user, super_token = generate_user(is_superuser=True)
105 normal_user, normal_token = generate_user()
107 with real_admin_session(super_token) as api:
108 with mock_notification_email() as mock:
109 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
110 assert res.user_id == normal_user.id
111 assert res.username == normal_user.username
112 assert res.email == normal_user.email
113 assert res.gender == "Machine"
114 assert parse_date(res.birthdate) == normal_user.birthdate
115 assert not res.banned
116 assert not res.deleted
118 mock.assert_called_once()
119 e = email_fields(mock)
120 assert e.subject == "[TEST] Your gender was changed"
121 assert e.recipient == normal_user.email
122 assert "Machine" in e.plain
123 assert "Machine" in e.html
125 push_collector.assert_user_has_single_matching(
126 normal_user.id,
127 title="Your gender was changed",
128 body="Your gender on Couchers.org was changed to Machine by an admin.",
129 )
132def test_ChangeUserBirthdate(db, push_collector):
133 super_user, super_token = generate_user(is_superuser=True)
134 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
136 with real_admin_session(super_token) as api:
137 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
138 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
140 with mock_notification_email() as mock:
141 res = api.ChangeUserBirthdate(
142 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
143 )
145 assert res.user_id == normal_user.id
146 assert res.username == normal_user.username
147 assert res.email == normal_user.email
148 assert res.birthdate == "1990-05-25"
149 assert res.gender == normal_user.gender
150 assert not res.banned
151 assert not res.deleted
153 mock.assert_called_once()
154 e = email_fields(mock)
155 assert e.subject == "[TEST] Your date of birth was changed"
156 assert e.recipient == normal_user.email
157 assert "1990" in e.plain
158 assert "1990" in e.html
160 push_collector.assert_user_has_single_matching(
161 normal_user.id,
162 title="Your date of birth was changed",
163 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.",
164 )
167def test_BanUser(db):
168 super_user, super_token = generate_user(is_superuser=True)
169 normal_user, _ = generate_user()
170 admin_note = "A good reason"
171 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
172 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
174 with real_admin_session(super_token) as api:
175 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
176 assert res.user_id == normal_user.id
177 assert res.username == normal_user.username
178 assert res.email == normal_user.email
179 assert res.gender == normal_user.gender
180 assert parse_date(res.birthdate) == normal_user.birthdate
181 assert res.banned
182 assert not res.deleted
183 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
186def test_UnbanUser(db):
187 super_user, super_token = generate_user(is_superuser=True)
188 normal_user, _ = generate_user()
189 admin_note = "A good reason"
190 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
191 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
193 with real_admin_session(super_token) as api:
194 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
195 assert res.user_id == normal_user.id
196 assert res.username == normal_user.username
197 assert res.email == normal_user.email
198 assert res.gender == normal_user.gender
199 assert parse_date(res.birthdate) == normal_user.birthdate
200 assert not res.banned
201 assert not res.deleted
202 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
205def test_AddAdminNote(db):
206 super_user, super_token = generate_user(is_superuser=True)
207 normal_user, _ = generate_user()
208 admin_note1 = "User reported strange behavior"
209 admin_note2 = "Insert private information here"
210 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
211 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
213 with real_admin_session(super_token) as api:
214 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
215 assert res.user_id == normal_user.id
216 assert res.username == normal_user.username
217 assert res.email == normal_user.email
218 assert res.gender == normal_user.gender
219 assert parse_date(res.birthdate) == normal_user.birthdate
220 assert not res.banned
221 assert not res.deleted
222 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
224 with real_admin_session(super_token) as api:
225 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
226 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
229def test_AddAdminNote_blank(db):
230 super_user, super_token = generate_user(is_superuser=True)
231 normal_user, _ = generate_user()
232 empty_admin_note = " \t \n "
234 with real_admin_session(super_token) as api:
235 with pytest.raises(grpc.RpcError) as e:
236 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
237 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
238 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY
241def test_admin_content_reports(db):
242 super_user, super_token = generate_user(is_superuser=True)
243 normal_user, token = generate_user()
244 bad_user1, _ = generate_user()
245 bad_user2, _ = generate_user()
247 with reporting_session(token) as api:
248 api.Report(
249 reporting_pb2.ReportReq(
250 reason="spam",
251 description="r1",
252 content_ref="comment/123",
253 author_user=bad_user1.username,
254 user_agent="n/a",
255 page="https://couchers.org/comment/123",
256 )
257 )
258 api.Report(
259 reporting_pb2.ReportReq(
260 reason="spam",
261 description="r2",
262 content_ref="comment/124",
263 author_user=bad_user2.username,
264 user_agent="n/a",
265 page="https://couchers.org/comment/124",
266 )
267 )
268 api.Report(
269 reporting_pb2.ReportReq(
270 reason="something else",
271 description="r3",
272 content_ref="page/321",
273 author_user=bad_user1.username,
274 user_agent="n/a",
275 page="https://couchers.org/page/321",
276 )
277 )
279 with session_scope() as session:
280 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all())
282 with real_admin_session(super_token) as api:
283 with pytest.raises(grpc.RpcError) as e:
284 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
285 assert e.value.code() == grpc.StatusCode.NOT_FOUND
286 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND
288 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
289 rep = res.content_report
290 assert rep.content_report_id == id_by_description["r2"]
291 assert rep.reporting_user_id == normal_user.id
292 assert rep.author_user_id == bad_user2.id
293 assert rep.reason == "spam"
294 assert rep.description == "r2"
295 assert rep.content_ref == "comment/124"
296 assert rep.user_agent == "n/a"
297 assert rep.page == "https://couchers.org/comment/124"
299 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
300 assert res.content_reports[0].content_report_id == id_by_description["r3"]
301 assert res.content_reports[1].content_report_id == id_by_description["r1"]
304def test_DeleteUser(db):
305 super_user, super_token = generate_user(is_superuser=True)
306 normal_user, normal_token = generate_user()
308 with real_admin_session(super_token) as api:
309 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
310 assert res.user_id == normal_user.id
311 assert res.username == normal_user.username
312 assert res.email == normal_user.email
313 assert res.gender == normal_user.gender
314 assert parse_date(res.birthdate) == normal_user.birthdate
315 assert not res.banned
316 assert res.deleted
318 with real_admin_session(super_token) as api:
319 res = api.RecoverDeletedUser(admin_pb2.RecoverDeletedUserReq(user=normal_user.username))
320 assert res.user_id == normal_user.id
321 assert res.username == normal_user.username
322 assert res.email == normal_user.email
323 assert res.gender == normal_user.gender
324 assert parse_date(res.birthdate) == normal_user.birthdate
325 assert not res.banned
326 assert not res.deleted
329def test_CreateApiKey(db, push_collector):
330 with session_scope() as session:
331 super_user, super_token = generate_user(is_superuser=True)
332 normal_user, normal_token = generate_user()
334 assert (
335 session.execute(
336 select(func.count())
337 .select_from(UserSession)
338 .where(UserSession.is_api_key == True)
339 .where(UserSession.user_id == normal_user.id)
340 ).scalar_one()
341 == 0
342 )
344 with mock_notification_email() as mock:
345 with real_admin_session(super_token) as api:
346 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
348 mock.assert_called_once()
349 e = email_fields(mock)
350 assert e.subject == "[TEST] Your API key for Couchers.org"
352 with session_scope() as session:
353 api_key = session.execute(
354 select(UserSession)
355 .where(UserSession.is_valid)
356 .where(UserSession.is_api_key == True)
357 .where(UserSession.user_id == normal_user.id)
358 ).scalar_one()
360 assert api_key.token in e.plain
361 assert api_key.token in e.html
363 assert e.recipient == normal_user.email
364 assert "api key" in e.subject.lower()
365 unique_string = "We've issued you with the following API key:"
366 assert unique_string in e.plain
367 assert unique_string in e.html
368 assert "support@couchers.org" in e.plain
369 assert "support@couchers.org" in e.html
371 push_collector.assert_user_has_single_matching(
372 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email."
373 )
376VALID_GEOJSON_MULTIPOLYGON = """
377 {
378 "type": "MultiPolygon",
379 "coordinates":
380 [
381 [
382 [
383 [
384 -73.98114904754641,
385 40.7470284264813
386 ],
387 [
388 -73.98314135177611,
389 40.73416844413217
390 ],
391 [
392 -74.00538969848634,
393 40.734314779027144
394 ],
395 [
396 -74.00479214294432,
397 40.75027851544338
398 ],
399 [
400 -73.98114904754641,
401 40.7470284264813
402 ]
403 ]
404 ]
405 ]
406 }
407"""
409POINT_GEOJSON = """
410{ "type": "Point", "coordinates": [100.0, 0.0] }
411"""
414def test_CreateCommunity_invalid_geojson(db):
415 super_user, super_token = generate_user(is_superuser=True)
416 normal_user, normal_token = generate_user()
417 with real_admin_session(super_token) as api:
418 with pytest.raises(grpc.RpcError) as e:
419 api.CreateCommunity(
420 admin_pb2.CreateCommunityReq(
421 name="test community",
422 description="community for testing",
423 admin_ids=[],
424 geojson=POINT_GEOJSON,
425 )
426 )
427 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
428 assert e.value.details() == errors.NO_MULTIPOLYGON
431def test_CreateCommunity(db):
432 with session_scope() as session:
433 super_user, super_token = generate_user(is_superuser=True)
434 normal_user, normal_token = generate_user()
435 with real_admin_session(super_token) as api:
436 api.CreateCommunity(
437 admin_pb2.CreateCommunityReq(
438 name="test community",
439 description="community for testing",
440 admin_ids=[],
441 geojson=VALID_GEOJSON_MULTIPOLYGON,
442 )
443 )
444 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
445 assert community.description == "community for testing"
446 assert community.slug == "test-community"
449def test_UpdateCommunity_invalid_geojson(db):
450 super_user, super_token = generate_user(is_superuser=True)
452 with session_scope() as session:
453 with real_admin_session(super_token) as api:
454 api.CreateCommunity(
455 admin_pb2.CreateCommunityReq(
456 name="test community",
457 description="community for testing",
458 admin_ids=[],
459 geojson=VALID_GEOJSON_MULTIPOLYGON,
460 )
461 )
462 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
464 with pytest.raises(grpc.RpcError) as e:
465 api.UpdateCommunity(
466 admin_pb2.UpdateCommunityReq(
467 community_id=community.parent_node_id,
468 name="test community 2",
469 description="community for testing 2",
470 geojson=POINT_GEOJSON,
471 )
472 )
473 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
474 assert e.value.details() == errors.NO_MULTIPOLYGON
477def test_UpdateCommunity_invalid_id(db):
478 super_user, super_token = generate_user(is_superuser=True)
480 with session_scope() as session:
481 with real_admin_session(super_token) as api:
482 api.CreateCommunity(
483 admin_pb2.CreateCommunityReq(
484 name="test community",
485 description="community for testing",
486 admin_ids=[],
487 geojson=VALID_GEOJSON_MULTIPOLYGON,
488 )
489 )
491 with pytest.raises(grpc.RpcError) as e:
492 api.UpdateCommunity(
493 admin_pb2.UpdateCommunityReq(
494 community_id=1000,
495 name="test community 1000",
496 description="community for testing 1000",
497 geojson=VALID_GEOJSON_MULTIPOLYGON,
498 )
499 )
500 assert e.value.code() == grpc.StatusCode.NOT_FOUND
501 assert e.value.details() == errors.COMMUNITY_NOT_FOUND
504def test_UpdateCommunity(db):
505 super_user, super_token = generate_user(is_superuser=True)
507 with session_scope() as session:
508 with real_admin_session(super_token) as api:
509 api.CreateCommunity(
510 admin_pb2.CreateCommunityReq(
511 name="test community",
512 description="community for testing",
513 admin_ids=[],
514 geojson=VALID_GEOJSON_MULTIPOLYGON,
515 )
516 )
517 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
518 assert community.description == "community for testing"
520 api.CreateCommunity(
521 admin_pb2.CreateCommunityReq(
522 name="test community 2",
523 description="community for testing 2",
524 admin_ids=[],
525 geojson=VALID_GEOJSON_MULTIPOLYGON,
526 )
527 )
528 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one()
530 api.UpdateCommunity(
531 admin_pb2.UpdateCommunityReq(
532 community_id=community.parent_node_id,
533 name="test community 2",
534 description="community for testing 2",
535 geojson=VALID_GEOJSON_MULTIPOLYGON,
536 parent_node_id=community_2.parent_node_id,
537 )
538 )
539 session.commit()
541 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one()
542 assert community_updated.description == "community for testing 2"
543 assert community_updated.slug == "test-community-2"
545 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one()
546 assert node_updated.parent_node_id == community_2.parent_node_id
549def test_GetChats(db):
550 super_user, super_token = generate_user(is_superuser=True)
551 normal_user, normal_token = generate_user()
553 with real_admin_session(super_token) as api:
554 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
555 assert res.response
558def test_badges(db, push_collector):
559 super_user, super_token = generate_user(is_superuser=True)
560 normal_user, normal_token = generate_user()
562 with real_admin_session(super_token) as api:
563 # can add a badge
564 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
565 with mock_notification_email() as mock:
566 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
567 assert "volunteer" in res.badges
569 # badge emails are disabled by default
570 mock.assert_not_called()
572 push_collector.assert_user_has_single_matching(
573 normal_user.id,
574 title="The Active Volunteer badge was added to your profile",
575 body="Check out your profile to see the new badge!",
576 )
578 # can't add/edit special tags
579 with pytest.raises(grpc.RpcError) as e:
580 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
581 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
582 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE
584 # double add badge
585 with pytest.raises(grpc.RpcError) as e:
586 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
587 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
588 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE
590 # can remove badge
591 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
592 with mock_notification_email() as mock:
593 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
594 assert "volunteer" not in res.badges
596 # badge emails are disabled by default
597 mock.assert_not_called()
599 push_collector.assert_user_push_matches_fields(
600 normal_user.id,
601 ix=1,
602 title="The Active Volunteer badge was removed from your profile",
603 body="You can see all your badges on your profile.",
604 )
606 # not found on user
607 with pytest.raises(grpc.RpcError) as e:
608 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
609 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
610 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE
612 # not found in general
613 with pytest.raises(grpc.RpcError) as e:
614 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
615 assert e.value.code() == grpc.StatusCode.NOT_FOUND
616 assert e.value.details() == errors.BADGE_NOT_FOUND
619def test_DeleteEvent(db):
620 super_user, super_token = generate_user(is_superuser=True)
621 normal_user, normal_token = generate_user()
623 with session_scope() as session:
624 create_community(session, 0, 2, "Community", [normal_user], [], None)
626 start_time = now() + timedelta(hours=2)
627 end_time = start_time + timedelta(hours=3)
628 with events_session(normal_token) as api:
629 res = api.CreateEvent(
630 events_pb2.CreateEventReq(
631 title="Dummy Title",
632 content="Dummy content.",
633 photo_key=None,
634 offline_information=events_pb2.OfflineEventInformation(
635 address="Near Null Island",
636 lat=0.1,
637 lng=0.2,
638 ),
639 start_time=Timestamp_from_datetime(start_time),
640 end_time=Timestamp_from_datetime(end_time),
641 timezone="UTC",
642 )
643 )
644 event_id = res.event_id
645 assert not res.is_deleted
647 with session_scope() as session:
648 with real_admin_session(super_token) as api:
649 api.DeleteEvent(
650 admin_pb2.DeleteEventReq(
651 event_id=event_id,
652 )
653 )
654 occurrence = session.get(EventOccurrence, ident=event_id)
655 assert occurrence.is_deleted
658def test_ListUserIds(db):
659 super_user, super_token = generate_user(is_superuser=True)
660 normal_user, normal_token = generate_user()
662 with real_admin_session(super_token) as api:
663 res = api.ListUserIds(
664 admin_pb2.ListUserIdsReq(
665 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
666 )
667 )
668 assert len(res.user_ids) == 2
669 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
671 with real_admin_session(super_token) as api:
672 res = api.ListUserIds(
673 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
674 )
675 assert res.user_ids == []
678def test_EditReferenceText(db):
679 super_user, super_token = generate_user(is_superuser=True)
680 test_new_text = "New Text"
682 user1, user1_token = generate_user()
683 user2, user2_token = generate_user()
685 with session_scope() as session:
686 with references_session(user1_token) as api:
687 reference = api.WriteFriendReference(
688 references_pb2.WriteFriendReferenceReq(
689 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
690 )
691 )
693 with real_admin_session(super_token) as admin_api:
694 admin_api.EditReferenceText(
695 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
696 )
698 session.expire_all()
700 modified_reference = session.execute(
701 select(Reference).where(Reference.id == reference.reference_id)
702 ).scalar_one_or_none()
703 assert modified_reference.text == test_new_text
706def test_DeleteReference(db):
707 super_user, super_token = generate_user(is_superuser=True)
709 user1, user1_token = generate_user()
710 user2, user2_token = generate_user()
712 with references_session(user1_token) as api:
713 reference = api.WriteFriendReference(
714 references_pb2.WriteFriendReferenceReq(
715 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
716 )
717 )
719 with references_session(user1_token) as api:
720 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
722 with real_admin_session(super_token) as admin_api:
723 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
725 with references_session(user1_token) as api:
726 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
728 with session_scope() as session:
729 modified_reference = session.execute(
730 select(Reference).where(Reference.id == reference.reference_id)
731 ).scalar_one_or_none()
732 assert modified_reference.is_deleted
735def test_admin_delete_account_url(db, push_collector):
736 super_user, super_token = generate_user(is_superuser=True)
738 user, token = generate_user()
739 user_id = user.id
741 with real_admin_session(super_token) as admin_api:
742 url = admin_api.CreateAccountDeletionLink(
743 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
744 ).account_deletion_confirm_url
746 push_collector.assert_user_has_count(user_id, 0)
748 with session_scope() as session:
749 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
750 token = token_o.token
751 assert token_o.user.id == user_id
752 assert url == f"http://localhost:3000/delete-account?token={token}"
754 with mock_notification_email() as mock:
755 with auth_api_session() as (auth_api, metadata_interceptor):
756 auth_api.ConfirmDeleteAccount(
757 auth_pb2.ConfirmDeleteAccountReq(
758 token=token,
759 )
760 )
762 push_collector.assert_user_push_matches_fields(
763 user_id,
764 ix=0,
765 title="Your Couchers.org account has been deleted",
766 body="You can still undo this by following the link we emailed to you within 7 days.",
767 )
769 mock.assert_called_once()
770 e = email_fields(mock)
773# community invite feature tested in test_events.py
774# SendBlogPostNotification tested in test_notifications.py
775# MarkUserNeedsLocationUpdate tested in test_jail.py