Coverage for app/backend/src/tests/test_threads.py: 100%

334 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import string 

2import textwrap 

3 

4import grpc 

5import pytest 

6from sqlalchemy import select 

7 

8from couchers.db import session_scope 

9from couchers.models import ( 

10 Comment, 

11 ModerationObjectType, 

12 ModerationQueueItem, 

13 ModerationState, 

14 ModerationVisibility, 

15 Reply, 

16 Thread, 

17 User, 

18) 

19from couchers.models.discussions import CommentVersion, ContentChangeType, ReplyVersion 

20from couchers.proto import moderation_pb2, threads_pb2 

21from couchers.servicers.threads import pack_thread_id 

22from couchers.utils import now 

23from tests.fixtures.db import generate_user 

24from tests.fixtures.sessions import real_moderation_session, threads_session 

25 

26 

27@pytest.fixture(autouse=True) 

28def _(testconfig): 

29 pass 

30 

31 

32def test_threads_basic(db): 

33 user1, token1 = generate_user() 

34 

35 # Create a dummy Thread (should be replaced by pages later on) 

36 with session_scope() as session: 

37 dummy_thread = Thread() 

38 session.add(dummy_thread) 

39 session.flush() 

40 PARENT_THREAD_ID = pack_thread_id(database_id=dummy_thread.id, depth=0) 

41 

42 with threads_session(token1) as api: 

43 bat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="bat")).thread_id 

44 

45 cat_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="cat")).thread_id 

46 

47 dog_id = api.PostReply(threads_pb2.PostReplyReq(thread_id=PARENT_THREAD_ID, content="dog")).thread_id 

48 

49 dogs = [ 

50 api.PostReply(threads_pb2.PostReplyReq(thread_id=dog_id, content=animal)).thread_id 

51 for animal in ["hyena", "wolf", "prariewolf"] 

52 ] 

53 cats = [ 

54 api.PostReply(threads_pb2.PostReplyReq(thread_id=cat_id, content=animal)).thread_id 

55 for animal in ["cheetah", "lynx", "panther"] 

56 ] 

57 

58 # Make some queries 

59 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=PARENT_THREAD_ID)) 

60 assert len(ret.replies) == 3 

61 assert ret.next_page_token == "" 

62 assert ret.replies[0].thread_id == dog_id 

63 assert ret.replies[0].content == "dog" 

64 assert ret.replies[0].author_user_id == user1.id 

65 assert ret.replies[0].num_replies == 3 

66 

67 assert ret.replies[1].thread_id == cat_id 

68 assert ret.replies[1].content == "cat" 

69 assert ret.replies[1].author_user_id == user1.id 

70 assert ret.replies[1].num_replies == 3 

71 

72 assert ret.replies[2].thread_id == bat_id 

73 assert ret.replies[2].content == "bat" 

74 assert ret.replies[2].author_user_id == user1.id 

75 assert ret.replies[2].num_replies == 0 

76 

77 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=cat_id)) 

78 assert len(ret.replies) == 3 

79 assert ret.next_page_token == "" 

80 assert [reply.thread_id for reply in ret.replies] == cats[::-1] 

81 

82 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=dog_id)) 

83 assert len(ret.replies) == 3 

84 assert ret.next_page_token == "" 

85 assert [reply.thread_id for reply in ret.replies] == dogs[::-1] 

86 

87 

88def test_threads_errors(db): 

89 user1, token1 = generate_user() 

90 with threads_session(token1) as api: 

91 # request non-existing comment 

92 with pytest.raises(grpc.RpcError) as e: 

93 api.GetThread(threads_pb2.GetThreadReq(thread_id=11)) 

94 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

95 assert e.value.details() == "Discussion thread not found." 

96 

97 # request non-existing depth digit 

98 with pytest.raises(grpc.RpcError) as e: 

99 api.GetThread(threads_pb2.GetThreadReq(thread_id=19)) 

100 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

101 assert e.value.details() == "Discussion thread not found." 

