Coverage for app / backend / src / tests / test_jail.py: 100%

237 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import grpc 

2import pytest 

3from google.protobuf import empty_pb2 

4 

5from couchers.constants import TOS_VERSION 

6from couchers.models import users 

7from couchers.proto import admin_pb2, api_pb2, jail_pb2 

8from couchers.servicers import jail as servicers_jail 

9from couchers.utils import create_coordinate, to_aware_datetime 

10from tests.fixtures.db import generate_user 

11from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email 

12from tests.fixtures.sessions import real_account_session, real_admin_session, real_api_session, real_jail_session 

13 

14 

15@pytest.fixture(autouse=True) 

16def _(testconfig): 

17 pass 

18 

19 

20def test_jail_basic(db): 

21 user1, token1 = generate_user() 

22 

23 with real_api_session(token1) as api: 

24 api.Ping(api_pb2.PingReq()) 

25 

26 with real_jail_session(token1) as jail: 

27 res = jail.JailInfo(empty_pb2.Empty()) 

28 # check every field is false 

29 for field in res.DESCRIPTOR.fields: 

30 assert not getattr(res, field.name) 

31 

32 assert not res.jailed 

33 

34 # make the user jailed 

35 user2, token2 = generate_user(accepted_tos=0) 

36 

37 with real_api_session(token2) as api, pytest.raises(grpc.RpcError) as e: 

38 api.Ping(api_pb2.PingReq()) 

39 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

40 

41 with real_jail_session(token2) as jail: 

42 res = jail.JailInfo(empty_pb2.Empty()) 

43 

44 assert res.jailed 

45 

46 reason_count = 0 

47 

48 # check at least one field is true 

49 for field in res.DESCRIPTOR.fields: 

50 reason_count += getattr(res, field.name) == True 

51 

52 assert reason_count > 0 

53 

54 

55def test_JailInfo(db): 

56 user1, token1 = generate_user(accepted_tos=0) 

57 

58 with real_jail_session(token1) as jail: 

59 res = jail.JailInfo(empty_pb2.Empty()) 

60 assert res.jailed 

61 assert res.has_not_accepted_tos 

62 

63 with real_api_session(token1) as api, pytest.raises(grpc.RpcError) as e: 

64 res = api.Ping(api_pb2.PingReq()) 

65 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

66 

67 # make the user not jailed 

68 user2, token2 = generate_user() 

69 

70 with real_jail_session(token2) as jail: 

71 res = jail.JailInfo(empty_pb2.Empty()) 

72 assert not res.jailed 

73 assert not res.has_not_accepted_tos 

74 

75 with real_api_session(token2) as api: 

76 res = api.Ping(api_pb2.PingReq()) 

77 

78 

79def test_AcceptTOS(db): 

80 # make them have not accepted TOS 

81 user1, token1 = generate_user(accepted_tos=0) 

82 

83 with real_jail_session(token1) as jail: 

84 res = jail.JailInfo(empty_pb2.Empty()) 

85 assert res.jailed 

86 assert res.has_not_accepted_tos 

87 

88 # make sure we can't unaccept 

89 with pytest.raises(grpc.RpcError) as e: 

90 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

91 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

92 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service." 

93 

94 res = jail.JailInfo(empty_pb2.Empty()) 

95 assert res.jailed 

96 assert res.has_not_accepted_tos 

97 

98 # now accept 

99 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

100 

101 res = jail.JailInfo(empty_pb2.Empty()) 

102 assert not res.jailed 

103 assert not res.has_not_accepted_tos 

104 

105 # make sure we can't unaccept 

106 with pytest.raises(grpc.RpcError) as e: 

107 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

108 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

109 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service." 

110 

111 # make them have accepted TOS 

112 user2, token2 = generate_user() 

113 

114 with real_jail_session(token2) as jail: 

115 res = jail.JailInfo(empty_pb2.Empty()) 

116 assert not res.jailed 

117 assert not res.has_not_accepted_tos 

118 

119 # make sure we can't unaccept 

120 with pytest.raises(grpc.RpcError) as e: 

121 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

122 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

123 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service." 

124 

125 # accepting again doesn't do anything 

126 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

127 

128 res = jail.JailInfo(empty_pb2.Empty()) 

129 assert not res.jailed 

130 assert not res.has_not_accepted_tos 

131 

132 

133def test_TOS_increase(db, monkeypatch): 

134 # test if the TOS version is updated 

135 

136 # not jailed yet 

137 user, token = generate_user() 

138 

139 with real_jail_session(token) as jail: 

