Coverage for app/backend/src/tests/fixtures/misc.py: 98%
125 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1from dataclasses import dataclass
2from typing import Any
3from unittest.mock import patch
5from sqlalchemy.orm import Session
7from couchers.config import config
8from couchers.jobs.worker import process_job
9from couchers.models import User
10from couchers.notifications.push import PushNotificationContent
11from couchers.proto import moderation_pb2
12from couchers.proto.internal import jobs_pb2
13from couchers.servicers.threads import unpack_thread_id
14from tests.fixtures.sessions import real_moderation_session
17def process_jobs() -> None:
18 while process_job():
19 pass
22class EmailCollector:
23 """Intercepts emails so they can be verified by tests."""
25 def __init__(self) -> None:
26 # Collected emails by recipient address, chronologically.
27 self.by_recipient: dict[str, list[jobs_pb2.SendEmailPayload]] = {}
28 self._patch = patch("couchers.email.queuing._queue_email", self._mock_queue_email)
30 def _mock_queue_email(self, session: Session, payload: jobs_pb2.SendEmailPayload) -> None:
31 if payload.recipient not in self.by_recipient:
32 self.by_recipient[payload.recipient] = []
33 self.by_recipient[payload.recipient].append(payload)
35 def __enter__(self):
36 process_jobs() # Flush any emails prior to this point
37 self.by_recipient.clear()
38 self._patch.start()
39 return self
41 def __exit__(self, exc_type, exc_val, exc_tb):
42 self._patch.stop()
43 return False # Let any exception propagate
45 def count(self) -> int:
46 process_jobs()
47 return sum(len(v) for v in self.by_recipient.values())
49 def count_for_recipient(self, recipient: str) -> int:
50 process_jobs()
51 return len(self.by_recipient.get(recipient, []))
53 def count_for_mods(self) -> int:
54 return self.count_for_recipient(config.MODS_EMAIL_RECIPIENT)
56 def count_for_reports(self) -> int:
57 return self.count_for_recipient(config.REPORTS_EMAIL_RECIPIENT)
59 def pop_for_recipient(self, recipient: str, *, last: bool = False) -> jobs_pb2.SendEmailPayload:
60 """
61 Removes and returns the oldest email queued to a given recipient,
62 optionally asserting that it is the last one.
63 """
64 process_jobs()
65 emails = self.by_recipient.get(recipient)
66 assert emails, f"No emails to pop for recipient {recipient}."
67 if last:
68 assert len(emails) == 1, f"Expected a single email for recipient {recipient}."
69 return emails.pop(0)
71 def pop_for_mods(self, *, last: bool = False) -> jobs_pb2.SendEmailPayload:
72 return self.pop_for_recipient(config.MODS_EMAIL_RECIPIENT, last=last)
74 def pop_for_reports(self, *, last: bool = False) -> jobs_pb2.SendEmailPayload:
75 return self.pop_for_recipient(config.REPORTS_EMAIL_RECIPIENT, last=last)
78@dataclass(frozen=True, slots=True, kw_only=True)
79class Push:
80 topic_action: str
81 content: PushNotificationContent
82 key: str | None = None
83 ttl: int | None = None
86class PushCollector:
87 """Captures push notifications and allows inspecting them."""
89 def __init__(self) -> None:
90 # Collected notifications by user id, chronologically.
91 self.by_user: dict[int, list[Push]] = {}
92 self._patch = patch("couchers.notifications.push._push_to_user", self._mock_push_to_user)
94 def _mock_push_to_user(self, session: Session, user_id: int, **kwargs: Any) -> None:
95 if user_id not in self.by_user:
96 self.by_user[user_id] = []
97 self.by_user[user_id].append(Push(**kwargs))
99 def __enter__(self):
100 process_jobs() # Flush any push notifications prior to this point
101 self.by_user.clear()
102 self._patch.start()
103 return self
105 def __exit__(self, exc_type, exc_val, exc_tb):
106 self._patch.stop()
107 return False # Let any exception propagate
109 def count_for_user(self, user_id: int) -> int:
110 process_jobs()
111 return len(self.by_user.get(user_id, []))
113 def pop_for_user(self, user_id: int, *, last: bool = False) -> Push:
114 """
115 Removes and returns the oldest push notification received by the given user,
116 optionally asserting that it is the last one.
117 """
118 process_jobs()
119 pushes = self.by_user.get(user_id)
120 assert pushes, f"No notifications to pop for user {user_id}."
121 if last:
122 assert len(pushes) == 1, f"Expected a single notification for user {user_id}."
123 return pushes.pop(0)
126class Moderator:
127 """
128 A test fixture that provides a moderator user and methods to exercise the moderation API.
130 Usage:
131 def test_example(db, moderator):
132 user, token = generate_user()
133 # ... create a host request ...
134 moderator.approve_host_request(host_request_id)
135 """
137 def __init__(self, user: User, token: str):
138 self.user = user
139 self.token = token
141 def approve_host_request(self, host_request_id: int, reason: str = "Test approval") -> None:
142 """
143 Approve a host request using the moderation API.
145 Args:
146 host_request_id: The conversation_id of the host request
147 reason: Optional reason for approval
148 """
149 with real_moderation_session(self.token) as api:
150 state_res = api.GetModerationState(
151 moderation_pb2.GetModerationStateReq(
152 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST,
153 object_id=host_request_id,
154 )
155 )
156 api.ModerateContent(
157 moderation_pb2.ModerateContentReq(
158 moderation_state_id=state_res.moderation_state.moderation_state_id,
159 action=moderation_pb2.MODERATION_ACTION_APPROVE,
160 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
161 reason=reason,
162 clear_flags=True,
163 )
164 )
166 def approve_group_chat(self, group_chat_id: int, reason: str = "Test approval") -> None:
167 """
168 Approve a group chat using the moderation API.
170 Args:
171 group_chat_id: The conversation_id of the group chat
172 reason: Optional reason for approval
173 """
174 with real_moderation_session(self.token) as api:
175 state_res = api.GetModerationState(
176 moderation_pb2.GetModerationStateReq(
177 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT,
178 object_id=group_chat_id,
179 )
180 )
181 api.ModerateContent(
182 moderation_pb2.ModerateContentReq(
183 moderation_state_id=state_res.moderation_state.moderation_state_id,
184 action=moderation_pb2.MODERATION_ACTION_APPROVE,
185 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
186 reason=reason,
187 clear_flags=True,
188 )
189 )
191 def approve_friend_request(self, friend_request_id: int, reason: str = "Test approval") -> None:
192 """
193 Approve a friend request using the moderation API.
195 Args:
196 friend_request_id: The ID of the friend request (FriendRelationship.id)
197 reason: Optional reason for approval
198 """
199 with real_moderation_session(self.token) as api:
200 state_res = api.GetModerationState(
201 moderation_pb2.GetModerationStateReq(
202 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST,
203 object_id=friend_request_id,
204 )
205 )
206 api.ModerateContent(
207 moderation_pb2.ModerateContentReq(
208 moderation_state_id=state_res.moderation_state.moderation_state_id,
209 action=moderation_pb2.MODERATION_ACTION_APPROVE,
210 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
211 reason=reason,
212 clear_flags=True,
213 )
214 )
216 def approve_event_occurrence(self, occurrence_id: int, reason: str = "Test approval") -> None:
217 """
218 Approve an event occurrence using the moderation API.
220 Args:
221 occurrence_id: The ID of the EventOccurrence (what the proto calls event_id)
222 reason: Optional reason for approval
223 """
224 with real_moderation_session(self.token) as api:
225 state_res = api.GetModerationState(
226 moderation_pb2.GetModerationStateReq(
227 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE,
228 object_id=occurrence_id,
229 )
230 )
231 api.ModerateContent(
232 moderation_pb2.ModerateContentReq(
233 moderation_state_id=state_res.moderation_state.moderation_state_id,
234 action=moderation_pb2.MODERATION_ACTION_APPROVE,
235 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
236 reason=reason,
237 clear_flags=True,
238 )
239 )
241 def approve_comment(self, comment_id: int, reason: str = "Test approval") -> None:
242 """Approve a Comment using the moderation API. comment_id is the database id of the Comment."""
243 with real_moderation_session(self.token) as api:
244 state_res = api.GetModerationState(
245 moderation_pb2.GetModerationStateReq(
246 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_COMMENT,
247 object_id=comment_id,
248 )
249 )
250 api.ModerateContent(
251 moderation_pb2.ModerateContentReq(
252 moderation_state_id=state_res.moderation_state.moderation_state_id,
253 action=moderation_pb2.MODERATION_ACTION_APPROVE,
254 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
255 reason=reason,
256 clear_flags=True,
257 )
258 )
260 def approve_reply(self, reply_id: int, reason: str = "Test approval") -> None:
261 """Approve a Reply using the moderation API. reply_id is the database id of the Reply."""
262 with real_moderation_session(self.token) as api:
263 state_res = api.GetModerationState(
264 moderation_pb2.GetModerationStateReq(
265 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_REPLY,
266 object_id=reply_id,
267 )
268 )
269 api.ModerateContent(
270 moderation_pb2.ModerateContentReq(
271 moderation_state_id=state_res.moderation_state.moderation_state_id,
272 action=moderation_pb2.MODERATION_ACTION_APPROVE,
273 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
274 reason=reason,
275 clear_flags=True,
276 )
277 )
279 def approve_discussion(self, discussion_id: int, reason: str = "Test approval") -> None:
280 """Approve a Discussion using the moderation API. discussion_id is the database id of the Discussion."""
281 with real_moderation_session(self.token) as api:
282 state_res = api.GetModerationState(
283 moderation_pb2.GetModerationStateReq(
284 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_DISCUSSION,
285 object_id=discussion_id,
286 )
287 )
288 api.ModerateContent(
289 moderation_pb2.ModerateContentReq(
290 moderation_state_id=state_res.moderation_state.moderation_state_id,
291 action=moderation_pb2.MODERATION_ACTION_APPROVE,
292 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
293 reason=reason,
294 clear_flags=True,
295 )
296 )
298 def approve_reference(self, reference_id: int, reason: str = "Test approval") -> None:
299 """Approve a Reference using the moderation API."""
300 with real_moderation_session(self.token) as api:
301 state_res = api.GetModerationState(
302 moderation_pb2.GetModerationStateReq(
303 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_REFERENCE,
304 object_id=reference_id,
305 )
306 )
307 api.ModerateContent(
308 moderation_pb2.ModerateContentReq(
309 moderation_state_id=state_res.moderation_state.moderation_state_id,
310 action=moderation_pb2.MODERATION_ACTION_APPROVE,
311 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
312 reason=reason,
313 clear_flags=True,
314 )
315 )
317 def approve_thread_post(self, packed_thread_id: int, reason: str = "Test approval") -> None:
318 """Approve whichever of Comment/Reply the packed thread_id refers to."""
319 database_id, depth = unpack_thread_id(packed_thread_id)
320 if depth == 1:
321 self.approve_comment(database_id, reason=reason)
322 elif depth == 2: 322 ↛ 325line 322 didn't jump to line 325 because the condition on line 322 was always true
323 self.approve_reply(database_id, reason=reason)
324 else:
325 raise ValueError(f"approve_thread_post: thread_id {packed_thread_id} has unsupported depth {depth}")