Coverage for src/couchers/notifications/web_push_api.py: 57%

37 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-06 23:17 +0000

1import logging 

2from time import time 

3from typing import Any 

4from urllib.parse import urlparse 

5 

6import http_ece 

7import requests 

8from cryptography.hazmat.primitives import serialization 

9from cryptography.hazmat.primitives.asymmetric import ec 

10from py_vapid import Vapid 

11 

12from couchers.crypto import b64decode_unpadded, b64encode_unpadded 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17def gen_vapid_keys() -> tuple[str, str]: 

18 prv_key = ec.generate_private_key(ec.SECP256R1()) 

19 pub_key = prv_key.public_key() 

20 prv = prv_key.private_numbers().private_value.to_bytes(length=32) 

21 pub = pub_key.public_bytes(serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint) 

22 return b64encode_unpadded(prv), b64encode_unpadded(pub) 

23 

24 

25def get_vapid_public_key_from_private_key(private: str) -> str: 

26 pub = Vapid.from_string(private).public_key 

27 result: str = b64encode_unpadded( 

28 pub.public_bytes(serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint) 

29 ) 

30 return result 

31 

32 

33def generate_vapid_authorization(endpoint: str, vapid_sub: str, vapid_private_key: str) -> str: 

34 url = urlparse(endpoint) 

35 vapid_claim = { 

36 "sub": vapid_sub, 

37 "aud": f"{url.scheme}://{url.netloc}", 

38 "exp": int(time()) + (12 * 60 * 60), 

39 } 

40 return Vapid.from_string(private_key=vapid_private_key).sign(vapid_claim)["Authorization"] # type: ignore[no-any-return] 

41 

42 

43def send_web_push( 

44 data: bytes, 

45 endpoint: str, 

46 auth_key: bytes, 

47 receiver_key: bytes, 

48 vapid_sub: str, 

49 vapid_private_key: str, 

50 ttl: int = 0, 

51) -> requests.Response: 

52 logger.debug(f"Sending {len(data)} bytes to {endpoint[:20]}...") 

53 headers = { 

54 "authorization": generate_vapid_authorization(endpoint, vapid_sub, vapid_private_key), 

55 "content-encoding": "aes128gcm", 

56 "ttl": str(ttl), 

57 } 

58 

59 encrypted = http_ece.encrypt( 

60 data, 

61 private_key=ec.generate_private_key(ec.SECP256R1()), 

62 auth_secret=auth_key, 

63 dh=receiver_key, 

64 ) 

65 

66 return requests.post( 

67 endpoint, 

68 timeout=20, 

69 data=encrypted, 

70 headers=headers, 

71 ) 

72 

73 

74def decode_key(value: str) -> bytes: 

75 return b64decode_unpadded(value.encode()) 

76 

77 

78def parse_subscription_info(subscription_info: dict[str, Any]) -> tuple[str, bytes, bytes]: 

79 endpoint = subscription_info["endpoint"] 

80 auth_key = decode_key(subscription_info["keys"]["auth"]) 

81 receiver_key = decode_key(subscription_info["keys"]["p256dh"]) 

82 return endpoint, auth_key, receiver_key