102 

103 # post on non-existing comment 

104 with pytest.raises(grpc.RpcError) as e: 

105 api.PostReply(threads_pb2.PostReplyReq(thread_id=11, content="foo")) 

106 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

107 assert e.value.details() == "Discussion thread not found." 

108 

109 # post on non-existing depth 

110 with pytest.raises(grpc.RpcError) as e: 

111 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="foo")) 

112 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

113 assert e.value.details() == "Discussion thread not found." 

114 

115 # post empty content 

116 with pytest.raises(grpc.RpcError) as e: 

117 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content="")) 

118 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

119 assert e.value.details() == "You cannot post an empty comment." 

120 

121 # post whitespace only content 

122 with pytest.raises(grpc.RpcError) as e: 

123 api.PostReply(threads_pb2.PostReplyReq(thread_id=19, content=" ")) 

124 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

125 assert e.value.details() == "You cannot post an empty comment." 

126 

127 

128def pagination_test(api, parent_id): 

129 # Post some data 

130 for c in reversed(string.ascii_lowercase): 

131 api.PostReply(threads_pb2.PostReplyReq(thread_id=parent_id, content=c)) 

132 

133 # Get it with pagination 

134 token = "" 

135 

136 for expected_page in textwrap.wrap(string.ascii_lowercase, 5): 

137 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_id, page_size=5, page_token=token)) 

138 assert "".join(x.content for x in ret.replies) == expected_page 

139 token = ret.next_page_token 

140 

141 assert token == "" 

142 

143 return ret.replies[0].thread_id # to be used as a test one level deeper 

144 

145 

146def test_threads_pagination(db): 

147 user1, token1 = generate_user() 

148 

149 PARENT_THREAD_ID = 10 

150 

151 # Create a dummy Thread (should be replaced by pages later on) 

152 with session_scope() as session: 

153 session.add(Thread()) 

154 

155 with threads_session(token1) as api: 

156 comment_id = pagination_test(api, PARENT_THREAD_ID) 

157 pagination_test(api, comment_id) 

158 

159 

160def _make_thread_and_comment(token, content="hello"): 

161 """Helper: create a Thread, post a top-level Comment via the API, return (parent_thread_id, comment_thread_id).""" 

162 with session_scope() as session: 

163 thread = Thread() 

164 session.add(thread) 

165 session.flush() 

166 parent_thread_id = pack_thread_id(database_id=thread.id, depth=0) 

167 

168 with threads_session(token) as api: 

169 comment_thread_id = api.PostReply( 

170 threads_pb2.PostReplyReq(thread_id=parent_thread_id, content=content) 

171 ).thread_id 

172 

173 return parent_thread_id, comment_thread_id 

174 

175 

176def test_comment_creates_moderation_state(db): 

177 """Posting a comment creates a ModerationState (shadowed) and an initial-review queue item.""" 

178 user, token = generate_user() 

179 _, comment_thread_id = _make_thread_and_comment(token) 

180 comment_db_id = comment_thread_id // 10 

181 

182 with session_scope() as session: 

183 comment = session.execute(select(Comment).where(Comment.id == comment_db_id)).scalar_one() 

184 

185 state = session.execute( 

186 select(ModerationState).where(ModerationState.id == comment.moderation_state_id) 

187 ).scalar_one() 

188 assert state.object_type == ModerationObjectType.comment 

189 assert state.object_id == comment.id 

190 assert state.visibility == ModerationVisibility.shadowed 

191 

192 queue_item = session.execute( 

193 select(ModerationQueueItem).where(ModerationQueueItem.moderation_state_id == state.id) 

194 ).scalar_one() 

195 assert queue_item.resolved_by_log_id is None 

196 

197 

198def test_reply_creates_moderation_state(db): 

199 """Posting a reply to a comment creates its own ModerationState.""" 

200 user, token = generate_user() 

201 _, comment_thread_id = _make_thread_and_comment(token) 

202 

