Coverage for src/tests/test_jail.py: 100%

222 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import grpc 

2import pytest 

3from google.protobuf import empty_pb2 

4 

5from couchers import errors, models 

6from couchers.constants import TOS_VERSION 

7from couchers.servicers import jail as servicers_jail 

8from couchers.utils import to_aware_datetime 

9from proto import admin_pb2, api_pb2, jail_pb2 

10from tests.test_fixtures import ( # noqa # noqa 

11 db, 

12 email_fields, 

13 fast_passwords, 

14 generate_user, 

15 mock_notification_email, 

16 push_collector, 

17 real_account_session, 

18 real_admin_session, 

19 real_api_session, 

20 real_jail_session, 

21 testconfig, 

22) 

23 

24 

25@pytest.fixture(autouse=True) 

26def _(testconfig): 

27 pass 

28 

29 

30def test_jail_basic(db): 

31 user1, token1 = generate_user() 

32 

33 with real_api_session(token1) as api: 

34 res = api.Ping(api_pb2.PingReq()) 

35 

36 with real_jail_session(token1) as jail: 

37 res = jail.JailInfo(empty_pb2.Empty()) 

38 # check every field is false 

39 for field in res.DESCRIPTOR.fields: 

40 assert not getattr(res, field.name) 

41 

42 assert not res.jailed 

43 

44 # make the user jailed 

45 user2, token2 = generate_user(accepted_tos=0) 

46 

47 with real_api_session(token2) as api, pytest.raises(grpc.RpcError) as e: 

48 res = api.Ping(api_pb2.PingReq()) 

49 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

50 

51 with real_jail_session(token2) as jail: 

52 res = jail.JailInfo(empty_pb2.Empty()) 

53 

54 assert res.jailed 

55 

56 reason_count = 0 

57 

58 # check at least one field is true 

59 for field in res.DESCRIPTOR.fields: 

60 reason_count += getattr(res, field.name) == True 

61 

62 assert reason_count > 0 

63 

64 

65def test_JailInfo(db): 

66 user1, token1 = generate_user(accepted_tos=0) 

67 

68 with real_jail_session(token1) as jail: 

69 res = jail.JailInfo(empty_pb2.Empty()) 

70 assert res.jailed 

71 assert res.has_not_accepted_tos 

72 

73 with real_api_session(token1) as api, pytest.raises(grpc.RpcError) as e: 

74 res = api.Ping(api_pb2.PingReq()) 

75 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

76 

77 # make the user not jailed 

78 user2, token2 = generate_user() 

79 

80 with real_jail_session(token2) as jail: 

81 res = jail.JailInfo(empty_pb2.Empty()) 

82 assert not res.jailed 

83 assert not res.has_not_accepted_tos 

84 

85 with real_api_session(token2) as api: 

86 res = api.Ping(api_pb2.PingReq()) 

87 

88 

89def test_AcceptTOS(db): 

90 # make them have not accepted TOS 

91 user1, token1 = generate_user(accepted_tos=0) 

92 

93 with real_jail_session(token1) as jail: 

94 res = jail.JailInfo(empty_pb2.Empty()) 

95 assert res.jailed 

96 assert res.has_not_accepted_tos 

97 

98 # make sure we can't unaccept 

99 with pytest.raises(grpc.RpcError) as e: 

100 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

101 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

102 assert e.value.details() == errors.CANT_UNACCEPT_TOS 

103 

104 res = jail.JailInfo(empty_pb2.Empty()) 

105 assert res.jailed 

106 assert res.has_not_accepted_tos 

107 

108 # now accept 

109 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

110 

111 res = jail.JailInfo(empty_pb2.Empty()) 

112 assert not res.jailed 

113 assert not res.has_not_accepted_tos 

114 

115 # make sure we can't unaccept 

116 with pytest.raises(grpc.RpcError) as e: 

117 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

118 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

119 assert e.value.details() == errors.CANT_UNACCEPT_TOS 

120 

121 # make them have accepted TOS 

122 user2, token2 = generate_user() 

123 

124 with real_jail_session(token2) as jail: 

125 res = jail.JailInfo(empty_pb2.Empty()) 

126 assert not res.jailed 

127 assert not res.has_not_accepted_tos 

128 

129 # make sure we can't unaccept 

130 with pytest.raises(grpc.RpcError) as e: 

131 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

132 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

133 assert e.value.details() == errors.CANT_UNACCEPT_TOS 

134 

