Coverage for src/couchers/crypto.py: 96%

102 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1import functools 

2import secrets 

3import string 

4from base64 import urlsafe_b64decode, urlsafe_b64encode 

5from typing import Optional, Union 

6 

7import nacl.pwhash 

8import nacl.utils 

9from nacl.bindings import crypto_aead 

10from nacl.bindings.crypto_generichash import generichash_blake2b_salt_personal 

11from nacl.bindings.utils import sodium_memcmp 

12from nacl.exceptions import InvalidkeyError 

13from nacl.public import PrivateKey, PublicKey, SealedBox 

14from nacl.utils import random as random_bytes 

15 

16from couchers.config import config 

17 

18 

19def b64encode(data: bytes) -> str: 

20 return urlsafe_b64encode(data).decode("ascii") 

21 

22 

23def b64decode(data: str) -> bytes: 

24 return urlsafe_b64decode(data) 

25 

26 

27def b64encode_unpadded(data: str) -> bytes: 

28 return b64encode(data).replace("=", "") 

29 

30 

31def b64decode_unpadded(data: bytes) -> str: 

32 return b64decode(data + b"===="[len(data) % 4 :]) 

33 

34 

35def _urlsafe_random_b64(length=32) -> str: 

36 return b64encode(random_bytes(length)) 

37 

38 

39def urlsafe_secure_token(): 

40 """ 

41 A cryptographically secure random token that can be put in a URL 

42 """ 

43 return _urlsafe_random_b64(32) 

44 

45 

46def cookiesafe_secure_token(): 

47 return random_hex(32) 

48 

49 

50def hash_password(password: str): 

51 return nacl.pwhash.str(password.encode("utf-8")) 

52 

53 

54def verify_password(hashed: bytes, password: str): 

55 try: 

56 correct = nacl.pwhash.verify(hashed, password.encode("utf-8")) 

57 return correct 

58 except InvalidkeyError: 

59 return False 

60 

61 

62def random_hex(length=32): 

63 """ 

64 Length in binary 

65 """ 

66 return random_bytes(length).hex() 

67 

68 

69def secure_compare(val1, val2): 

70 return sodium_memcmp(val1, val2) 

71 

72 

73def generate_hash_signature(message: bytes, key: bytes) -> bytes: 

74 """ 

75 Computes a blake2b keyed hash for the message. 

76 

77 This can be used as a fast yet secure symmetric signature: by checking that 

78 the hashes agree, we can make sure the signature was generated by a party 

79 with knowledge of the key. 

80 """ 

81 return generichash_blake2b_salt_personal(message, key=key, digest_size=32) 

82 

83 

84def simple_hash_signature(message: Union[bytes, str], key_name: str) -> str: 

85 if isinstance(message, str): 

86 msg_bytes = message.encode("utf8") 

87 else: 

88 msg_bytes = message 

89 return b64encode(generate_hash_signature(message=msg_bytes, key=get_secret(key_name))) 

90 

91 

92def verify_hash_signature(message: bytes, key: bytes, sig: bytes) -> bool: 

93 """ 

94 Verifies a hash signature generated with generate_hash_signature. 

95 

96 Returns true if the signature matches, otherwise false. 

97 """ 

98 return secure_compare(sig, generate_hash_signature(message, key)) 

99 

100 

101def generate_random_5digit_string(): 

102 """Return a random 5-digit string""" 

103 return f"{secrets.randbelow(100000):05d}" 

104 

105 

106def verify_token(a: str, b: str): 

107 """Return True if strings a and b are equal, in such a way as to 

108 reduce the risk of timing attacks. 

109 """ 

110 return secrets.compare_digest(a, b) 

111 

112 

113def stable_secure_uniform(key: bytes, seed: bytes): 

114 random_bytes = generate_hash_signature(message=seed, key=key) 

115 assert len(random_bytes) > 7 

116 # taken from cpython 

117 rr = random_bytes[:7] 

118 # Number of bits in a float 

119 BPF = 53 

120 RECIP_BPF = 2**-BPF 

121 return (int.from_bytes(rr) >> 3) * RECIP_BPF 

122 

123 

124@functools.lru_cache 

125def get_secret(name: str): 

126 """ 

127 Derives a secret key from the root secret using a key derivation function 

128 """ 

129 return generate_hash_signature(name.encode("utf8"), config["SECRET"]) 

130 

131 

132UNSUBSCRIBE_KEY_NAME = "unsubscribe" 

133EMAIL_SOURCE_DATA_KEY_NAME = "email-source-data" 

134PAGE_TOKEN_KEY_NAME = "pagination" 

135USER_LOCATION_RANDOMIZATION_NAME = "user-location-randomization-v1" 

136 

137 

138# AEAD: Authenticated Encryption with Associated Data 

139 

140_aead_key_len = crypto_aead.crypto_aead_xchacha20poly1305_ietf_KEYBYTES 

141_aead_nonce_len = crypto_aead.crypto_aead_xchacha20poly1305_ietf_NPUBBYTES 

142 

143 

144def aead_generate_nonce(): 

145 return random_bytes(_aead_nonce_len) 

146 

147 

148def aead_generate_key(): 

149 return random_bytes(_aead_key_len) 

150 

151 

152def aead_encrypt(key: bytes, secret_data: bytes, plaintext_data: bytes = b"", nonce: Optional[bytes] = None) -> bytes: 

153 if not nonce: 

154 nonce = aead_generate_nonce() 

155 encrypted = crypto_aead.crypto_aead_xchacha20poly1305_ietf_encrypt(secret_data, plaintext_data, nonce, key) 

156 return nonce, encrypted 

157 

158 

159def aead_decrypt(key: bytes, nonce: bytes, encrypted_secret_data: bytes, plaintext_data: bytes = b"") -> bytes: 

160 return crypto_aead.crypto_aead_xchacha20poly1305_ietf_decrypt(encrypted_secret_data, plaintext_data, nonce, key) 

161 

162 

163def simple_encrypt(key_name: str, data: bytes) -> bytes: 

164 key = get_secret(key_name) 

165 nonce, data = aead_encrypt(key, data) 

166 return nonce + data 

167 

168 

169def simple_decrypt(key_name: str, data: bytes) -> bytes: 

170 key = get_secret(key_name) 

171 nonce, data = data[:_aead_nonce_len], data[_aead_nonce_len:] 

172 return aead_decrypt(key, nonce, data) 

173 

174 

175def encrypt_page_token(plaintext_page_token: str): 

176 return b64encode(simple_encrypt(PAGE_TOKEN_KEY_NAME, plaintext_page_token.encode("utf8"))) 

177 

178 

179def decrypt_page_token(encrypted_page_token: str): 

180 return simple_decrypt(PAGE_TOKEN_KEY_NAME, b64decode(encrypted_page_token)).decode("utf8") 

181 

182 

183# Public key cryptography 

184 

185 

186def asym_encrypt(public_key: bytes, data: bytes) -> bytes: 

187 return SealedBox(PublicKey(public_key)).encrypt(data) 

188 

189 

190def asym_decrypt(private_key: bytes, encrypted_data: bytes) -> bytes: 

191 return SealedBox(PrivateKey(private_key)).decrypt(encrypted_data) 

192 

193 

194def generate_asym_keypair(): 

195 skey = PrivateKey.generate() 

196 return skey.encode(), skey.public_key.encode() 

197 

198 

199def generate_invite_code(length=8): 

200 """ 

201 Generates a secure, URL-safe invite code of the given length. 

202 """ 

203 alphabet = string.ascii_lowercase + string.digits 

204 return "".join(secrets.choice(alphabet) for _ in range(length))