140 res = jail.JailInfo(empty_pb2.Empty()) 

141 assert not res.jailed 

142 assert not res.has_not_accepted_tos 

143 

144 with real_api_session(token) as api: 

145 res = api.Ping(api_pb2.PingReq()) 

146 

147 # now we pretend to update the TOS version 

148 new_TOS_VERSION = TOS_VERSION + 1 

149 

150 monkeypatch.setattr(users, "TOS_VERSION", new_TOS_VERSION) 

151 monkeypatch.setattr(servicers_jail, "TOS_VERSION", new_TOS_VERSION) 

152 

153 # make sure we're jailed 

154 with real_api_session(token) as api, pytest.raises(grpc.RpcError) as e: 

155 api.Ping(api_pb2.PingReq()) 

156 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

157 

158 with real_jail_session(token) as jail: 

159 res = jail.JailInfo(empty_pb2.Empty()) 

160 assert res.jailed 

161 assert res.has_not_accepted_tos 

162 

163 # now accept 

164 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

165 

166 res = jail.JailInfo(empty_pb2.Empty()) 

167 assert not res.jailed 

168 assert not res.has_not_accepted_tos 

169 

170 

171def test_SetLocation(db): 

172 # make them need to update location 

173 user1, token1 = generate_user(geom=create_coordinate(0, 0), geom_radius=0, needs_to_update_location=True) 

174 

175 

176def test_MarkUserNeedsLocationUpdate(db): 

177 user, token = generate_user() 

178 super_user, super_token = generate_user(is_superuser=True) 

179 

180 with real_jail_session(token) as jail: 

181 res = jail.JailInfo(empty_pb2.Empty()) 

182 assert not res.jailed 

183 assert not res.needs_to_update_location 

184 assert len(res.pending_mod_notes) == 0 

185 

186 with real_admin_session(super_token) as admin: 

187 with mock_notification_email() as mock: 

188 admin.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=user.username)) 

189 

190 with real_jail_session(token) as jail: 

191 res = jail.JailInfo(empty_pb2.Empty()) 

192 assert res.jailed 

193 assert res.needs_to_update_location 

194 

195 res = jail.SetLocation( 

196 jail_pb2.SetLocationReq( 

197 city="New York City", 

198 lat=40.7812, 

199 lng=-73.9647, 

200 radius=250, 

201 ) 

202 ) 

203 

204 assert not res.jailed 

205 assert not res.needs_to_update_location 

206 

207 res = jail.JailInfo(empty_pb2.Empty()) 

208 assert not res.jailed 

209 assert not res.needs_to_update_location 

210 

211 

212def test_AcceptCommunityGuidelines(db): 

213 # make them have not accepted GC 

214 user1, token1 = generate_user(accepted_community_guidelines=0) 

215 

216 with real_jail_session(token1) as jail: 

217 res = jail.JailInfo(empty_pb2.Empty()) 

218 assert res.jailed 

219 assert res.has_not_accepted_community_guidelines 

220 

221 # make sure we can't unaccept 

222 with pytest.raises(grpc.RpcError) as e: 

223 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

224 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

225 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines." 

226 

227 res = jail.JailInfo(empty_pb2.Empty()) 

228 assert res.jailed 

229 assert res.has_not_accepted_community_guidelines 

230 

231 # now accept 

232 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True)) 

233 

234 res = jail.JailInfo(empty_pb2.Empty()) 

235 assert not res.jailed 

236 assert not res.has_not_accepted_community_guidelines 

237 

238 # make sure we can't unaccept 

239 with pytest.raises(grpc.RpcError) as e: 

240 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

241 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

242 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines." 

243 

244 # make them have accepted GC 

245 user2, token2 = generate_user() 

246 

247 with real_jail_session(token2) as jail: 

248 res = jail.JailInfo(empty_pb2.Empty()) 

249 assert not res.jailed 

250 assert not res.has_not_accepted_community_guidelines 

251 

252 # make sure we can't unaccept 

253 with pytest.raises(grpc.RpcError) as e: 

254 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

255 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

256 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines." 

257 

258 # accepting again doesn't do anything 

259 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True)) 

260 

261 res = jail.JailInfo(empty_pb2.Empty()) 

262 assert not res.jailed 

263 assert not res.has_not_accepted_community_guidelines 

264 

265 

266def test_modnotes(db, push_collector: PushCollector): 

267 user, token = generate_user() 

268 super_user, super_token = generate_user(is_superuser=True) 

269 

270 with real_jail_session(token) as jail: 

271 res = jail.JailInfo(empty_pb2.Empty()) 

