Coverage for app/backend/src/tests/fixtures/misc.py: 98%

125 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1from dataclasses import dataclass 

2from typing import Any 

3from unittest.mock import patch 

4 

5from sqlalchemy.orm import Session 

6 

7from couchers.config import config 

8from couchers.jobs.worker import process_job 

9from couchers.models import User 

10from couchers.notifications.push import PushNotificationContent 

11from couchers.proto import moderation_pb2 

12from couchers.proto.internal import jobs_pb2 

13from couchers.servicers.threads import unpack_thread_id 

14from tests.fixtures.sessions import real_moderation_session 

15 

16 

17def process_jobs() -> None: 

18 while process_job(): 

19 pass 

20 

21 

22class EmailCollector: 

23 """Intercepts emails so they can be verified by tests.""" 

24 

25 def __init__(self) -> None: 

26 # Collected emails by recipient address, chronologically. 

27 self.by_recipient: dict[str, list[jobs_pb2.SendEmailPayload]] = {} 

28 self._patch = patch("couchers.email.queuing._queue_email", self._mock_queue_email) 

29 

30 def _mock_queue_email(self, session: Session, payload: jobs_pb2.SendEmailPayload) -> None: 

31 if payload.recipient not in self.by_recipient: 

32 self.by_recipient[payload.recipient] = [] 

33 self.by_recipient[payload.recipient].append(payload) 

34 

35 def __enter__(self): 

36 process_jobs() # Flush any emails prior to this point 

37 self.by_recipient.clear() 

38 self._patch.start() 

39 return self 

40 

41 def __exit__(self, exc_type, exc_val, exc_tb): 

42 self._patch.stop() 

43 return False # Let any exception propagate 

44 

45 def count(self) -> int: 

46 process_jobs() 

47 return sum(len(v) for v in self.by_recipient.values()) 

48 

49 def count_for_recipient(self, recipient: str) -> int: 

50 process_jobs() 

51 return len(self.by_recipient.get(recipient, [])) 

52 

53 def count_for_mods(self) -> int: 

54 return self.count_for_recipient(config.MODS_EMAIL_RECIPIENT) 

55 

56 def count_for_reports(self) -> int: 

57 return self.count_for_recipient(config.REPORTS_EMAIL_RECIPIENT) 

58 

59 def pop_for_recipient(self, recipient: str, *, last: bool = False) -> jobs_pb2.SendEmailPayload: 

60 """ 

61 Removes and returns the oldest email queued to a given recipient, 

62 optionally asserting that it is the last one. 

63 """ 

64 process_jobs() 

65 emails = self.by_recipient.get(recipient) 

66 assert emails, f"No emails to pop for recipient {recipient}." 

67 if last: 

68 assert len(emails) == 1, f"Expected a single email for recipient {recipient}." 

69 return emails.pop(0) 

70 

71 def pop_for_mods(self, *, last: bool = False) -> jobs_pb2.SendEmailPayload: 

72 return self.pop_for_recipient(config.MODS_EMAIL_RECIPIENT, last=last) 

73 

74 def pop_for_reports(self, *, last: bool = False) -> jobs_pb2.SendEmailPayload: 

75 return self.pop_for_recipient(config.REPORTS_EMAIL_RECIPIENT, last=last) 

76 

77 

78@dataclass(frozen=True, slots=True, kw_only=True) 

79class Push: 

80 topic_action: str 

81 content: PushNotificationContent 

82 key: str | None = None 

83 ttl: int | None = None 

84 

85 

86class PushCollector: 

87 """Captures push notifications and allows inspecting them.""" 

88 

89 def __init__(self) -> None: 

90 # Collected notifications by user id, chronologically. 

91 self.by_user: dict[int, list[Push]] = {} 

92 self._patch = patch("couchers.notifications.push._push_to_user", self._mock_push_to_user) 

93 

94 def _mock_push_to_user(self, session: Session, user_id: int, **kwargs: Any) -> None: 

95 if user_id not in self.by_user: 

96 self.by_user[user_id] = [] 

