Coverage for app/backend/src/couchers/utils.py: 93%

196 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import http.cookies 

2import re 

3import typing 

4from collections.abc import Mapping, Sequence 

5from datetime import UTC, date, datetime, timedelta, tzinfo 

6from email.utils import formatdate 

7from typing import TYPE_CHECKING, Any, overload 

8from zoneinfo import ZoneInfo 

9 

10import regex 

11from geoalchemy2 import WKBElement, WKTElement 

12from geoalchemy2.shape import from_shape, to_shape 

13from google.protobuf.duration_pb2 import Duration 

14from google.protobuf.timestamp_pb2 import Timestamp 

15from shapely.geometry import Point, Polygon, shape 

16from sqlalchemy import Function, cast 

17from sqlalchemy.orm import Mapped 

18from sqlalchemy.sql import func 

19from sqlalchemy.types import DateTime 

20 

21from couchers.config import config 

22from couchers.constants import ( 

23 EMAIL_REGEX, 

24 PREFERRED_LANGUAGE_COOKIE_EXPIRY, 

25 VALID_NAME_MAX_LENGTH, 

26 VALID_NAME_MIN_LENGTH, 

27 VALID_NAME_REGEX, 

28) 

29from couchers.crypto import ( 

30 create_sofa_id, 

31 decode_sofa, 

32 decrypt_page_token, 

33 encode_sofa, 

34 encrypt_page_token, 

35) 

36from couchers.proto.internal import internal_pb2 

37 

38_VALID_NAME_PATTERN = regex.compile(VALID_NAME_REGEX) 

39 

40if TYPE_CHECKING: 

41 from couchers.models import Geom 

42 

43 

44# When a user logs in, they can basically input one of three things: user id, username, or email 

45# These are three non-intersecting sets 

46# * user_ids are numeric representations in base 10 

47# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, 

48# and don't start or end with underscore 

49# * emails are just whatever stack overflow says emails are ;) 

50 

51 

52def is_valid_user_id(field: str) -> bool: 

53 """ 

54 Checks if it's a string representing a base 10 integer not starting with 0 

55 """ 

56 return re.match(r"[1-9][0-9]*$", field) is not None 

57 

58 

59def is_valid_username(field: str) -> bool: 

60 """ 

61 Checks if it's an alphanumeric + underscore, lowercase string, at least 

62 two characters long, and starts with a letter, ends with alphanumeric 

63 """ 

64 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

65 

66 

67def is_valid_name(field: str) -> bool: 

68 """ 

69 Checks that the name satisfies the same rules as the web frontend: 

70 

71 * only letters (any Unicode letter), whitespace, apostrophes, and hyphens 

72 * no leading or trailing whitespace 

73 * 2-100 characters 

74 """ 

75 if len(field) > VALID_NAME_MAX_LENGTH or len(field) < VALID_NAME_MIN_LENGTH: 

76 return False 

77 

78 return _VALID_NAME_PATTERN.fullmatch(field) is not None 

79 

80 

81def is_valid_email(field: str) -> bool: 

82 return re.match(EMAIL_REGEX, field) is not None 

83 

84 

85def Timestamp_from_datetime(dt: datetime) -> Timestamp: 

86 if dt.tzinfo is None: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 raise ValueError("Cannot convert a naive datetime to a timestamp.") 

88 

89 pb_ts = Timestamp() 

90 pb_ts.FromDatetime(dt) 

91 return pb_ts 

92 

93 

94def Duration_from_timedelta(dt: timedelta) -> Duration: 

95 pb_d = Duration() 

96 pb_d.FromTimedelta(dt) 

97 return pb_d 

98 

99 

100def parse_date(date_str: str) -> date | None: 

101 """ 

102 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

103 """ 

104 try: 

105 return date.fromisoformat(date_str) 

106 except ValueError: 

107 return None 

108 

109 

110def date_to_api(date_obj: date) -> str: 

111 return date_obj.isoformat() 

112 

113 

114def to_aware_datetime(ts: Timestamp) -> datetime: 

115 """ 

116 Turns a protobuf Timestamp object into a timezone-aware datetime 

117 """ 

118 return ts.ToDatetime(tzinfo=UTC) 

119 

120 

121def to_timezone(value: Timestamp | datetime, timezone: tzinfo) -> datetime: 

122 """Returns an instant in time as a datetime in a given timezone.""" 

123 if isinstance(value, Timestamp): 

124 return value.ToDatetime(timezone) 

125 

126 if value.tzinfo is None: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was never true

127 # A naive datetime does not represent a point in time. 

128 raise ValueError("Cannot convert a naive datetime to a timezone.") 

129 

130 return value.astimezone(timezone) 

131 

132 

133def now() -> datetime: 

134 return datetime.now(tz=UTC) 

135 

136 

137def minimum_allowed_birthdate() -> date: 

138 """ 

139 Most recent birthdate allowed to register (must be 18 years minimum) 

140 

141 This approximation works on leap days! 

142 """ 

143 return today() - timedelta(days=365.25 * 18) 

144 

145 

146def today() -> date: 

