Coverage for src/couchers/notifications/push_api.py: 54%
35 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-01-08 04:37 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-01-08 04:37 +0000
1import logging
2from time import time
3from urllib.parse import urlparse
5import http_ece
6import requests
7from cryptography.hazmat.primitives import serialization
8from cryptography.hazmat.primitives.asymmetric import ec
9from py_vapid import Vapid
11from couchers.crypto import b64decode_unpadded, b64encode_unpadded
13logger = logging.getLogger(__name__)
16def gen_vapid_keys():
17 prv_key = ec.generate_private_key(ec.SECP256R1())
18 pub_key = prv_key.public_key()
19 prv = prv_key.private_numbers().private_value.to_bytes(length=32)
20 pub = pub_key.public_bytes(serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint)
21 return b64encode_unpadded(prv), b64encode_unpadded(pub)
24def get_vapid_public_key_from_private_key(private):
25 pub = Vapid.from_string(private).public_key
26 return b64encode_unpadded(
27 pub.public_bytes(serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint)
28 )
31def generate_vapid_authorization(endpoint, vapid_sub, vapid_private_key):
32 url = urlparse(endpoint)
33 vapid_claim = {
34 "sub": vapid_sub,
35 "aud": f"{url.scheme}://{url.netloc}",
36 "exp": int(time()) + (12 * 60 * 60),
37 }
38 return Vapid.from_string(private_key=vapid_private_key).sign(vapid_claim)["Authorization"]
41def send_push(data, endpoint, auth_key, receiver_key, vapid_sub, vapid_private_key, ttl=0):
42 logger.debug(f"Sending {len(data)} bytes to {endpoint[:20]}...")
43 headers = {
44 "authorization": generate_vapid_authorization(endpoint, vapid_sub, vapid_private_key),
45 "content-encoding": "aes128gcm",
46 "ttl": str(ttl),
47 }
49 encrypted = http_ece.encrypt(
50 data,
51 private_key=ec.generate_private_key(ec.SECP256R1()),
52 auth_secret=auth_key,
53 dh=receiver_key,
54 )
56 return requests.post(
57 endpoint,
58 timeout=20,
59 data=encrypted,
60 headers=headers,
61 )
64def decode_key(value):
65 return b64decode_unpadded(value.encode())
68def parse_subscription_info(subscription_info):
69 endpoint = subscription_info["endpoint"]
70 auth_key = decode_key(subscription_info["keys"]["auth"])
71 receiver_key = decode_key(subscription_info["keys"]["p256dh"])
72 return endpoint, auth_key, receiver_key