Coverage for app / backend / src / couchers / crypto.py: 95%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import functools 

2import secrets 

3import string 

4from base64 import urlsafe_b64decode, urlsafe_b64encode 

5 

6import nacl.pwhash 

7import nacl.utils 

8from google.protobuf.message import Message 

9from nacl.bindings import crypto_aead 

10from nacl.bindings.crypto_generichash import generichash_blake2b_salt_personal 

11from nacl.bindings.utils import sodium_memcmp 

12from nacl.exceptions import InvalidkeyError 

13from nacl.public import PrivateKey, PublicKey, SealedBox 

14from nacl.utils import random as random_bytes 

15 

16from couchers.config import config 

17from couchers.proto.internal import internal_pb2 

18 

19 

20def b64encode(data: bytes) -> str: 

21 return urlsafe_b64encode(data).decode("ascii") 

22 

23 

24def b64decode(data: str | bytes) -> bytes: 

25 return urlsafe_b64decode(data) 

26 

27 

28def b64encode_unpadded(data: bytes) -> str: 

29 return b64encode(data).replace("=", "") 

30 

31 

32def b64decode_unpadded(data: bytes) -> bytes: 

33 return b64decode(data + b"===="[len(data) % 4 :]) 

34 

35 

36def _urlsafe_random_b64(length: int = 32) -> str: 

37 return b64encode(random_bytes(length)) 

38 

39 

40def urlsafe_secure_token() -> str: 

41 """ 

42 A cryptographically secure random token that can be put in a URL 

43 """ 

44 return _urlsafe_random_b64(32) 

45 

46 

47def cookiesafe_secure_token() -> str: 

48 return random_hex(32) 

49 

50 

51def hash_password(password: str) -> bytes: 

52 return nacl.pwhash.str(password.encode("utf-8")) 

53 

54 

55def verify_password(hashed: bytes, password: str) -> bool: 

56 try: 

57 correct = nacl.pwhash.verify(hashed, password.encode("utf-8")) 

58 return correct 

59 except InvalidkeyError: 

60 return False 

61 

62 

63def random_hex(length: int = 32) -> str: 

64 """ 

65 Length in binary 

66 """ 

67 return random_bytes(length).hex() 

68 

69 

70def secure_compare(val1: bytes, val2: bytes) -> bool: 

71 return sodium_memcmp(val1, val2) 

72 

73 

74def generate_hash_signature(message: bytes, key: bytes, digest_size: int = 32) -> bytes: 

75 """ 

76 Computes a blake2b keyed hash for the message. 

77 

78 This can be used as a fast yet secure symmetric signature: by checking that 

79 the hashes agree, we can make sure the signature was generated by a party 

80 with knowledge of the key. 

81 """ 

82 return generichash_blake2b_salt_personal(message, key=key, digest_size=digest_size) 

83 

84 

85def simple_hash_signature(message: bytes | str, key_name: str) -> str: 

86 if isinstance(message, str): 86 ↛ 89line 86 didn't jump to line 89 because the condition on line 86 was always true

87 msg_bytes = message.encode("utf8") 

88 else: 

89 msg_bytes = message 

90 return b64encode(generate_hash_signature(message=msg_bytes, key=get_secret(key_name))) 

91 

92 

93def verify_hash_signature(message: bytes, key: bytes, sig: bytes, digest_size: int = 32) -> bool: 

94 """ 

95 Verifies a hash signature generated with generate_hash_signature. 

96 

97 Returns true if the signature matches, otherwise false. 

98 """ 

99 return secure_compare(sig, generate_hash_signature(message, key, digest_size)) 

100 

101 

102def generate_random_5digit_string() -> str: 

103 """Return a random 5-digit string""" 

104 return f"{secrets.randbelow(100000):05d}" 

105 

106 

107def verify_token(a: str, b: str) -> bool: 

108 """Return True if strings a and b are equal, in such a way as to 

109 reduce the risk of timing attacks. 

110 """ 

111 return secrets.compare_digest(a, b) 

112 

113 

114def stable_secure_uniform(key: bytes, seed: bytes) -> float: 

115 random_bytes_val = generate_hash_signature(message=seed, key=key) 

116 assert len(random_bytes_val) > 7 

117 # taken from cpython 

118 rr = random_bytes_val[:7] 

119 # Number of bits in a float 

120 BPF = 53 

121 RECIP_BPF: float = 2**-BPF 

122 return (int.from_bytes(rr) >> 3) * RECIP_BPF 

123 

124 

125@functools.lru_cache 

126def get_secret(name: str) -> bytes: 

127 """ 

128 Derives a secret key from the root secret using a key derivation function 

129 """ 

130 return generate_hash_signature(name.encode("utf8"), config["SECRET"]) 

131 

132 

133UNSUBSCRIBE_KEY_NAME = "unsubscribe" 

134EMAIL_SOURCE_DATA_KEY_NAME = "email-source-data" 

135PAGE_TOKEN_KEY_NAME = "pagination" 

136SOFA_KEY_NAME = "sofa_cookie" 

137USER_LOCATION_RANDOMIZATION_NAME = "user-location-randomization-v1" 

138 

139 

