Coverage for src/tests/test_jail.py: 100%

233 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1import grpc 

2import pytest 

3from google.protobuf import empty_pb2 

4 

5from couchers import errors, models 

6from couchers.constants import TOS_VERSION 

7from couchers.servicers import jail as servicers_jail 

8from couchers.utils import create_coordinate, to_aware_datetime 

9from proto import admin_pb2, api_pb2, jail_pb2 

10from tests.test_fixtures import ( # noqa # noqa 

11 db, 

12 email_fields, 

13 fast_passwords, 

14 generate_user, 

15 mock_notification_email, 

16 push_collector, 

17 real_account_session, 

18 real_admin_session, 

19 real_api_session, 

20 real_jail_session, 

21 testconfig, 

22) 

23 

24 

25@pytest.fixture(autouse=True) 

26def _(testconfig): 

27 pass 

28 

29 

30def test_jail_basic(db): 

31 user1, token1 = generate_user() 

32 

33 with real_api_session(token1) as api: 

34 res = api.Ping(api_pb2.PingReq()) 

35 

36 with real_jail_session(token1) as jail: 

37 res = jail.JailInfo(empty_pb2.Empty()) 

38 # check every field is false 

39 for field in res.DESCRIPTOR.fields: 

40 assert not getattr(res, field.name) 

41 

42 assert not res.jailed 

43 

44 # make the user jailed 

45 user2, token2 = generate_user(accepted_tos=0) 

46 

47 with real_api_session(token2) as api, pytest.raises(grpc.RpcError) as e: 

48 res = api.Ping(api_pb2.PingReq()) 

49 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

50 

51 with real_jail_session(token2) as jail: 

52 res = jail.JailInfo(empty_pb2.Empty()) 

53 

54 assert res.jailed 

55 

56 reason_count = 0 

57 

58 # check at least one field is true 

59 for field in res.DESCRIPTOR.fields: 

60 reason_count += getattr(res, field.name) == True 

61 

62 assert reason_count > 0 

63 

64 

65def test_JailInfo(db): 

66 user1, token1 = generate_user(accepted_tos=0) 

67 

68 with real_jail_session(token1) as jail: 

69 res = jail.JailInfo(empty_pb2.Empty()) 

70 assert res.jailed 

71 assert res.has_not_accepted_tos 

72 

73 with real_api_session(token1) as api, pytest.raises(grpc.RpcError) as e: 

74 res = api.Ping(api_pb2.PingReq()) 

75 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

76 

77 # make the user not jailed 

78 user2, token2 = generate_user() 

79 

80 with real_jail_session(token2) as jail: 

81 res = jail.JailInfo(empty_pb2.Empty()) 

82 assert not res.jailed 

83 assert not res.has_not_accepted_tos 

84 

85 with real_api_session(token2) as api: 

86 res = api.Ping(api_pb2.PingReq()) 

87 

88 

89def test_AcceptTOS(db): 

90 # make them have not accepted TOS 

91 user1, token1 = generate_user(accepted_tos=0) 

92 

93 with real_jail_session(token1) as jail: 

94 res = jail.JailInfo(empty_pb2.Empty()) 

95 assert res.jailed 

96 assert res.has_not_accepted_tos 

97 

98 # make sure we can't unaccept 

99 with pytest.raises(grpc.RpcError) as e: 

100 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

101 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

102 assert e.value.details() == errors.CANT_UNACCEPT_TOS 

103 

104 res = jail.JailInfo(empty_pb2.Empty()) 

105 assert res.jailed 

106 assert res.has_not_accepted_tos 

107 

108 # now accept 

109 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

110 

111 res = jail.JailInfo(empty_pb2.Empty()) 

112 assert not res.jailed 

113 assert not res.has_not_accepted_tos 

114 

115 # make sure we can't unaccept 

116 with pytest.raises(grpc.RpcError) as e: 

117 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

118 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

119 assert e.value.details() == errors.CANT_UNACCEPT_TOS 

120 

121 # make them have accepted TOS 

122 user2, token2 = generate_user() 

123 

124 with real_jail_session(token2) as jail: 

125 res = jail.JailInfo(empty_pb2.Empty()) 

126 assert not res.jailed 

127 assert not res.has_not_accepted_tos 

128 

129 # make sure we can't unaccept 

130 with pytest.raises(grpc.RpcError) as e: 

131 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

132 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

133 assert e.value.details() == errors.CANT_UNACCEPT_TOS 

134 

135 # accepting again doesn't do anything 

136 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

137 

138 res = jail.JailInfo(empty_pb2.Empty()) 

139 assert not res.jailed 

140 assert not res.has_not_accepted_tos 

141 

