Coverage for src/tests/test_admin.py: 100%
378 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
1from datetime import date, datetime
2from re import match
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers import errors
9from couchers.db import session_scope
10from couchers.models import Cluster, ContentReport, EventOccurrence, Node, Reference, UserSession
11from couchers.sql import couchers_select as select
12from couchers.utils import Timestamp_from_datetime, now, parse_date, timedelta
13from proto import admin_pb2, events_pb2, references_pb2, reporting_pb2
14from tests.test_communities import create_community
15from tests.test_fixtures import ( # noqa
16 db,
17 email_fields,
18 events_session,
19 generate_user,
20 get_user_id_and_token,
21 mock_notification_email,
22 push_collector,
23 real_admin_session,
24 references_session,
25 reporting_session,
26 testconfig,
27)
30@pytest.fixture(autouse=True)
31def _(testconfig):
32 pass
35def test_access_by_normal_user(db):
36 normal_user, normal_token = generate_user()
38 with real_admin_session(normal_token) as api:
39 # all requests to the admin servicer should break when done by a non-super_user
40 with pytest.raises(grpc.RpcError) as e:
41 api.GetUserDetails(
42 admin_pb2.GetUserDetailsReq(
43 user=str(normal_user.id),
44 )
45 )
46 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
49def test_GetUser(db):
50 super_user, super_token = generate_user(is_superuser=True)
51 normal_user, normal_token = generate_user()
53 with real_admin_session(super_token) as api:
54 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
55 assert res.user_id == normal_user.id
56 assert res.username == normal_user.username
58 with real_admin_session(super_token) as api:
59 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note="Testing banning"))
61 with real_admin_session(super_token) as api:
62 res = api.GetUser(admin_pb2.GetUserReq(user=str(normal_user.id)))
63 assert res.user_id == normal_user.id
64 assert res.username == normal_user.username
67def test_GetUserDetails(db):
68 super_user, super_token = generate_user(is_superuser=True)
69 normal_user, normal_token = generate_user()
71 with real_admin_session(super_token) as api:
72 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
73 assert res.user_id == normal_user.id
74 assert res.username == normal_user.username
75 assert res.email == normal_user.email
76 assert res.gender == normal_user.gender
77 assert parse_date(res.birthdate) == normal_user.birthdate
78 assert not res.banned
79 assert not res.deleted
81 with real_admin_session(super_token) as api:
82 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
83 assert res.user_id == normal_user.id
84 assert res.username == normal_user.username
85 assert res.email == normal_user.email
86 assert res.gender == normal_user.gender
87 assert parse_date(res.birthdate) == normal_user.birthdate
88 assert not res.banned
89 assert not res.deleted
91 with real_admin_session(super_token) as api:
92 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
93 assert res.user_id == normal_user.id
94 assert res.username == normal_user.username
95 assert res.email == normal_user.email
96 assert res.gender == normal_user.gender
97 assert parse_date(res.birthdate) == normal_user.birthdate
98 assert not res.banned
99 assert not res.deleted
102def test_ChangeUserGender(db, push_collector):
103 super_user, super_token = generate_user(is_superuser=True)
104 normal_user, normal_token = generate_user()
106 with real_admin_session(super_token) as api:
107 with mock_notification_email() as mock:
108 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
109 assert res.user_id == normal_user.id
110 assert res.username == normal_user.username
111 assert res.email == normal_user.email
112 assert res.gender == "Machine"
113 assert parse_date(res.birthdate) == normal_user.birthdate
114 assert not res.banned
115 assert not res.deleted
117 mock.assert_called_once()
118 e = email_fields(mock)
119 assert e.subject == "[TEST] Your gender was changed"
120 assert e.recipient == normal_user.email
121 assert "Machine" in e.plain
122 assert "Machine" in e.html
124 push_collector.assert_user_has_single_matching(
125 normal_user.id,
126 title="Your gender was changed",
127 body="Your gender on Couchers.org was changed to Machine by an admin.",
128 )
131def test_ChangeUserBirthdate(db, push_collector):
132 super_user, super_token = generate_user(is_superuser=True)
133 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
135 with real_admin_session(super_token) as api:
136 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
137 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
139 with mock_notification_email() as mock:
140 res = api.ChangeUserBirthdate(
141 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
142 )
144 assert res.user_id == normal_user.id
145 assert res.username == normal_user.username
146 assert res.email == normal_user.email
147 assert res.birthdate == "1990-05-25"
148 assert res.gender == normal_user.gender
149 assert not res.banned
150 assert not res.deleted
152 mock.assert_called_once()
153 e = email_fields(mock)
154 assert e.subject == "[TEST] Your date of birth was changed"
155 assert e.recipient == normal_user.email
156 assert "1990" in e.plain
157 assert "1990" in e.html
159 push_collector.assert_user_has_single_matching(
160 normal_user.id,
161 title="Your date of birth was changed",
162 body="Your date of birth on Couchers.org was changed to Friday 25 May 1990 by an admin.",
163 )
166def test_BanUser(db):
167 super_user, super_token = generate_user(is_superuser=True)
168 normal_user, _ = generate_user()
169 admin_note = "A good reason"
170 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
171 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
173 with real_admin_session(super_token) as api:
174 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username, admin_note=admin_note))
175 assert res.user_id == normal_user.id
176 assert res.username == normal_user.username
177 assert res.email == normal_user.email
178 assert res.gender == normal_user.gender
179 assert parse_date(res.birthdate) == normal_user.birthdate
180 assert res.banned
181 assert not res.deleted
182 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
185def test_UnbanUser(db):
186 super_user, super_token = generate_user(is_superuser=True)
187 normal_user, _ = generate_user()
188 admin_note = "A good reason"
189 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
190 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
192 with real_admin_session(super_token) as api:
193 res = api.UnbanUser(admin_pb2.UnbanUserReq(user=normal_user.username, admin_note=admin_note))
194 assert res.user_id == normal_user.id
195 assert res.username == normal_user.username
196 assert res.email == normal_user.email
197 assert res.gender == normal_user.gender
198 assert parse_date(res.birthdate) == normal_user.birthdate
199 assert not res.banned
200 assert not res.deleted
201 assert match(rf"^{prefix_regex} {admin_note}\n$", res.admin_note)
204def test_AddAdminNote(db):
205 super_user, super_token = generate_user(is_superuser=True)
206 normal_user, _ = generate_user()
207 admin_note1 = "User reported strange behavior"
208 admin_note2 = "Insert private information here"
209 utc_regex = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+00:00"
210 prefix_regex = rf"\n\[{utc_regex}\] \(id: {super_user.id}, username: {super_user.username}\)"
212 with real_admin_session(super_token) as api:
213 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note1))
214 assert res.user_id == normal_user.id
215 assert res.username == normal_user.username
216 assert res.email == normal_user.email
217 assert res.gender == normal_user.gender
218 assert parse_date(res.birthdate) == normal_user.birthdate
219 assert not res.banned
220 assert not res.deleted
221 assert match(rf"^{prefix_regex} {admin_note1}\n$", res.admin_note)
223 with real_admin_session(super_token) as api:
224 res = api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=admin_note2))
225 assert match(rf"^{prefix_regex} {admin_note1}\n{prefix_regex} {admin_note2}\n$", res.admin_note)
228def test_AddAdminNote_blank(db):
229 super_user, super_token = generate_user(is_superuser=True)
230 normal_user, _ = generate_user()
231 empty_admin_note = " \t \n "
233 with real_admin_session(super_token) as api:
234 with pytest.raises(grpc.RpcError) as e:
235 api.AddAdminNote(admin_pb2.AddAdminNoteReq(user=normal_user.username, admin_note=empty_admin_note))
236 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
237 assert e.value.details() == errors.ADMIN_NOTE_CANT_BE_EMPTY
240def test_admin_content_reports(db):
241 super_user, super_token = generate_user(is_superuser=True)
242 normal_user, token = generate_user()
243 bad_user1, _ = generate_user()
244 bad_user2, _ = generate_user()
246 with reporting_session(token) as api:
247 api.Report(
248 reporting_pb2.ReportReq(
249 reason="spam",
250 description="r1",
251 content_ref="comment/123",
252 author_user=bad_user1.username,
253 user_agent="n/a",
254 page="https://couchers.org/comment/123",
255 )
256 )
257 api.Report(
258 reporting_pb2.ReportReq(
259 reason="spam",
260 description="r2",
261 content_ref="comment/124",
262 author_user=bad_user2.username,
263 user_agent="n/a",
264 page="https://couchers.org/comment/124",
265 )
266 )
267 api.Report(
268 reporting_pb2.ReportReq(
269 reason="something else",
270 description="r3",
271 content_ref="page/321",
272 author_user=bad_user1.username,
273 user_agent="n/a",
274 page="https://couchers.org/page/321",
275 )
276 )
278 with session_scope() as session:
279 id_by_description = dict(session.execute(select(ContentReport.description, ContentReport.id)).all())
281 with real_admin_session(super_token) as api:
282 with pytest.raises(grpc.RpcError) as e:
283 api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=-1))
284 assert e.value.code() == grpc.StatusCode.NOT_FOUND
285 assert e.value.details() == errors.CONTENT_REPORT_NOT_FOUND
287 res = api.GetContentReport(admin_pb2.GetContentReportReq(content_report_id=id_by_description["r2"]))
288 rep = res.content_report
289 assert rep.content_report_id == id_by_description["r2"]
290 assert rep.reporting_user_id == normal_user.id
291 assert rep.author_user_id == bad_user2.id
292 assert rep.reason == "spam"
293 assert rep.description == "r2"
294 assert rep.content_ref == "comment/124"
295 assert rep.user_agent == "n/a"
296 assert rep.page == "https://couchers.org/comment/124"
298 res = api.GetContentReportsForAuthor(admin_pb2.GetContentReportsForAuthorReq(user=bad_user1.username))
299 assert res.content_reports[0].content_report_id == id_by_description["r3"]
300 assert res.content_reports[1].content_report_id == id_by_description["r1"]
303def test_DeleteUser(db):
304 super_user, super_token = generate_user(is_superuser=True)
305 normal_user, normal_token = generate_user()
307 with real_admin_session(super_token) as api:
308 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
309 assert res.user_id == normal_user.id
310 assert res.username == normal_user.username
311 assert res.email == normal_user.email
312 assert res.gender == normal_user.gender
313 assert parse_date(res.birthdate) == normal_user.birthdate
314 assert not res.banned
315 assert res.deleted
318def test_CreateApiKey(db, push_collector):
319 with session_scope() as session:
320 super_user, super_token = generate_user(is_superuser=True)
321 normal_user, normal_token = generate_user()
323 assert (
324 session.execute(
325 select(func.count())
326 .select_from(UserSession)
327 .where(UserSession.is_api_key == True)
328 .where(UserSession.user_id == normal_user.id)
329 ).scalar_one()
330 == 0
331 )
333 with mock_notification_email() as mock:
334 with real_admin_session(super_token) as api:
335 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
337 mock.assert_called_once()
338 e = email_fields(mock)
339 assert e.subject == "[TEST] Your API key for Couchers.org"
341 with session_scope() as session:
342 api_key = session.execute(
343 select(UserSession)
344 .where(UserSession.is_valid)
345 .where(UserSession.is_api_key == True)
346 .where(UserSession.user_id == normal_user.id)
347 ).scalar_one()
349 assert api_key.token in e.plain
350 assert api_key.token in e.html
352 assert e.recipient == normal_user.email
353 assert "api key" in e.subject.lower()
354 unique_string = "We've issued you with the following API key:"
355 assert unique_string in e.plain
356 assert unique_string in e.html
357 assert "support@couchers.org" in e.plain
358 assert "support@couchers.org" in e.html
360 push_collector.assert_user_has_single_matching(
361 normal_user.id, title="An API key was created for your account", body="Details were sent to you via email."
362 )
365VALID_GEOJSON_MULTIPOLYGON = """
366 {
367 "type": "MultiPolygon",
368 "coordinates":
369 [
370 [
371 [
372 [
373 -73.98114904754641,
374 40.7470284264813
375 ],
376 [
377 -73.98314135177611,
378 40.73416844413217
379 ],
380 [
381 -74.00538969848634,
382 40.734314779027144
383 ],
384 [
385 -74.00479214294432,
386 40.75027851544338
387 ],
388 [
389 -73.98114904754641,
390 40.7470284264813
391 ]
392 ]
393 ]
394 ]
395 }
396"""
398POINT_GEOJSON = """
399{ "type": "Point", "coordinates": [100.0, 0.0] }
400"""
403def test_CreateCommunity_invalid_geojson(db):
404 super_user, super_token = generate_user(is_superuser=True)
405 normal_user, normal_token = generate_user()
406 with real_admin_session(super_token) as api:
407 with pytest.raises(grpc.RpcError) as e:
408 api.CreateCommunity(
409 admin_pb2.CreateCommunityReq(
410 name="test community",
411 description="community for testing",
412 admin_ids=[],
413 geojson=POINT_GEOJSON,
414 )
415 )
416 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
417 assert e.value.details() == errors.NO_MULTIPOLYGON
420def test_CreateCommunity(db):
421 with session_scope() as session:
422 super_user, super_token = generate_user(is_superuser=True)
423 normal_user, normal_token = generate_user()
424 with real_admin_session(super_token) as api:
425 api.CreateCommunity(
426 admin_pb2.CreateCommunityReq(
427 name="test community",
428 description="community for testing",
429 admin_ids=[],
430 geojson=VALID_GEOJSON_MULTIPOLYGON,
431 )
432 )
433 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
434 assert community.description == "community for testing"
435 assert community.slug == "test-community"
438def test_UpdateCommunity_invalid_geojson(db):
439 super_user, super_token = generate_user(is_superuser=True)
441 with session_scope() as session:
442 with real_admin_session(super_token) as api:
443 api.CreateCommunity(
444 admin_pb2.CreateCommunityReq(
445 name="test community",
446 description="community for testing",
447 admin_ids=[],
448 geojson=VALID_GEOJSON_MULTIPOLYGON,
449 )
450 )
451 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
453 with pytest.raises(grpc.RpcError) as e:
454 api.UpdateCommunity(
455 admin_pb2.UpdateCommunityReq(
456 community_id=community.parent_node_id,
457 name="test community 2",
458 description="community for testing 2",
459 geojson=POINT_GEOJSON,
460 )
461 )
462 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
463 assert e.value.details() == errors.NO_MULTIPOLYGON
466def test_UpdateCommunity_invalid_id(db):
467 super_user, super_token = generate_user(is_superuser=True)
469 with session_scope() as session:
470 with real_admin_session(super_token) as api:
471 api.CreateCommunity(
472 admin_pb2.CreateCommunityReq(
473 name="test community",
474 description="community for testing",
475 admin_ids=[],
476 geojson=VALID_GEOJSON_MULTIPOLYGON,
477 )
478 )
480 with pytest.raises(grpc.RpcError) as e:
481 api.UpdateCommunity(
482 admin_pb2.UpdateCommunityReq(
483 community_id=1000,
484 name="test community 1000",
485 description="community for testing 1000",
486 geojson=VALID_GEOJSON_MULTIPOLYGON,
487 )
488 )
489 assert e.value.code() == grpc.StatusCode.NOT_FOUND
490 assert e.value.details() == errors.COMMUNITY_NOT_FOUND
493def test_UpdateCommunity(db):
494 super_user, super_token = generate_user(is_superuser=True)
496 with session_scope() as session:
497 with real_admin_session(super_token) as api:
498 api.CreateCommunity(
499 admin_pb2.CreateCommunityReq(
500 name="test community",
501 description="community for testing",
502 admin_ids=[],
503 geojson=VALID_GEOJSON_MULTIPOLYGON,
504 )
505 )
506 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
507 assert community.description == "community for testing"
509 api.CreateCommunity(
510 admin_pb2.CreateCommunityReq(
511 name="test community 2",
512 description="community for testing 2",
513 admin_ids=[],
514 geojson=VALID_GEOJSON_MULTIPOLYGON,
515 )
516 )
517 community_2 = session.execute(select(Cluster).where(Cluster.name == "test community 2")).scalar_one()
519 api.UpdateCommunity(
520 admin_pb2.UpdateCommunityReq(
521 community_id=community.parent_node_id,
522 name="test community 2",
523 description="community for testing 2",
524 geojson=VALID_GEOJSON_MULTIPOLYGON,
525 parent_node_id=community_2.parent_node_id,
526 )
527 )
528 session.commit()
530 community_updated = session.execute(select(Cluster).where(Cluster.id == community.id)).scalar_one()
531 assert community_updated.description == "community for testing 2"
532 assert community_updated.slug == "test-community-2"
534 node_updated = session.execute(select(Node).where(Node.id == community_updated.parent_node_id)).scalar_one()
535 assert node_updated.parent_node_id == community_2.parent_node_id
538def test_GetChats(db):
539 super_user, super_token = generate_user(is_superuser=True)
540 normal_user, normal_token = generate_user()
542 with real_admin_session(super_token) as api:
543 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
544 assert res.response
547def test_badges(db, push_collector):
548 super_user, super_token = generate_user(is_superuser=True)
549 normal_user, normal_token = generate_user()
551 with real_admin_session(super_token) as api:
552 # can add a badge
553 assert "volunteer" not in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
554 with mock_notification_email() as mock:
555 res = api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
556 assert "volunteer" in res.badges
558 # badge emails are disabled by default
559 mock.assert_not_called()
561 push_collector.assert_user_has_single_matching(
562 normal_user.id,
563 title="The Active Volunteer badge was added to your profile",
564 body="Check out your profile to see the new badge!",
565 )
567 # can't add/edit special tags
568 with pytest.raises(grpc.RpcError) as e:
569 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="founder"))
570 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
571 assert e.value.details() == errors.ADMIN_CANNOT_EDIT_BADGE
573 # double add badge
574 with pytest.raises(grpc.RpcError) as e:
575 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="volunteer"))
576 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
577 assert e.value.details() == errors.USER_ALREADY_HAS_BADGE
579 # can remove badge
580 assert "volunteer" in api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)).badges
581 with mock_notification_email() as mock:
582 res = api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
583 assert "volunteer" not in res.badges
585 # badge emails are disabled by default
586 mock.assert_not_called()
588 push_collector.assert_user_push_matches_fields(
589 normal_user.id,
590 ix=1,
591 title="The Active Volunteer badge was removed from your profile",
592 body="You can see all your badges on your profile.",
593 )
595 # not found on user
596 with pytest.raises(grpc.RpcError) as e:
597 api.RemoveBadge(admin_pb2.RemoveBadgeReq(user=normal_user.username, badge_id="volunteer"))
598 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
599 assert e.value.details() == errors.USER_DOES_NOT_HAVE_BADGE
601 # not found in general
602 with pytest.raises(grpc.RpcError) as e:
603 api.AddBadge(admin_pb2.AddBadgeReq(user=normal_user.username, badge_id="nonexistentbadge"))
604 assert e.value.code() == grpc.StatusCode.NOT_FOUND
605 assert e.value.details() == errors.BADGE_NOT_FOUND
608def test_DeleteEvent(db):
609 super_user, super_token = generate_user(is_superuser=True)
610 normal_user, normal_token = generate_user()
612 with session_scope() as session:
613 create_community(session, 0, 2, "Community", [normal_user], [], None)
615 start_time = now() + timedelta(hours=2)
616 end_time = start_time + timedelta(hours=3)
617 with events_session(normal_token) as api:
618 res = api.CreateEvent(
619 events_pb2.CreateEventReq(
620 title="Dummy Title",
621 content="Dummy content.",
622 photo_key=None,
623 offline_information=events_pb2.OfflineEventInformation(
624 address="Near Null Island",
625 lat=0.1,
626 lng=0.2,
627 ),
628 start_time=Timestamp_from_datetime(start_time),
629 end_time=Timestamp_from_datetime(end_time),
630 timezone="UTC",
631 )
632 )
633 event_id = res.event_id
634 assert not res.is_deleted
636 with session_scope() as session:
637 with real_admin_session(super_token) as api:
638 api.DeleteEvent(
639 admin_pb2.DeleteEventReq(
640 event_id=event_id,
641 )
642 )
643 occurrence = session.get(EventOccurrence, ident=event_id)
644 assert occurrence.is_deleted
647def test_ListUserIds(db):
648 super_user, super_token = generate_user(is_superuser=True)
649 normal_user, normal_token = generate_user()
651 with real_admin_session(super_token) as api:
652 res = api.ListUserIds(
653 admin_pb2.ListUserIdsReq(
654 start_time=Timestamp_from_datetime(datetime(2000, 1, 1)), end_time=Timestamp_from_datetime(now())
655 )
656 )
657 assert len(res.user_ids) == 2
658 assert sorted(res.user_ids) == sorted([super_user.id, normal_user.id])
660 with real_admin_session(super_token) as api:
661 res = api.ListUserIds(
662 admin_pb2.ListUserIdsReq(start_time=Timestamp_from_datetime(now()), end_time=Timestamp_from_datetime(now()))
663 )
664 assert res.user_ids == []
667def test_EditReferenceText(db):
668 super_user, super_token = generate_user(is_superuser=True)
669 test_new_text = "New Text"
671 user1, user1_token = generate_user()
672 user2, user2_token = generate_user()
674 with session_scope() as session:
675 with references_session(user1_token) as api:
676 reference = api.WriteFriendReference(
677 references_pb2.WriteFriendReferenceReq(
678 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
679 )
680 )
682 with real_admin_session(super_token) as admin_api:
683 admin_api.EditReferenceText(
684 admin_pb2.EditReferenceTextReq(reference_id=reference.reference_id, new_text=test_new_text)
685 )
687 session.expire_all()
689 modified_reference = session.execute(
690 select(Reference).where(Reference.id == reference.reference_id)
691 ).scalar_one_or_none()
692 assert modified_reference.text == test_new_text
695def test_DeleteReference(db):
696 super_user, super_token = generate_user(is_superuser=True)
698 user1, user1_token = generate_user()
699 user2, user2_token = generate_user()
701 with references_session(user1_token) as api:
702 reference = api.WriteFriendReference(
703 references_pb2.WriteFriendReferenceReq(
704 to_user_id=user2.id, text="Old Text", private_text="", was_appropriate=True, rating=1
705 )
706 )
708 with references_session(user1_token) as api:
709 assert api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
711 with real_admin_session(super_token) as admin_api:
712 admin_api.DeleteReference(admin_pb2.DeleteReferenceReq(reference_id=reference.reference_id))
714 with references_session(user1_token) as api:
715 assert not api.ListReferences(references_pb2.ListReferencesReq(from_user_id=user1.id)).references
717 with session_scope() as session:
718 modified_reference = session.execute(
719 select(Reference).where(Reference.id == reference.reference_id)
720 ).scalar_one_or_none()
721 assert modified_reference.is_deleted
724# community invite feature tested in test_events.py