203 with threads_session(token) as api: 

204 reply_thread_id = api.PostReply( 

205 threads_pb2.PostReplyReq(thread_id=comment_thread_id, content="reply text") 

206 ).thread_id 

207 reply_db_id = reply_thread_id // 10 

208 

209 with session_scope() as session: 

210 reply = session.execute(select(Reply).where(Reply.id == reply_db_id)).scalar_one() 

211 

212 state = session.execute( 

213 select(ModerationState).where(ModerationState.id == reply.moderation_state_id) 

214 ).scalar_one() 

215 assert state.object_type == ModerationObjectType.reply 

216 assert state.object_id == reply.id 

217 assert state.visibility == ModerationVisibility.shadowed 

218 

219 

220def test_shadowed_comment_visible_to_author_only(db): 

221 """A shadowed comment is visible to its author but not to other users.""" 

222 author, author_token = generate_user() 

223 other, other_token = generate_user() 

224 

225 parent_thread_id, _ = _make_thread_and_comment(author_token, content="secret") 

226 

227 # Author sees their own shadowed comment 

228 with threads_session(author_token) as api: 

229 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)) 

230 assert len(ret.replies) == 1 

231 assert ret.replies[0].content == "secret" 

232 

233 # Other user does not see the shadowed comment 

234 with threads_session(other_token) as api: 

235 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)) 

236 assert len(ret.replies) == 0 

237 

238 

239def test_shadowed_reply_visible_to_author_only(db): 

240 """A shadowed reply is visible to its author but not to other users.""" 

241 author, author_token = generate_user() 

242 other, other_token = generate_user() 

243 

244 _, comment_thread_id = _make_thread_and_comment(author_token, content="hi") 

245 # Approve the comment so the parent comment is visible to others (otherwise they can't see the comment context anyway) 

246 comment_db_id = comment_thread_id // 10 

247 with session_scope() as session: 

248 comment = session.execute(select(Comment).where(Comment.id == comment_db_id)).scalar_one() 

249 state = session.execute( 

250 select(ModerationState).where(ModerationState.id == comment.moderation_state_id) 

251 ).scalar_one() 

252 state.visibility = ModerationVisibility.visible 

253 

254 with threads_session(author_token) as api: 

255 api.PostReply(threads_pb2.PostReplyReq(thread_id=comment_thread_id, content="my reply")) 

256 

257 # Author sees their own shadowed reply 

258 with threads_session(author_token) as api: 

259 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=comment_thread_id)) 

260 assert len(ret.replies) == 1 

261 assert ret.replies[0].content == "my reply" 

262 

263 # Other user does not see the shadowed reply 

264 with threads_session(other_token) as api: 

265 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=comment_thread_id)) 

266 assert len(ret.replies) == 0 

267 

268 

269def _approve(session, moderation_state_id): 

270 session.execute( 

271 select(ModerationState).where(ModerationState.id == moderation_state_id) 

272 ).scalar_one().visibility = ModerationVisibility.visible 

273 

274 

275def test_comment_by_invisible_user_hidden(db): 

276 """A comment by a deleted/banned user is hidden from others even when its moderation state is visible.""" 

277 author, author_token = generate_user() 

278 other, other_token = generate_user() 

279 

280 parent_thread_id, comment_thread_id = _make_thread_and_comment(author_token, content="from invisible user") 

281 comment_db_id = comment_thread_id // 10 

282 

283 with session_scope() as session: 

284 comment = session.execute(select(Comment).where(Comment.id == comment_db_id)).scalar_one() 

285 _approve(session, comment.moderation_state_id) 

286 

287 # while the author is visible, the comment shows 

288 with threads_session(other_token) as api: 

289 assert len(api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)).replies) == 1 

290 

291 # delete the author 

292 with session_scope() as session: 

293 session.execute(select(User).where(User.id == author.id)).scalar_one().deleted_at = now() 

294 

295 # the comment is now hidden from other users 

296 with threads_session(other_token) as api: 

