Coverage for src/couchers/notifications/push_api.py: 54%

35 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-01-08 04:37 +0000

1import logging 

2from time import time 

3from urllib.parse import urlparse 

4 

5import http_ece 

6import requests 

7from cryptography.hazmat.primitives import serialization 

8from cryptography.hazmat.primitives.asymmetric import ec 

9from py_vapid import Vapid 

10 

11from couchers.crypto import b64decode_unpadded, b64encode_unpadded 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16def gen_vapid_keys(): 

17 prv_key = ec.generate_private_key(ec.SECP256R1()) 

18 pub_key = prv_key.public_key() 

19 prv = prv_key.private_numbers().private_value.to_bytes(length=32) 

20 pub = pub_key.public_bytes(serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint) 

21 return b64encode_unpadded(prv), b64encode_unpadded(pub) 

22 

23 

24def get_vapid_public_key_from_private_key(private): 

25 pub = Vapid.from_string(private).public_key 

26 return b64encode_unpadded( 

27 pub.public_bytes(serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint) 

28 ) 

29 

30 

31def generate_vapid_authorization(endpoint, vapid_sub, vapid_private_key): 

32 url = urlparse(endpoint) 

33 vapid_claim = { 

34 "sub": vapid_sub, 

35 "aud": f"{url.scheme}://{url.netloc}", 

36 "exp": int(time()) + (12 * 60 * 60), 

37 } 

38 return Vapid.from_string(private_key=vapid_private_key).sign(vapid_claim)["Authorization"] 

39 

40 

41def send_push(data, endpoint, auth_key, receiver_key, vapid_sub, vapid_private_key, ttl=0): 

42 logger.debug(f"Sending {len(data)} bytes to {endpoint[:20]}...") 

43 headers = { 

44 "authorization": generate_vapid_authorization(endpoint, vapid_sub, vapid_private_key), 

45 "content-encoding": "aes128gcm", 

46 "ttl": str(ttl), 

47 } 

48 

49 encrypted = http_ece.encrypt( 

50 data, 

51 private_key=ec.generate_private_key(ec.SECP256R1()), 

52 auth_secret=auth_key, 

53 dh=receiver_key, 

54 ) 

55 

56 return requests.post( 

57 endpoint, 

58 timeout=20, 

59 data=encrypted, 

60 headers=headers, 

61 ) 

62 

63 

64def decode_key(value): 

65 return b64decode_unpadded(value.encode()) 

66 

67 

68def parse_subscription_info(subscription_info): 

69 endpoint = subscription_info["endpoint"] 

70 auth_key = decode_key(subscription_info["keys"]["auth"]) 

71 receiver_key = decode_key(subscription_info["keys"]["p256dh"]) 

72 return endpoint, auth_key, receiver_key