Coverage for app / backend / src / couchers / utils.py: 94%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import http.cookies 

2import re 

3import typing 

4from collections.abc import Mapping, Sequence 

5from datetime import UTC, date, datetime, timedelta 

6from email.utils import formatdate 

7from typing import TYPE_CHECKING, Any, overload 

8from zoneinfo import ZoneInfo 

9 

10from geoalchemy2 import WKBElement, WKTElement 

11from geoalchemy2.shape import from_shape, to_shape 

12from google.protobuf.duration_pb2 import Duration 

13from google.protobuf.timestamp_pb2 import Timestamp 

14from shapely.geometry import Point, Polygon, shape 

15from sqlalchemy import Function, cast 

16from sqlalchemy.orm import Mapped 

17from sqlalchemy.sql import func 

18from sqlalchemy.types import DateTime 

19 

20from couchers.config import config 

21from couchers.constants import EMAIL_REGEX, PREFERRED_LANGUAGE_COOKIE_EXPIRY 

22from couchers.crypto import ( 

23 create_sofa_id, 

24 decode_sofa, 

25 decrypt_page_token, 

26 encode_sofa, 

27 encrypt_page_token, 

28) 

29from couchers.proto.internal import internal_pb2 

30 

31if TYPE_CHECKING: 

32 from couchers.models import Geom 

33 

34 

35# When a user logs in, they can basically input one of three things: user id, username, or email 

36# These are three non-intersecting sets 

37# * user_ids are numeric representations in base 10 

38# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, 

39# and don't start or end with underscore 

40# * emails are just whatever stack overflow says emails are ;) 

41 

42 

43def is_valid_user_id(field: str) -> bool: 

44 """ 

45 Checks if it's a string representing a base 10 integer not starting with 0 

46 """ 

47 return re.match(r"[1-9][0-9]*$", field) is not None 

48 

49 

50def is_valid_username(field: str) -> bool: 

51 """ 

52 Checks if it's an alphanumeric + underscore, lowercase string, at least 

53 two characters long, and starts with a letter, ends with alphanumeric 

54 """ 

55 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

56 

57 

58def is_valid_name(field: str) -> bool: 

59 """ 

60 Checks if it has at least one non-whitespace character 

61 """ 

62 return re.match(r"\S+", field) is not None 

63 

64 

65def is_valid_email(field: str) -> bool: 

66 return re.match(EMAIL_REGEX, field) is not None 

67 

68 

69def Timestamp_from_datetime(dt: datetime) -> Timestamp: 

70 pb_ts = Timestamp() 

71 pb_ts.FromDatetime(dt) 

72 return pb_ts 

73 

74 

75def Duration_from_timedelta(dt: timedelta) -> Duration: 

76 pb_d = Duration() 

77 pb_d.FromTimedelta(dt) 

78 return pb_d 

79 

80 

81def parse_date(date_str: str) -> date | None: 

82 """ 

83 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

84 """ 

85 try: 

86 return date.fromisoformat(date_str) 

87 except ValueError: 

88 return None 

89 

90 

91def date_to_api(date_obj: date) -> str: 

92 return date_obj.isoformat() 

93 

94 

95def to_aware_datetime(ts: Timestamp) -> datetime: 

96 """ 

97 Turns a protobuf Timestamp object into a timezone-aware datetime 

98 """ 

99 return ts.ToDatetime(tzinfo=UTC) 

100 

101 

102def now() -> datetime: 

103 return datetime.now(tz=UTC) 

104 

105 

106def minimum_allowed_birthdate() -> date: 

107 """ 

108 Most recent birthdate allowed to register (must be 18 years minimum) 

109 

110 This approximation works on leap days! 

111 """ 

112 return today() - timedelta(days=365.25 * 18) 

113 

114 

115def today() -> date: 

116 """ 

117 Date only in UTC 

118 """ 

119 return now().date() 

120 

121 

122def now_in_timezone(tz: str) -> datetime: 

123 """ 

124 tz should be tzdata identifier, e.g. America/New_York 

125 """ 

126 return datetime.now(ZoneInfo(tz)) 

127 

128 

129def today_in_timezone(tz: str) -> date: 

130 """ 

131 tz should be tzdata identifier, e.g. America/New_York 

132 """ 

133 return now_in_timezone(tz).date() 

134 

135 

136# Note: be very careful with ordering of lat/lng! 

137# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

138# When entering as EPSG4326, we also need it in (lng, lat) 

139 

140 

141def wrap_coordinate(lat: float, lng: float) -> tuple[float, float]: 

142 """ 

143 Wraps (lat, lng) point in the EPSG4326 format 

144 """ 