135 # accepting again doesn't do anything 

136 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

137 

138 res = jail.JailInfo(empty_pb2.Empty()) 

139 assert not res.jailed 

140 assert not res.has_not_accepted_tos 

141 

142 

143def test_TOS_increase(db, monkeypatch): 

144 # test if the TOS version is updated 

145 

146 # not jailed yet 

147 user, token = generate_user() 

148 

149 with real_jail_session(token) as jail: 

150 res = jail.JailInfo(empty_pb2.Empty()) 

151 assert not res.jailed 

152 assert not res.has_not_accepted_tos 

153 

154 with real_api_session(token) as api: 

155 res = api.Ping(api_pb2.PingReq()) 

156 

157 # now we pretend to update the TOS version 

158 new_TOS_VERSION = TOS_VERSION + 1 

159 

160 monkeypatch.setattr(models, "TOS_VERSION", new_TOS_VERSION) 

161 monkeypatch.setattr(servicers_jail, "TOS_VERSION", new_TOS_VERSION) 

162 

163 # make sure we're jailed 

164 with real_api_session(token) as api, pytest.raises(grpc.RpcError) as e: 

165 res = api.Ping(api_pb2.PingReq()) 

166 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

167 

168 with real_jail_session(token) as jail: 

169 res = jail.JailInfo(empty_pb2.Empty()) 

170 assert res.jailed 

171 assert res.has_not_accepted_tos 

172 

173 # now accept 

174 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

175 

176 res = jail.JailInfo(empty_pb2.Empty()) 

177 assert not res.jailed 

178 assert not res.has_not_accepted_tos 

179 

180 

181def test_SetLocation(db): 

182 # make them have not added a location 

183 user1, token1 = generate_user(geom=None, geom_radius=None) 

184 

185 with real_jail_session(token1) as jail: 

186 res = jail.JailInfo(empty_pb2.Empty()) 

187 assert res.jailed 

188 assert res.has_not_added_location 

189 

190 res = jail.SetLocation( 

191 jail_pb2.SetLocationReq( 

192 city="New York City", 

193 lat=40.7812, 

194 lng=-73.9647, 

195 radius=250, 

196 ) 

197 ) 

198 

199 assert not res.jailed 

200 assert not res.has_not_added_location 

201 

202 res = jail.JailInfo(empty_pb2.Empty()) 

203 assert not res.jailed 

204 assert not res.has_not_added_location 

205 

206 

207def test_AcceptCommunityGuidelines(db): 

208 # make them have not accepted GC 

209 user1, token1 = generate_user(accepted_community_guidelines=0) 

210 

211 with real_jail_session(token1) as jail: 

212 res = jail.JailInfo(empty_pb2.Empty()) 

213 assert res.jailed 

214 assert res.has_not_accepted_community_guidelines 

215 

216 # make sure we can't unaccept 

217 with pytest.raises(grpc.RpcError) as e: 

218 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

219 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

220 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES 

221 

222 res = jail.JailInfo(empty_pb2.Empty()) 

223 assert res.jailed 

224 assert res.has_not_accepted_community_guidelines 

225 

226 # now accept 

227 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True)) 

228 

229 res = jail.JailInfo(empty_pb2.Empty()) 

230 assert not res.jailed 

231 assert not res.has_not_accepted_community_guidelines 

232 

233 # make sure we can't unaccept 

234 with pytest.raises(grpc.RpcError) as e: 

235 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

236 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

237 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES 

238 

239 # make them have accepted GC 

240 user2, token2 = generate_user() 

241 

242 with real_jail_session(token2) as jail: 

243 res = jail.JailInfo(empty_pb2.Empty()) 

244 assert not res.jailed 

245 assert not res.has_not_accepted_community_guidelines 

246 

247 # make sure we can't unaccept 

248 with pytest.raises(grpc.RpcError) as e: 

249 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

250 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

251 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES 

252 

253 # accepting again doesn't do anything 

254 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True)) 

255 

256 res = jail.JailInfo(empty_pb2.Empty()) 

257 assert not res.jailed 

258 assert not res.has_not_accepted_community_guidelines 

259 

260 

261def test_modnotes(db, push_collector): 

262 user, token = generate_user() 

263 super_user, super_token = generate_user(is_superuser=True) 

264 

265 with real_jail_session(token) as jail: 

266 res = jail.JailInfo(empty_pb2.Empty()) 

267 assert not res.jailed 

268 assert not res.has_pending_mod_notes 

