Coverage for app / backend / src / couchers / utils.py: 94%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import http.cookies 

2import re 

3import typing 

4from collections.abc import Mapping, Sequence 

5from datetime import date, datetime, timedelta 

6from email.utils import formatdate 

7from typing import TYPE_CHECKING, Any, overload 

8 

9import pytz 

10from geoalchemy2 import WKBElement, WKTElement 

11from geoalchemy2.shape import from_shape, to_shape 

12from google.protobuf.duration_pb2 import Duration 

13from google.protobuf.timestamp_pb2 import Timestamp 

14from shapely.geometry import Point, Polygon, shape 

15from sqlalchemy import Function, cast 

16from sqlalchemy.orm import Mapped 

17from sqlalchemy.sql import func 

18from sqlalchemy.types import DateTime 

19 

20from couchers.config import config 

21from couchers.constants import EMAIL_REGEX, PREFERRED_LANGUAGE_COOKIE_EXPIRY 

22from couchers.crypto import ( 

23 create_sofa_id, 

24 decode_sofa, 

25 decrypt_page_token, 

26 encode_sofa, 

27 encrypt_page_token, 

28) 

29from couchers.proto.internal import internal_pb2 

30 

31if TYPE_CHECKING: 

32 from couchers.models import Geom 

33 

34utc = pytz.UTC 

35 

36 

37# When a user logs in, they can basically input one of three things: user id, username, or email 

38# These are three non-intersecting sets 

39# * user_ids are numeric representations in base 10 

40# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, 

41# and don't start or end with underscore 

42# * emails are just whatever stack overflow says emails are ;) 

43 

44 

45def is_valid_user_id(field: str) -> bool: 

46 """ 

47 Checks if it's a string representing a base 10 integer not starting with 0 

48 """ 

49 return re.match(r"[1-9][0-9]*$", field) is not None 

50 

51 

52def is_valid_username(field: str) -> bool: 

53 """ 

54 Checks if it's an alphanumeric + underscore, lowercase string, at least 

55 two characters long, and starts with a letter, ends with alphanumeric 

56 """ 

57 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

58 

59 

60def is_valid_name(field: str) -> bool: 

61 """ 

62 Checks if it has at least one non-whitespace character 

63 """ 

64 return re.match(r"\S+", field) is not None 

65 

66 

67def is_valid_email(field: str) -> bool: 

68 return re.match(EMAIL_REGEX, field) is not None 

69 

70 

71def Timestamp_from_datetime(dt: datetime) -> Timestamp: 

72 pb_ts = Timestamp() 

73 pb_ts.FromDatetime(dt) 

74 return pb_ts 

75 

76 

77def Duration_from_timedelta(dt: timedelta) -> Duration: 

78 pb_d = Duration() 

79 pb_d.FromTimedelta(dt) 

80 return pb_d 

81 

82 

83def parse_date(date_str: str) -> date | None: 

84 """ 

85 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

86 """ 

87 try: 

88 return date.fromisoformat(date_str) 

89 except ValueError: 

90 return None 

91 

92 

93def date_to_api(date_obj: date) -> str: 

94 return date_obj.isoformat() 

95 

96 

97def to_aware_datetime(ts: Timestamp) -> datetime: 

98 """ 

99 Turns a protobuf Timestamp object into a timezone-aware datetime 

100 """ 

101 return ts.ToDatetime(tzinfo=utc) 

102 

103 

104def now() -> datetime: 

105 return datetime.now(utc) 

106 

107 

108def minimum_allowed_birthdate() -> date: 

109 """ 

110 Most recent birthdate allowed to register (must be 18 years minimum) 

111 

112 This approximation works on leap days! 

113 """ 

114 return today() - timedelta(days=365.25 * 18) 

115 

116 

117def today() -> date: 

118 """ 

119 Date only in UTC 

120 """ 

121 return now().date() 

122 

123 

124def now_in_timezone(tz: str) -> datetime: 

125 """ 

126 tz should be tzdata identifier, e.g. America/New_York 

127 """ 

128 return datetime.now(pytz.timezone(tz)) 

129 

130 

131def today_in_timezone(tz: str) -> date: 

132 """ 

133 tz should be tzdata identifier, e.g. America/New_York 

134 """ 

135 return now_in_timezone(tz).date() 

136 

137 

138# Note: be very careful with ordering of lat/lng! 

139# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

140# When entering as EPSG4326, we also need it in (lng, lat) 

141 

142 

143def wrap_coordinate(lat: float, lng: float) -> tuple[float, float]: 

144 """ 

145 Wraps (lat, lng) point in the EPSG4326 format 

146 """ 