272 assert not res.jailed 

273 assert not res.has_pending_mod_notes 

274 assert len(res.pending_mod_notes) == 0 

275 

276 with real_account_session(token) as account: 

277 res = account.ListModNotes(empty_pb2.Empty()) 

278 assert len(res.mod_notes) == 0 

279 

280 with real_admin_session(super_token) as admin: 

281 with mock_notification_email() as mock: 

282 admin.SendModNote( 

283 admin_pb2.SendModNoteReq( 

284 user=user.username, 

285 content="# Important note\nThis is a sample mod note.", 

286 internal_id="sample_note", 

287 ) 

288 ) 

289 mock.assert_called_once() 

290 fields = email_fields(mock) 

291 

292 assert fields.subject == "[TEST] You have received a mod note" 

293 push = push_collector.pop_for_user(user.id, last=True) 

294 assert push.content.title == "New moderator note" 

295 assert ( 

296 push.content.body 

297 == "You received a moderator note. Read and acknowledge it to continue using the platform." 

298 ) 

299 

300 with real_jail_session(token) as jail: 

301 res = jail.JailInfo(empty_pb2.Empty()) 

302 assert res.jailed 

303 assert res.has_pending_mod_notes 

304 assert len(res.pending_mod_notes) == 1 

305 note = res.pending_mod_notes[0] 

306 assert note.note_content == "# Important note\nThis is a sample mod note." 

307 

308 note_id = note.note_id 

309 

310 with pytest.raises(grpc.RpcError) as e: 

311 jail.AcknowledgePendingModNote( 

312 jail_pb2.AcknowledgePendingModNoteReq( 

313 note_id=note_id, 

314 acknowledge=False, 

315 ) 

316 ) 

317 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

318 assert e.value.details() == "You need to read and acknowledge the moderator note." 

319 

320 assert res.jailed 

321 assert res.has_pending_mod_notes 

322 assert len(res.pending_mod_notes) == 1 

323 note = res.pending_mod_notes[0] 

324 assert note.note_content == "# Important note\nThis is a sample mod note." 

325 

326 res = jail.AcknowledgePendingModNote( 

327 jail_pb2.AcknowledgePendingModNoteReq( 

328 note_id=note_id, 

329 acknowledge=True, 

330 ) 

331 ) 

332 assert not res.jailed 

333 assert not res.has_pending_mod_notes 

334 assert len(res.pending_mod_notes) == 0 

335 

336 with pytest.raises(grpc.RpcError) as e: 

337 jail.AcknowledgePendingModNote( 

338 jail_pb2.AcknowledgePendingModNoteReq( 

339 note_id=note_id, 

340 acknowledge=False, 

341 ) 

342 ) 

343 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

344 assert e.value.details() == "Moderator note not found." 

345 

346 with real_account_session(token) as account: 

347 res = account.ListModNotes(empty_pb2.Empty()) 

348 assert len(res.mod_notes) == 1 

349 note = res.mod_notes[0] 

350 assert note.note_id == note_id 

351 assert note.note_content == "# Important note\nThis is a sample mod note." 

352 

353 assert to_aware_datetime(note.acknowledged) > to_aware_datetime(note.created) 

354 

355 

356def test_modnotes_no_notify(db, push_collector: PushCollector): 

357 user, token = generate_user() 

358 super_user, super_token = generate_user(is_superuser=True) 

359 

360 with real_jail_session(token) as jail: 

361 res = jail.JailInfo(empty_pb2.Empty()) 

362 assert not res.jailed 

363 assert not res.has_pending_mod_notes 

364 assert len(res.pending_mod_notes) == 0 

365 

366 with real_account_session(token) as account: 

367 res = account.ListModNotes(empty_pb2.Empty()) 

368 assert len(res.mod_notes) == 0 

369 

370 with real_admin_session(super_token) as admin: 

371 with mock_notification_email() as mock: 

372 admin.SendModNote( 

373 admin_pb2.SendModNoteReq( 

374 user=user.username, 

375 content="# Important note\nThis is a sample mod note.", 

376 internal_id="sample_note", 

377 do_not_notify=True, 

378 ) 

379 ) 

380 mock.assert_not_called() 

381 

382 with real_jail_session(token) as jail: 

383 res = jail.JailInfo(empty_pb2.Empty()) 

384 assert res.jailed 

385 assert res.has_pending_mod_notes 

386 assert len(res.pending_mod_notes) == 1 

387 note = res.pending_mod_notes[0] 

388 assert note.note_content == "# Important note\nThis is a sample mod note."