Coverage for src/couchers/notifications/send_raw_push_notification.py: 32%

94 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-08 16:02 +0000

1import json 

2import logging 

3from dataclasses import dataclass 

4 

5from sqlalchemy.sql import func 

6 

7from couchers.config import config 

8from couchers.db import session_scope 

9from couchers.metrics import push_notification_counter 

10from couchers.models import ( 

11 PushNotificationDeliveryAttempt, 

12 PushNotificationDeliveryOutcome, 

13 PushNotificationPlatform, 

14 PushNotificationSubscription, 

15) 

16from couchers.notifications.expo_api import send_expo_push_notification 

17from couchers.notifications.web_push_api import send_web_push 

18from couchers.proto.internal import jobs_pb2 

19from couchers.sql import couchers_select as select 

20from couchers.utils import now 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class PushNotificationError(Exception): 

26 """Base exception for push notification errors. 

27 

28 Transient errors should raise this base class - they will be retried. 

29 """ 

30 

31 def __init__(self, message: str, *, status_code: int | None = None, response: str | None = None): 

32 super().__init__(message) 

33 self.status_code = status_code 

34 self.response = response 

35 

36 

37class PermanentSubscriptionFailure(PushNotificationError): 

38 """Subscription is permanently broken and should be disabled. 

39 

40 Examples: device unregistered, invalid credentials, 404/410 Gone. 

41 """ 

42 

43 pass 

44 

45 

46class PermanentMessageFailure(PushNotificationError): 

47 """Message cannot be delivered, but the subscription is still valid. 

48 

49 Don't disable the subscription, but don't retry this specific message. 

50 """ 

51 

52 pass 

53 

54 

55class MessageTooLong(PermanentMessageFailure): 

56 """Message exceeds the platform's size limits.""" 

57 

58 pass 

59 

60 

61@dataclass 

62class PushDeliveryResult: 

63 """Result of a successful push notification delivery.""" 

64 

65 status_code: int 

66 response: str | None = None 

67 expo_ticket_id: str | None = None 

68 

69 

70def _send_web_push( 

71 sub: PushNotificationSubscription, payload: jobs_pb2.SendRawPushNotificationPayloadV2 

72) -> PushDeliveryResult: 

73 """Send via Web Push API. Raises appropriate exceptions on failure.""" 

74 data = json.dumps( 

75 { 

76 "title": payload.title, 

77 "body": payload.body, 

78 "icon": payload.icon, 

79 "url": payload.url, 

80 "user_id": payload.user_id, 

81 "topic_action": payload.topic_action, 

82 "key": payload.key, 

83 } 

84 ).encode("utf8") 

85 

86 if len(data) > 3072: 

87 raise MessageTooLong(f"Data too long for web push ({len(data)} bytes, max 3072)") 

88 

89 resp = send_web_push( 

90 data, 

91 sub.endpoint, 

92 sub.auth_key, 

93 sub.p256dh_key, 

94 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"], 

95 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"], 

96 ttl=payload.ttl, 

97 ) 

98 

99 if resp.status_code in [200, 201, 202]: 

100 return PushDeliveryResult(status_code=resp.status_code, response=resp.text) 

101 

102 if resp.status_code in [404, 410]: 

103 raise PermanentSubscriptionFailure( 

104 f"Subscription gone (HTTP {resp.status_code})", 

105 status_code=resp.status_code, 

106 response=resp.text, 

107 ) 

108 

109 # Other errors are transient - will retry 

110 raise PushNotificationError( 

111 f"Web push failed (HTTP {resp.status_code})", 

112 status_code=resp.status_code, 

113 response=resp.text, 

114 ) 

115 

116 

117def _send_expo( 

118 sub: PushNotificationSubscription, payload: jobs_pb2.SendRawPushNotificationPayloadV2 

119) -> PushDeliveryResult: 

120 """Send via Expo Push API. Raises appropriate exceptions on failure.""" 

121 collapse_key = None 

122 if payload.topic_action and payload.key: 

123 collapse_key = f"{payload.topic_action}_{payload.key}" 

124 

125 result = send_expo_push_notification( 

126 token=sub.token, 

127 title=payload.title, 

128 body=payload.body, 

129 data={ 

130 "url": payload.url, 

131 "topic_action": payload.topic_action, 

132 "key": payload.key, 

133 }, 

134 collapse_key=collapse_key, 

135 ) 

136 

137 # Parse the Expo response 

138 response_data = {} 

139 if isinstance(result.get("data"), list) and len(result.get("data", [])) > 0: 

