Coverage for app/backend/src/couchers/servicers/threads.py: 86%
181 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import logging
3import grpc
4import sqlalchemy.exc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists, select
7from sqlalchemy.orm import Session
8from sqlalchemy.sql import func
10from couchers.context import CouchersContext, make_background_user_context, make_notification_user_context
11from couchers.db import session_scope
12from couchers.jobs.enqueue import queue_job
13from couchers.models import (
14 Comment,
15 Discussion,
16 Event,
17 EventOccurrence,
18 ModerationObjectType,
19 Reply,
20 Thread,
21 User,
22)
23from couchers.models.discussions import CommentVersion, ContentChangeType, ReplyVersion
24from couchers.models.notifications import NotificationTopicAction
25from couchers.moderation.utils import create_moderation
26from couchers.notifications.notify import notify
27from couchers.proto import notification_data_pb2, threads_pb2, threads_pb2_grpc
28from couchers.proto.internal import jobs_pb2
29from couchers.servicers.api import user_model_to_pb
30from couchers.servicers.blocking import is_not_visible
31from couchers.sql import where_moderated_content_visible, where_users_column_visible
32from couchers.utils import Timestamp_from_datetime, now
34logger = logging.getLogger(__name__)
37# Since the API exposes a single ID space regardless of nesting level,
38# we construct the API id by appending the nesting level to the
39# database ID.
42def pack_thread_id(database_id: int, depth: int) -> int:
43 return database_id * 10 + depth
46def unpack_thread_id(thread_id: int) -> tuple[int, int]:
47 """Returns (database_id, depth) tuple."""
48 return divmod(thread_id, 10)
51def total_num_responses(session: Session, context: CouchersContext, database_id: int) -> int:
52 """Return the total number of visible, non-deleted comments and replies to the thread with
53 database id database_id.
54 """
55 comments = where_moderated_content_visible(
56 where_users_column_visible(
57 select(func.count())
58 .select_from(Comment)
59 .where(Comment.thread_id == database_id)
60 .where(Comment.deleted == None),
61 context,
62 Comment.author_user_id,
63 ),
64 context,
65 Comment,
66 is_list_operation=True,
67 )
68 replies = where_moderated_content_visible(
69 where_users_column_visible(
70 select(func.count())
71 .select_from(Reply)
72 .join(Comment, Comment.id == Reply.comment_id)
73 .where(Comment.thread_id == database_id)
74 .where(Reply.deleted == None),
75 context,
76 Reply.author_user_id,
77 ),
78 context,
79 Reply,
80 is_list_operation=True,
81 )
82 return session.execute(comments).scalar_one() + session.execute(replies).scalar_one()
85def thread_to_pb(session: Session, context: CouchersContext, database_id: int) -> threads_pb2.Thread:
86 return threads_pb2.Thread(
87 thread_id=pack_thread_id(database_id, 0),
88 num_responses=total_num_responses(session, context, database_id),
89 )
92def generate_reply_notifications(payload: jobs_pb2.GenerateReplyNotificationsPayload) -> None:
93 # Import here to avoid circular dependency
94 from couchers.servicers.discussions import discussion_to_pb # noqa: PLC0415
95 from couchers.servicers.events import event_to_pb # noqa: PLC0415
97 with session_scope() as session:
98 database_id, depth = unpack_thread_id(payload.thread_id)
99 if depth == 1:
100 # this is a top-level Comment on a Thread attached to event, discussion, etc
101 comment = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one()
102 thread = session.execute(select(Thread).where(Thread.id == comment.thread_id)).scalar_one()
103 author_user = session.execute(select(User).where(User.id == comment.author_user_id)).scalar_one()
104 # reply object for notif
105 reply = threads_pb2.Reply(
106 thread_id=payload.thread_id,
107 content=comment.content,
108 author_user_id=comment.author_user_id,
109 created_time=Timestamp_from_datetime(comment.created),
110 num_replies=0,
111 )
112 # figure out if the thread is related to an event or discussion
113 event = session.execute(select(Event).where(Event.thread_id == thread.id)).scalar_one_or_none()
114 discussion = session.execute(
115 select(Discussion).where(Discussion.thread_id == thread.id)
116 ).scalar_one_or_none()
117 if event:
118 # thread is an event thread
119 occurrence = event.occurrences.order_by(EventOccurrence.id.desc()).limit(1).one()
120 subscribed_user_ids = [user.id for user in event.subscribers]
121 attending_user_ids = [user.user_id for user in occurrence.attendances]
123 for user_id in set(subscribed_user_ids + attending_user_ids):
124 if is_not_visible(session, user_id, comment.author_user_id): 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 continue
126 if user_id == comment.author_user_id: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 continue
128 context = make_notification_user_context(user_id=user_id)
129 notify(
130 session,
131 user_id=user_id,
132 topic_action=NotificationTopicAction.event__comment,
133 key=str(occurrence.id),
134 data=notification_data_pb2.EventComment(
135 reply=reply,
136 event=event_to_pb(session, occurrence, context),
137 author=user_model_to_pb(author_user, session, context),
138 ),
139 moderation_state_id=comment.moderation_state_id,
140 )
141 elif discussion: 141 ↛ 168line 141 didn't jump to line 168 because the condition on line 141 was always true
142 # community discussion thread
143 cluster = discussion.owner_cluster
145 if not cluster.is_official_cluster: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 raise NotImplementedError("Shouldn't have discussions under groups, only communities")
148 for user_id in [discussion.creator_user_id]:
149 if is_not_visible(session, user_id, comment.author_user_id): 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 continue
151 if user_id == comment.author_user_id: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 continue
154 context = make_notification_user_context(user_id=user_id)
155 notify(
156 session,
157 user_id=user_id,
158 topic_action=NotificationTopicAction.discussion__comment,
159 key=str(discussion.id),
160 data=notification_data_pb2.DiscussionComment(
161 reply=reply,
162 discussion=discussion_to_pb(session, discussion, context),
163 author=user_model_to_pb(author_user, session, context),
164 ),
165 moderation_state_id=comment.moderation_state_id,
166 )
167 else:
168 raise NotImplementedError("I can only do event and discussion threads for now")
169 elif depth == 2: 169 ↛ 244line 169 didn't jump to line 244 because the condition on line 169 was always true
170 # this is a second-level reply to a comment
171 db_reply = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one()
172 # the comment we're replying to
173 parent_comment = session.execute(select(Comment).where(Comment.id == db_reply.comment_id)).scalar_one()
174 context = make_background_user_context(user_id=db_reply.author_user_id)
175 thread_replies_author_user_ids = (
176 session.execute(
177 where_users_column_visible(
178 select(Reply.author_user_id).where(Reply.comment_id == parent_comment.id),
179 context,
180 Reply.author_user_id,
181 )
182 )
183 .scalars()
184 .all()
185 )
186 thread_user_ids = set(thread_replies_author_user_ids)
187 if not is_not_visible(session, parent_comment.author_user_id, db_reply.author_user_id): 187 ↛ 190line 187 didn't jump to line 190 because the condition on line 187 was always true
188 thread_user_ids.add(parent_comment.author_user_id)
190 author_user = session.execute(select(User).where(User.id == db_reply.author_user_id)).scalar_one()
192 user_ids_to_notify = set(thread_user_ids) - {db_reply.author_user_id}
194 reply = threads_pb2.Reply(
195 thread_id=payload.thread_id,
196 content=db_reply.content,
197 author_user_id=db_reply.author_user_id,
198 created_time=Timestamp_from_datetime(db_reply.created),
199 num_replies=0,
200 )
202 event = session.execute(
203 select(Event).where(Event.thread_id == parent_comment.thread_id)
204 ).scalar_one_or_none()
205 discussion = session.execute(
206 select(Discussion).where(Discussion.thread_id == parent_comment.thread_id)
207 ).scalar_one_or_none()
208 if event:
209 # thread is an event thread
210 occurrence = event.occurrences.order_by(EventOccurrence.id.desc()).limit(1).one()
211 for user_id in user_ids_to_notify:
212 context = make_notification_user_context(user_id=user_id)
213 notify(
214 session,
215 user_id=user_id,
216 topic_action=NotificationTopicAction.thread__reply,
217 key=str(occurrence.id),
218 data=notification_data_pb2.ThreadReply(
219 reply=reply,
220 event=event_to_pb(session, occurrence, context),
221 author=user_model_to_pb(author_user, session, context),
222 ),
223 moderation_state_id=db_reply.moderation_state_id,
224 )
225 elif discussion: 225 ↛ 242line 225 didn't jump to line 242 because the condition on line 225 was always true
226 # community discussion thread
227 for user_id in user_ids_to_notify:
228 context = make_notification_user_context(user_id=user_id)
229 notify(
230 session,
231 user_id=user_id,
232 topic_action=NotificationTopicAction.thread__reply,
233 key=str(discussion.id),
234 data=notification_data_pb2.ThreadReply(
235 reply=reply,
236 discussion=discussion_to_pb(session, discussion, context),
237 author=user_model_to_pb(author_user, session, context),
238 ),
239 moderation_state_id=db_reply.moderation_state_id,
240 )
241 else:
242 raise NotImplementedError("I can only do event and discussion threads for now")
243 else:
244 raise Exception("Unknown depth")
247class Threads(threads_pb2_grpc.ThreadsServicer):
248 def GetThread(
249 self, request: threads_pb2.GetThreadReq, context: CouchersContext, session: Session
250 ) -> threads_pb2.GetThreadRes:
251 database_id, depth = unpack_thread_id(request.thread_id)
252 page_size = request.page_size if 0 < request.page_size < 100000 else 1000
253 page_start = unpack_thread_id(int(request.page_token))[0] if request.page_token else 2**50
255 if depth == 0:
256 if not session.execute(select(Thread).where(Thread.id == database_id)).scalar_one_or_none(): 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
259 has_replies = exists().where((Reply.comment_id == Comment.id) & (Reply.deleted == None)).correlate(Comment)
260 visible_reply_count = (
261 where_moderated_content_visible(
262 where_users_column_visible(
263 select(func.count(Reply.id)).where(Reply.comment_id == Comment.id).where(Reply.deleted == None),
264 context,
265 Reply.author_user_id,
266 ),
267 context,
268 Reply,
269 is_list_operation=True,
270 )
271 .correlate(Comment)
272 .scalar_subquery()
273 )
275 res = session.execute(
276 where_moderated_content_visible(
277 where_users_column_visible(
278 select(Comment, visible_reply_count)
279 .where(Comment.thread_id == database_id)
280 .where((Comment.deleted == None) | has_replies)
281 .where(Comment.id < page_start)
282 .order_by(Comment.created.desc())
283 .limit(page_size + 1),
284 context,
285 Comment.author_user_id,
286 ),
287 context,
288 Comment,
289 is_list_operation=True,
290 )
291 ).all()
292 # Deleted comments are shown as stubs (thread_id, deleted, num_replies only)
293 # to preserve thread structure, but content and author are stripped.
294 replies = []
295 for r, n in res[:page_size]:
296 if r.deleted is not None:
297 replies.append(
298 threads_pb2.Reply(
299 thread_id=pack_thread_id(r.id, 1),
300 deleted=True,
301 num_replies=n,
302 )
303 )
304 else:
305 replies.append(
306 threads_pb2.Reply(
307 thread_id=pack_thread_id(r.id, 1),
308 content=r.content,
309 author_user_id=r.author_user_id,
310 created_time=Timestamp_from_datetime(r.created),
311 num_replies=n,
312 can_edit=(context.user_id == r.author_user_id),
313 last_edited=Timestamp_from_datetime(r.last_edited) if r.last_edited else None,
314 )
315 )
317 elif depth == 1:
318 if not session.execute(
319 where_moderated_content_visible(
320 where_users_column_visible(
321 select(Comment).where(Comment.id == database_id),
322 context,
323 Comment.author_user_id,
324 ),
325 context,
326 Comment,
327 )
328 ).scalar_one_or_none():
329 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
331 res = (
332 session.execute( # type: ignore[assignment]
333 where_moderated_content_visible(
334 where_users_column_visible(
335 select(Reply)
336 .where(Reply.comment_id == database_id)
337 .where(Reply.deleted == None)
338 .where(Reply.id < page_start)
339 .order_by(Reply.created.desc())
340 .limit(page_size + 1),
341 context,
342 Reply.author_user_id,
343 ),
344 context,
345 Reply,
346 is_list_operation=True,
347 )
348 )
349 .scalars()
350 .all()
351 )
352 replies = [
353 threads_pb2.Reply(
354 thread_id=pack_thread_id(r.id, 2),
355 content=r.content,
356 author_user_id=r.author_user_id,
357 created_time=Timestamp_from_datetime(r.created),
358 num_replies=0,
359 can_edit=(context.user_id == r.author_user_id),
360 last_edited=Timestamp_from_datetime(r.last_edited) if r.last_edited else None,
361 )
362 for r in res[:page_size]
363 ]
365 else:
366 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
368 if len(res) > page_size:
369 # There's more!
370 next_page_token = str(replies[-1].thread_id)
371 else:
372 next_page_token = ""
374 return threads_pb2.GetThreadRes(replies=replies, next_page_token=next_page_token)
376 def PostReply(
377 self, request: threads_pb2.PostReplyReq, context: CouchersContext, session: Session
378 ) -> threads_pb2.PostReplyRes:
379 content = request.content.strip()
381 if content == "":
382 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_comment")
384 database_id, depth = unpack_thread_id(request.thread_id)
385 if depth not in (0, 1):
386 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
388 object_to_add: Comment | Reply | None = None
390 def create_object(moderation_state_id: int) -> int:
391 nonlocal object_to_add
392 if depth == 0:
393 object_to_add = Comment(
394 thread_id=database_id,
395 author_user_id=context.user_id,
396 content=content,
397 moderation_state_id=moderation_state_id,
398 )
399 else:
400 object_to_add = Reply(
401 comment_id=database_id,
402 author_user_id=context.user_id,
403 content=content,
404 moderation_state_id=moderation_state_id,
405 )
406 session.add(object_to_add)
407 try:
408 session.flush()
409 except sqlalchemy.exc.IntegrityError:
410 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
411 return object_to_add.id
413 create_moderation(
414 session=session,
415 object_type=ModerationObjectType.comment if depth == 0 else ModerationObjectType.reply,
416 object_id=create_object,
417 creator_user_id=context.user_id,
418 )
420 assert object_to_add is not None
421 thread_id = pack_thread_id(object_to_add.id, depth + 1)
423 queue_job(
424 session,
425 job=generate_reply_notifications,
426 payload=jobs_pb2.GenerateReplyNotificationsPayload(
427 thread_id=thread_id,
428 ),
429 )
431 return threads_pb2.PostReplyRes(thread_id=thread_id)
433 def UpdateReply(
434 self, request: threads_pb2.UpdateReplyReq, context: CouchersContext, session: Session
435 ) -> threads_pb2.Reply:
436 content = request.content.strip()
437 if not content: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_comment")
440 database_id, depth = unpack_thread_id(request.thread_id)
441 if depth == 1:
442 obj: Comment | Reply | None = session.execute(
443 select(Comment).where(Comment.id == database_id)
444 ).scalar_one_or_none()
445 elif depth == 2: 445 ↛ 448line 445 didn't jump to line 448 because the condition on line 445 was always true
446 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
447 else:
448 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
450 if not obj: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true
451 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
452 if obj.deleted is not None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "reply_deleted")
454 if obj.author_user_id != context.user_id: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true
455 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "reply_edit_permission_denied")
457 old_content = obj.content
459 if depth == 1:
460 session.add(
461 CommentVersion(
462 comment_id=database_id,
463 editor_user_id=context.user_id,
464 change_type=ContentChangeType.edit,
465 old_content=old_content,
466 new_content=content,
467 )
468 )
469 else:
470 session.add(
471 ReplyVersion(
472 reply_id=database_id,
473 editor_user_id=context.user_id,
474 change_type=ContentChangeType.edit,
475 old_content=old_content,
476 new_content=content,
477 )
478 )
480 obj.content = content
481 obj.last_edited = now()
483 return threads_pb2.Reply(
484 thread_id=request.thread_id,
485 content=obj.content,
486 author_user_id=obj.author_user_id,
487 created_time=Timestamp_from_datetime(obj.created),
488 num_replies=0,
489 can_edit=True,
490 last_edited=Timestamp_from_datetime(obj.last_edited),
491 )
493 def DeleteReply(
494 self, request: threads_pb2.DeleteReplyReq, context: CouchersContext, session: Session
495 ) -> empty_pb2.Empty:
496 database_id, depth = unpack_thread_id(request.thread_id)
497 if depth == 1:
498 obj: Comment | Reply | None = session.execute(
499 select(Comment).where(Comment.id == database_id)
500 ).scalar_one_or_none()
501 elif depth == 2: 501 ↛ 504line 501 didn't jump to line 504 because the condition on line 501 was always true
502 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
503 else:
504 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
506 if not obj: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true
507 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found")
508 if obj.deleted is not None: 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true
509 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "reply_deleted")
510 if obj.author_user_id != context.user_id: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true
511 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "reply_delete_permission_denied")
513 if depth == 1:
514 session.add(
515 CommentVersion(
516 comment_id=database_id,
517 editor_user_id=context.user_id,
518 change_type=ContentChangeType.delete,
519 old_content=obj.content,
520 new_content=None,
521 )
522 )
523 else:
524 session.add(
525 ReplyVersion(
526 reply_id=database_id,
527 editor_user_id=context.user_id,
528 change_type=ContentChangeType.delete,
529 old_content=obj.content,
530 new_content=None,
531 )
532 )
534 obj.deleted = now()
536 return empty_pb2.Empty()