Coverage for app / backend / src / couchers / servicers / threads.py: 89%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import logging 

2 

3import grpc 

4import sqlalchemy.exc 

5from sqlalchemy import select 

6from sqlalchemy.orm import Session 

7from sqlalchemy.sql import func 

8 

9from couchers.context import CouchersContext, make_background_user_context 

10from couchers.db import session_scope 

11from couchers.jobs.enqueue import queue_job 

12from couchers.models import Comment, Discussion, Event, EventOccurrence, Reply, Thread, User 

13from couchers.models.notifications import NotificationTopicAction 

14from couchers.notifications.notify import notify 

15from couchers.proto import notification_data_pb2, threads_pb2, threads_pb2_grpc 

16from couchers.proto.internal import jobs_pb2 

17from couchers.servicers.api import user_model_to_pb 

18from couchers.servicers.blocking import is_not_visible 

19from couchers.sql import where_users_column_visible 

20from couchers.utils import Timestamp_from_datetime 

21 

22logger = logging.getLogger(__name__) 

23 

24# Since the API exposes a single ID space regardless of nesting level, 

25# we construct the API id by appending the nesting level to the 

26# database ID. 

27 

28 

29def pack_thread_id(database_id: int, depth: int) -> int: 

30 return database_id * 10 + depth 

31 

32 

33def unpack_thread_id(thread_id: int) -> tuple[int, int]: 

34 """Returns (database_id, depth) tuple.""" 

35 return divmod(thread_id, 10) 

36 

37 

38def total_num_responses(session: Session, database_id: int) -> int: 

39 """Return the total number of comments and replies to the thread with 

40 database id database_id. 

41 """ 

42 comments = select(func.count()).select_from(Comment).where(Comment.thread_id == database_id) 

43 replies = ( 

44 select(func.count()) 

45 .select_from(Reply) 

46 .join(Comment, Comment.id == Reply.comment_id) 

47 .where(Comment.thread_id == database_id) 

48 ) 

49 return session.execute(comments).scalar_one() + session.execute(replies).scalar_one() 

50 

51 

52def thread_to_pb(session: Session, database_id: int) -> threads_pb2.Thread: 

53 return threads_pb2.Thread( 

54 thread_id=pack_thread_id(database_id, 0), 

55 num_responses=total_num_responses(session, database_id), 

56 ) 

57 

58 

59def generate_reply_notifications(payload: jobs_pb2.GenerateReplyNotificationsPayload) -> None: 

60 from couchers.servicers.discussions import discussion_to_pb 

61 from couchers.servicers.events import event_to_pb 

62 

63 with session_scope() as session: 

64 database_id, depth = unpack_thread_id(payload.thread_id) 

65 if depth == 1: 

66 # this is a top-level Comment on a Thread attached to event, discussion, etc 

67 comment = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one() 

68 thread = session.execute(select(Thread).where(Thread.id == comment.thread_id)).scalar_one() 

69 author_user = session.execute(select(User).where(User.id == comment.author_user_id)).scalar_one() 

70 # reply object for notif 

71 reply = threads_pb2.Reply( 

72 thread_id=payload.thread_id, 

73 content=comment.content, 

74 author_user_id=comment.author_user_id, 

75 created_time=Timestamp_from_datetime(comment.created), 

76 num_replies=0, 

77 ) 

78 # figure out if the thread is related to an event or discussion 

79 event = session.execute(select(Event).where(Event.thread_id == thread.id)).scalar_one_or_none() 

80 discussion = session.execute( 

81 select(Discussion).where(Discussion.thread_id == thread.id) 

82 ).scalar_one_or_none() 

83 if event: 

84 # thread is an event thread 

85 occurrence = event.occurrences.order_by(EventOccurrence.id.desc()).limit(1).one() 

86 subscribed_user_ids = [user.id for user in event.subscribers] 

87 attending_user_ids = [user.user_id for user in occurrence.attendances] 

88 

89 for user_id in set(subscribed_user_ids + attending_user_ids): 

90 if is_not_visible(session, user_id, comment.author_user_id): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 continue 

92 if user_id == comment.author_user_id: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 continue 

94 context = make_background_user_context(user_id=user_id) 

