Coverage for app / backend / src / couchers / crypto.py: 95%
123 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import functools
2import secrets
3import string
4from base64 import urlsafe_b64decode, urlsafe_b64encode
6import nacl.pwhash
7import nacl.utils
8from google.protobuf.message import Message
9from nacl.bindings import crypto_aead
10from nacl.bindings.crypto_generichash import generichash_blake2b_salt_personal
11from nacl.bindings.utils import sodium_memcmp
12from nacl.exceptions import InvalidkeyError
13from nacl.public import PrivateKey, PublicKey, SealedBox
14from nacl.utils import random as random_bytes
16from couchers.config import config
17from couchers.proto.internal import internal_pb2
20def b64encode(data: bytes) -> str:
21 return urlsafe_b64encode(data).decode("ascii")
24def b64decode(data: str | bytes) -> bytes:
25 return urlsafe_b64decode(data)
28def b64encode_unpadded(data: bytes) -> str:
29 return b64encode(data).replace("=", "")
32def b64decode_unpadded(data: bytes) -> bytes:
33 return b64decode(data + b"===="[len(data) % 4 :])
36def _urlsafe_random_b64(length: int = 32) -> str:
37 return b64encode(random_bytes(length))
40def urlsafe_secure_token() -> str:
41 """
42 A cryptographically secure random token that can be put in a URL
43 """
44 return _urlsafe_random_b64(32)
47def cookiesafe_secure_token() -> str:
48 return random_hex(32)
51def hash_password(password: str) -> bytes:
52 return nacl.pwhash.str(password.encode("utf-8"))
55def verify_password(hashed: bytes, password: str) -> bool:
56 try:
57 correct = nacl.pwhash.verify(hashed, password.encode("utf-8"))
58 return correct
59 except InvalidkeyError:
60 return False
63def random_hex(length: int = 32) -> str:
64 """
65 Length in binary
66 """
67 return random_bytes(length).hex()
70def secure_compare(val1: bytes, val2: bytes) -> bool:
71 return sodium_memcmp(val1, val2)
74def generate_hash_signature(message: bytes, key: bytes, digest_size: int = 32) -> bytes:
75 """
76 Computes a blake2b keyed hash for the message.
78 This can be used as a fast yet secure symmetric signature: by checking that
79 the hashes agree, we can make sure the signature was generated by a party
80 with knowledge of the key.
81 """
82 return generichash_blake2b_salt_personal(message, key=key, digest_size=digest_size)
85def simple_hash_signature(message: bytes | str, key_name: str) -> str:
86 if isinstance(message, str): 86 ↛ 89line 86 didn't jump to line 89 because the condition on line 86 was always true
87 msg_bytes = message.encode("utf8")
88 else:
89 msg_bytes = message
90 return b64encode(generate_hash_signature(message=msg_bytes, key=get_secret(key_name)))
93def verify_hash_signature(message: bytes, key: bytes, sig: bytes, digest_size: int = 32) -> bool:
94 """
95 Verifies a hash signature generated with generate_hash_signature.
97 Returns true if the signature matches, otherwise false.
98 """
99 return secure_compare(sig, generate_hash_signature(message, key, digest_size))
102def generate_random_5digit_string() -> str:
103 """Return a random 5-digit string"""
104 return f"{secrets.randbelow(100000):05d}"
107def verify_token(a: str, b: str) -> bool:
108 """Return True if strings a and b are equal, in such a way as to
109 reduce the risk of timing attacks.
110 """
111 return secrets.compare_digest(a, b)
114def stable_secure_uniform(key: bytes, seed: bytes) -> float:
115 random_bytes_val = generate_hash_signature(message=seed, key=key)
116 assert len(random_bytes_val) > 7
117 # taken from cpython
118 rr = random_bytes_val[:7]
119 # Number of bits in a float
120 BPF = 53
121 RECIP_BPF: float = 2**-BPF
122 return (int.from_bytes(rr) >> 3) * RECIP_BPF
125@functools.lru_cache
126def get_secret(name: str) -> bytes:
127 """
128 Derives a secret key from the root secret using a key derivation function
129 """
130 return generate_hash_signature(name.encode("utf8"), config["SECRET"])
133UNSUBSCRIBE_KEY_NAME = "unsubscribe"
134EMAIL_SOURCE_DATA_KEY_NAME = "email-source-data"
135PAGE_TOKEN_KEY_NAME = "pagination"
136SOFA_KEY_NAME = "sofa_cookie"
137USER_LOCATION_RANDOMIZATION_NAME = "user-location-randomization-v1"
140# AEAD: Authenticated Encryption with Associated Data
142_aead_key_len = crypto_aead.crypto_aead_xchacha20poly1305_ietf_KEYBYTES
143_aead_nonce_len = crypto_aead.crypto_aead_xchacha20poly1305_ietf_NPUBBYTES
146def aead_generate_nonce() -> bytes:
147 return random_bytes(_aead_nonce_len)
150def aead_generate_key() -> bytes:
151 return random_bytes(_aead_key_len)
154def aead_encrypt(
155 key: bytes,
156 secret_data: bytes,
157 plaintext_data: bytes = b"",
158 nonce: bytes | None = None,
159) -> tuple[bytes, bytes]:
160 if not nonce: 160 ↛ 162line 160 didn't jump to line 162 because the condition on line 160 was always true
161 nonce = aead_generate_nonce()
162 encrypted = crypto_aead.crypto_aead_xchacha20poly1305_ietf_encrypt(secret_data, plaintext_data, nonce, key)
163 return nonce, encrypted
166def aead_decrypt(key: bytes, nonce: bytes, encrypted_secret_data: bytes, plaintext_data: bytes = b"") -> bytes:
167 return crypto_aead.crypto_aead_xchacha20poly1305_ietf_decrypt(encrypted_secret_data, plaintext_data, nonce, key)
170def simple_encrypt(key_name: str, data: bytes) -> bytes:
171 key = get_secret(key_name)
172 nonce, encrypted = aead_encrypt(key, data)
173 return nonce + encrypted
176def simple_decrypt(key_name: str, data: bytes) -> bytes:
177 key = get_secret(key_name)
178 nonce, encrypted = data[:_aead_nonce_len], data[_aead_nonce_len:]
179 return aead_decrypt(key, nonce, encrypted)
182def encrypt_proto(key_name: str, message: Message) -> str:
183 """
184 Encrypts a protobuf message and returns a base64-encoded string.
185 """
186 return b64encode(simple_encrypt(key_name, message.SerializeToString()))
189def decrypt_proto[T: Message](key_name: str, encrypted_value: str, proto_class: type[T]) -> T:
190 """
191 Decrypts a base64-encoded string and parses it as the given proto class.
192 Raises an exception if decryption or parsing fails.
193 """
194 return proto_class.FromString(simple_decrypt(key_name, b64decode(encrypted_value)))
197_sofa_sig_len = 16
198_sofa_id_len = 18
201def create_sofa_id() -> bytes:
202 return random_bytes(_sofa_id_len)
205def encode_sofa(sofa_id: bytes, payload: internal_pb2.SofaPayload) -> str:
206 serialized = payload.SerializeToString()
207 sig = generate_hash_signature(sofa_id + serialized, get_secret(SOFA_KEY_NAME), digest_size=_sofa_sig_len)
208 return b64encode(sofa_id + serialized + sig)
211def decode_sofa(signed_value: str) -> tuple[bytes, internal_pb2.SofaPayload]:
212 data = b64decode(signed_value)
213 if len(data) < _sofa_id_len + _sofa_sig_len:
214 raise ValueError("Invalid signed data: too short")
215 if not verify_hash_signature(
216 data[:-_sofa_sig_len], get_secret(SOFA_KEY_NAME), data[-_sofa_sig_len:], digest_size=_sofa_sig_len
217 ):
218 raise ValueError("Invalid signature")
219 return data[:_sofa_id_len], internal_pb2.SofaPayload.FromString(data[_sofa_id_len:-_sofa_sig_len])
222def encrypt_page_token(plaintext_page_token: str) -> str:
223 return b64encode(simple_encrypt(PAGE_TOKEN_KEY_NAME, plaintext_page_token.encode("utf8")))
226def decrypt_page_token(encrypted_page_token: str) -> str:
227 return simple_decrypt(PAGE_TOKEN_KEY_NAME, b64decode(encrypted_page_token)).decode("utf8")
230# Public key cryptography
233def asym_encrypt(public_key: bytes, data: bytes) -> bytes:
234 return SealedBox(PublicKey(public_key)).encrypt(data)
237def asym_decrypt(private_key: bytes, encrypted_data: bytes) -> bytes:
238 return SealedBox(PrivateKey(private_key)).decrypt(encrypted_data)
241def generate_asym_keypair() -> tuple[bytes, bytes]:
242 skey = PrivateKey.generate()
243 return skey.encode(), skey.public_key.encode()
246def generate_invite_code(length: int = 8) -> str:
247 """
248 Generates a secure, URL-safe invite code of the given length.
249 """
250 alphabet = string.ascii_lowercase + string.digits
251 return "".join(secrets.choice(alphabet) for _ in range(length))