Coverage for src/couchers/crypto.py: 96%

101 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-17 01:15 +0000

1import functools 

2import secrets 

3import string 

4from base64 import urlsafe_b64decode, urlsafe_b64encode 

5 

6import nacl.pwhash 

7import nacl.utils 

8from nacl.bindings import crypto_aead 

9from nacl.bindings.crypto_generichash import generichash_blake2b_salt_personal 

10from nacl.bindings.utils import sodium_memcmp 

11from nacl.exceptions import InvalidkeyError 

12from nacl.public import PrivateKey, PublicKey, SealedBox 

13from nacl.utils import random as random_bytes 

14 

15from couchers.config import config 

16 

17 

18def b64encode(data: bytes) -> str: 

19 return urlsafe_b64encode(data).decode("ascii") 

20 

21 

22def b64decode(data: str | bytes) -> bytes: 

23 return urlsafe_b64decode(data) 

24 

25 

26def b64encode_unpadded(data: bytes) -> str: 

27 return b64encode(data).replace("=", "") 

28 

29 

30def b64decode_unpadded(data: bytes) -> bytes: 

31 return b64decode(data + b"===="[len(data) % 4 :]) 

32 

33 

34def _urlsafe_random_b64(length: int = 32) -> str: 

35 return b64encode(random_bytes(length)) 

36 

37 

38def urlsafe_secure_token() -> str: 

39 """ 

40 A cryptographically secure random token that can be put in a URL 

41 """ 

42 return _urlsafe_random_b64(32) 

43 

44 

45def cookiesafe_secure_token() -> str: 

46 return random_hex(32) 

47 

48 

49def hash_password(password: str) -> bytes: 

50 return nacl.pwhash.str(password.encode("utf-8")) 

51 

52 

53def verify_password(hashed: bytes, password: str) -> bool: 

54 try: 

55 correct = nacl.pwhash.verify(hashed, password.encode("utf-8")) 

56 return correct 

57 except InvalidkeyError: 

58 return False 

59 

60 

61def random_hex(length: int = 32) -> str: 

62 """ 

63 Length in binary 

64 """ 

65 return random_bytes(length).hex() 

66 

67 

68def secure_compare(val1: bytes, val2: bytes) -> bool: 

69 return sodium_memcmp(val1, val2) 

70 

71 

72def generate_hash_signature(message: bytes, key: bytes) -> bytes: 

73 """ 

74 Computes a blake2b keyed hash for the message. 

75 

76 This can be used as a fast yet secure symmetric signature: by checking that 

77 the hashes agree, we can make sure the signature was generated by a party 

78 with knowledge of the key. 

79 """ 

80 return generichash_blake2b_salt_personal(message, key=key, digest_size=32) 

81 

82 

83def simple_hash_signature(message: bytes | str, key_name: str) -> str: 

84 if isinstance(message, str): 

85 msg_bytes = message.encode("utf8") 

86 else: 

87 msg_bytes = message 

88 return b64encode(generate_hash_signature(message=msg_bytes, key=get_secret(key_name))) 

89 

90 

91def verify_hash_signature(message: bytes, key: bytes, sig: bytes) -> bool: 

92 """ 

93 Verifies a hash signature generated with generate_hash_signature. 

94 

95 Returns true if the signature matches, otherwise false. 

96 """ 

97 return secure_compare(sig, generate_hash_signature(message, key)) 

98 

99 

100def generate_random_5digit_string() -> str: 

101 """Return a random 5-digit string""" 

102 return f"{secrets.randbelow(100000):05d}" 

103 

104 

105def verify_token(a: str, b: str) -> bool: 

106 """Return True if strings a and b are equal, in such a way as to 

107 reduce the risk of timing attacks. 

108 """ 

109 return secrets.compare_digest(a, b) 

110 

111 

112def stable_secure_uniform(key: bytes, seed: bytes) -> float: 

113 random_bytes_val = generate_hash_signature(message=seed, key=key) 

114 assert len(random_bytes_val) > 7 

115 # taken from cpython 

