Coverage for src/couchers/notifications/send_raw_push_notification.py: 32%
94 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-08 16:02 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-08 16:02 +0000
1import json
2import logging
3from dataclasses import dataclass
5from sqlalchemy.sql import func
7from couchers.config import config
8from couchers.db import session_scope
9from couchers.metrics import push_notification_counter
10from couchers.models import (
11 PushNotificationDeliveryAttempt,
12 PushNotificationDeliveryOutcome,
13 PushNotificationPlatform,
14 PushNotificationSubscription,
15)
16from couchers.notifications.expo_api import send_expo_push_notification
17from couchers.notifications.web_push_api import send_web_push
18from couchers.proto.internal import jobs_pb2
19from couchers.sql import couchers_select as select
20from couchers.utils import now
22logger = logging.getLogger(__name__)
25class PushNotificationError(Exception):
26 """Base exception for push notification errors.
28 Transient errors should raise this base class - they will be retried.
29 """
31 def __init__(self, message: str, *, status_code: int | None = None, response: str | None = None):
32 super().__init__(message)
33 self.status_code = status_code
34 self.response = response
37class PermanentSubscriptionFailure(PushNotificationError):
38 """Subscription is permanently broken and should be disabled.
40 Examples: device unregistered, invalid credentials, 404/410 Gone.
41 """
43 pass
46class PermanentMessageFailure(PushNotificationError):
47 """Message cannot be delivered, but the subscription is still valid.
49 Don't disable the subscription, but don't retry this specific message.
50 """
52 pass
55class MessageTooLong(PermanentMessageFailure):
56 """Message exceeds the platform's size limits."""
58 pass
61@dataclass
62class PushDeliveryResult:
63 """Result of a successful push notification delivery."""
65 status_code: int
66 response: str | None = None
67 expo_ticket_id: str | None = None
70def _send_web_push(
71 sub: PushNotificationSubscription, payload: jobs_pb2.SendRawPushNotificationPayloadV2
72) -> PushDeliveryResult:
73 """Send via Web Push API. Raises appropriate exceptions on failure."""
74 data = json.dumps(
75 {
76 "title": payload.title,
77 "body": payload.body,
78 "icon": payload.icon,
79 "url": payload.url,
80 "user_id": payload.user_id,
81 "topic_action": payload.topic_action,
82 "key": payload.key,
83 }
84 ).encode("utf8")
86 if len(data) > 3072:
87 raise MessageTooLong(f"Data too long for web push ({len(data)} bytes, max 3072)")
89 resp = send_web_push(
90 data,
91 sub.endpoint,
92 sub.auth_key,
93 sub.p256dh_key,
94 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"],
95 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"],
96 ttl=payload.ttl,
97 )
99 if resp.status_code in [200, 201, 202]:
100 return PushDeliveryResult(status_code=resp.status_code, response=resp.text)
102 if resp.status_code in [404, 410]:
103 raise PermanentSubscriptionFailure(
104 f"Subscription gone (HTTP {resp.status_code})",
105 status_code=resp.status_code,
106 response=resp.text,
107 )
109 # Other errors are transient - will retry
110 raise PushNotificationError(
111 f"Web push failed (HTTP {resp.status_code})",
112 status_code=resp.status_code,
113 response=resp.text,
114 )
117def _send_expo(
118 sub: PushNotificationSubscription, payload: jobs_pb2.SendRawPushNotificationPayloadV2
119) -> PushDeliveryResult:
120 """Send via Expo Push API. Raises appropriate exceptions on failure."""
121 collapse_key = None
122 if payload.topic_action and payload.key:
123 collapse_key = f"{payload.topic_action}_{payload.key}"
125 result = send_expo_push_notification(
126 token=sub.token,
127 title=payload.title,
128 body=payload.body,
129 data={
130 "url": payload.url,
131 "topic_action": payload.topic_action,
132 "key": payload.key,
133 },
134 collapse_key=collapse_key,
135 )
137 # Parse the Expo response
138 response_data = {}
139 if isinstance(result.get("data"), list) and len(result.get("data", [])) > 0:
140 response_data = result["data"][0]
141 elif isinstance(result.get("data"), dict):
142 response_data = result["data"]
144 status = response_data.get("status", "unknown")
145 response_str = str(result)
147 if status == "ok":
148 # Extract ticket ID for receipt checking
149 ticket_id = response_data.get("id")
150 return PushDeliveryResult(status_code=200, response=response_str, expo_ticket_id=ticket_id)
152 # Handle error status
153 error_code = response_data.get("details", {}).get("error")
155 if error_code == "MessageTooBig":
156 raise MessageTooLong(
157 f"Expo message too big: {error_code}",
158 status_code=400,
159 response=response_str,
160 )
162 if error_code in {"DeviceNotRegistered", "InvalidCredentials"}:
163 raise PermanentSubscriptionFailure(
164 f"Expo subscription invalid: {error_code}",
165 status_code=400,
166 response=response_str,
167 )
169 # Other errors are transient - will retry
170 raise PushNotificationError(
171 f"Expo push failed: {error_code or status}",
172 status_code=400,
173 response=response_str,
174 )
177def send_raw_push_notification_v2(payload: jobs_pb2.SendRawPushNotificationPayloadV2) -> None:
178 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
179 logger.info("Not sending push notification: push notifications disabled")
180 return
182 with session_scope() as session:
183 sub = session.execute(
184 select(PushNotificationSubscription).where(
185 PushNotificationSubscription.id == payload.push_notification_subscription_id
186 )
187 ).scalar_one()
189 if sub.disabled_at < now():
190 logger.info(f"Skipping push to already-disabled subscription {sub.id}")
191 return
193 try:
194 if sub.platform == PushNotificationPlatform.web_push:
195 result = _send_web_push(sub, payload)
196 elif sub.platform == PushNotificationPlatform.expo:
197 result = _send_expo(sub, payload)
198 else:
199 raise ValueError(f"Unknown platform: {sub.platform}")
201 # Success - receipt will be checked by the batch job check_expo_push_receipts
202 session.add(
203 PushNotificationDeliveryAttempt(
204 push_notification_subscription_id=sub.id,
205 outcome=PushNotificationDeliveryOutcome.success,
206 status_code=result.status_code,
207 response=result.response,
208 expo_ticket_id=result.expo_ticket_id,
209 )
210 )
212 push_notification_counter.labels(platform=sub.platform.name, outcome="success").inc()
213 logger.debug(f"Successfully sent push to sub {sub.id} for user {sub.user_id}")
215 except PermanentSubscriptionFailure as e:
216 logger.info(f"Disabling push sub {sub.id} for user {sub.user_id}: {e}")
217 session.add(
218 PushNotificationDeliveryAttempt(
219 push_notification_subscription_id=sub.id,
220 outcome=PushNotificationDeliveryOutcome.permanent_subscription_failure,
221 status_code=e.status_code,
222 response=e.response,
223 )
224 )
225 sub.disabled_at = func.now()
226 push_notification_counter.labels(platform=sub.platform.name, outcome="permanent_subscription_failure").inc()
228 except PermanentMessageFailure as e:
229 logger.warning(f"Permanent message failure for sub {sub.id}: {e}")
230 session.add(
231 PushNotificationDeliveryAttempt(
232 push_notification_subscription_id=sub.id,
233 outcome=PushNotificationDeliveryOutcome.permanent_message_failure,
234 status_code=e.status_code,
235 response=e.response,
236 )
237 )
238 push_notification_counter.labels(platform=sub.platform.name, outcome="permanent_message_failure").inc()
240 except PushNotificationError as e:
241 # Transient error - log attempt and re-raise to trigger retry
242 logger.warning(f"Transient push failure for sub {sub.id}: {e}")
243 session.add(
244 PushNotificationDeliveryAttempt(
245 push_notification_subscription_id=sub.id,
246 outcome=PushNotificationDeliveryOutcome.transient_failure,
247 status_code=e.status_code,
248 response=e.response,
249 )
250 )
251 push_notification_counter.labels(platform=sub.platform.name, outcome="transient_failure").inc()
252 session.commit()
253 raise