297 assert len(api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)).replies) == 0 

298 

299 

300def test_reply_by_invisible_user_hidden(db): 

301 """A reply by a deleted/banned user is hidden from others even when its moderation state is visible.""" 

302 commenter, commenter_token = generate_user() 

303 replier, replier_token = generate_user() 

304 viewer, viewer_token = generate_user() 

305 

306 # comment by a user who stays visible, so the parent comment can still be navigated to 

307 parent_thread_id, comment_thread_id = _make_thread_and_comment(commenter_token, content="hi") 

308 comment_db_id = comment_thread_id // 10 

309 with session_scope() as session: 

310 comment = session.execute(select(Comment).where(Comment.id == comment_db_id)).scalar_one() 

311 _approve(session, comment.moderation_state_id) 

312 

313 with threads_session(replier_token) as api: 

314 reply_thread_id = api.PostReply( 

315 threads_pb2.PostReplyReq(thread_id=comment_thread_id, content="my reply") 

316 ).thread_id 

317 reply_db_id = reply_thread_id // 10 

318 with session_scope() as session: 

319 reply = session.execute(select(Reply).where(Reply.id == reply_db_id)).scalar_one() 

320 _approve(session, reply.moderation_state_id) 

321 

322 # while the replier is visible, the reply shows and is counted on the parent comment 

323 with threads_session(viewer_token) as api: 

324 assert len(api.GetThread(threads_pb2.GetThreadReq(thread_id=comment_thread_id)).replies) == 1 

325 parent = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)) 

326 assert parent.replies[0].num_replies == 1 

327 

328 # delete the replier 

329 with session_scope() as session: 

330 session.execute(select(User).where(User.id == replier.id)).scalar_one().deleted_at = now() 

331 

332 # the reply is now hidden from other users and no longer counted on the parent comment 

333 with threads_session(viewer_token) as api: 

334 assert len(api.GetThread(threads_pb2.GetThreadReq(thread_id=comment_thread_id)).replies) == 0 

335 parent = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)) 

336 assert parent.replies[0].num_replies == 0 

337 

338 

339def test_admin_can_approve_comment(db): 

340 """A moderator can approve a comment via ModerateContent and make it visible to other users.""" 

341 author, author_token = generate_user() 

342 other, other_token = generate_user() 

343 _moderator, moderator_token = generate_user(is_superuser=True) 

344 

345 parent_thread_id, comment_thread_id = _make_thread_and_comment(author_token, content="approved comment") 

346 comment_db_id = comment_thread_id // 10 

347 

348 with session_scope() as session: 

349 comment = session.execute(select(Comment).where(Comment.id == comment_db_id)).scalar_one() 

350 state_id = comment.moderation_state_id 

351 

352 # Other user can't see it yet 

353 with threads_session(other_token) as api: 

354 assert len(api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)).replies) == 0 

355 

356 # Moderator approves 

357 with real_moderation_session(moderator_token) as api: 

358 api.ModerateContent( 

359 moderation_pb2.ModerateContentReq( 

360 moderation_state_id=state_id, 

361 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

362 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

363 reason="Looks good", 

364 ) 

365 ) 

366 

367 # Now other user sees it 

368 with threads_session(other_token) as api: 

369 ret = api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)) 

370 assert len(ret.replies) == 1 

371 assert ret.replies[0].content == "approved comment" 

372 

373 

374def test_admin_can_hide_comment(db): 

375 """A moderator can hide an approved comment, removing it from non-author views.""" 

376 author, author_token = generate_user() 

377 other, other_token = generate_user() 

378 _moderator, moderator_token = generate_user(is_superuser=True) 

379 

380 parent_thread_id, comment_thread_id = _make_thread_and_comment(author_token, content="bad comment") 

381 comment_db_id = comment_thread_id // 10 

382 

383 with session_scope() as session: 

384 comment = session.execute(select(Comment).where(Comment.id == comment_db_id)).scalar_one() 