116 rr = random_bytes_val[:7] 

117 # Number of bits in a float 

118 BPF = 53 

119 RECIP_BPF: float = 2**-BPF 

120 return (int.from_bytes(rr) >> 3) * RECIP_BPF 

121 

122 

123@functools.lru_cache 

124def get_secret(name: str) -> bytes: 

125 """ 

126 Derives a secret key from the root secret using a key derivation function 

127 """ 

128 return generate_hash_signature(name.encode("utf8"), config["SECRET"]) 

129 

130 

131UNSUBSCRIBE_KEY_NAME = "unsubscribe" 

132EMAIL_SOURCE_DATA_KEY_NAME = "email-source-data" 

133PAGE_TOKEN_KEY_NAME = "pagination" 

134USER_LOCATION_RANDOMIZATION_NAME = "user-location-randomization-v1" 

135 

136 

137# AEAD: Authenticated Encryption with Associated Data 

138 

139_aead_key_len = crypto_aead.crypto_aead_xchacha20poly1305_ietf_KEYBYTES 

140_aead_nonce_len = crypto_aead.crypto_aead_xchacha20poly1305_ietf_NPUBBYTES 

141 

142 

143def aead_generate_nonce() -> bytes: 

144 return random_bytes(_aead_nonce_len) 

145 

146 

147def aead_generate_key() -> bytes: 

148 return random_bytes(_aead_key_len) 

149 

150 

151def aead_encrypt( 

152 key: bytes, 

153 secret_data: bytes, 

154 plaintext_data: bytes = b"", 

155 nonce: bytes | None = None, 

156) -> tuple[bytes, bytes]: 

157 if not nonce: 

158 nonce = aead_generate_nonce() 

159 encrypted = crypto_aead.crypto_aead_xchacha20poly1305_ietf_encrypt(secret_data, plaintext_data, nonce, key) 

160 return nonce, encrypted 

161 

162 

163def aead_decrypt(key: bytes, nonce: bytes, encrypted_secret_data: bytes, plaintext_data: bytes = b"") -> bytes: 

164 return crypto_aead.crypto_aead_xchacha20poly1305_ietf_decrypt(encrypted_secret_data, plaintext_data, nonce, key) 

165 

166 

167def simple_encrypt(key_name: str, data: bytes) -> bytes: 

168 key = get_secret(key_name) 

169 nonce, encrypted = aead_encrypt(key, data) 

170 return nonce + encrypted 

171 

172 

173def simple_decrypt(key_name: str, data: bytes) -> bytes: 

174 key = get_secret(key_name) 

175 nonce, encrypted = data[:_aead_nonce_len], data[_aead_nonce_len:] 

176 return aead_decrypt(key, nonce, encrypted) 

177 

178 

179def encrypt_page_token(plaintext_page_token: str) -> str: 

180 return b64encode(simple_encrypt(PAGE_TOKEN_KEY_NAME, plaintext_page_token.encode("utf8"))) 

181 

182 

183def decrypt_page_token(encrypted_page_token: str) -> str: 

184 return simple_decrypt(PAGE_TOKEN_KEY_NAME, b64decode(encrypted_page_token)).decode("utf8") 

185 

186 

187# Public key cryptography 

188 

189 

190def asym_encrypt(public_key: bytes, data: bytes) -> bytes: 

191 return SealedBox(PublicKey(public_key)).encrypt(data) 

192 

193 

194def asym_decrypt(private_key: bytes, encrypted_data: bytes) -> bytes: 

195 return SealedBox(PrivateKey(private_key)).decrypt(encrypted_data) 

196 

197 

198def generate_asym_keypair() -> tuple[bytes, bytes]: 

199 skey = PrivateKey.generate() 

200 return skey.encode(), skey.public_key.encode() 

201 

202 

203def generate_invite_code(length: int = 8) -> str: 

204 """ 

205 Generates a secure, URL-safe invite code of the given length. 

206 """ 

207 alphabet = string.ascii_lowercase + string.digits 

208 return "".join(secrets.choice(alphabet) for _ in range(length))