Coverage for src/tests/test_admin.py: 100%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from datetime import date
2from unittest.mock import patch
4import grpc
5import pytest
6from sqlalchemy.sql import func
8from couchers import errors
9from couchers.db import session_scope
10from couchers.models import Cluster, UserSession
11from couchers.sql import couchers_select as select
12from couchers.utils import parse_date
13from proto import admin_pb2
14from tests.test_fixtures import db, generate_user, get_user_id_and_token, real_admin_session, testconfig # noqa
17@pytest.fixture(autouse=True)
18def _(testconfig):
19 pass
22def test_access_by_normal_user(db):
23 with session_scope() as session:
24 normal_user, normal_token = generate_user()
26 with real_admin_session(normal_token) as api:
27 # all requests to the admin servicer should break when done by a non-super_user
28 with pytest.raises(grpc.RpcError) as e:
29 api.GetUserDetails(
30 admin_pb2.GetUserDetailsReq(
31 user=str(normal_user.id),
32 )
33 )
34 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
37def test_GetUserDetails(db):
38 with session_scope() as session:
39 super_user, super_token = generate_user(is_superuser=True)
40 normal_user, normal_token = generate_user()
42 with real_admin_session(super_token) as api:
43 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id)))
44 assert res.user_id == normal_user.id
45 assert res.username == normal_user.username
46 assert res.email == normal_user.email
47 assert res.gender == normal_user.gender
48 assert parse_date(res.birthdate) == normal_user.birthdate
49 assert not res.banned
50 assert not res.deleted
52 with real_admin_session(super_token) as api:
53 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
54 assert res.user_id == normal_user.id
55 assert res.username == normal_user.username
56 assert res.email == normal_user.email
57 assert res.gender == normal_user.gender
58 assert parse_date(res.birthdate) == normal_user.birthdate
59 assert not res.banned
60 assert not res.deleted
62 with real_admin_session(super_token) as api:
63 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email))
64 assert res.user_id == normal_user.id
65 assert res.username == normal_user.username
66 assert res.email == normal_user.email
67 assert res.gender == normal_user.gender
68 assert parse_date(res.birthdate) == normal_user.birthdate
69 assert not res.banned
70 assert not res.deleted
73def test_ChangeUserGender(db):
74 with session_scope() as session:
75 super_user, super_token = generate_user(is_superuser=True)
76 normal_user, normal_token = generate_user()
78 with real_admin_session(super_token) as api:
79 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine"))
80 assert res.user_id == normal_user.id
81 assert res.username == normal_user.username
82 assert res.email == normal_user.email
83 assert res.gender == "Machine"
84 assert parse_date(res.birthdate) == normal_user.birthdate
85 assert not res.banned
86 assert not res.deleted
89def test_ChangeUserBirthdate(db):
90 with session_scope() as session:
91 super_user, super_token = generate_user(is_superuser=True)
92 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1))
94 with real_admin_session(super_token) as api:
95 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username))
96 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1)
98 res = api.ChangeUserBirthdate(
99 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25")
100 )
102 assert res.user_id == normal_user.id
103 assert res.username == normal_user.username
104 assert res.email == normal_user.email
105 assert res.birthdate == "1990-05-25"
106 assert res.gender == normal_user.gender
107 assert not res.banned
108 assert not res.deleted
111def test_BanUser(db):
112 with session_scope() as session:
113 super_user, super_token = generate_user(is_superuser=True)
114 normal_user, normal_token = generate_user()
116 with real_admin_session(super_token) as api:
117 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username))
118 assert res.user_id == normal_user.id
119 assert res.username == normal_user.username
120 assert res.email == normal_user.email
121 assert res.gender == normal_user.gender
122 assert parse_date(res.birthdate) == normal_user.birthdate
123 assert res.banned
124 assert not res.deleted
127def test_DeleteUser(db):
128 with session_scope() as session:
129 super_user, super_token = generate_user(is_superuser=True)
130 normal_user, normal_token = generate_user()
132 with real_admin_session(super_token) as api:
133 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username))
134 assert res.user_id == normal_user.id
135 assert res.username == normal_user.username
136 assert res.email == normal_user.email
137 assert res.gender == normal_user.gender
138 assert parse_date(res.birthdate) == normal_user.birthdate
139 assert not res.banned
140 assert res.deleted
143def test_CreateApiKey(db):
144 with session_scope() as session:
145 super_user, super_token = generate_user(is_superuser=True)
146 normal_user, normal_token = generate_user()
148 assert (
149 session.execute(
150 select(func.count())
151 .select_from(UserSession)
152 .where(UserSession.is_api_key == True)
153 .where(UserSession.user_id == normal_user.id)
154 ).scalar_one()
155 == 0
156 )
158 with patch("couchers.email.enqueue_email_from_template") as mock:
159 with real_admin_session(super_token) as api:
160 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username))
162 with session_scope() as session:
163 api_key = session.execute(
164 select(UserSession)
165 .where(UserSession.is_valid)
166 .where(UserSession.is_api_key == True)
167 .where(UserSession.user_id == normal_user.id)
168 ).scalar_one()
170 assert mock.called_once_with(
171 normal_user.email,
172 "api_key",
173 template_args={"user": normal_user, "token": api_key.token, "expiry": api_key.expiry},
174 )
177VALID_GEOJSON_MULTIPOLYGON = """
178 {
179 "type": "MultiPolygon",
180 "coordinates":
181 [
182 [
183 [
184 [
185 -73.98114904754641,
186 40.7470284264813
187 ],
188 [
189 -73.98314135177611,
190 40.73416844413217
191 ],
192 [
193 -74.00538969848634,
194 40.734314779027144
195 ],
196 [
197 -74.00479214294432,
198 40.75027851544338
199 ],
200 [
201 -73.98114904754641,
202 40.7470284264813
203 ]
204 ]
205 ]
206 ]
207 }
208"""
210POINT_GEOJSON = """
211{ "type": "Point", "coordinates": [100.0, 0.0] }
212"""
215def test_CreateCommunity_invalid_geojson(db):
216 with session_scope() as session:
217 super_user, super_token = generate_user(is_superuser=True)
218 normal_user, normal_token = generate_user()
219 with real_admin_session(super_token) as api:
220 with pytest.raises(grpc.RpcError) as e:
221 api.CreateCommunity(
222 admin_pb2.CreateCommunityReq(
223 name="test community",
224 slug="test-community",
225 description="community for testing",
226 admin_ids=[],
227 geojson=POINT_GEOJSON,
228 )
229 )
230 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
231 assert e.value.details() == errors.NO_MULTIPOLYGON
234def test_CreateCommunity(db):
235 with session_scope() as session:
236 super_user, super_token = generate_user(is_superuser=True)
237 normal_user, normal_token = generate_user()
238 with real_admin_session(super_token) as api:
239 api.CreateCommunity(
240 admin_pb2.CreateCommunityReq(
241 name="test community",
242 slug="test-community",
243 description="community for testing",
244 admin_ids=[],
245 geojson=VALID_GEOJSON_MULTIPOLYGON,
246 )
247 )
248 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one()
249 assert community.description == "community for testing"
250 assert community.slug == "test-community"
253def test_GetChats(db):
254 with session_scope() as session:
255 super_user, super_token = generate_user(is_superuser=True)
256 normal_user, normal_token = generate_user()
258 with real_admin_session(super_token) as api:
259 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username))
260 assert res.response