97 self.by_user[user_id].append(Push(**kwargs)) 

98 

99 def __enter__(self): 

100 process_jobs() # Flush any push notifications prior to this point 

101 self.by_user.clear() 

102 self._patch.start() 

103 return self 

104 

105 def __exit__(self, exc_type, exc_val, exc_tb): 

106 self._patch.stop() 

107 return False # Let any exception propagate 

108 

109 def count_for_user(self, user_id: int) -> int: 

110 process_jobs() 

111 return len(self.by_user.get(user_id, [])) 

112 

113 def pop_for_user(self, user_id: int, *, last: bool = False) -> Push: 

114 """ 

115 Removes and returns the oldest push notification received by the given user, 

116 optionally asserting that it is the last one. 

117 """ 

118 process_jobs() 

119 pushes = self.by_user.get(user_id) 

120 assert pushes, f"No notifications to pop for user {user_id}." 

121 if last: 

122 assert len(pushes) == 1, f"Expected a single notification for user {user_id}." 

123 return pushes.pop(0) 

124 

125 

126class Moderator: 

127 """ 

128 A test fixture that provides a moderator user and methods to exercise the moderation API. 

129 

130 Usage: 

131 def test_example(db, moderator): 

132 user, token = generate_user() 

133 # ... create a host request ... 

134 moderator.approve_host_request(host_request_id) 

135 """ 

136 

137 def __init__(self, user: User, token: str): 

138 self.user = user 

139 self.token = token 

140 

141 def approve_host_request(self, host_request_id: int, reason: str = "Test approval") -> None: 

142 """ 

143 Approve a host request using the moderation API. 

144 

145 Args: 

146 host_request_id: The conversation_id of the host request 

147 reason: Optional reason for approval 

148 """ 

149 with real_moderation_session(self.token) as api: 

150 state_res = api.GetModerationState( 

151 moderation_pb2.GetModerationStateReq( 

152 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST, 

153 object_id=host_request_id, 

154 ) 

155 ) 

156 api.ModerateContent( 

157 moderation_pb2.ModerateContentReq( 

158 moderation_state_id=state_res.moderation_state.moderation_state_id, 

159 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

160 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

161 reason=reason, 

162 clear_flags=True, 

163 ) 

164 ) 

165 

166 def approve_group_chat(self, group_chat_id: int, reason: str = "Test approval") -> None: 

167 """ 

168 Approve a group chat using the moderation API. 

169 

170 Args: 

171 group_chat_id: The conversation_id of the group chat 

172 reason: Optional reason for approval 

173 """ 

174 with real_moderation_session(self.token) as api: 

175 state_res = api.GetModerationState( 

176 moderation_pb2.GetModerationStateReq( 

177 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT, 

178 object_id=group_chat_id, 

179 ) 

180 ) 

181 api.ModerateContent( 

182 moderation_pb2.ModerateContentReq( 

183 moderation_state_id=state_res.moderation_state.moderation_state_id, 

184 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

185 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

186 reason=reason, 

187 clear_flags=True, 

188 ) 

189 ) 

190 

191 def approve_friend_request(self, friend_request_id: int, reason: str = "Test approval") -> None: 

192 """ 

193 Approve a friend request using the moderation API. 

194 

195 Args: 

196 friend_request_id: The ID of the friend request (FriendRelationship.id) 

197 reason: Optional reason for approval 

198 """ 

199 with real_moderation_session(self.token) as api: 

200 state_res = api.GetModerationState( 

201 moderation_pb2.GetModerationStateReq( 

202 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST, 

203 object_id=friend_request_id, 

204 ) 

205 ) 

206 api.ModerateContent( 

207 moderation_pb2.ModerateContentReq( 

208 moderation_state_id=state_res.moderation_state.moderation_state_id, 

209 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

210 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

211 reason=reason, 

212 clear_flags=True, 

213 ) 

214 ) 

215 

216 def approve_event_occurrence(self, occurrence_id: int, reason: str = "Test approval") -> None: 