385 state_id = comment.moderation_state_id 

386 # Pretend the comment was previously approved 

387 state = session.execute(select(ModerationState).where(ModerationState.id == state_id)).scalar_one() 

388 state.visibility = ModerationVisibility.visible 

389 

390 # Other user sees it 

391 with threads_session(other_token) as api: 

392 assert len(api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)).replies) == 1 

393 

394 # Moderator hides it 

395 with real_moderation_session(moderator_token) as api: 

396 api.ModerateContent( 

397 moderation_pb2.ModerateContentReq( 

398 moderation_state_id=state_id, 

399 action=moderation_pb2.MODERATION_ACTION_HIDE, 

400 visibility=moderation_pb2.MODERATION_VISIBILITY_HIDDEN, 

401 reason="Inappropriate", 

402 ) 

403 ) 

404 

405 # Other user no longer sees it 

406 with threads_session(other_token) as api: 

407 assert len(api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)).replies) == 0 

408 # Author also no longer sees it (hidden, not shadowed) 

409 with threads_session(author_token) as api: 

410 assert len(api.GetThread(threads_pb2.GetThreadReq(thread_id=parent_thread_id)).replies) == 0 

411 

412 

413def test_total_num_responses_excludes_shadowed(db): 

414 from couchers.context import make_background_user_context # noqa: PLC0415 

415 from couchers.servicers.threads import total_num_responses # noqa: PLC0415 

416 

417 author, author_token = generate_user() 

418 viewer, _ = generate_user() 

419 parent_thread_id, _ = _make_thread_and_comment(author_token, content="one") 

420 viewer_context = make_background_user_context(user_id=viewer.id) 

421 

422 parent_db_id, _ = divmod(parent_thread_id, 10) 

423 

424 with session_scope() as session: 

425 assert total_num_responses(session, viewer_context, parent_db_id) == 0 

426 

427 with session_scope() as session: 

428 state = session.execute( 

429 select(ModerationState).where( 

430 ModerationState.object_type == ModerationObjectType.comment, 

431 ModerationState.object_id == session.execute(select(Comment.id)).scalar_one(), 

432 ) 

433 ).scalar_one() 

434 state.visibility = ModerationVisibility.visible 

435 

436 with session_scope() as session: 

437 assert total_num_responses(session, viewer_context, parent_db_id) == 1 

438 

439 

440def test_total_num_responses_includes_own_shadowed(db): 

441 """The count uses the viewer's context so authors see their own shadowed content in the total, 

442 matching what GetThread shows them in the list.""" 

443 from couchers.context import make_background_user_context # noqa: PLC0415 

444 from couchers.servicers.threads import total_num_responses # noqa: PLC0415 

445 

446 author, author_token = generate_user() 

447 parent_thread_id, _ = _make_thread_and_comment(author_token, content="one") 

448 author_context = make_background_user_context(user_id=author.id) 

449 

450 parent_db_id, _ = divmod(parent_thread_id, 10) 

451 

452 with session_scope() as session: 

453 assert total_num_responses(session, author_context, parent_db_id) == 1 

454 

455 

456def test_edit_comment_creates_version_record(db): 

457 user, token = generate_user() 

458 parent_thread_id, comment_thread_id = _make_thread_and_comment(token, content="original comment") 

459 comment_db_id = comment_thread_id // 10 

460 

461 with threads_session(token) as api: 

462 api.UpdateReply(threads_pb2.UpdateReplyReq(thread_id=comment_thread_id, content="edited comment")) 

463 

464 with session_scope() as session: 

465 versions = ( 

466 session.execute(select(CommentVersion).where(CommentVersion.comment_id == comment_db_id)).scalars().all() 

467 ) 

468 assert len(versions) == 1 

469 v = versions[0] 

470 assert v.change_type == ContentChangeType.edit 

471 assert v.old_content == "original comment" 

472 assert v.new_content == "edited comment" 

473 assert v.editor_user_id == user.id 

474 

475 