140 response_data = result["data"][0] 

141 elif isinstance(result.get("data"), dict): 

142 response_data = result["data"] 

143 

144 status = response_data.get("status", "unknown") 

145 response_str = str(result) 

146 

147 if status == "ok": 

148 # Extract ticket ID for receipt checking 

149 ticket_id = response_data.get("id") 

150 return PushDeliveryResult(status_code=200, response=response_str, expo_ticket_id=ticket_id) 

151 

152 # Handle error status 

153 error_code = response_data.get("details", {}).get("error") 

154 

155 if error_code == "MessageTooBig": 

156 raise MessageTooLong( 

157 f"Expo message too big: {error_code}", 

158 status_code=400, 

159 response=response_str, 

160 ) 

161 

162 if error_code in {"DeviceNotRegistered", "InvalidCredentials"}: 

163 raise PermanentSubscriptionFailure( 

164 f"Expo subscription invalid: {error_code}", 

165 status_code=400, 

166 response=response_str, 

167 ) 

168 

169 # Other errors are transient - will retry 

170 raise PushNotificationError( 

171 f"Expo push failed: {error_code or status}", 

172 status_code=400, 

173 response=response_str, 

174 ) 

175 

176 

177def send_raw_push_notification_v2(payload: jobs_pb2.SendRawPushNotificationPayloadV2) -> None: 

178 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

179 logger.info("Not sending push notification: push notifications disabled") 

180 return 

181 

182 with session_scope() as session: 

183 sub = session.execute( 

184 select(PushNotificationSubscription).where( 

185 PushNotificationSubscription.id == payload.push_notification_subscription_id 

186 ) 

187 ).scalar_one() 

188 

189 if sub.disabled_at < now(): 

190 logger.info(f"Skipping push to already-disabled subscription {sub.id}") 

191 return 

192 

193 try: 

194 if sub.platform == PushNotificationPlatform.web_push: 

195 result = _send_web_push(sub, payload) 

196 elif sub.platform == PushNotificationPlatform.expo: 

197 result = _send_expo(sub, payload) 

198 else: 

199 raise ValueError(f"Unknown platform: {sub.platform}") 

200 

201 # Success - receipt will be checked by the batch job check_expo_push_receipts 

202 session.add( 

203 PushNotificationDeliveryAttempt( 

204 push_notification_subscription_id=sub.id, 

205 outcome=PushNotificationDeliveryOutcome.success, 

206 status_code=result.status_code, 

207 response=result.response, 

208 expo_ticket_id=result.expo_ticket_id, 

209 ) 

210 ) 

211 

212 push_notification_counter.labels(platform=sub.platform.name, outcome="success").inc() 

213 logger.debug(f"Successfully sent push to sub {sub.id} for user {sub.user_id}") 

214 

215 except PermanentSubscriptionFailure as e: 

216 logger.info(f"Disabling push sub {sub.id} for user {sub.user_id}: {e}") 

217 session.add( 

218 PushNotificationDeliveryAttempt( 

219 push_notification_subscription_id=sub.id, 

220 outcome=PushNotificationDeliveryOutcome.permanent_subscription_failure, 

221 status_code=e.status_code, 

222 response=e.response, 

223 ) 

224 ) 

225 sub.disabled_at = func.now() 

226 push_notification_counter.labels(platform=sub.platform.name, outcome="permanent_subscription_failure").inc() 

227 

228 except PermanentMessageFailure as e: 

229 logger.warning(f"Permanent message failure for sub {sub.id}: {e}") 

230 session.add( 

231 PushNotificationDeliveryAttempt( 

232 push_notification_subscription_id=sub.id, 

233 outcome=PushNotificationDeliveryOutcome.permanent_message_failure, 

234 status_code=e.status_code, 

235 response=e.response, 

236 ) 

237 ) 

238 push_notification_counter.labels(platform=sub.platform.name, outcome="permanent_message_failure").inc() 

239 

240 except PushNotificationError as e: 

241 # Transient error - log attempt and re-raise to trigger retry 

242 logger.warning(f"Transient push failure for sub {sub.id}: {e}") 

243 session.add( 

244 PushNotificationDeliveryAttempt( 

245 push_notification_subscription_id=sub.id, 

246 outcome=PushNotificationDeliveryOutcome.transient_failure, 

247 status_code=e.status_code, 

248 response=e.response, 

249 ) 

250 ) 

251 push_notification_counter.labels(platform=sub.platform.name, outcome="transient_failure").inc() 

252 session.commit() 

253 raise