269 assert len(res.pending_mod_notes) == 0 

270 

271 with real_account_session(token) as account: 

272 res = account.ListModNotes(empty_pb2.Empty()) 

273 assert len(res.mod_notes) == 0 

274 

275 with real_admin_session(super_token) as admin: 

276 with mock_notification_email() as mock: 

277 admin.SendModNote( 

278 admin_pb2.SendModNoteReq( 

279 user=user.username, 

280 content="# Important note\nThis is a sample mod note.", 

281 internal_id="sample_note", 

282 ) 

283 ) 

284 mock.assert_called_once() 

285 e = email_fields(mock) 

286 

287 assert e.subject == "[TEST] You have received a mod note" 

288 push_collector.assert_user_has_single_matching( 

289 user.id, 

290 title="You received a mod note", 

291 body="You need to read and acknowledge the note before continuing to use the platform.", 

292 ) 

293 

294 with real_jail_session(token) as jail: 

295 res = jail.JailInfo(empty_pb2.Empty()) 

296 assert res.jailed 

297 assert res.has_pending_mod_notes 

298 assert len(res.pending_mod_notes) == 1 

299 note = res.pending_mod_notes[0] 

300 assert note.note_content == "# Important note\nThis is a sample mod note." 

301 

302 note_id = note.note_id 

303 

304 with pytest.raises(grpc.RpcError) as e: 

305 jail.AcknowledgePendingModNote( 

306 jail_pb2.AcknowledgePendingModNoteReq( 

307 note_id=note_id, 

308 acknowledge=False, 

309 ) 

310 ) 

311 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

312 assert e.value.details() == errors.MOD_NOTE_NEED_TO_ACKNOWELDGE 

313 

314 assert res.jailed 

315 assert res.has_pending_mod_notes 

316 assert len(res.pending_mod_notes) == 1 

317 note = res.pending_mod_notes[0] 

318 assert note.note_content == "# Important note\nThis is a sample mod note." 

319 

320 res = jail.AcknowledgePendingModNote( 

321 jail_pb2.AcknowledgePendingModNoteReq( 

322 note_id=note_id, 

323 acknowledge=True, 

324 ) 

325 ) 

326 assert not res.jailed 

327 assert not res.has_pending_mod_notes 

328 assert len(res.pending_mod_notes) == 0 

329 

330 with pytest.raises(grpc.RpcError) as e: 

331 jail.AcknowledgePendingModNote( 

332 jail_pb2.AcknowledgePendingModNoteReq( 

333 note_id=note_id, 

334 acknowledge=False, 

335 ) 

336 ) 

337 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

338 assert e.value.details() == errors.MOD_NOTE_NOT_FOUND 

339 

340 with real_account_session(token) as account: 

341 res = account.ListModNotes(empty_pb2.Empty()) 

342 assert len(res.mod_notes) == 1 

343 note = res.mod_notes[0] 

344 assert note.note_id == note_id 

345 assert note.note_content == "# Important note\nThis is a sample mod note." 

346 

347 assert to_aware_datetime(note.acknowledged) > to_aware_datetime(note.created) 

348 

349 

350def test_modnotes_no_notify(db, push_collector): 

351 user, token = generate_user() 

352 super_user, super_token = generate_user(is_superuser=True) 

353 

354 with real_jail_session(token) as jail: 

355 res = jail.JailInfo(empty_pb2.Empty()) 

356 assert not res.jailed 

357 assert not res.has_pending_mod_notes 

358 assert len(res.pending_mod_notes) == 0 

359 

360 with real_account_session(token) as account: 

361 res = account.ListModNotes(empty_pb2.Empty()) 

362 assert len(res.mod_notes) == 0 

363 

364 with real_admin_session(super_token) as admin: 

365 with mock_notification_email() as mock: 

366 admin.SendModNote( 

367 admin_pb2.SendModNoteReq( 

368 user=user.username, 

369 content="# Important note\nThis is a sample mod note.", 

370 internal_id="sample_note", 

371 do_not_notify=True, 

372 ) 

373 ) 

374 mock.assert_not_called() 

375 

376 with real_jail_session(token) as jail: 

377 res = jail.JailInfo(empty_pb2.Empty()) 

378 assert res.jailed 

379 assert res.has_pending_mod_notes 

380 assert len(res.pending_mod_notes) == 1 

381 note = res.pending_mod_notes[0] 

382 assert note.note_content == "# Important note\nThis is a sample mod note."