147 

148 def __wrap_gen(deg: float, ct: float, adj: float) -> float: 

149 if deg > ct: 

150 deg -= adj 

151 if deg < -ct: 

152 deg += adj 

153 return deg 

154 

155 def __wrap_flip(deg: float, ct: float, adj: float) -> float: 

156 if deg > ct: 

157 deg = -deg + adj 

158 if deg < -ct: 

159 deg = -deg - adj 

160 return deg 

161 

162 def __wrap_rem(deg: float, ct: float = 360) -> float: 

163 if deg > ct: 

164 deg = deg % ct 

165 if deg < -ct: 

166 deg = deg % -ct 

167 return deg 

168 

169 if lng < -180 or lng > 180 or lat < -90 or lat > 90: 

170 lng = __wrap_rem(lng) 

171 lat = __wrap_rem(lat) 

172 lng = __wrap_gen(lng, 180, 360) 

173 lat = __wrap_flip(lat, 180, 180) 

174 lat = __wrap_flip(lat, 90, 180) 

175 if lng == -180: 

176 lng = 180 

177 if lng == -360: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 lng = 0 

179 

180 return lat, lng 

181 

182 

183def create_coordinate(lat: float, lng: float) -> WKBElement: 

184 """ 

185 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

186 """ 

187 lat, lng = wrap_coordinate(lat, lng) 

188 return from_shape(Point(lng, lat), srid=4326) 

189 

190 

191def create_polygon_lat_lng(points: list[list[float]]) -> WKBElement: 

192 """ 

193 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

194 """ 

195 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

196 

197 

198def create_polygon_lng_lat(points: list[list[float]]) -> WKBElement: 

199 """ 

200 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

201 """ 

202 return from_shape(Polygon(points), srid=4326) 

203 

204 

205def geojson_to_geom(geojson: dict[str, Any]) -> WKBElement: 

206 """ 

207 Turns GeoJSON to PostGIS geom data in EPSG4326 

208 """ 

209 return from_shape(shape(geojson), srid=4326) 

210 

211 

212def to_multi(polygon: WKBElement) -> Function[Any]: 

213 return func.ST_Multi(polygon) 

214 

215 

216@overload 

217def get_coordinates(geom: WKBElement | WKTElement) -> tuple[float, float]: ... 

218@overload 

219def get_coordinates(geom: None) -> None: ... 

220 

221 

222def get_coordinates(geom: WKBElement | WKTElement | None) -> tuple[float, float] | None: 

223 """ 

224 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy 

225 """ 

226 if geom: 

227 shp = to_shape(geom) 

228 # note the funniness with 4326 normally being (x, y) = (lng, lat) 

229 return shp.y, shp.x 

230 else: 

231 return None 

232 

233 

234def http_date(dt: datetime | None = None) -> str: 

235 """ 

236 Format the datetime for HTTP cookies 

237 """ 

238 if not dt: 

239 dt = now() 

240 return formatdate(dt.timestamp(), usegmt=True) 

241 

242 

243def _create_tasty_cookie(name: str, value: Any, expiry: datetime, httponly: bool) -> str: 

244 cookie: http.cookies.Morsel[str] = http.cookies.Morsel() 

245 cookie.set(name, str(value), str(value)) 

246 # tell the browser when to stop sending the cookie 

247 cookie["expires"] = http_date(expiry) 

248 # restrict to our domain, note if there's no domain, it won't include subdomains 

249 cookie["domain"] = config["COOKIE_DOMAIN"] 

250 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

251 cookie["path"] = "/" 

252 if config["DEV"]: 252 ↛ 257line 252 didn't jump to line 257 because the condition on line 252 was always true

253 # send only on requests from first-party domains 

254 cookie["samesite"] = "Strict" 

255 else: 

256 # send on all requests, requires Secure 

257 cookie["samesite"] = "None" 

258 # only set cookie on HTTPS sites in production 

259 cookie["secure"] = True 

260 # not accessible from javascript 

261 cookie["httponly"] = httponly 

262 

263 return cookie.OutputString() 

264 

265 

266def create_session_cookies(token: str, user_id: str | int, expiry: datetime) -> list[str]: 

267 """ 

268 Creates our session cookies. 

269 

270 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out 

271 """ 

272 return [ 

273 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True), 

274 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False), 

275 ] 

276 

277 

278def create_lang_cookie(lang: str) -> list[str]: 

279 return [ 

280 _create_tasty_cookie("NEXT_LOCALE", lang, expiry=(now() + PREFERRED_LANGUAGE_COOKIE_EXPIRY), httponly=False) 

281 ] 

282 

283 

