Coverage for app/backend/src/couchers/servicers/threads.py: 86%

181 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import logging 

2 

3import grpc 

4import sqlalchemy.exc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists, select 

7from sqlalchemy.orm import Session 

8from sqlalchemy.sql import func 

9 

10from couchers.context import CouchersContext, make_background_user_context, make_notification_user_context 

11from couchers.db import session_scope 

12from couchers.jobs.enqueue import queue_job 

13from couchers.models import ( 

14 Comment, 

15 Discussion, 

16 Event, 

17 EventOccurrence, 

18 ModerationObjectType, 

19 Reply, 

20 Thread, 

21 User, 

22) 

23from couchers.models.discussions import CommentVersion, ContentChangeType, ReplyVersion 

24from couchers.models.notifications import NotificationTopicAction 

25from couchers.moderation.utils import create_moderation 

26from couchers.notifications.notify import notify 

27from couchers.proto import notification_data_pb2, threads_pb2, threads_pb2_grpc 

28from couchers.proto.internal import jobs_pb2 

29from couchers.servicers.api import user_model_to_pb 

30from couchers.servicers.blocking import is_not_visible 

31from couchers.sql import where_moderated_content_visible, where_users_column_visible 

32from couchers.utils import Timestamp_from_datetime, now 

33 

34logger = logging.getLogger(__name__) 

35 

36 

37# Since the API exposes a single ID space regardless of nesting level, 

38# we construct the API id by appending the nesting level to the 

39# database ID. 

40 

41 

42def pack_thread_id(database_id: int, depth: int) -> int: 

43 return database_id * 10 + depth 

44 

45 

46def unpack_thread_id(thread_id: int) -> tuple[int, int]: 

47 """Returns (database_id, depth) tuple.""" 

48 return divmod(thread_id, 10) 

49 

50 

51def total_num_responses(session: Session, context: CouchersContext, database_id: int) -> int: 

52 """Return the total number of visible, non-deleted comments and replies to the thread with 

53 database id database_id. 

54 """ 

55 comments = where_moderated_content_visible( 

56 where_users_column_visible( 

57 select(func.count()) 

58 .select_from(Comment) 

59 .where(Comment.thread_id == database_id) 

60 .where(Comment.deleted == None), 

61 context, 

62 Comment.author_user_id, 

63 ), 

64 context, 

65 Comment, 

66 is_list_operation=True, 

67 ) 

68 replies = where_moderated_content_visible( 

69 where_users_column_visible( 

70 select(func.count()) 

71 .select_from(Reply) 

72 .join(Comment, Comment.id == Reply.comment_id) 

73 .where(Comment.thread_id == database_id) 

74 .where(Reply.deleted == None), 

75 context, 

76 Reply.author_user_id, 

77 ), 

78 context, 

79 Reply, 

80 is_list_operation=True, 

81 ) 

82 return session.execute(comments).scalar_one() + session.execute(replies).scalar_one() 

83 

84 

85def thread_to_pb(session: Session, context: CouchersContext, database_id: int) -> threads_pb2.Thread: 

86 return threads_pb2.Thread( 

87 thread_id=pack_thread_id(database_id, 0), 

88 num_responses=total_num_responses(session, context, database_id), 

89 ) 

90 

91 

92def generate_reply_notifications(payload: jobs_pb2.GenerateReplyNotificationsPayload) -> None: 

93 # Import here to avoid circular dependency 

94 from couchers.servicers.discussions import discussion_to_pb # noqa: PLC0415 

95 from couchers.servicers.events import event_to_pb # noqa: PLC0415 

96 

97 with session_scope() as session: 

98 database_id, depth = unpack_thread_id(payload.thread_id) 

99 if depth == 1: 

100 # this is a top-level Comment on a Thread attached to event, discussion, etc 

101 comment = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one() 

102 thread = session.execute(select(Thread).where(Thread.id == comment.thread_id)).scalar_one() 

103 author_user = session.execute(select(User).where(User.id == comment.author_user_id)).scalar_one() 