142 

143def test_TOS_increase(db, monkeypatch): 

144 # test if the TOS version is updated 

145 

146 # not jailed yet 

147 user, token = generate_user() 

148 

149 with real_jail_session(token) as jail: 

150 res = jail.JailInfo(empty_pb2.Empty()) 

151 assert not res.jailed 

152 assert not res.has_not_accepted_tos 

153 

154 with real_api_session(token) as api: 

155 res = api.Ping(api_pb2.PingReq()) 

156 

157 # now we pretend to update the TOS version 

158 new_TOS_VERSION = TOS_VERSION + 1 

159 

160 monkeypatch.setattr(models, "TOS_VERSION", new_TOS_VERSION) 

161 monkeypatch.setattr(servicers_jail, "TOS_VERSION", new_TOS_VERSION) 

162 

163 # make sure we're jailed 

164 with real_api_session(token) as api, pytest.raises(grpc.RpcError) as e: 

165 res = api.Ping(api_pb2.PingReq()) 

166 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

167 

168 with real_jail_session(token) as jail: 

169 res = jail.JailInfo(empty_pb2.Empty()) 

170 assert res.jailed 

171 assert res.has_not_accepted_tos 

172 

173 # now accept 

174 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

175 

176 res = jail.JailInfo(empty_pb2.Empty()) 

177 assert not res.jailed 

178 assert not res.has_not_accepted_tos 

179 

180 

181def test_SetLocation(db): 

182 # make them need to update location 

183 user1, token1 = generate_user(geom=create_coordinate(0, 0), geom_radius=0, needs_to_update_location=True) 

184 

185 

186def test_MarkUserNeedsLocationUpdate(db): 

187 user, token = generate_user() 

188 super_user, super_token = generate_user(is_superuser=True) 

189 

190 with real_jail_session(token) as jail: 

191 res = jail.JailInfo(empty_pb2.Empty()) 

192 assert not res.jailed 

193 assert not res.needs_to_update_location 

194 assert len(res.pending_mod_notes) == 0 

195 

196 with real_admin_session(super_token) as admin: 

197 with mock_notification_email() as mock: 

198 admin.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=user.username)) 

199 

200 with real_jail_session(token) as jail: 

201 res = jail.JailInfo(empty_pb2.Empty()) 

202 assert res.jailed 

203 assert res.needs_to_update_location 

204 

205 res = jail.SetLocation( 

206 jail_pb2.SetLocationReq( 

207 city="New York City", 

208 lat=40.7812, 

209 lng=-73.9647, 

210 radius=250, 

211 ) 

212 ) 

213 

214 assert not res.jailed 

215 assert not res.needs_to_update_location 

216 

217 res = jail.JailInfo(empty_pb2.Empty()) 

218 assert not res.jailed 

219 assert not res.needs_to_update_location 

220 

221 

222def test_AcceptCommunityGuidelines(db): 

223 # make them have not accepted GC 

224 user1, token1 = generate_user(accepted_community_guidelines=0) 

225 

226 with real_jail_session(token1) as jail: 

227 res = jail.JailInfo(empty_pb2.Empty()) 

228 assert res.jailed 

229 assert res.has_not_accepted_community_guidelines 

230 

231 # make sure we can't unaccept 

232 with pytest.raises(grpc.RpcError) as e: 

233 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

234 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

235 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES 

236 

237 res = jail.JailInfo(empty_pb2.Empty()) 

238 assert res.jailed 

239 assert res.has_not_accepted_community_guidelines 

240 

241 # now accept 

242 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True)) 

243 

244 res = jail.JailInfo(empty_pb2.Empty()) 

245 assert not res.jailed 

246 assert not res.has_not_accepted_community_guidelines 

247 

248 # make sure we can't unaccept 

249 with pytest.raises(grpc.RpcError) as e: 

250 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

251 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

252 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES 

253 

254 # make them have accepted GC 

255 user2, token2 = generate_user() 

256 

257 with real_jail_session(token2) as jail: 

258 res = jail.JailInfo(empty_pb2.Empty()) 

259 assert not res.jailed 

260 assert not res.has_not_accepted_community_guidelines 

261 

262 # make sure we can't unaccept 

263 with pytest.raises(grpc.RpcError) as e: 

264 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

265 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

266 assert e.value.details() == errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES 

267 

268 # accepting again doesn't do anything 

269 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True)) 

270 

271 res = jail.JailInfo(empty_pb2.Empty()) 

272 assert not res.jailed 

273 assert not res.has_not_accepted_community_guidelines 

274 

275 

276def test_modnotes(db, push_collector): 

277 user, token = generate_user() 

278 super_user, super_token = generate_user(is_superuser=True) 