476def test_delete_comment_creates_version_record(db): 

477 user, token = generate_user() 

478 parent_thread_id, comment_thread_id = _make_thread_and_comment(token, content="comment to delete") 

479 comment_db_id = comment_thread_id // 10 

480 

481 with threads_session(token) as api: 

482 api.DeleteReply(threads_pb2.DeleteReplyReq(thread_id=comment_thread_id)) 

483 

484 with session_scope() as session: 

485 versions = ( 

486 session.execute(select(CommentVersion).where(CommentVersion.comment_id == comment_db_id)).scalars().all() 

487 ) 

488 assert len(versions) == 1 

489 v = versions[0] 

490 assert v.change_type == ContentChangeType.delete 

491 assert v.old_content == "comment to delete" 

492 assert v.new_content is None 

493 assert v.editor_user_id == user.id 

494 

495 

496def test_edit_reply_creates_version_record(db): 

497 user, token = generate_user() 

498 _, comment_thread_id = _make_thread_and_comment(token, content="a comment") 

499 

500 with threads_session(token) as api: 

501 reply_thread_id = api.PostReply( 

502 threads_pb2.PostReplyReq(thread_id=comment_thread_id, content="original reply") 

503 ).thread_id 

504 reply_db_id = reply_thread_id // 10 

505 

506 with threads_session(token) as api: 

507 api.UpdateReply(threads_pb2.UpdateReplyReq(thread_id=reply_thread_id, content="edited reply")) 

508 

509 with session_scope() as session: 

510 versions = session.execute(select(ReplyVersion).where(ReplyVersion.reply_id == reply_db_id)).scalars().all() 

511 assert len(versions) == 1 

512 v = versions[0] 

513 assert v.change_type == ContentChangeType.edit 

514 assert v.old_content == "original reply" 

515 assert v.new_content == "edited reply" 

516 assert v.editor_user_id == user.id 

517 

518 

519def test_delete_reply_creates_version_record(db): 

520 user, token = generate_user() 

521 _, comment_thread_id = _make_thread_and_comment(token, content="a comment") 

522 

523 with threads_session(token) as api: 

524 reply_thread_id = api.PostReply( 

525 threads_pb2.PostReplyReq(thread_id=comment_thread_id, content="reply to delete") 

526 ).thread_id 

527 reply_db_id = reply_thread_id // 10 

528 

529 with threads_session(token) as api: 

530 api.DeleteReply(threads_pb2.DeleteReplyReq(thread_id=reply_thread_id)) 

531 

532 with session_scope() as session: 

533 versions = session.execute(select(ReplyVersion).where(ReplyVersion.reply_id == reply_db_id)).scalars().all() 

534 assert len(versions) == 1 

535 v = versions[0] 

536 assert v.change_type == ContentChangeType.delete 

537 assert v.old_content == "reply to delete" 

538 assert v.new_content is None 

539 assert v.editor_user_id == user.id 

540 

541 

542def test_edit_comment_multiple_edits_creates_multiple_version_records(db): 

543 user, token = generate_user() 

544 _, comment_thread_id = _make_thread_and_comment(token, content="v1") 

545 comment_db_id = comment_thread_id // 10 

546 

547 with threads_session(token) as api: 

548 api.UpdateReply(threads_pb2.UpdateReplyReq(thread_id=comment_thread_id, content="v2")) 

549 api.UpdateReply(threads_pb2.UpdateReplyReq(thread_id=comment_thread_id, content="v3")) 

550 

551 with session_scope() as session: 

552 versions = ( 

553 session.execute( 

554 select(CommentVersion).where(CommentVersion.comment_id == comment_db_id).order_by(CommentVersion.id) 

555 ) 

556 .scalars() 

557 .all() 

558 ) 

559 assert len(versions) == 2 

560 assert versions[0].old_content == "v1" 

561 assert versions[0].new_content == "v2" 

562 assert versions[1].old_content == "v2" 

563 assert versions[1].new_content == "v3"