104 # reply object for notif 

105 reply = threads_pb2.Reply( 

106 thread_id=payload.thread_id, 

107 content=comment.content, 

108 author_user_id=comment.author_user_id, 

109 created_time=Timestamp_from_datetime(comment.created), 

110 num_replies=0, 

111 ) 

112 # figure out if the thread is related to an event or discussion 

113 event = session.execute(select(Event).where(Event.thread_id == thread.id)).scalar_one_or_none() 

114 discussion = session.execute( 

115 select(Discussion).where(Discussion.thread_id == thread.id) 

116 ).scalar_one_or_none() 

117 if event: 

118 # thread is an event thread 

119 occurrence = event.occurrences.order_by(EventOccurrence.id.desc()).limit(1).one() 

120 subscribed_user_ids = [user.id for user in event.subscribers] 

121 attending_user_ids = [user.user_id for user in occurrence.attendances] 

122 

123 for user_id in set(subscribed_user_ids + attending_user_ids): 

124 if is_not_visible(session, user_id, comment.author_user_id): 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 continue 

126 if user_id == comment.author_user_id: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 continue 

128 context = make_notification_user_context(user_id=user_id) 

129 notify( 

130 session, 

131 user_id=user_id, 

132 topic_action=NotificationTopicAction.event__comment, 

133 key=str(occurrence.id), 

134 data=notification_data_pb2.EventComment( 

135 reply=reply, 

136 event=event_to_pb(session, occurrence, context), 

137 author=user_model_to_pb(author_user, session, context), 

138 ), 

139 moderation_state_id=comment.moderation_state_id, 

140 ) 

141 elif discussion: 141 ↛ 168line 141 didn't jump to line 168 because the condition on line 141 was always true

142 # community discussion thread 

143 cluster = discussion.owner_cluster 

144 

145 if not cluster.is_official_cluster: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 raise NotImplementedError("Shouldn't have discussions under groups, only communities") 

147 

148 for user_id in [discussion.creator_user_id]: 

149 if is_not_visible(session, user_id, comment.author_user_id): 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 continue 

151 if user_id == comment.author_user_id: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 continue 

153 

154 context = make_notification_user_context(user_id=user_id) 

155 notify( 

156 session, 

157 user_id=user_id, 

158 topic_action=NotificationTopicAction.discussion__comment, 

159 key=str(discussion.id), 

160 data=notification_data_pb2.DiscussionComment( 

161 reply=reply, 

162 discussion=discussion_to_pb(session, discussion, context), 

163 author=user_model_to_pb(author_user, session, context), 

164 ), 

165 moderation_state_id=comment.moderation_state_id, 

166 ) 

167 else: 

168 raise NotImplementedError("I can only do event and discussion threads for now") 

169 elif depth == 2: 169 ↛ 244line 169 didn't jump to line 244 because the condition on line 169 was always true

170 # this is a second-level reply to a comment 

171 db_reply = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one() 

172 # the comment we're replying to 

173 parent_comment = session.execute(select(Comment).where(Comment.id == db_reply.comment_id)).scalar_one() 

174 context = make_background_user_context(user_id=db_reply.author_user_id) 

175 thread_replies_author_user_ids = ( 

176 session.execute( 

177 where_users_column_visible( 

178 select(Reply.author_user_id).where(Reply.comment_id == parent_comment.id), 

179 context, 

180 Reply.author_user_id, 

181 ) 

182 ) 

183 .scalars() 

184 .all() 

185 ) 

186 thread_user_ids = set(thread_replies_author_user_ids) 

187 if not is_not_visible(session, parent_comment.author_user_id, db_reply.author_user_id): 187 ↛ 190line 187 didn't jump to line 190 because the condition on line 187 was always true

188 thread_user_ids.add(parent_comment.author_user_id) 

189 

190 author_user = session.execute(select(User).where(User.id == db_reply.author_user_id)).scalar_one() 

191 

