Coverage for src/tests/test_admin.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

145 statements  

1from datetime import date 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from sqlalchemy.sql import func 

7 

8from couchers import errors 

9from couchers.db import session_scope 

10from couchers.models import Cluster, UserSession 

11from couchers.sql import couchers_select as select 

12from couchers.utils import parse_date 

13from proto import admin_pb2 

14from tests.test_fixtures import db, generate_user, get_user_id_and_token, real_admin_session, testconfig # noqa 

15 

16 

17@pytest.fixture(autouse=True) 

18def _(testconfig): 

19 pass 

20 

21 

22def test_access_by_normal_user(db): 

23 with session_scope() as session: 

24 normal_user, normal_token = generate_user() 

25 

26 with real_admin_session(normal_token) as api: 

27 # all requests to the admin servicer should break when done by a non-super_user 

28 with pytest.raises(grpc.RpcError) as e: 

29 api.GetUserDetails( 

30 admin_pb2.GetUserDetailsReq( 

31 user=str(normal_user.id), 

32 ) 

33 ) 

34 assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED 

35 

36 

37def test_GetUserDetails(db): 

38 with session_scope() as session: 

39 super_user, super_token = generate_user(is_superuser=True) 

40 normal_user, normal_token = generate_user() 

41 

42 with real_admin_session(super_token) as api: 

43 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=str(normal_user.id))) 

44 assert res.user_id == normal_user.id 

45 assert res.username == normal_user.username 

46 assert res.email == normal_user.email 

47 assert res.gender == normal_user.gender 

48 assert parse_date(res.birthdate) == normal_user.birthdate 

49 assert not res.banned 

50 assert not res.deleted 

51 

52 with real_admin_session(super_token) as api: 

53 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

54 assert res.user_id == normal_user.id 

55 assert res.username == normal_user.username 

56 assert res.email == normal_user.email 

57 assert res.gender == normal_user.gender 

58 assert parse_date(res.birthdate) == normal_user.birthdate 

59 assert not res.banned 

60 assert not res.deleted 

61 

62 with real_admin_session(super_token) as api: 

63 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.email)) 

64 assert res.user_id == normal_user.id 

65 assert res.username == normal_user.username 

66 assert res.email == normal_user.email 

67 assert res.gender == normal_user.gender 

68 assert parse_date(res.birthdate) == normal_user.birthdate 

69 assert not res.banned 

70 assert not res.deleted 

71 

72 

73def test_ChangeUserGender(db): 

74 with session_scope() as session: 

75 super_user, super_token = generate_user(is_superuser=True) 

76 normal_user, normal_token = generate_user() 

77 

78 with real_admin_session(super_token) as api: 

79 res = api.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=normal_user.username, gender="Machine")) 

80 assert res.user_id == normal_user.id 

81 assert res.username == normal_user.username 

82 assert res.email == normal_user.email 

83 assert res.gender == "Machine" 

84 assert parse_date(res.birthdate) == normal_user.birthdate 

85 assert not res.banned 

86 assert not res.deleted 

87 

88 

89def test_ChangeUserBirthdate(db): 

90 with session_scope() as session: 

91 super_user, super_token = generate_user(is_superuser=True) 

92 normal_user, normal_token = generate_user(birthdate=date(year=2000, month=1, day=1)) 

93 

94 with real_admin_session(super_token) as api: 

95 res = api.GetUserDetails(admin_pb2.GetUserDetailsReq(user=normal_user.username)) 

96 assert parse_date(res.birthdate) == date(year=2000, month=1, day=1) 

97 

98 res = api.ChangeUserBirthdate( 

99 admin_pb2.ChangeUserBirthdateReq(user=normal_user.username, birthdate="1990-05-25") 

100 ) 

101 

102 assert res.user_id == normal_user.id 

103 assert res.username == normal_user.username 

104 assert res.email == normal_user.email 

105 assert res.birthdate == "1990-05-25" 

106 assert res.gender == normal_user.gender 

107 assert not res.banned 

108 assert not res.deleted 

109 

110 

111def test_BanUser(db): 

112 with session_scope() as session: 

113 super_user, super_token = generate_user(is_superuser=True) 

114 normal_user, normal_token = generate_user() 

115 

116 with real_admin_session(super_token) as api: 

117 res = api.BanUser(admin_pb2.BanUserReq(user=normal_user.username)) 

118 assert res.user_id == normal_user.id 

119 assert res.username == normal_user.username 

120 assert res.email == normal_user.email 

121 assert res.gender == normal_user.gender 

122 assert parse_date(res.birthdate) == normal_user.birthdate 