145 

146 def __wrap_gen(deg: float, ct: float, adj: float) -> float: 

147 if deg > ct: 

148 deg -= adj 

149 if deg < -ct: 

150 deg += adj 

151 return deg 

152 

153 def __wrap_flip(deg: float, ct: float, adj: float) -> float: 

154 if deg > ct: 

155 deg = -deg + adj 

156 if deg < -ct: 

157 deg = -deg - adj 

158 return deg 

159 

160 def __wrap_rem(deg: float, ct: float = 360) -> float: 

161 if deg > ct: 

162 deg = deg % ct 

163 if deg < -ct: 

164 deg = deg % -ct 

165 return deg 

166 

167 if lng < -180 or lng > 180 or lat < -90 or lat > 90: 

168 lng = __wrap_rem(lng) 

169 lat = __wrap_rem(lat) 

170 lng = __wrap_gen(lng, 180, 360) 

171 lat = __wrap_flip(lat, 180, 180) 

172 lat = __wrap_flip(lat, 90, 180) 

173 if lng == -180: 

174 lng = 180 

175 if lng == -360: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 lng = 0 

177 

178 return lat, lng 

179 

180 

181def create_coordinate(lat: float, lng: float) -> WKBElement: 

182 """ 

183 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

184 """ 

185 lat, lng = wrap_coordinate(lat, lng) 

186 return from_shape(Point(lng, lat), srid=4326) 

187 

188 

189def create_polygon_lat_lng(points: list[list[float]]) -> WKBElement: 

190 """ 

191 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

192 """ 

193 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

194 

195 

196def create_polygon_lng_lat(points: list[list[float]]) -> WKBElement: 

197 """ 

198 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

199 """ 

200 return from_shape(Polygon(points), srid=4326) 

201 

202 

203def geojson_to_geom(geojson: dict[str, Any]) -> WKBElement: 

204 """ 

205 Turns GeoJSON to PostGIS geom data in EPSG4326 

206 """ 

207 return from_shape(shape(geojson), srid=4326) 

208 

209 

210def to_multi(polygon: WKBElement) -> Function[Any]: 

211 return func.ST_Multi(polygon) 

212 

213 

214@overload 

215def get_coordinates(geom: WKBElement | WKTElement) -> tuple[float, float]: ... 

216@overload 

217def get_coordinates(geom: None) -> None: ... 

218 

219 

220def get_coordinates(geom: WKBElement | WKTElement | None) -> tuple[float, float] | None: 

221 """ 

222 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy 

223 """ 

224 if geom: 

225 shp = to_shape(geom) 

226 # note the funniness with 4326 normally being (x, y) = (lng, lat) 

227 return shp.y, shp.x 

228 else: 

229 return None 

230 

231 

232def http_date(dt: datetime | None = None) -> str: 

233 """ 

234 Format the datetime for HTTP cookies 

235 """ 

236 if not dt: 

237 dt = now() 

238 return formatdate(dt.timestamp(), usegmt=True) 

239 

240 

241def _create_tasty_cookie(name: str, value: Any, expiry: datetime, httponly: bool) -> str: 

242 cookie: http.cookies.Morsel[str] = http.cookies.Morsel() 

243 cookie.set(name, str(value), str(value)) 

244 # tell the browser when to stop sending the cookie 

245 cookie["expires"] = http_date(expiry) 

246 # restrict to our domain, note if there's no domain, it won't include subdomains 

247 cookie["domain"] = config["COOKIE_DOMAIN"] 

248 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

249 cookie["path"] = "/" 

250 if config["DEV"]: 250 ↛ 255line 250 didn't jump to line 255 because the condition on line 250 was always true

251 # send only on requests from first-party domains 

252 cookie["samesite"] = "Strict" 

253 else: 

254 # send on all requests, requires Secure 

255 cookie["samesite"] = "None" 

256 # only set cookie on HTTPS sites in production 

257 cookie["secure"] = True 

258 # not accessible from javascript 

259 cookie["httponly"] = httponly 

260 

261 return cookie.OutputString() 

262 

263 

264def create_session_cookies(token: str, user_id: str | int, expiry: datetime) -> list[str]: 

265 """ 

266 Creates our session cookies. 

267 

268 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out 

269 """ 

270 return [ 

271 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True), 

272 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False), 

273 ] 

274 

275 

276def create_lang_cookie(lang: str) -> list[str]: 

277 return [ 

278 _create_tasty_cookie("NEXT_LOCALE", lang, expiry=(now() + PREFERRED_LANGUAGE_COOKIE_EXPIRY), httponly=False) 

279 ] 

280 

281 