140# AEAD: Authenticated Encryption with Associated Data 

141 

142_aead_key_len = crypto_aead.crypto_aead_xchacha20poly1305_ietf_KEYBYTES 

143_aead_nonce_len = crypto_aead.crypto_aead_xchacha20poly1305_ietf_NPUBBYTES 

144 

145 

146def aead_generate_nonce() -> bytes: 

147 return random_bytes(_aead_nonce_len) 

148 

149 

150def aead_generate_key() -> bytes: 

151 return random_bytes(_aead_key_len) 

152 

153 

154def aead_encrypt( 

155 key: bytes, 

156 secret_data: bytes, 

157 plaintext_data: bytes = b"", 

158 nonce: bytes | None = None, 

159) -> tuple[bytes, bytes]: 

160 if not nonce: 160 ↛ 162line 160 didn't jump to line 162 because the condition on line 160 was always true

161 nonce = aead_generate_nonce() 

162 encrypted = crypto_aead.crypto_aead_xchacha20poly1305_ietf_encrypt(secret_data, plaintext_data, nonce, key) 

163 return nonce, encrypted 

164 

165 

166def aead_decrypt(key: bytes, nonce: bytes, encrypted_secret_data: bytes, plaintext_data: bytes = b"") -> bytes: 

167 return crypto_aead.crypto_aead_xchacha20poly1305_ietf_decrypt(encrypted_secret_data, plaintext_data, nonce, key) 

168 

169 

170def simple_encrypt(key_name: str, data: bytes) -> bytes: 

171 key = get_secret(key_name) 

172 nonce, encrypted = aead_encrypt(key, data) 

173 return nonce + encrypted 

174 

175 

176def simple_decrypt(key_name: str, data: bytes) -> bytes: 

177 key = get_secret(key_name) 

178 nonce, encrypted = data[:_aead_nonce_len], data[_aead_nonce_len:] 

179 return aead_decrypt(key, nonce, encrypted) 

180 

181 

182def encrypt_proto(key_name: str, message: Message) -> str: 

183 """ 

184 Encrypts a protobuf message and returns a base64-encoded string. 

185 """ 

186 return b64encode(simple_encrypt(key_name, message.SerializeToString())) 

187 

188 

189def decrypt_proto[T: Message](key_name: str, encrypted_value: str, proto_class: type[T]) -> T: 

190 """ 

191 Decrypts a base64-encoded string and parses it as the given proto class. 

192 Raises an exception if decryption or parsing fails. 

193 """ 

194 return proto_class.FromString(simple_decrypt(key_name, b64decode(encrypted_value))) 

195 

196 

197_sofa_sig_len = 16 

198_sofa_id_len = 18 

199 

200 

201def create_sofa_id() -> bytes: 

202 return random_bytes(_sofa_id_len) 

203 

204 

205def encode_sofa(sofa_id: bytes, payload: internal_pb2.SofaPayload) -> str: 

206 serialized = payload.SerializeToString() 

207 sig = generate_hash_signature(sofa_id + serialized, get_secret(SOFA_KEY_NAME), digest_size=_sofa_sig_len) 

208 return b64encode(sofa_id + serialized + sig) 

209 

210 

211def decode_sofa(signed_value: str) -> tuple[bytes, internal_pb2.SofaPayload]: 

212 data = b64decode(signed_value) 

213 if len(data) < _sofa_id_len + _sofa_sig_len: 

214 raise ValueError("Invalid signed data: too short") 

215 if not verify_hash_signature( 

216 data[:-_sofa_sig_len], get_secret(SOFA_KEY_NAME), data[-_sofa_sig_len:], digest_size=_sofa_sig_len 

217 ): 

218 raise ValueError("Invalid signature") 

219 return data[:_sofa_id_len], internal_pb2.SofaPayload.FromString(data[_sofa_id_len:-_sofa_sig_len]) 

220 

221 

222def encrypt_page_token(plaintext_page_token: str) -> str: 

223 return b64encode(simple_encrypt(PAGE_TOKEN_KEY_NAME, plaintext_page_token.encode("utf8"))) 

224 

225 

226def decrypt_page_token(encrypted_page_token: str) -> str: 

227 return simple_decrypt(PAGE_TOKEN_KEY_NAME, b64decode(encrypted_page_token)).decode("utf8") 

228 

229 

230# Public key cryptography 

231 

232 

233def asym_encrypt(public_key: bytes, data: bytes) -> bytes: 

234 return SealedBox(PublicKey(public_key)).encrypt(data) 

235 

236 

237def asym_decrypt(private_key: bytes, encrypted_data: bytes) -> bytes: 

238 return SealedBox(PrivateKey(private_key)).decrypt(encrypted_data) 

239 

240 

241def generate_asym_keypair() -> tuple[bytes, bytes]: 

242 skey = PrivateKey.generate() 

243 return skey.encode(), skey.public_key.encode() 

244 

245 

246def generate_invite_code(length: int = 8) -> str: 

247 """ 

248 Generates a secure, URL-safe invite code of the given length. 

249 """ 

250 alphabet = string.ascii_lowercase + string.digits 

251 return "".join(secrets.choice(alphabet) for _ in range(length))