279 

280 with real_jail_session(token) as jail: 

281 res = jail.JailInfo(empty_pb2.Empty()) 

282 assert not res.jailed 

283 assert not res.has_pending_mod_notes 

284 assert len(res.pending_mod_notes) == 0 

285 

286 with real_account_session(token) as account: 

287 res = account.ListModNotes(empty_pb2.Empty()) 

288 assert len(res.mod_notes) == 0 

289 

290 with real_admin_session(super_token) as admin: 

291 with mock_notification_email() as mock: 

292 admin.SendModNote( 

293 admin_pb2.SendModNoteReq( 

294 user=user.username, 

295 content="# Important note\nThis is a sample mod note.", 

296 internal_id="sample_note", 

297 ) 

298 ) 

299 mock.assert_called_once() 

300 e = email_fields(mock) 

301 

302 assert e.subject == "[TEST] You have received a mod note" 

303 push_collector.assert_user_has_single_matching( 

304 user.id, 

305 title="You received a mod note", 

306 body="You need to read and acknowledge the note before continuing to use the platform.", 

307 ) 

308 

309 with real_jail_session(token) as jail: 

310 res = jail.JailInfo(empty_pb2.Empty()) 

311 assert res.jailed 

312 assert res.has_pending_mod_notes 

313 assert len(res.pending_mod_notes) == 1 

314 note = res.pending_mod_notes[0] 

315 assert note.note_content == "# Important note\nThis is a sample mod note." 

316 

317 note_id = note.note_id 

318 

319 with pytest.raises(grpc.RpcError) as e: 

320 jail.AcknowledgePendingModNote( 

321 jail_pb2.AcknowledgePendingModNoteReq( 

322 note_id=note_id, 

323 acknowledge=False, 

324 ) 

325 ) 

326 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

327 assert e.value.details() == errors.MOD_NOTE_NEED_TO_ACKNOWELDGE 

328 

329 assert res.jailed 

330 assert res.has_pending_mod_notes 

331 assert len(res.pending_mod_notes) == 1 

332 note = res.pending_mod_notes[0] 

333 assert note.note_content == "# Important note\nThis is a sample mod note." 

334 

335 res = jail.AcknowledgePendingModNote( 

336 jail_pb2.AcknowledgePendingModNoteReq( 

337 note_id=note_id, 

338 acknowledge=True, 

339 ) 

340 ) 

341 assert not res.jailed 

342 assert not res.has_pending_mod_notes 

343 assert len(res.pending_mod_notes) == 0 

344 

345 with pytest.raises(grpc.RpcError) as e: 

346 jail.AcknowledgePendingModNote( 

347 jail_pb2.AcknowledgePendingModNoteReq( 

348 note_id=note_id, 

349 acknowledge=False, 

350 ) 

351 ) 

352 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

353 assert e.value.details() == errors.MOD_NOTE_NOT_FOUND 

354 

355 with real_account_session(token) as account: 

356 res = account.ListModNotes(empty_pb2.Empty()) 

357 assert len(res.mod_notes) == 1 

358 note = res.mod_notes[0] 

359 assert note.note_id == note_id 

360 assert note.note_content == "# Important note\nThis is a sample mod note." 

361 

362 assert to_aware_datetime(note.acknowledged) > to_aware_datetime(note.created) 

363 

364 

365def test_modnotes_no_notify(db, push_collector): 

366 user, token = generate_user() 

367 super_user, super_token = generate_user(is_superuser=True) 

368 

369 with real_jail_session(token) as jail: 

370 res = jail.JailInfo(empty_pb2.Empty()) 

371 assert not res.jailed 

372 assert not res.has_pending_mod_notes 

373 assert len(res.pending_mod_notes) == 0 

374 

375 with real_account_session(token) as account: 

376 res = account.ListModNotes(empty_pb2.Empty()) 

377 assert len(res.mod_notes) == 0 

378 

379 with real_admin_session(super_token) as admin: 

380 with mock_notification_email() as mock: 

381 admin.SendModNote( 

382 admin_pb2.SendModNoteReq( 

383 user=user.username, 

384 content="# Important note\nThis is a sample mod note.", 

385 internal_id="sample_note", 

386 do_not_notify=True, 

387 ) 

388 ) 

389 mock.assert_not_called() 

390 

391 with real_jail_session(token) as jail: 

392 res = jail.JailInfo(empty_pb2.Empty()) 

393 assert res.jailed 

394 assert res.has_pending_mod_notes 

395 assert len(res.pending_mod_notes) == 1 

396 note = res.pending_mod_notes[0] 

397 assert note.note_content == "# Important note\nThis is a sample mod note."