192 user_ids_to_notify = set(thread_user_ids) - {db_reply.author_user_id} 

193 

194 reply = threads_pb2.Reply( 

195 thread_id=payload.thread_id, 

196 content=db_reply.content, 

197 author_user_id=db_reply.author_user_id, 

198 created_time=Timestamp_from_datetime(db_reply.created), 

199 num_replies=0, 

200 ) 

201 

202 event = session.execute( 

203 select(Event).where(Event.thread_id == parent_comment.thread_id) 

204 ).scalar_one_or_none() 

205 discussion = session.execute( 

206 select(Discussion).where(Discussion.thread_id == parent_comment.thread_id) 

207 ).scalar_one_or_none() 

208 if event: 

209 # thread is an event thread 

210 occurrence = event.occurrences.order_by(EventOccurrence.id.desc()).limit(1).one() 

211 for user_id in user_ids_to_notify: 

212 context = make_notification_user_context(user_id=user_id) 

213 notify( 

214 session, 

215 user_id=user_id, 

216 topic_action=NotificationTopicAction.thread__reply, 

217 key=str(occurrence.id), 

218 data=notification_data_pb2.ThreadReply( 

219 reply=reply, 

220 event=event_to_pb(session, occurrence, context), 

221 author=user_model_to_pb(author_user, session, context), 

222 ), 

223 moderation_state_id=db_reply.moderation_state_id, 

224 ) 

225 elif discussion: 225 ↛ 242line 225 didn't jump to line 242 because the condition on line 225 was always true

226 # community discussion thread 

227 for user_id in user_ids_to_notify: 

228 context = make_notification_user_context(user_id=user_id) 

229 notify( 

230 session, 

231 user_id=user_id, 

232 topic_action=NotificationTopicAction.thread__reply, 

233 key=str(discussion.id), 

234 data=notification_data_pb2.ThreadReply( 

235 reply=reply, 

236 discussion=discussion_to_pb(session, discussion, context), 

237 author=user_model_to_pb(author_user, session, context), 

238 ), 

239 moderation_state_id=db_reply.moderation_state_id, 

240 ) 

241 else: 

242 raise NotImplementedError("I can only do event and discussion threads for now") 

243 else: 

244 raise Exception("Unknown depth") 

245 

246 

247class Threads(threads_pb2_grpc.ThreadsServicer): 

248 def GetThread( 

249 self, request: threads_pb2.GetThreadReq, context: CouchersContext, session: Session 

250 ) -> threads_pb2.GetThreadRes: 

251 database_id, depth = unpack_thread_id(request.thread_id) 

252 page_size = request.page_size if 0 < request.page_size < 100000 else 1000 

253 page_start = unpack_thread_id(int(request.page_token))[0] if request.page_token else 2**50 

254 

255 if depth == 0: 

256 if not session.execute(select(Thread).where(Thread.id == database_id)).scalar_one_or_none(): 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

258 

259 has_replies = exists().where((Reply.comment_id == Comment.id) & (Reply.deleted == None)).correlate(Comment) 

260 visible_reply_count = ( 

261 where_moderated_content_visible( 

262 where_users_column_visible( 

263 select(func.count(Reply.id)).where(Reply.comment_id == Comment.id).where(Reply.deleted == None), 

264 context, 

265 Reply.author_user_id, 

266 ), 

267 context, 

268 Reply, 

269 is_list_operation=True, 

270 ) 

271 .correlate(Comment) 

272 .scalar_subquery() 

273 ) 

274 

275 res = session.execute( 

276 where_moderated_content_visible( 

277 where_users_column_visible( 

278 select(Comment, visible_reply_count) 

279 .where(Comment.thread_id == database_id) 

280 .where((Comment.deleted == None) | has_replies) 

281 .where(Comment.id < page_start) 

282 .order_by(Comment.created.desc()) 

283 .limit(page_size + 1), 

284 context, 

285 Comment.author_user_id, 

286 ), 

287 context, 

288 Comment, 

289 is_list_operation=True, 

290 ) 

291 ).all() 