147 """ 

148 Date only in UTC 

149 """ 

150 return now().date() 

151 

152 

153def now_in_timezone(tz: str) -> datetime: 

154 """ 

155 tz should be tzdata identifier, e.g. America/New_York 

156 """ 

157 return datetime.now(ZoneInfo(tz)) 

158 

159 

160def today_in_timezone(tz: str) -> date: 

161 """ 

162 tz should be tzdata identifier, e.g. America/New_York 

163 """ 

164 return now_in_timezone(tz).date() 

165 

166 

167# Note: be very careful with ordering of lat/lng! 

168# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

169# When entering as EPSG4326, we also need it in (lng, lat) 

170 

171 

172def wrap_coordinate(lat: float, lng: float) -> tuple[float, float]: 

173 """ 

174 Wraps (lat, lng) point in the EPSG4326 format 

175 """ 

176 

177 def __wrap_gen(deg: float, ct: float, adj: float) -> float: 

178 if deg > ct: 

179 deg -= adj 

180 if deg < -ct: 

181 deg += adj 

182 return deg 

183 

184 def __wrap_flip(deg: float, ct: float, adj: float) -> float: 

185 if deg > ct: 

186 deg = -deg + adj 

187 if deg < -ct: 

188 deg = -deg - adj 

189 return deg 

190 

191 def __wrap_rem(deg: float, ct: float = 360) -> float: 

192 if deg > ct: 

193 deg = deg % ct 

194 if deg < -ct: 

195 deg = deg % -ct 

196 return deg 

197 

198 if lng < -180 or lng > 180 or lat < -90 or lat > 90: 

199 lng = __wrap_rem(lng) 

200 lat = __wrap_rem(lat) 

201 lng = __wrap_gen(lng, 180, 360) 

202 lat = __wrap_flip(lat, 180, 180) 

203 lat = __wrap_flip(lat, 90, 180) 

204 if lng == -180: 

205 lng = 180 

206 if lng == -360: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 lng = 0 

208 

209 return lat, lng 

210 

211 

212def create_coordinate(lat: float, lng: float) -> WKBElement: 

213 """ 

214 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

215 """ 

216 lat, lng = wrap_coordinate(lat, lng) 

217 return from_shape(Point(lng, lat), srid=4326) 

218 

219 

220def create_polygon_lat_lng(points: list[list[float]]) -> WKBElement: 

221 """ 

222 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

223 """ 

224 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

225 

226 

227def create_polygon_lng_lat(points: list[list[float]]) -> WKBElement: 

228 """ 

229 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

230 """ 

231 return from_shape(Polygon(points), srid=4326) 

232 

233 

234def geojson_to_geom(geojson: dict[str, Any]) -> WKBElement: 

235 """ 

236 Turns GeoJSON to PostGIS geom data in EPSG4326 

237 """ 

238 return from_shape(shape(geojson), srid=4326) 

239 

240 

241def to_multi(polygon: WKBElement) -> Function[Any]: 

242 return func.ST_Multi(polygon) 

243 

244 

245@overload 

246def get_coordinates(geom: WKBElement | WKTElement) -> tuple[float, float]: ... 

247@overload 

248def get_coordinates(geom: None) -> None: ... 

249 

250 

251def get_coordinates(geom: WKBElement | WKTElement | None) -> tuple[float, float] | None: 

252 """ 

253 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy 

254 """ 

255 if geom: 

256 shp = to_shape(geom) 

257 # note the funniness with 4326 normally being (x, y) = (lng, lat) 

258 return shp.y, shp.x 

259 else: 

260 return None 

261 

262 

263def http_date(dt: datetime | None = None) -> str: 

264 """ 

265 Format the datetime for HTTP cookies 

266 """ 

267 if not dt: 

268 dt = now() 

269 return formatdate(dt.timestamp(), usegmt=True) 

270 

271 

272def _create_tasty_cookie(name: str, value: Any, expiry: datetime, httponly: bool) -> str: 

273 cookie: http.cookies.Morsel[str] = http.cookies.Morsel() 

274 cookie.set(name, str(value), str(value)) 

275 # tell the browser when to stop sending the cookie 

276 cookie["expires"] = http_date(expiry) 

277 # restrict to our domain, note if there's no domain, it won't include subdomains 

278 cookie["domain"] = config.COOKIE_DOMAIN 

279 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

280 cookie["path"] = "/" 

281 if config.DEV: 281 ↛ 286line 281 didn't jump to line 286 because the condition on line 281 was always true

282 # send only on requests from first-party domains 

283 cookie["samesite"] = "Strict" 

284 else: 

285 # send on all requests, requires Secure 

286 cookie["samesite"] = "None" 

287 # only set cookie on HTTPS sites in production 

288 cookie["secure"] = True 

289 # not accessible from javascript 

290 cookie["httponly"] = httponly 

291 

292 return cookie.OutputString() 

293 

294 

295def create_session_cookies(token: str, user_id: str | int, expiry: datetime) -> list[str]: 

296 """ 

297 Creates our session cookies. 

298 

299 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out 

300 """ 

301 return [ 

302 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True), 