284def _parse_cookie(headers: Mapping[str, str | bytes], cookie_name: str) -> str | None: 

285 """ 

286 Helper to parse a cookie value from headers by name, returning None if not found. 

287 """ 

288 if "cookie" not in headers: 

289 return None 

290 

291 cookie_str = typing.cast(str, headers["cookie"]) 

292 cookie = http.cookies.SimpleCookie(cookie_str).get(cookie_name) 

293 

294 if not cookie: 

295 return None 

296 

297 return cookie.value 

298 

299 

300def parse_session_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

301 """ 

302 Returns our session cookie value (aka token) or None 

303 """ 

304 return _parse_cookie(headers, "couchers-sesh") 

305 

306 

307def parse_user_id_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

308 """ 

309 Returns our user id cookie value or None 

310 """ 

311 return _parse_cookie(headers, "couchers-user-id") 

312 

313 

314def parse_ui_lang_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

315 """ 

316 Returns language cookie or None 

317 """ 

318 return _parse_cookie(headers, "NEXT_LOCALE") 

319 

320 

321def parse_api_key(headers: Mapping[str, str | bytes]) -> str | None: 

322 """ 

323 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

324 """ 

325 if "authorization" not in headers: 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true

326 return None 

327 

328 authorization = headers["authorization"] 

329 if isinstance(authorization, bytes): 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 authorization = authorization.decode("utf-8") 

331 

332 if not authorization.startswith("Bearer "): 

333 return None 

334 

335 return authorization[7:] 

336 

337 

338def parse_sofa_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

339 cookie_value = _parse_cookie(headers, "sofa") 

340 if not cookie_value: 

341 return None 

342 

343 try: 

344 decode_sofa(cookie_value) 

345 return cookie_value 

346 except Exception: 

347 return None 

348 

349 

350def generate_sofa_cookie() -> tuple[str, str]: 

351 sofa_value = encode_sofa( 

352 create_sofa_id(), 

353 internal_pb2.SofaPayload( 

354 version=1, 

355 created=Timestamp_from_datetime(now()), 

356 ), 

357 ) 

358 return sofa_value, _create_tasty_cookie("sofa", sofa_value, now() + timedelta(days=10000), httponly=True) 

359 

360 

361def remove_duplicates_retain_order[T](list_: Sequence[T]) -> list[T]: 

362 out = [] 

363 for item in list_: 

364 if item not in out: 

365 out.append(item) 

366 return out 

367 

368 

369def date_in_timezone(date_: Mapped[date | None], timezone: str) -> Function[Any]: 

370 """ 

371 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

372 start of that date in that timezone. E.g., if postgres is in 'America/New_York', 

373 

374 SET SESSION TIME ZONE 'America/New_York'; 

375 

376 CREATE TABLE tz_trouble (to_date date, timezone text); 

377 

378 INSERT INTO tz_trouble(to_date, timezone) VALUES 

379 ('2021-03-10'::date, 'Australia/Sydney'), 

380 ('2021-03-20'::date, 'Europe/Berlin'), 

381 ('2021-04-15'::date, 'America/New_York'); 

382 

383 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

384 

385 The result is: 

386 

387 timezone 

388 ------------------------ 

389 2021-03-09 08:00:00-05 

390 2021-03-19 19:00:00-04 

391 2021-04-15 00:00:00-04 

392 """ 

393 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

394 

395 

396def millis_from_dt(dt: datetime) -> int: 

397 return round(1000 * dt.timestamp()) 

398 

399 

400def dt_from_millis(millis: int) -> datetime: 

401 return datetime.fromtimestamp(millis / 1000, tz=utc) 

402 

403 

404def dt_to_page_token(dt: datetime) -> str: 

405 """ 

406 Python has datetime resolution equal to 1 micro, as does postgres 

407 

408 We pray to deities that this never changes 

409 """ 

410 assert datetime.resolution == timedelta(microseconds=1) 

411 return encrypt_page_token(str(round(1_000_000 * dt.timestamp()))) 

412 

413 

414def dt_from_page_token(page_token: str) -> datetime: 

415 # see above comment 

416 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc) 

417 

418 

419def last_active_coarsen(dt: datetime) -> datetime: 

420 """ 

421 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

422 """ 

423 return dt.replace(minute=0, second=0, microsecond=0) 

424 

425 

426def not_none[T](x: T | None) -> T: 

427 if x is None: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 raise ValueError("Expected a value but got None") 

429 return x 

430 

431 

432def is_geom(x: Geom | None) -> Geom: 

433 """not_none does not work with unions.""" 

434 if x is None: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 raise ValueError("Expected a Geom but got None") 

436 return x