292 # Deleted comments are shown as stubs (thread_id, deleted, num_replies only) 

293 # to preserve thread structure, but content and author are stripped. 

294 replies = [] 

295 for r, n in res[:page_size]: 

296 if r.deleted is not None: 

297 replies.append( 

298 threads_pb2.Reply( 

299 thread_id=pack_thread_id(r.id, 1), 

300 deleted=True, 

301 num_replies=n, 

302 ) 

303 ) 

304 else: 

305 replies.append( 

306 threads_pb2.Reply( 

307 thread_id=pack_thread_id(r.id, 1), 

308 content=r.content, 

309 author_user_id=r.author_user_id, 

310 created_time=Timestamp_from_datetime(r.created), 

311 num_replies=n, 

312 can_edit=(context.user_id == r.author_user_id), 

313 last_edited=Timestamp_from_datetime(r.last_edited) if r.last_edited else None, 

314 ) 

315 ) 

316 

317 elif depth == 1: 

318 if not session.execute( 

319 where_moderated_content_visible( 

320 where_users_column_visible( 

321 select(Comment).where(Comment.id == database_id), 

322 context, 

323 Comment.author_user_id, 

324 ), 

325 context, 

326 Comment, 

327 ) 

328 ).scalar_one_or_none(): 

329 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

330 

331 res = ( 

332 session.execute( # type: ignore[assignment] 

333 where_moderated_content_visible( 

334 where_users_column_visible( 

335 select(Reply) 

336 .where(Reply.comment_id == database_id) 

337 .where(Reply.deleted == None) 

338 .where(Reply.id < page_start) 

339 .order_by(Reply.created.desc()) 

340 .limit(page_size + 1), 

341 context, 

342 Reply.author_user_id, 

343 ), 

344 context, 

345 Reply, 

346 is_list_operation=True, 

347 ) 

348 ) 

349 .scalars() 

350 .all() 

351 ) 

352 replies = [ 

353 threads_pb2.Reply( 

354 thread_id=pack_thread_id(r.id, 2), 

355 content=r.content, 

356 author_user_id=r.author_user_id, 

357 created_time=Timestamp_from_datetime(r.created), 

358 num_replies=0, 

359 can_edit=(context.user_id == r.author_user_id), 

360 last_edited=Timestamp_from_datetime(r.last_edited) if r.last_edited else None, 

361 ) 

362 for r in res[:page_size] 

363 ] 

364 

365 else: 

366 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

367 

368 if len(res) > page_size: 

369 # There's more! 

370 next_page_token = str(replies[-1].thread_id) 

371 else: 

372 next_page_token = "" 

373 

374 return threads_pb2.GetThreadRes(replies=replies, next_page_token=next_page_token) 

375 

376 def PostReply( 

377 self, request: threads_pb2.PostReplyReq, context: CouchersContext, session: Session 

378 ) -> threads_pb2.PostReplyRes: 

379 content = request.content.strip() 

380 

381 if content == "": 

382 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_comment") 

383 

384 database_id, depth = unpack_thread_id(request.thread_id) 

385 if depth not in (0, 1): 

386 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

387 

388 object_to_add: Comment | Reply | None = None 

389 

390 def create_object(moderation_state_id: int) -> int: 

391 nonlocal object_to_add 

392 if depth == 0: 

393 object_to_add = Comment( 

394 thread_id=database_id, 

395 author_user_id=context.user_id, 

396 content=content, 

397 moderation_state_id=moderation_state_id, 

398 ) 

399 else: 

400 object_to_add = Reply( 

401 comment_id=database_id, 

402 author_user_id=context.user_id, 

403 content=content, 

404 moderation_state_id=moderation_state_id, 

405 ) 

406 session.add(object_to_add) 

407 try: 

408 session.flush() 

409 except sqlalchemy.exc.IntegrityError: 

410 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

411 return object_to_add.id 

412 