95 notify( 

96 session, 

97 user_id=user_id, 

98 topic_action=NotificationTopicAction.event__comment, 

99 key=str(occurrence.id), 

100 data=notification_data_pb2.EventComment( 

101 reply=reply, 

102 event=event_to_pb(session, occurrence, context), 

103 author=user_model_to_pb(author_user, session, context), 

104 ), 

105 ) 

106 elif discussion: 106 ↛ 132line 106 didn't jump to line 132 because the condition on line 106 was always true

107 # community discussion thread 

108 cluster = discussion.owner_cluster 

109 

110 if not cluster.is_official_cluster: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 raise NotImplementedError("Shouldn't have discussions under groups, only communities") 

112 

113 for user_id in [discussion.creator_user_id]: 

114 if is_not_visible(session, user_id, comment.author_user_id): 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 continue 

116 if user_id == comment.author_user_id: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 continue 

118 

119 context = make_background_user_context(user_id=user_id) 

120 notify( 

121 session, 

122 user_id=user_id, 

123 topic_action=NotificationTopicAction.discussion__comment, 

124 key=str(discussion.id), 

125 data=notification_data_pb2.DiscussionComment( 

126 reply=reply, 

127 discussion=discussion_to_pb(session, discussion, context), 

128 author=user_model_to_pb(author_user, session, context), 

129 ), 

130 ) 

131 else: 

132 raise NotImplementedError("I can only do event and discussion threads for now") 

133 elif depth == 2: 133 ↛ 206line 133 didn't jump to line 206 because the condition on line 133 was always true

134 # this is a second-level reply to a comment 

135 db_reply = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one() 

136 # the comment we're replying to 

137 parent_comment = session.execute(select(Comment).where(Comment.id == db_reply.comment_id)).scalar_one() 

138 context = make_background_user_context(user_id=db_reply.author_user_id) 

139 thread_replies_author_user_ids = ( 

140 session.execute( 

141 where_users_column_visible( 

142 select(Reply.author_user_id).where(Reply.comment_id == parent_comment.id), 

143 context, 

144 Reply.author_user_id, 

145 ) 

146 ) 

147 .scalars() 

148 .all() 

149 ) 

150 thread_user_ids = set(thread_replies_author_user_ids) 

151 if not is_not_visible(session, parent_comment.author_user_id, db_reply.author_user_id): 151 ↛ 154line 151 didn't jump to line 154 because the condition on line 151 was always true

152 thread_user_ids.add(parent_comment.author_user_id) 

153 

154 author_user = session.execute(select(User).where(User.id == db_reply.author_user_id)).scalar_one() 

155 

156 user_ids_to_notify = set(thread_user_ids) - {db_reply.author_user_id} 

157 

158 reply = threads_pb2.Reply( 

159 thread_id=payload.thread_id, 

160 content=db_reply.content, 

161 author_user_id=db_reply.author_user_id, 

162 created_time=Timestamp_from_datetime(db_reply.created), 

163 num_replies=0, 

164 ) 

165 

166 event = session.execute( 

167 select(Event).where(Event.thread_id == parent_comment.thread_id) 

168 ).scalar_one_or_none() 

169 discussion = session.execute( 

170 select(Discussion).where(Discussion.thread_id == parent_comment.thread_id) 

171 ).scalar_one_or_none() 

172 if event: 

173 # thread is an event thread 

174 occurrence = event.occurrences.order_by(EventOccurrence.id.desc()).limit(1).one() 

175 for user_id in user_ids_to_notify: 

176 context = make_background_user_context(user_id=user_id) 

177 notify( 

178 session, 

179 user_id=user_id, 

180 topic_action=NotificationTopicAction.thread__reply, 

181 key=str(occurrence.id), 

182 data=notification_data_pb2.ThreadReply( 

183 reply=reply, 

184 event=event_to_pb(session, occurrence, context), 

185 author=user_model_to_pb(author_user, session, context), 

186 ), 

187 ) 

188 elif discussion: 188 ↛ 204line 188 didn't jump to line 204 because the condition on line 188 was always true

189 # community discussion thread 

190 for user_id in user_ids_to_notify: 

191 context = make_background_user_context(user_id=user_id) 

