Coverage for app / backend / src / couchers / notifications / send_raw_push_notification.py: 24%
99 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import json
2import logging
3from dataclasses import dataclass
5from sqlalchemy import select
6from sqlalchemy.sql import func
8from couchers.config import config
9from couchers.db import session_scope
10from couchers.metrics import push_notification_counter
11from couchers.models import (
12 PushNotificationDeliveryAttempt,
13 PushNotificationDeliveryOutcome,
14 PushNotificationPlatform,
15 PushNotificationSubscription,
16)
17from couchers.models.notifications import DeviceType
18from couchers.notifications.expo_api import send_expo_push_notification
19from couchers.notifications.web_push_api import send_web_push
20from couchers.proto.internal import jobs_pb2
21from couchers.utils import not_none, now
23logger = logging.getLogger(__name__)
26class PushNotificationError(Exception):
27 """Base exception for push notification errors.
29 Transient errors should raise this base class - they will be retried.
30 """
32 def __init__(self, message: str, *, status_code: int | None = None, response: str | None = None):
33 super().__init__(message)
34 self.status_code = status_code
35 self.response = response
38class PermanentSubscriptionFailure(PushNotificationError):
39 """Subscription is permanently broken and should be disabled.
41 Examples: device unregistered, invalid credentials, 404/410 Gone.
42 """
44 pass
47class PermanentMessageFailure(PushNotificationError):
48 """Message cannot be delivered, but the subscription is still valid.
50 Don't disable the subscription, but don't retry this specific message.
51 """
53 pass
56class MessageTooLong(PermanentMessageFailure):
57 """Message exceeds the platform's size limits."""
59 pass
62@dataclass
63class PushDeliveryResult:
64 """Result of a successful push notification delivery."""
66 status_code: int
67 response: str | None = None
68 expo_ticket_id: str | None = None
71def _send_web_push(
72 sub: PushNotificationSubscription, payload: jobs_pb2.SendRawPushNotificationPayloadV2
73) -> PushDeliveryResult:
74 """Send via Web Push API. Raises appropriate exceptions on failure."""
75 data = json.dumps(
76 {
77 "title": payload.title,
78 "body": payload.body,
79 "icon": payload.icon,
80 "url": payload.url,
81 "user_id": payload.user_id,
82 "topic_action": payload.topic_action,
83 "key": payload.key,
84 }
85 ).encode("utf8")
87 if len(data) > 3072:
88 raise MessageTooLong(f"Data too long for web push ({len(data)} bytes, max 3072)")
90 resp = send_web_push(
91 data,
92 not_none(sub.endpoint),
93 not_none(sub.auth_key),
94 not_none(sub.p256dh_key),
95 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"],
96 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"],
97 ttl=payload.ttl,
98 )
100 if resp.status_code in [200, 201, 202]:
101 return PushDeliveryResult(status_code=resp.status_code, response=resp.text)
103 if resp.status_code in [404, 410]:
104 raise PermanentSubscriptionFailure(
105 f"Subscription gone (HTTP {resp.status_code})",
106 status_code=resp.status_code,
107 response=resp.text,
108 )
110 # Other errors are transient - will retry
111 raise PushNotificationError(
112 f"Web push failed (HTTP {resp.status_code})",
113 status_code=resp.status_code,
114 response=resp.text,
115 )
118def _send_expo(
119 sub: PushNotificationSubscription, payload: jobs_pb2.SendRawPushNotificationPayloadV2
120) -> PushDeliveryResult:
121 """Send via Expo Push API. Raises appropriate exceptions on failure."""
122 collapse_key = None
123 if payload.topic_action and payload.key:
124 collapse_key = f"{payload.topic_action}_{payload.key}"
126 title: str
127 ios_subtitle: str | None = None
128 if sub.device_type == DeviceType.ios and payload.ios_title:
129 # Prefer the iOS-specific title/subtitle pair if available.
130 title = payload.ios_title
131 ios_subtitle = payload.ios_subtitle
132 else:
133 title = payload.title
135 result = send_expo_push_notification(
136 token=not_none(sub.token),
137 title=title,
138 ios_subtitle=ios_subtitle,
139 body=payload.body,
140 data={
141 "url": payload.url,
142 "topic_action": payload.topic_action,
143 "key": payload.key,
144 },
145 collapse_key=collapse_key,
146 )
148 # Parse the Expo response
149 response_data = {}
150 if isinstance(result.get("data"), list) and len(result.get("data", [])) > 0:
151 response_data = result["data"][0]
152 elif isinstance(result.get("data"), dict):
153 response_data = result["data"]
155 status = response_data.get("status", "unknown")
156 response_str = str(result)
158 if status == "ok":
159 # Extract ticket ID for receipt checking
160 ticket_id = response_data.get("id")
161 return PushDeliveryResult(status_code=200, response=response_str, expo_ticket_id=ticket_id)
163 # Handle error status
164 error_code = response_data.get("details", {}).get("error")
166 if error_code == "MessageTooBig":
167 raise MessageTooLong(
168 f"Expo message too big: {error_code}",
169 status_code=400,
170 response=response_str,
171 )
173 if error_code in {"DeviceNotRegistered", "InvalidCredentials"}:
174 raise PermanentSubscriptionFailure(
175 f"Expo subscription invalid: {error_code}",
176 status_code=400,
177 response=response_str,
178 )
180 # Other errors are transient - will retry
181 raise PushNotificationError(
182 f"Expo push failed: {error_code or status}",
183 status_code=400,
184 response=response_str,
185 )
188def send_raw_push_notification_v2(payload: jobs_pb2.SendRawPushNotificationPayloadV2) -> None:
189 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
190 logger.info("Not sending push notification: push notifications disabled")
191 return
193 with session_scope() as session:
194 sub = session.execute(
195 select(PushNotificationSubscription).where(
196 PushNotificationSubscription.id == payload.push_notification_subscription_id
197 )
198 ).scalar_one()
200 if sub.disabled_at < now():
201 logger.info(f"Skipping push to already-disabled subscription {sub.id}")
202 return
204 try:
205 if sub.platform == PushNotificationPlatform.web_push:
206 result = _send_web_push(sub, payload)
207 elif sub.platform == PushNotificationPlatform.expo:
208 result = _send_expo(sub, payload)
209 else:
210 raise ValueError(f"Unknown platform: {sub.platform}")
212 # Success - receipt will be checked by the batch job check_expo_push_receipts
213 session.add(
214 PushNotificationDeliveryAttempt(
215 push_notification_subscription_id=sub.id,
216 outcome=PushNotificationDeliveryOutcome.success,
217 status_code=result.status_code,
218 response=result.response,
219 expo_ticket_id=result.expo_ticket_id,
220 )
221 )
223 push_notification_counter.labels(platform=sub.platform.name, outcome="success").inc()
224 logger.debug(f"Successfully sent push to sub {sub.id} for user {sub.user_id}")
226 except PermanentSubscriptionFailure as e:
227 logger.info(f"Disabling push sub {sub.id} for user {sub.user_id}: {e}")
228 session.add(
229 PushNotificationDeliveryAttempt(
230 push_notification_subscription_id=sub.id,
231 outcome=PushNotificationDeliveryOutcome.permanent_subscription_failure,
232 status_code=e.status_code,
233 response=e.response,
234 )
235 )
236 sub.disabled_at = func.now()
237 push_notification_counter.labels(platform=sub.platform.name, outcome="permanent_subscription_failure").inc()
239 except PermanentMessageFailure as e:
240 logger.warning(f"Permanent message failure for sub {sub.id}: {e}")
241 session.add(
242 PushNotificationDeliveryAttempt(
243 push_notification_subscription_id=sub.id,
244 outcome=PushNotificationDeliveryOutcome.permanent_message_failure,
245 status_code=e.status_code,
246 response=e.response,
247 )
248 )
249 push_notification_counter.labels(platform=sub.platform.name, outcome="permanent_message_failure").inc()
251 except PushNotificationError as e:
252 # Transient error - log attempt and re-raise to trigger retry
253 logger.warning(f"Transient push failure for sub {sub.id}: {e}")
254 session.add(
255 PushNotificationDeliveryAttempt(
256 push_notification_subscription_id=sub.id,
257 outcome=PushNotificationDeliveryOutcome.transient_failure,
258 status_code=e.status_code,
259 response=e.response,
260 )
261 )
262 push_notification_counter.labels(platform=sub.platform.name, outcome="transient_failure").inc()
263 session.commit()
264 raise