Coverage for app/backend/src/tests/test_jail.py: 100%

233 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import grpc 

2import pytest 

3from google.protobuf import empty_pb2 

4 

5from couchers.constants import TOS_VERSION 

6from couchers.models import users 

7from couchers.proto import admin_pb2, api_pb2, jail_pb2 

8from couchers.servicers import jail as servicers_jail 

9from couchers.utils import create_coordinate, to_aware_datetime 

10from tests.fixtures.db import generate_user 

11from tests.fixtures.misc import EmailCollector, PushCollector 

12from tests.fixtures.sessions import real_account_session, real_admin_session, real_api_session, real_jail_session 

13 

14 

15@pytest.fixture(autouse=True) 

16def _(testconfig): 

17 pass 

18 

19 

20def test_jail_basic(db): 

21 user1, token1 = generate_user() 

22 

23 with real_api_session(token1) as api: 

24 api.Ping(api_pb2.PingReq()) 

25 

26 with real_jail_session(token1) as jail: 

27 res = jail.JailInfo(empty_pb2.Empty()) 

28 # check every field is false 

29 for field in res.DESCRIPTOR.fields: 

30 assert not getattr(res, field.name) 

31 

32 assert not res.jailed 

33 

34 # make the user jailed 

35 user2, token2 = generate_user(accepted_tos=0) 

36 

37 with real_api_session(token2) as api, pytest.raises(grpc.RpcError) as e: 

38 api.Ping(api_pb2.PingReq()) 

39 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

40 

41 with real_jail_session(token2) as jail: 

42 res = jail.JailInfo(empty_pb2.Empty()) 

43 

44 assert res.jailed 

45 

46 reason_count = 0 

47 

48 # check at least one field is true 

49 for field in res.DESCRIPTOR.fields: 

50 reason_count += getattr(res, field.name) == True 

51 

52 assert reason_count > 0 

53 

54 

55def test_JailInfo(db): 

56 user1, token1 = generate_user(accepted_tos=0) 

57 

58 with real_jail_session(token1) as jail: 

59 res = jail.JailInfo(empty_pb2.Empty()) 

60 assert res.jailed 

61 assert res.has_not_accepted_tos 

62 

63 with real_api_session(token1) as api, pytest.raises(grpc.RpcError) as e: 

64 res = api.Ping(api_pb2.PingReq()) 

65 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

66 

67 # make the user not jailed 

68 user2, token2 = generate_user() 

69 

70 with real_jail_session(token2) as jail: 

71 res = jail.JailInfo(empty_pb2.Empty()) 

72 assert not res.jailed 

73 assert not res.has_not_accepted_tos 

74 

75 with real_api_session(token2) as api: 

76 res = api.Ping(api_pb2.PingReq()) 

77 

78 

79def test_AcceptTOS(db): 

80 # make them have not accepted TOS 

81 user1, token1 = generate_user(accepted_tos=0) 

82 

83 with real_jail_session(token1) as jail: 

84 res = jail.JailInfo(empty_pb2.Empty()) 

85 assert res.jailed 

86 assert res.has_not_accepted_tos 

87 

88 # make sure we can't unaccept 

89 with pytest.raises(grpc.RpcError) as e: 

90 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

91 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

92 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service." 

93 

94 res = jail.JailInfo(empty_pb2.Empty()) 

95 assert res.jailed 

96 assert res.has_not_accepted_tos 

97 

98 # now accept 

99 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

100 

101 res = jail.JailInfo(empty_pb2.Empty()) 

102 assert not res.jailed 

103 assert not res.has_not_accepted_tos 

104 

105 # make sure we can't unaccept 

106 with pytest.raises(grpc.RpcError) as e: 

107 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

108 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

109 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service." 

110 

111 # make them have accepted TOS 

112 user2, token2 = generate_user() 

113 

114 with real_jail_session(token2) as jail: 

115 res = jail.JailInfo(empty_pb2.Empty()) 

116 assert not res.jailed 

117 assert not res.has_not_accepted_tos 

118 

119 # make sure we can't unaccept 

120 with pytest.raises(grpc.RpcError) as e: 

121 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=False)) 

122 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

123 assert e.value.details() == "You cannot revoke acceptance of the Terms of Service." 

124 

125 # accepting again doesn't do anything 

126 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

127 

128 res = jail.JailInfo(empty_pb2.Empty()) 

129 assert not res.jailed 

130 assert not res.has_not_accepted_tos 

131 

132 

133def test_TOS_increase(db, monkeypatch): 

134 # test if the TOS version is updated 

135 

136 # not jailed yet 

137 user, token = generate_user() 

138 

139 with real_jail_session(token) as jail: 

140 res = jail.JailInfo(empty_pb2.Empty()) 