123 assert res.banned 

124 assert not res.deleted 

125 

126 

127def test_DeleteUser(db): 

128 with session_scope() as session: 

129 super_user, super_token = generate_user(is_superuser=True) 

130 normal_user, normal_token = generate_user() 

131 

132 with real_admin_session(super_token) as api: 

133 res = api.DeleteUser(admin_pb2.DeleteUserReq(user=normal_user.username)) 

134 assert res.user_id == normal_user.id 

135 assert res.username == normal_user.username 

136 assert res.email == normal_user.email 

137 assert res.gender == normal_user.gender 

138 assert parse_date(res.birthdate) == normal_user.birthdate 

139 assert not res.banned 

140 assert res.deleted 

141 

142 

143def test_CreateApiKey(db): 

144 with session_scope() as session: 

145 super_user, super_token = generate_user(is_superuser=True) 

146 normal_user, normal_token = generate_user() 

147 

148 assert ( 

149 session.execute( 

150 select(func.count()) 

151 .select_from(UserSession) 

152 .where(UserSession.is_api_key == True) 

153 .where(UserSession.user_id == normal_user.id) 

154 ).scalar_one() 

155 == 0 

156 ) 

157 

158 with patch("couchers.email.enqueue_email_from_template") as mock: 

159 with real_admin_session(super_token) as api: 

160 res = api.CreateApiKey(admin_pb2.CreateApiKeyReq(user=normal_user.username)) 

161 

162 with session_scope() as session: 

163 api_key = session.execute( 

164 select(UserSession) 

165 .where(UserSession.is_valid) 

166 .where(UserSession.is_api_key == True) 

167 .where(UserSession.user_id == normal_user.id) 

168 ).scalar_one() 

169 

170 assert mock.called_once_with( 

171 normal_user.email, 

172 "api_key", 

173 template_args={"user": normal_user, "token": api_key.token, "expiry": api_key.expiry}, 

174 ) 

175 

176 

177VALID_GEOJSON_MULTIPOLYGON = """ 

178 { 

179 "type": "MultiPolygon", 

180 "coordinates": 

181 [ 

182 [ 

183 [ 

184 [ 

185 -73.98114904754641, 

186 40.7470284264813 

187 ], 

188 [ 

189 -73.98314135177611, 

190 40.73416844413217 

191 ], 

192 [ 

193 -74.00538969848634, 

194 40.734314779027144 

195 ], 

196 [ 

197 -74.00479214294432, 

198 40.75027851544338 

199 ], 

200 [ 

201 -73.98114904754641, 

202 40.7470284264813 

203 ] 

204 ] 

205 ] 

206 ] 

207 } 

208""" 

209 

210POINT_GEOJSON = """ 

211{ "type": "Point", "coordinates": [100.0, 0.0] } 

212""" 

213 

214 

215def test_CreateCommunity_invalid_geojson(db): 

216 with session_scope() as session: 

217 super_user, super_token = generate_user(is_superuser=True) 

218 normal_user, normal_token = generate_user() 

219 with real_admin_session(super_token) as api: 

220 with pytest.raises(grpc.RpcError) as e: 

221 api.CreateCommunity( 

222 admin_pb2.CreateCommunityReq( 

223 name="test community", 

224 slug="test-community", 

225 description="community for testing", 

226 admin_ids=[], 

227 geojson=POINT_GEOJSON, 

228 ) 

229 ) 

230 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

231 assert e.value.details() == errors.NO_MULTIPOLYGON 

232 

233 

234def test_CreateCommunity(db): 

235 with session_scope() as session: 

236 super_user, super_token = generate_user(is_superuser=True) 

237 normal_user, normal_token = generate_user() 

238 with real_admin_session(super_token) as api: 

239 api.CreateCommunity( 

240 admin_pb2.CreateCommunityReq( 

241 name="test community", 

242 slug="test-community", 

243 description="community for testing", 

244 admin_ids=[], 

245 geojson=VALID_GEOJSON_MULTIPOLYGON, 

246 ) 

247 ) 

248 community = session.execute(select(Cluster).where(Cluster.name == "test community")).scalar_one() 

249 assert community.description == "community for testing" 

250 assert community.slug == "test-community" 

251 

252 

253def test_GetChats(db): 

254 with session_scope() as session: 

255 super_user, super_token = generate_user(is_superuser=True) 

256 normal_user, normal_token = generate_user() 

257 

258 with real_admin_session(super_token) as api: 

259 res = api.GetChats(admin_pb2.GetChatsReq(user=normal_user.username)) 

260 assert res.response