Coverage for src/couchers/notifications/web_push_api.py: 57%
37 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-06 23:17 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-06 23:17 +0000
1import logging
2from time import time
3from typing import Any
4from urllib.parse import urlparse
6import http_ece
7import requests
8from cryptography.hazmat.primitives import serialization
9from cryptography.hazmat.primitives.asymmetric import ec
10from py_vapid import Vapid
12from couchers.crypto import b64decode_unpadded, b64encode_unpadded
14logger = logging.getLogger(__name__)
17def gen_vapid_keys() -> tuple[str, str]:
18 prv_key = ec.generate_private_key(ec.SECP256R1())
19 pub_key = prv_key.public_key()
20 prv = prv_key.private_numbers().private_value.to_bytes(length=32)
21 pub = pub_key.public_bytes(serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint)
22 return b64encode_unpadded(prv), b64encode_unpadded(pub)
25def get_vapid_public_key_from_private_key(private: str) -> str:
26 pub = Vapid.from_string(private).public_key
27 result: str = b64encode_unpadded(
28 pub.public_bytes(serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint)
29 )
30 return result
33def generate_vapid_authorization(endpoint: str, vapid_sub: str, vapid_private_key: str) -> str:
34 url = urlparse(endpoint)
35 vapid_claim = {
36 "sub": vapid_sub,
37 "aud": f"{url.scheme}://{url.netloc}",
38 "exp": int(time()) + (12 * 60 * 60),
39 }
40 return Vapid.from_string(private_key=vapid_private_key).sign(vapid_claim)["Authorization"] # type: ignore[no-any-return]
43def send_web_push(
44 data: bytes,
45 endpoint: str,
46 auth_key: bytes,
47 receiver_key: bytes,
48 vapid_sub: str,
49 vapid_private_key: str,
50 ttl: int = 0,
51) -> requests.Response:
52 logger.debug(f"Sending {len(data)} bytes to {endpoint[:20]}...")
53 headers = {
54 "authorization": generate_vapid_authorization(endpoint, vapid_sub, vapid_private_key),
55 "content-encoding": "aes128gcm",
56 "ttl": str(ttl),
57 }
59 encrypted = http_ece.encrypt(
60 data,
61 private_key=ec.generate_private_key(ec.SECP256R1()),
62 auth_secret=auth_key,
63 dh=receiver_key,
64 )
66 return requests.post(
67 endpoint,
68 timeout=20,
69 data=encrypted,
70 headers=headers,
71 )
74def decode_key(value: str) -> bytes:
75 return b64decode_unpadded(value.encode())
78def parse_subscription_info(subscription_info: dict[str, Any]) -> tuple[str, bytes, bytes]:
79 endpoint = subscription_info["endpoint"]
80 auth_key = decode_key(subscription_info["keys"]["auth"])
81 receiver_key = decode_key(subscription_info["keys"]["p256dh"])
82 return endpoint, auth_key, receiver_key