282def _parse_cookie(headers: Mapping[str, str | bytes], cookie_name: str) -> str | None: 

283 """ 

284 Helper to parse a cookie value from headers by name, returning None if not found. 

285 """ 

286 if "cookie" not in headers: 

287 return None 

288 

289 cookie_str = typing.cast(str, headers["cookie"]) 

290 cookie = http.cookies.SimpleCookie(cookie_str).get(cookie_name) 

291 

292 if not cookie: 

293 return None 

294 

295 return cookie.value 

296 

297 

298def parse_session_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

299 """ 

300 Returns our session cookie value (aka token) or None 

301 """ 

302 return _parse_cookie(headers, "couchers-sesh") 

303 

304 

305def parse_user_id_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

306 """ 

307 Returns our user id cookie value or None 

308 """ 

309 return _parse_cookie(headers, "couchers-user-id") 

310 

311 

312def parse_ui_lang_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

313 """ 

314 Returns language cookie or None 

315 """ 

316 return _parse_cookie(headers, "NEXT_LOCALE") 

317 

318 

319def parse_api_key(headers: Mapping[str, str | bytes]) -> str | None: 

320 """ 

321 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

322 """ 

323 if "authorization" not in headers: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 return None 

325 

326 authorization = headers["authorization"] 

327 if isinstance(authorization, bytes): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 authorization = authorization.decode("utf-8") 

329 

330 if not authorization.startswith("Bearer "): 

331 return None 

332 

333 return authorization[7:] 

334 

335 

336def parse_sofa_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

337 cookie_value = _parse_cookie(headers, "sofa") 

338 if not cookie_value: 

339 return None 

340 

341 try: 

342 decode_sofa(cookie_value) 

343 return cookie_value 

344 except Exception: 

345 return None 

346 

347 

348def generate_sofa_cookie() -> tuple[str, str]: 

349 sofa_value = encode_sofa( 

350 create_sofa_id(), 

351 internal_pb2.SofaPayload( 

352 version=1, 

353 created=Timestamp_from_datetime(now()), 

354 ), 

355 ) 

356 return sofa_value, _create_tasty_cookie("sofa", sofa_value, now() + timedelta(days=10000), httponly=True) 

357 

358 

359def remove_duplicates_retain_order[T](list_: Sequence[T]) -> list[T]: 

360 out = [] 

361 for item in list_: 

362 if item not in out: 

363 out.append(item) 

364 return out 

365 

366 

367def date_in_timezone(date_: Mapped[date | None], timezone: str) -> Function[Any]: 

368 """ 

369 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

370 start of that date in that timezone. E.g., if postgres is in 'America/New_York', 

371 

372 SET SESSION TIME ZONE 'America/New_York'; 

373 

374 CREATE TABLE tz_trouble (to_date date, timezone text); 

375 

376 INSERT INTO tz_trouble(to_date, timezone) VALUES 

377 ('2021-03-10'::date, 'Australia/Sydney'), 

378 ('2021-03-20'::date, 'Europe/Berlin'), 

379 ('2021-04-15'::date, 'America/New_York'); 

380 

381 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

382 

383 The result is: 

384 

385 timezone 

386 ------------------------ 

387 2021-03-09 08:00:00-05 

388 2021-03-19 19:00:00-04 

389 2021-04-15 00:00:00-04 

390 """ 

391 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

392 

393 

394def millis_from_dt(dt: datetime) -> int: 

395 return round(1000 * dt.timestamp()) 

396 

397 

398def dt_from_millis(millis: int) -> datetime: 

399 return datetime.fromtimestamp(millis / 1000, tz=UTC) 

400 

401 

402def dt_to_page_token(dt: datetime) -> str: 

403 """ 

404 Python has datetime resolution equal to 1 micro, as does postgres 

405 

406 We pray to deities that this never changes 

407 """ 

408 assert datetime.resolution == timedelta(microseconds=1) 

409 return encrypt_page_token(str(round(1_000_000 * dt.timestamp()))) 

410 

411 

412def dt_from_page_token(page_token: str) -> datetime: 

413 # see above comment 

414 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=UTC) 

415 

416 

417def last_active_coarsen(dt: datetime) -> datetime: 

418 """ 

419 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

420 """ 

421 return dt.replace(minute=0, second=0, microsecond=0) 

422 

423 

424def not_none[T](x: T | None) -> T: 

425 if x is None: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 raise ValueError("Expected a value but got None") 

427 return x 

428 

429 

430def is_geom(x: Geom | None) -> Geom: 

431 """not_none does not work with unions.""" 

432 if x is None: 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true

433 raise ValueError("Expected a Geom but got None") 

434 return x