192 notify( 

193 session, 

194 user_id=user_id, 

195 topic_action=NotificationTopicAction.thread__reply, 

196 key=str(discussion.id), 

197 data=notification_data_pb2.ThreadReply( 

198 reply=reply, 

199 discussion=discussion_to_pb(session, discussion, context), 

200 author=user_model_to_pb(author_user, session, context), 

201 ), 

202 ) 

203 else: 

204 raise NotImplementedError("I can only do event and discussion threads for now") 

205 else: 

206 raise Exception("Unknown depth") 

207 

208 

209class Threads(threads_pb2_grpc.ThreadsServicer): 

210 def GetThread( 

211 self, request: threads_pb2.GetThreadReq, context: CouchersContext, session: Session 

212 ) -> threads_pb2.GetThreadRes: 

213 database_id, depth = unpack_thread_id(request.thread_id) 

214 page_size = request.page_size if 0 < request.page_size < 100000 else 1000 

215 page_start = unpack_thread_id(int(request.page_token))[0] if request.page_token else 2**50 

216 

217 if depth == 0: 

218 if not session.execute(select(Thread).where(Thread.id == database_id)).scalar_one_or_none(): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

220 

221 res = session.execute( 

222 select(Comment, func.count(Reply.id)) 

223 .outerjoin(Reply, Reply.comment_id == Comment.id) 

224 .where(Comment.thread_id == database_id) 

225 .where(Comment.id < page_start) 

226 .group_by(Comment.id) 

227 .order_by(Comment.created.desc()) 

228 .limit(page_size + 1) 

229 ).all() 

230 replies = [ 

231 threads_pb2.Reply( 

232 thread_id=pack_thread_id(r.id, 1), 

233 content=r.content, 

234 author_user_id=r.author_user_id, 

235 created_time=Timestamp_from_datetime(r.created), 

236 num_replies=n, 

237 ) 

238 for r, n in res[:page_size] 

239 ] 

240 

241 elif depth == 1: 

242 if not session.execute(select(Comment).where(Comment.id == database_id)).scalar_one_or_none(): 

243 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

244 

245 res = ( 

246 session.execute( # type: ignore[assignment] 

247 select(Reply) 

248 .where(Reply.comment_id == database_id) 

249 .where(Reply.id < page_start) 

250 .order_by(Reply.created.desc()) 

251 .limit(page_size + 1) 

252 ) 

253 .scalars() 

254 .all() 

255 ) 

256 replies = [ 

257 threads_pb2.Reply( 

258 thread_id=pack_thread_id(r.id, 2), 

259 content=r.content, 

260 author_user_id=r.author_user_id, 

261 created_time=Timestamp_from_datetime(r.created), 

262 num_replies=0, 

263 ) 

264 for r in res[:page_size] 

265 ] 

266 

267 else: 

268 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

269 

270 if len(res) > page_size: 

271 # There's more! 

272 next_page_token = str(replies[-1].thread_id) 

273 else: 

274 next_page_token = "" 

275 

276 return threads_pb2.GetThreadRes(replies=replies, next_page_token=next_page_token) 

277 

278 def PostReply( 

279 self, request: threads_pb2.PostReplyReq, context: CouchersContext, session: Session 

280 ) -> threads_pb2.PostReplyRes: 

281 content = request.content.strip() 

282 

283 if content == "": 

284 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_comment") 

285 

286 database_id, depth = unpack_thread_id(request.thread_id) 

287 if depth == 0: 

288 object_to_add: Comment | Reply = Comment( 

289 thread_id=database_id, author_user_id=context.user_id, content=content 

290 ) 

291 elif depth == 1: 

292 object_to_add = Reply(comment_id=database_id, author_user_id=context.user_id, content=content) 

293 else: 

294 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

295 session.add(object_to_add) 

296 try: 

297 session.flush() 

298 except sqlalchemy.exc.IntegrityError: 

299 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "thread_not_found") 

300 

301 thread_id = pack_thread_id(object_to_add.id, depth + 1) 

302 

303 queue_job( 

304 session, 

305 job=generate_reply_notifications, 

306 payload=jobs_pb2.GenerateReplyNotificationsPayload( 

307 thread_id=thread_id, 

308 ), 

309 ) 

310 

311 return threads_pb2.PostReplyRes(thread_id=thread_id)