217 """ 

218 Approve an event occurrence using the moderation API. 

219 

220 Args: 

221 occurrence_id: The ID of the EventOccurrence (what the proto calls event_id) 

222 reason: Optional reason for approval 

223 """ 

224 with real_moderation_session(self.token) as api: 

225 state_res = api.GetModerationState( 

226 moderation_pb2.GetModerationStateReq( 

227 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE, 

228 object_id=occurrence_id, 

229 ) 

230 ) 

231 api.ModerateContent( 

232 moderation_pb2.ModerateContentReq( 

233 moderation_state_id=state_res.moderation_state.moderation_state_id, 

234 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

235 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

236 reason=reason, 

237 clear_flags=True, 

238 ) 

239 ) 

240 

241 def approve_comment(self, comment_id: int, reason: str = "Test approval") -> None: 

242 """Approve a Comment using the moderation API. comment_id is the database id of the Comment.""" 

243 with real_moderation_session(self.token) as api: 

244 state_res = api.GetModerationState( 

245 moderation_pb2.GetModerationStateReq( 

246 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_COMMENT, 

247 object_id=comment_id, 

248 ) 

249 ) 

250 api.ModerateContent( 

251 moderation_pb2.ModerateContentReq( 

252 moderation_state_id=state_res.moderation_state.moderation_state_id, 

253 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

254 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

255 reason=reason, 

256 clear_flags=True, 

257 ) 

258 ) 

259 

260 def approve_reply(self, reply_id: int, reason: str = "Test approval") -> None: 

261 """Approve a Reply using the moderation API. reply_id is the database id of the Reply.""" 

262 with real_moderation_session(self.token) as api: 

263 state_res = api.GetModerationState( 

264 moderation_pb2.GetModerationStateReq( 

265 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_REPLY, 

266 object_id=reply_id, 

267 ) 

268 ) 

269 api.ModerateContent( 

270 moderation_pb2.ModerateContentReq( 

271 moderation_state_id=state_res.moderation_state.moderation_state_id, 

272 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

273 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

274 reason=reason, 

275 clear_flags=True, 

276 ) 

277 ) 

278 

279 def approve_discussion(self, discussion_id: int, reason: str = "Test approval") -> None: 

280 """Approve a Discussion using the moderation API. discussion_id is the database id of the Discussion.""" 

281 with real_moderation_session(self.token) as api: 

282 state_res = api.GetModerationState( 

283 moderation_pb2.GetModerationStateReq( 

284 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_DISCUSSION, 

285 object_id=discussion_id, 

286 ) 

287 ) 

288 api.ModerateContent( 

289 moderation_pb2.ModerateContentReq( 

290 moderation_state_id=state_res.moderation_state.moderation_state_id, 

291 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

292 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

293 reason=reason, 

294 clear_flags=True, 

295 ) 

296 ) 

297 

298 def approve_reference(self, reference_id: int, reason: str = "Test approval") -> None: 

299 """Approve a Reference using the moderation API.""" 

300 with real_moderation_session(self.token) as api: 

301 state_res = api.GetModerationState( 

302 moderation_pb2.GetModerationStateReq( 

303 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_REFERENCE, 

304 object_id=reference_id, 

305 ) 

306 ) 

307 api.ModerateContent( 

308 moderation_pb2.ModerateContentReq( 

309 moderation_state_id=state_res.moderation_state.moderation_state_id, 

310 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

311 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

312 reason=reason, 

313 clear_flags=True, 

314 ) 

315 ) 

316 

317 def approve_thread_post(self, packed_thread_id: int, reason: str = "Test approval") -> None: 

318 """Approve whichever of Comment/Reply the packed thread_id refers to.""" 

319 database_id, depth = unpack_thread_id(packed_thread_id) 

320 if depth == 1: 

321 self.approve_comment(database_id, reason=reason) 

322 elif depth == 2: 322 ↛ 325line 322 didn't jump to line 325 because the condition on line 322 was always true

323 self.approve_reply(database_id, reason=reason) 

324 else: 

325 raise ValueError(f"approve_thread_post: thread_id {packed_thread_id} has unsupported depth {depth}")