303 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False), 

304 ] 

305 

306 

307def create_lang_cookie(lang: str) -> list[str]: 

308 return [ 

309 _create_tasty_cookie("NEXT_LOCALE", lang, expiry=(now() + PREFERRED_LANGUAGE_COOKIE_EXPIRY), httponly=False) 

310 ] 

311 

312 

313def _parse_cookie(headers: Mapping[str, str | bytes], cookie_name: str) -> str | None: 

314 """ 

315 Helper to parse a cookie value from headers by name, returning None if not found. 

316 """ 

317 if "cookie" not in headers: 

318 return None 

319 

320 cookie_str = typing.cast(str, headers["cookie"]) 

321 cookie = http.cookies.SimpleCookie(cookie_str).get(cookie_name) 

322 

323 if not cookie: 

324 return None 

325 

326 return cookie.value 

327 

328 

329def parse_session_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

330 """ 

331 Returns our session cookie value (aka token) or None 

332 """ 

333 return _parse_cookie(headers, "couchers-sesh") 

334 

335 

336def parse_user_id_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

337 """ 

338 Returns our user id cookie value or None 

339 """ 

340 return _parse_cookie(headers, "couchers-user-id") 

341 

342 

343def parse_ui_lang_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

344 """ 

345 Returns language cookie or None 

346 """ 

347 return _parse_cookie(headers, "NEXT_LOCALE") 

348 

349 

350def parse_api_key(headers: Mapping[str, str | bytes]) -> str | None: 

351 """ 

352 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

353 """ 

354 if "authorization" not in headers: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true

355 return None 

356 

357 authorization = headers["authorization"] 

358 if isinstance(authorization, bytes): 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 authorization = authorization.decode("utf-8") 

360 

361 if not authorization.startswith("Bearer "): 

362 return None 

363 

364 return authorization[7:] 

365 

366 

367def parse_sofa_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

368 cookie_value = _parse_cookie(headers, "sofa") 

369 if not cookie_value: 

370 return None 

371 

372 try: 

373 decode_sofa(cookie_value) 

374 return cookie_value 

375 except Exception: 

376 return None 

377 

378 

379def generate_sofa_cookie() -> tuple[str, str]: 

380 sofa_value = encode_sofa( 

381 create_sofa_id(), 

382 internal_pb2.SofaPayload( 

383 version=1, 

384 created=Timestamp_from_datetime(now()), 

385 ), 

386 ) 

387 return sofa_value, _create_tasty_cookie("sofa", sofa_value, now() + timedelta(days=10000), httponly=True) 

388 

389 

390def remove_duplicates_retain_order[T](list_: Sequence[T]) -> list[T]: 

391 out = [] 

392 for item in list_: 

393 if item not in out: 

394 out.append(item) 

395 return out 

396 

397 

398def date_in_timezone(date_: Mapped[date | None], timezone: str) -> Function[Any]: 

399 """ 

400 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

401 start of that date in that timezone. E.g., if postgres is in 'America/New_York', 

402 

403 SET SESSION TIME ZONE 'America/New_York'; 

404 

405 CREATE TABLE tz_trouble (to_date date, timezone text); 

406 

407 INSERT INTO tz_trouble(to_date, timezone) VALUES 

408 ('2021-03-10'::date, 'Australia/Sydney'), 

409 ('2021-03-20'::date, 'Europe/Berlin'), 

410 ('2021-04-15'::date, 'America/New_York'); 

411 

412 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

413 

414 The result is: 

415 

416 timezone 

417 ------------------------ 

418 2021-03-09 08:00:00-05 

419 2021-03-19 19:00:00-04 

420 2021-04-15 00:00:00-04 

421 """ 

422 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

423 

424 

425def millis_from_dt(dt: datetime) -> int: 

426 return round(1000 * dt.timestamp()) 

427 

428 

429def dt_from_millis(millis: int) -> datetime: 

430 return datetime.fromtimestamp(millis / 1000, tz=UTC) 

431 

432 

433def dt_to_page_token(dt: datetime) -> str: 

434 """ 

435 Python has datetime resolution equal to 1 micro, as does postgres 

436 

437 We pray to deities that this never changes 

438 """ 

439 assert datetime.resolution == timedelta(microseconds=1) 

440 return encrypt_page_token(str(round(1_000_000 * dt.timestamp()))) 

441 

442 

443def dt_from_page_token(page_token: str) -> datetime: 

444 # see above comment 

445 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=UTC) 

446 

447 

448def last_active_coarsen(dt: datetime) -> datetime: 

449 """ 

450 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

451 """ 

452 return dt.replace(minute=0, second=0, microsecond=0) 

453 

454 

455def not_none[T](x: T | None) -> T: 

456 if x is None: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 raise ValueError("Expected a value but got None") 

458 return x 

459 

460 

461def is_geom(x: Geom | None) -> Geom: 

462 """not_none does not work with unions.""" 

463 if x is None: 463 ↛ 464line 463 didn't jump to line 464 because the condition on line 463 was never true

464 raise ValueError("Expected a Geom but got None") 

465 return x