141 assert not res.jailed 

142 assert not res.has_not_accepted_tos 

143 

144 with real_api_session(token) as api: 

145 res = api.Ping(api_pb2.PingReq()) 

146 

147 # now we pretend to update the TOS version 

148 new_TOS_VERSION = TOS_VERSION + 1 

149 

150 monkeypatch.setattr(users, "TOS_VERSION", new_TOS_VERSION) 

151 monkeypatch.setattr(servicers_jail, "TOS_VERSION", new_TOS_VERSION) 

152 

153 # make sure we're jailed 

154 with real_api_session(token) as api, pytest.raises(grpc.RpcError) as e: 

155 api.Ping(api_pb2.PingReq()) 

156 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

157 

158 with real_jail_session(token) as jail: 

159 res = jail.JailInfo(empty_pb2.Empty()) 

160 assert res.jailed 

161 assert res.has_not_accepted_tos 

162 

163 # now accept 

164 res = jail.AcceptTOS(jail_pb2.AcceptTOSReq(accept=True)) 

165 

166 res = jail.JailInfo(empty_pb2.Empty()) 

167 assert not res.jailed 

168 assert not res.has_not_accepted_tos 

169 

170 

171def test_SetLocation(db): 

172 # make them need to update location 

173 user1, token1 = generate_user(geom=create_coordinate(0, 0), geom_radius=0, needs_to_update_location=True) 

174 

175 

176def test_MarkUserNeedsLocationUpdate(db): 

177 user, token = generate_user() 

178 super_user, super_token = generate_user(is_superuser=True) 

179 

180 with real_jail_session(token) as jail: 

181 res = jail.JailInfo(empty_pb2.Empty()) 

182 assert not res.jailed 

183 assert not res.needs_to_update_location 

184 assert len(res.pending_mod_notes) == 0 

185 

186 with real_admin_session(super_token) as admin: 

187 admin.MarkUserNeedsLocationUpdate(admin_pb2.MarkUserNeedsLocationUpdateReq(user=user.username)) 

188 

189 with real_jail_session(token) as jail: 

190 res = jail.JailInfo(empty_pb2.Empty()) 

191 assert res.jailed 

192 assert res.needs_to_update_location 

193 

194 res = jail.SetLocation( 

195 jail_pb2.SetLocationReq( 

196 city="New York City", 

197 lat=40.7812, 

198 lng=-73.9647, 

199 radius=250, 

200 ) 

201 ) 

202 

203 assert not res.jailed 

204 assert not res.needs_to_update_location 

205 

206 res = jail.JailInfo(empty_pb2.Empty()) 

207 assert not res.jailed 

208 assert not res.needs_to_update_location 

209 

210 

211def test_AcceptCommunityGuidelines(db): 

212 # make them have not accepted GC 

213 user1, token1 = generate_user(accepted_community_guidelines=0) 

214 

215 with real_jail_session(token1) as jail: 

216 res = jail.JailInfo(empty_pb2.Empty()) 

217 assert res.jailed 

218 assert res.has_not_accepted_community_guidelines 

219 

220 # make sure we can't unaccept 

221 with pytest.raises(grpc.RpcError) as e: 

222 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

223 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

224 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines." 

225 

226 res = jail.JailInfo(empty_pb2.Empty()) 

227 assert res.jailed 

228 assert res.has_not_accepted_community_guidelines 

229 

230 # now accept 

231 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True)) 

232 

233 res = jail.JailInfo(empty_pb2.Empty()) 

234 assert not res.jailed 

235 assert not res.has_not_accepted_community_guidelines 

236 

237 # make sure we can't unaccept 

238 with pytest.raises(grpc.RpcError) as e: 

239 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

240 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

241 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines." 

242 

243 # make them have accepted GC 

244 user2, token2 = generate_user() 

245 

246 with real_jail_session(token2) as jail: 

247 res = jail.JailInfo(empty_pb2.Empty()) 

248 assert not res.jailed 

249 assert not res.has_not_accepted_community_guidelines 

250 

251 # make sure we can't unaccept 

252 with pytest.raises(grpc.RpcError) as e: 

253 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=False)) 

254 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

255 assert e.value.details() == "You cannot revoke acceptance of the Community Guidelines." 

256 

257 # accepting again doesn't do anything 

258 res = jail.AcceptCommunityGuidelines(jail_pb2.AcceptCommunityGuidelinesReq(accept=True)) 

259 

260 res = jail.JailInfo(empty_pb2.Empty()) 

261 assert not res.jailed 

262 assert not res.has_not_accepted_community_guidelines 

263 

264 

265def test_modnotes(db, email_collector: EmailCollector, push_collector: PushCollector): 

