Coverage for src/tests/test_admin.py: 100%
396 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1from datetime import date, datetime
2from re import match
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers import errors
9from couchers.db import session_scope
10from couchers.models import AccountDeletionToken, Cluster, ContentReport, EventOccurrence, Node, Reference, UserSession
11from couchers.sql import couchers_select as select
12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta
13from proto import admin_pb2, auth_pb2, events_pb2, references_pb2, reporting_pb2
14from tests.test_communities import create_community
15from tests.test_fixtures import ( # noqa
16 auth_api_session,
17 db,
18 email_fields,
19 events_session,
20 generate_user,
21 get_user_id_and_token,
22 mock_notification_email,
23 push_collector,
24 real_admin_session,
25 references_session,
26 reporting_session,
27 testconfig,
28)
31@pytest.fixture(autouse=True)
32def _(testconfig):
33 pass
36def test_access_by_normal_user(db):
37 normal_user, normal_token = generate_user()
39 with real_admin_session(normal_token) as api:
40 # all requests to the admin servicer should break when done by a non-super_user
41 with pytest.raises(grpc.RpcError) as e:
42 api.GetUserDetails(
43 admin_pb2.GetUserDetailsReq(
44 user=str(normal_user.id),
45 )
46 )
47 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
50def test_GetUser(db):
51 super_user, super_token = generate_user(is_superuser=True)
52 normal_user, normal_token = generate_user()
54 with real_admin_session(super_token) as api:
55 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
56 assert res.user_id == normal_user.id
57 assert res.username == normal_user.username
59 with real_admin_session(super_token) as api:
60 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
62 with real_admin_session(super_token) as api:
63 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
64 assert res.user_id == normal_user.id
65 assert res.username == normal_user.username
68def test_GetUserDetails(db):
69 super_user, super_token = generate_user(is_superuser=True)
70 normal_user, normal_token = generate_user()
72 with real_admin_session(super_token) as api:
73 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
74 assert res.user_id == normal_user.id
75 assert res.username == normal_user.username
76 assert res.email == normal_user.email
77 assert res.gender == normal_user.gender
78 assert parse_date(res.birthdate) == normal_user.birthdate
79 assert not res.banned
80 assert not res.deleted
82 with real_admin_session(super_token) as api:
83 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
84 assert res.user_id == normal_user.id
85 assert res.username == normal_user.username
86 assert res.email == normal_user.email
87 assert res.gender == normal_user.gender
88 assert parse_date(res.birthdate) == normal_user.birthdate
89 assert not res.banned
90 assert not res.deleted
92 with real_admin_session(super_token) as api:
93 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
94 assert res.user_id == normal_user.id
95 assert res.username == normal_user.username
96 assert res.email == normal_user.email
97 assert res.gender == normal_user.gender
98 assert parse_date(res.birthdate) == normal_user.birthdate
99 assert not res.banned
100 assert not res.deleted
103def test_ChangeUserGender(db, push_collector):
104 super_user, super_token = generate_user(is_superuser=True)
105 normal_user, normal_token = generate_user()
107 with real_admin_session(super_token) as api:
108 with mock_notification_email() as mock:
109 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
110 assert res.user_id == normal_user.id
111 assert res.username == normal_user.username
112 assert res.email == normal_user.email
113 assert res.gender == "Machine"
114 assert parse_date(res.birthdate) == normal_user.birthdate
115 assert not res.banned
116 assert not res.deleted
118 mock.assert_called_once()
119 e = email_fields(mock)
120 assert e.subject == "[TEST] Your gender was changed"
121 assert e.recipient == normal_user.email
122 assert "Machine" in e.plain
123 assert "Machine" in e.html
125 push_collector.assert_user_has_single_matching(
126 normal_user.id,
127 title="Your gender was changed",
128 body="Your gender on Couchers.org was changed to Machine by an admin.",
129 )
132def test_ChangeUserBirthdate(db, push_collector):
133 super_user, super_token = generate_user(is_superuser=True)
134 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
136 with real_admin_session(super_token) as api:
137 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
138 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
140 with mock_notification_email() as mock:
141 res = api.ChangeUserBirthdate(
142 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
143 )
145 assert res.user_id == normal_user.id
146 assert res.username == normal_user.username
147 assert res.email == normal_user.email
148 assert res.birthdate == "1990-05-25"
149 assert res.gender == normal_user.gender
150 assert not res.banned
151 assert not res.deleted
153 mock.assert_called_once()
154 e = email_fields(mock)
155 assert e.subject == "[TEST] Your date of birth was changed"
156 assert e.recipient == normal_user.email
157 assert "1990" in e.plain
158 assert "1990" in e.html
160 push_collector.assert_user_has_single_matching(
161 normal_user.id,
162 title="Your date of birth was changed",
163 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.",
164 )
167def test_BanUser(db):
168 super_user, super_token = generate_user(is_superuser=True)
169 normal_user, _ = generate_user()
170 admin_note = "A good reason"
171 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
172 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
174 with real_admin_session(super_token) as api:
175 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
176 assert res.user_id == normal_user.id
177 assert res.username == normal_user.username
178 assert res.email == normal_user.email
179 assert res.gender == normal_user.gender
180 assert parse_date(res.birthdate) == normal_user.birthdate
181 assert res.banned
182 assert not res.deleted
183 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
186def test_UnbanUser(db):
187 super_user, super_token = generate_user(is_superuser=True)
188 normal_user, _ = generate_user()
189 admin_note = "A good reason"
190 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
191 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
193 with real_admin_session(super_token) as api:
194 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
195 assert res.user_id == normal_user.id
196 assert res.username == normal_user.username
197 assert res.email == normal_user.email
198 assert res.gender == normal_user.gender
199 assert parse_date(res.birthdate) == normal_user.birthdate
200 assert not res.banned
201 assert not res.deleted
202 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
205def test_AddAdminNote(db):
206 super_user, super_token = generate_user(is_superuser=True)
207 normal_user, _ = generate_user()
208 admin_note1 = "User reported strange behavior"
209 admin_note2 = "Insert private information here"
210 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
211 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
213 with real_admin_session(super_token) as api:
214 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
215 assert res.user_id == normal_user.id
216 assert res.username == normal_user.username
217 assert res.email == normal_user.email
218 assert res.gender == normal_user.gender
219 assert parse_date(res.birthdate) == normal_user.birthdate
220 assert not res.banned
221 assert not res.deleted
222 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
224 with real_admin_session(super_token) as api:
225 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
226 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
229def test_AddAdminNote_blank(db):
230 super_user, super_token = generate_user(is_superuser=True)
231 normal_user, _ = generate_user()
232 empty_admin_note = " \t \n "
234 with real_admin_session(super_token) as api:
235 with pytest.raises(grpc.RpcError) as e:
236 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
237 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
238 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY
241def test_admin_content_reports(db):
242 super_user, super_token = generate_user(is_superuser=True)
243 normal_user, token = generate_user()
244 bad_user1, _ = generate_user()
245 bad_user2, _ = generate_user()
247 with reporting_session(token) as api:
248 api.Report(
249 reporting_pb2.ReportReq(
250 reason="spam",
251 description="r1",
252 content_ref="comment/123",
253 author_user=bad_user1.username,
254 user_agent="n/a",
255 page="https://couchers.org/comment/123",
256 )
257 )
258 api.Report(
259 reporting_pb2.ReportReq(
260 reason="spam",
261 description="r2",
262 content_ref="comment/124",
263 author_user=bad_user2.username,
264 user_agent="n/a",
265 page="https://couchers.org/comment/124",
266 )
267 )
268 api.Report(
269 reporting_pb2.ReportReq(
270 reason="something else",
271 description="r3",
272 content_ref="page/321",
273 author_user=bad_user1.username,
274 user_agent="n/a",
275 page="https://couchers.org/page/321",
276 )
277 )
279 with session_scope() as session:
280 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all())
282 with real_admin_session(super_token) as api:
283 with pytest.raises(grpc.RpcError) as e:
284 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
285 assert e.value.code() == grpc.StatusCode.NOT_FOUND
286 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND
288 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
289 rep = res.content_report
290 assert rep.content_report_id == id_by_description["r2"]
291 assert rep.reporting_user_id == normal_user.id
292 assert rep.author_user_id == bad_user2.id
293 assert rep.reason == "spam"
294 assert rep.description == "r2"
295 assert rep.content_ref == "comment/124"
296 assert rep.user_agent == "n/a"
297 assert rep.page == "https://couchers.org/comment/124"
299 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
300 assert res.content_reports[0].content_report_id == id_by_description["r3"]
301 assert res.content_reports[1].content_report_id == id_by_description["r1"]
304def test_DeleteUser(db):
305 super_user, super_token = generate_user(is_superuser=True)
306 normal_user, normal_token = generate_user()
308 with real_admin_session(super_token) as api:
309 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
310 assert res.user_id == normal_user.id
311 assert res.username == normal_user.username
312 assert res.email == normal_user.email
313 assert res.gender == normal_user.gender
314 assert parse_date(res.birthdate) == normal_user.birthdate
315 assert not res.banned
316 assert res.deleted
319def test_CreateApiKey(db, push_collector):
320 with session_scope() as session:
321 super_user, super_token = generate_user(is_superuser=True)
322 normal_user, normal_token = generate_user()
324 assert (
325 session.execute(
326 select(func.count())
327 .select_from(UserSession)
328 .where(UserSession.is_api_key == True)
329 .where(UserSession.user_id == normal_user.id)
330 ).scalar_one()
331 == 0
332 )
334 with mock_notification_email() as mock:
335 with real_admin_session(super_token) as api:
336 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
338 mock.assert_called_once()
339 e = email_fields(mock)
340 assert e.subject == "[TEST] Your API key for Couchers.org"
342 with session_scope() as session:
343 api_key = session.execute(
344 select(UserSession)
345 .where(UserSession.is_valid)
346 .where(UserSession.is_api_key == True)
347 .where(UserSession.user_id == normal_user.id)
348 ).scalar_one()
350 assert api_key.token in e.plain
351 assert api_key.token in e.html
353 assert e.recipient == normal_user.email
354 assert "api key" in e.subject.lower()
355 unique_string = "We've issued you with the following API key:"
356 assert unique_string in e.plain
357 assert unique_string in e.html
358 assert "support@couchers.org" in e.plain
359 assert "support@couchers.org" in e.html
361 push_collector.assert_user_has_single_matching(
362 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email."
363 )
366VALID_GEOJSON_MULTIPOLYGON = """
367 {
368 "type": "MultiPolygon",
369 "coordinates":
370 [
371 [
372 [
373 [
374 -73.98114904754641,
375 40.7470284264813
376 ],
377 [
378 -73.98314135177611,
379 40.73416844413217
380 ],
381 [
382 -74.00538969848634,
383 40.734314779027144
384 ],
385 [
386 -74.00479214294432,
387 40.75027851544338
388 ],
389 [
390 -73.98114904754641,
391 40.7470284264813
392 ]
393 ]
394 ]
395 ]
396 }
397"""
399POINT_GEOJSON = """
400{ "type": "Point", "coordinates": [100.0, 0.0] }
401"""
404def test_CreateCommunity_invalid_geojson(db):
405 super_user, super_token = generate_user(is_superuser=True)
406 normal_user, normal_token = generate_user()
407 with real_admin_session(super_token) as api:
408 with pytest.raises(grpc.RpcError) as e:
409 api.CreateCommunity(
410 admin_pb2.CreateCommunityReq(
411 name="test community",
412 description="community for testing",
413 admin_ids=[],
414 geojson=POINT_GEOJSON,
415 )
416 )
417 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
418 assert e.value.details() == errors.NO_MULTIPOLYGON
421def test_CreateCommunity(db):
422 with session_scope() as session:
423 super_user, super_token = generate_user(is_superuser=True)
424 normal_user, normal_token = generate_user()
425 with real_admin_session(super_token) as api:
426 api.CreateCommunity(
427 admin_pb2.CreateCommunityReq(
428 name="test community",
429 description="community for testing",
430 admin_ids=[],
431 geojson=VALID_GEOJSON_MULTIPOLYGON,
432 )
433 )
434 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
435 assert community.description == "community for testing"
436 assert community.slug == "test-community"
439def test_UpdateCommunity_invalid_geojson(db):
440 super_user, super_token = generate_user(is_superuser=True)
442 with session_scope() as session:
443 with real_admin_session(super_token) as api:
444 api.CreateCommunity(
445 admin_pb2.CreateCommunityReq(
446 name="test community",
447 description="community for testing",
448 admin_ids=[],
449 geojson=VALID_GEOJSON_MULTIPOLYGON,
450 )
451 )
452 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
454 with pytest.raises(grpc.RpcError) as e:
455 api.UpdateCommunity(
456 admin_pb2.UpdateCommunityReq(
457 community_id=community.parent_node_id,
458 name="test community 2",
459 description="community for testing 2",
460 geojson=POINT_GEOJSON,
461 )
462 )
463 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
464 assert e.value.details() == errors.NO_MULTIPOLYGON
467def test_UpdateCommunity_invalid_id(db):
468 super_user, super_token = generate_user(is_superuser=True)
470 with session_scope() as session:
471 with real_admin_session(super_token) as api:
472 api.CreateCommunity(
473 admin_pb2.CreateCommunityReq(
474 name="test community",
475 description="community for testing",
476 admin_ids=[],
477 geojson=VALID_GEOJSON_MULTIPOLYGON,
478 )
479 )
481 with pytest.raises(grpc.RpcError) as e:
482 api.UpdateCommunity(
483 admin_pb2.UpdateCommunityReq(
484 community_id=1000,
485 name="test community 1000",
486 description="community for testing 1000",
487 geojson=VALID_GEOJSON_MULTIPOLYGON,
488 )
489 )
490 assert e.value.code() == grpc.StatusCode.NOT_FOUND
491 assert e.value.details() == errors.COMMUNITY_NOT_FOUND
494def test_UpdateCommunity(db):
495 super_user, super_token = generate_user(is_superuser=True)
497 with session_scope() as session:
498 with real_admin_session(super_token) as api:
499 api.CreateCommunity(
500 admin_pb2.CreateCommunityReq(
501 name="test community",
502 description="community for testing",
503 admin_ids=[],
504 geojson=VALID_GEOJSON_MULTIPOLYGON,
505 )
506 )
507 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
508 assert community.description == "community for testing"
510 api.CreateCommunity(
511 admin_pb2.CreateCommunityReq(
512 name="test community 2",
513 description="community for testing 2",
514 admin_ids=[],
515 geojson=VALID_GEOJSON_MULTIPOLYGON,
516 )
517 )
518 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one()
520 api.UpdateCommunity(
521 admin_pb2.UpdateCommunityReq(
522 community_id=community.parent_node_id,
523 name="test community 2",
524 description="community for testing 2",
525 geojson=VALID_GEOJSON_MULTIPOLYGON,
526 parent_node_id=community_2.parent_node_id,
527 )
528 )
529 session.commit()
531 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one()
532 assert community_updated.description == "community for testing 2"
533 assert community_updated.slug == "test-community-2"
535 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one()
536 assert node_updated.parent_node_id == community_2.parent_node_id
539def test_GetChats(db):
540 super_user, super_token = generate_user(is_superuser=True)
541 normal_user, normal_token = generate_user()
543 with real_admin_session(super_token) as api:
544 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
545 assert res.response
548def test_badges(db, push_collector):
549 super_user, super_token = generate_user(is_superuser=True)
550 normal_user, normal_token = generate_user()
552 with real_admin_session(super_token) as api:
553 # can add a badge
554 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
555 with mock_notification_email() as mock:
556 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
557 assert "volunteer" in res.badges
559 # badge emails are disabled by default
560 mock.assert_not_called()
562 push_collector.assert_user_has_single_matching(
563 normal_user.id,
564 title="The Active Volunteer badge was added to your profile",
565 body="Check out your profile to see the new badge!",
566 )
568 # can't add/edit special tags
569 with pytest.raises(grpc.RpcError) as e:
570 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
571 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
572 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE
574 # double add badge
575 with pytest.raises(grpc.RpcError) as e:
576 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
577 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
578 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE
580 # can remove badge
581 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
582 with mock_notification_email() as mock:
583 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
584 assert "volunteer" not in res.badges
586 # badge emails are disabled by default
587 mock.assert_not_called()
589 push_collector.assert_user_push_matches_fields(
590 normal_user.id,
591 ix=1,
592 title="The Active Volunteer badge was removed from your profile",
593 body="You can see all your badges on your profile.",
594 )
596 # not found on user
597 with pytest.raises(grpc.RpcError) as e:
598 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
599 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
600 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE
602 # not found in general
603 with pytest.raises(grpc.RpcError) as e:
604 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
605 assert e.value.code() == grpc.StatusCode.NOT_FOUND
606 assert e.value.details() == errors.BADGE_NOT_FOUND
609def test_DeleteEvent(db):
610 super_user, super_token = generate_user(is_superuser=True)
611 normal_user, normal_token = generate_user()
613 with session_scope() as session:
614 create_community(session, 0, 2, "Community", [normal_user], [], None)
616 start_time = now() + timedelta(hours=2)
617 end_time = start_time + timedelta(hours=3)
618 with events_session(normal_token) as api:
619 res = api.CreateEvent(
620 events_pb2.CreateEventReq(
621 title="Dummy Title",
622 content="Dummy content.",
623 photo_key=None,
624 offline_information=events_pb2.OfflineEventInformation(
625 address="Near Null Island",
626 lat=0.1,
627 lng=0.2,
628 ),
629 start_time=Timestamp_from_datetime(start_time),
630 end_time=Timestamp_from_datetime(end_time),
631 timezone="UTC",
632 )
633 )
634 event_id = res.event_id
635 assert not res.is_deleted
637 with session_scope() as session:
638 with real_admin_session(super_token) as api:
639 api.DeleteEvent(
640 admin_pb2.DeleteEventReq(
641 event_id=event_id,
642 )
643 )
644 occurrence = session.get(EventOccurrence, ident=event_id)
645 assert occurrence.is_deleted
648def test_ListUserIds(db):
649 super_user, super_token = generate_user(is_superuser=True)
650 normal_user, normal_token = generate_user()
652 with real_admin_session(super_token) as api:
653 res = api.ListUserIds(
654 admin_pb2.ListUserIdsReq(
655 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
656 )
657 )
658 assert len(res.user_ids) == 2
659 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
661 with real_admin_session(super_token) as api:
662 res = api.ListUserIds(
663 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
664 )
665 assert res.user_ids == []
668def test_EditReferenceText(db):
669 super_user, super_token = generate_user(is_superuser=True)
670 test_new_text = "New Text"
672 user1, user1_token = generate_user()
673 user2, user2_token = generate_user()
675 with session_scope() as session:
676 with references_session(user1_token) as api:
677 reference = api.WriteFriendReference(
678 references_pb2.WriteFriendReferenceReq(
679 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
680 )
681 )
683 with real_admin_session(super_token) as admin_api:
684 admin_api.EditReferenceText(
685 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
686 )
688 session.expire_all()
690 modified_reference = session.execute(
691 select(Reference).where(Reference.id == reference.reference_id)
692 ).scalar_one_or_none()
693 assert modified_reference.text == test_new_text
696def test_DeleteReference(db):
697 super_user, super_token = generate_user(is_superuser=True)
699 user1, user1_token = generate_user()
700 user2, user2_token = generate_user()
702 with references_session(user1_token) as api:
703 reference = api.WriteFriendReference(
704 references_pb2.WriteFriendReferenceReq(
705 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
706 )
707 )
709 with references_session(user1_token) as api:
710 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
712 with real_admin_session(super_token) as admin_api:
713 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
715 with references_session(user1_token) as api:
716 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
718 with session_scope() as session:
719 modified_reference = session.execute(
720 select(Reference).where(Reference.id == reference.reference_id)
721 ).scalar_one_or_none()
722 assert modified_reference.is_deleted
725def test_admin_delete_account_url(db, push_collector):
726 super_user, super_token = generate_user(is_superuser=True)
728 user, token = generate_user()
729 user_id = user.id
731 with real_admin_session(super_token) as admin_api:
732 url = admin_api.CreateAccountDeletionLink(
733 admin_pb2.CreateAccountDeletionLinkReq(user=user.username)
734 ).account_deletion_confirm_url
736 push_collector.assert_user_has_count(user_id, 0)
738 with session_scope() as session:
739 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
740 token = token_o.token
741 assert token_o.user.id == user_id
742 assert url == f"http://localhost:3000/delete-account?token={token}"
744 with mock_notification_email() as mock:
745 with auth_api_session() as (auth_api, metadata_interceptor):
746 auth_api.ConfirmDeleteAccount(
747 auth_pb2.ConfirmDeleteAccountReq(
748 token=token,
749 )
750 )
752 push_collector.assert_user_push_matches_fields(
753 user_id,
754 ix=0,
755 title="Your Couchers.org account has been deleted",
756 body="You can still undo this by following the link we emailed to you within 7 days.",
757 )
759 mock.assert_called_once()
760 e = email_fields(mock)
763# community invite feature tested in test_events.py
764# SendBlogPostNotification tested in test_notifications.py
765# MarkUserNeedsLocationUpdate tested in test_jail.py