413 create_moderation( 

414 session=session, 

415 object_type=ModerationObjectType.comment if depth == 0 else ModerationObjectType.reply, 

416 object_id=create_object, 

417 creator_user_id=context.user_id, 

418 ) 

419 

420 assert object_to_add is not None 

421 thread_id = pack_thread_id(object_to_add.id, depth + 1) 

422 

423 queue_job( 

424 session, 

425 job=generate_reply_notifications, 

426 payload=jobs_pb2.GenerateReplyNotificationsPayload( 

427 thread_id=thread_id, 

428 ), 

429 ) 

430 

431 return threads_pb2.PostReplyRes(thread_id=thread_id) 

432 

433 def UpdateReply( 

434 self, request: threads_pb2.UpdateReplyReq, context: CouchersContext, session: Session 

435 ) -> threads_pb2.Reply: 

436 content = request.content.strip() 

437 if not content: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_comment") 

439 

440 database_id, depth = unpack_thread_id(request.thread_id) 

441 if depth == 1: 

442 obj: Comment | Reply | None = session.execute( 

443 select(Comment).where(Comment.id == database_id) 

444 ).scalar_one_or_none() 

445 elif depth == 2: 445 ↛ 448line 445 didn't jump to line 448 because the condition on line 445 was always true

446 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none() 

447 else: 

448 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

449 

450 if not obj: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true

451 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

452 if obj.deleted is not None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "reply_deleted") 

454 if obj.author_user_id != context.user_id: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "reply_edit_permission_denied") 

456 

457 old_content = obj.content 

458 

459 if depth == 1: 

460 session.add( 

461 CommentVersion( 

462 comment_id=database_id, 

463 editor_user_id=context.user_id, 

464 change_type=ContentChangeType.edit, 

465 old_content=old_content, 

466 new_content=content, 

467 ) 

468 ) 

469 else: 

470 session.add( 

471 ReplyVersion( 

472 reply_id=database_id, 

473 editor_user_id=context.user_id, 

474 change_type=ContentChangeType.edit, 

475 old_content=old_content, 

476 new_content=content, 

477 ) 

478 ) 

479 

480 obj.content = content 

481 obj.last_edited = now() 

482 

483 return threads_pb2.Reply( 

484 thread_id=request.thread_id, 

485 content=obj.content, 

486 author_user_id=obj.author_user_id, 

487 created_time=Timestamp_from_datetime(obj.created), 

488 num_replies=0, 

489 can_edit=True, 

490 last_edited=Timestamp_from_datetime(obj.last_edited), 

491 ) 

492 

493 def DeleteReply( 

494 self, request: threads_pb2.DeleteReplyReq, context: CouchersContext, session: Session 

495 ) -> empty_pb2.Empty: 

496 database_id, depth = unpack_thread_id(request.thread_id) 

497 if depth == 1: 

498 obj: Comment | Reply | None = session.execute( 

499 select(Comment).where(Comment.id == database_id) 

500 ).scalar_one_or_none() 

501 elif depth == 2: 501 ↛ 504line 501 didn't jump to line 504 because the condition on line 501 was always true

502 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none() 

503 else: 

504 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

505 

506 if not obj: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true

507 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

508 if obj.deleted is not None: 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true

509 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "reply_deleted") 

510 if obj.author_user_id != context.user_id: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true

511 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "reply_delete_permission_denied") 

512 

513 if depth == 1: 

514 session.add( 

515 CommentVersion( 

516 comment_id=database_id, 

517 editor_user_id=context.user_id, 

518 change_type=ContentChangeType.delete, 

519 old_content=obj.content, 

520 new_content=None, 

521 ) 

522 ) 

523 else: 

524 session.add( 

525 ReplyVersion( 

526 reply_id=database_id, 

527 editor_user_id=context.user_id, 

528 change_type=ContentChangeType.delete, 

529 old_content=obj.content, 

530 new_content=None, 

531 ) 

532 ) 

533 

534 obj.deleted = now() 

535 

536 return empty_pb2.Empty()