266 user, token = generate_user() 

267 super_user, super_token = generate_user(is_superuser=True) 

268 

269 with real_jail_session(token) as jail: 

270 res = jail.JailInfo(empty_pb2.Empty()) 

271 assert not res.jailed 

272 assert not res.has_pending_mod_notes 

273 assert len(res.pending_mod_notes) == 0 

274 

275 with real_account_session(token) as account: 

276 res = account.ListModNotes(empty_pb2.Empty()) 

277 assert len(res.mod_notes) == 0 

278 

279 with real_admin_session(super_token) as admin: 

280 admin.SendModNote( 

281 admin_pb2.SendModNoteReq( 

282 user=user.username, 

283 content="# Important note\nThis is a sample mod note.", 

284 internal_id="sample_note", 

285 ) 

286 ) 

287 

288 email = email_collector.pop_for_recipient(user.email, last=True) 

289 assert email.subject == "[TEST] You have received a mod note" 

290 

291 push = push_collector.pop_for_user(user.id, last=True) 

292 assert push.content.title == "New moderator note" 

293 assert push.content.body == "You received a moderator note. Read and acknowledge it to continue using the platform." 

294 

295 with real_jail_session(token) as jail: 

296 res = jail.JailInfo(empty_pb2.Empty()) 

297 assert res.jailed 

298 assert res.has_pending_mod_notes 

299 assert len(res.pending_mod_notes) == 1 

300 note = res.pending_mod_notes[0] 

301 assert note.note_content == "# Important note\nThis is a sample mod note." 

302 

303 note_id = note.note_id 

304 

305 with pytest.raises(grpc.RpcError) as e: 

306 jail.AcknowledgePendingModNote( 

307 jail_pb2.AcknowledgePendingModNoteReq( 

308 note_id=note_id, 

309 acknowledge=False, 

310 ) 

311 ) 

312 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

313 assert e.value.details() == "You need to read and acknowledge the moderator note." 

314 

315 assert res.jailed 

316 assert res.has_pending_mod_notes 

317 assert len(res.pending_mod_notes) == 1 

318 note = res.pending_mod_notes[0] 

319 assert note.note_content == "# Important note\nThis is a sample mod note." 

320 

321 res = jail.AcknowledgePendingModNote( 

322 jail_pb2.AcknowledgePendingModNoteReq( 

323 note_id=note_id, 

324 acknowledge=True, 

325 ) 

326 ) 

327 assert not res.jailed 

328 assert not res.has_pending_mod_notes 

329 assert len(res.pending_mod_notes) == 0 

330 

331 with pytest.raises(grpc.RpcError) as e: 

332 jail.AcknowledgePendingModNote( 

333 jail_pb2.AcknowledgePendingModNoteReq( 

334 note_id=note_id, 

335 acknowledge=False, 

336 ) 

337 ) 

338 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

339 assert e.value.details() == "Moderator note not found." 

340 

341 with real_account_session(token) as account: 

342 res = account.ListModNotes(empty_pb2.Empty()) 

343 assert len(res.mod_notes) == 1 

344 note = res.mod_notes[0] 

345 assert note.note_id == note_id 

346 assert note.note_content == "# Important note\nThis is a sample mod note." 

347 

348 assert to_aware_datetime(note.acknowledged) > to_aware_datetime(note.created) 

349 

350 

351def test_modnotes_no_notify(db, email_collector: EmailCollector, push_collector: PushCollector): 

352 user, token = generate_user() 

353 super_user, super_token = generate_user(is_superuser=True) 

354 

355 with real_jail_session(token) as jail: 

356 res = jail.JailInfo(empty_pb2.Empty()) 

357 assert not res.jailed 

358 assert not res.has_pending_mod_notes 

359 assert len(res.pending_mod_notes) == 0 

360 

361 with real_account_session(token) as account: 

362 res = account.ListModNotes(empty_pb2.Empty()) 

363 assert len(res.mod_notes) == 0 

364 

365 with real_admin_session(super_token) as admin: 

366 admin.SendModNote( 

367 admin_pb2.SendModNoteReq( 

368 user=user.username, 

369 content="# Important note\nThis is a sample mod note.", 

370 internal_id="sample_note", 

371 do_not_notify=True, 

372 ) 

373 ) 

374 

375 assert email_collector.count_for_recipient(user.email) == 0 

376 

377 with real_jail_session(token) as jail: 

378 res = jail.JailInfo(empty_pb2.Empty()) 

379 assert res.jailed 

380 assert res.has_pending_mod_notes 

381 assert len(res.pending_mod_notes) == 1 

382 note = res.pending_mod_notes[0] 

383 assert note.note_content == "# Important note\nThis is a sample mod note."