Coverage for src / couchers / utils.py: 92%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-13 12:05 +0000

1import http.cookies 

2import re 

3import typing 

4from collections.abc import Mapping, Sequence 

5from datetime import date, datetime, timedelta 

6from email.utils import formatdate 

7from typing import TYPE_CHECKING, Any, overload 

8from zoneinfo import ZoneInfo 

9 

10import pytz 

11from geoalchemy2 import WKBElement, WKTElement 

12from geoalchemy2.shape import from_shape, to_shape 

13from google.protobuf.duration_pb2 import Duration 

14from google.protobuf.timestamp_pb2 import Timestamp 

15from shapely.geometry import Point, Polygon, shape 

16from sqlalchemy import Function, cast 

17from sqlalchemy.orm import Mapped 

18from sqlalchemy.sql import func 

19from sqlalchemy.types import DateTime 

20 

21from couchers.config import config 

22from couchers.constants import EMAIL_REGEX, PREFERRED_LANGUAGE_COOKIE_EXPIRY 

23from couchers.crypto import decrypt_page_token, encrypt_page_token 

24 

25if TYPE_CHECKING: 

26 from couchers.models import Geom 

27 

28utc = pytz.UTC 

29 

30 

31# When a user logs in, they can basically input one of three things: user id, username, or email 

32# These are three non-intersecting sets 

33# * user_ids are numeric representations in base 10 

34# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, 

35# and don't start or end with underscore 

36# * emails are just whatever stack overflow says emails are ;) 

37 

38 

39def is_valid_user_id(field: str) -> bool: 

40 """ 

41 Checks if it's a string representing a base 10 integer not starting with 0 

42 """ 

43 return re.match(r"[1-9][0-9]*$", field) is not None 

44 

45 

46def is_valid_username(field: str) -> bool: 

47 """ 

48 Checks if it's an alphanumeric + underscore, lowercase string, at least 

49 two characters long, and starts with a letter, ends with alphanumeric 

50 """ 

51 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

52 

53 

54def is_valid_name(field: str) -> bool: 

55 """ 

56 Checks if it has at least one non-whitespace character 

57 """ 

58 return re.match(r"\S+", field) is not None 

59 

60 

61def is_valid_email(field: str) -> bool: 

62 return re.match(EMAIL_REGEX, field) is not None 

63 

64 

65def Timestamp_from_datetime(dt: datetime) -> Timestamp: 

66 pb_ts = Timestamp() 

67 pb_ts.FromDatetime(dt) 

68 return pb_ts 

69 

70 

71def Duration_from_timedelta(dt: timedelta) -> Duration: 

72 pb_d = Duration() 

73 pb_d.FromTimedelta(dt) 

74 return pb_d 

75 

76 

77def parse_date(date_str: str) -> date | None: 

78 """ 

79 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

80 """ 

81 try: 

82 return date.fromisoformat(date_str) 

83 except ValueError: 

84 return None 

85 

86 

87def date_to_api(date_obj: date) -> str: 

88 return date_obj.isoformat() 

89 

90 

91def to_aware_datetime(ts: Timestamp) -> datetime: 

92 """ 

93 Turns a protobuf Timestamp object into a timezone-aware datetime 

94 """ 

95 return ts.ToDatetime(tzinfo=utc) 

96 

97 

98def now() -> datetime: 

99 return datetime.now(utc) 

100 

101 

102def minimum_allowed_birthdate() -> date: 

103 """ 

104 Most recent birthdate allowed to register (must be 18 years minimum) 

105 

106 This approximation works on leap days! 

107 """ 

108 return today() - timedelta(days=365.25 * 18) 

109 

110 

111def today() -> date: 

112 """ 

113 Date only in UTC 

114 """ 

115 return now().date() 

116 

117 

118def now_in_timezone(tz: str) -> datetime: 

119 """ 

120 tz should be tzdata identifier, e.g. America/New_York 

121 """ 

122 return datetime.now(pytz.timezone(tz)) 

123 

124 

125def today_in_timezone(tz: str) -> date: 

126 """ 

127 tz should be tzdata identifier, e.g. America/New_York 

128 """ 

129 return now_in_timezone(tz).date() 

130 

131 

132# Note: be very careful with ordering of lat/lng! 

133# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

134# When entering as EPSG4326, we also need it in (lng, lat) 

135 

136 

137def wrap_coordinate(lat: float, lng: float) -> tuple[float, float]: 

138 """ 

139 Wraps (lat, lng) point in the EPSG4326 format 

140 """ 

141 

142 def __wrap_gen(deg: float, ct: float, adj: float) -> float: 

143 if deg > ct: 

144 deg -= adj 

145 if deg < -ct: 

146 deg += adj 

147 return deg 

148 

149 def __wrap_flip(deg: float, ct: float, adj: float) -> float: 

150 if deg > ct: 

151 deg = -deg + adj 

152 if deg < -ct: 

153 deg = -deg - adj 

154 return deg 

155 

156 def __wrap_rem(deg: float, ct: float = 360) -> float: 

157 if deg > ct: 

158 deg = deg % ct 

159 if deg < -ct: 

160 deg = deg % -ct 

161 return deg 

162 

163 if lng < -180 or lng > 180 or lat < -90 or lat > 90: 

164 lng = __wrap_rem(lng) 

165 lat = __wrap_rem(lat) 

166 lng = __wrap_gen(lng, 180, 360) 

167 lat = __wrap_flip(lat, 180, 180) 

168 lat = __wrap_flip(lat, 90, 180) 

169 if lng == -180: 

170 lng = 180 

171 if lng == -360: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 lng = 0 

173 

174 return lat, lng 

175 

176 

177def create_coordinate(lat: float, lng: float) -> WKBElement: 

178 """ 

179 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

180 """ 

181 lat, lng = wrap_coordinate(lat, lng) 

182 return from_shape(Point(lng, lat), srid=4326) 

183 

184 

185def create_polygon_lat_lng(points: list[list[float]]) -> WKBElement: 

186 """ 

187 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

188 """ 

189 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

190 

191 

192def create_polygon_lng_lat(points: list[list[float]]) -> WKBElement: 

193 """ 

194 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

195 """ 

196 return from_shape(Polygon(points), srid=4326) 

197 

198 

199def geojson_to_geom(geojson: dict[str, Any]) -> WKBElement: 

200 """ 

201 Turns GeoJSON to PostGIS geom data in EPSG4326 

202 """ 

203 return from_shape(shape(geojson), srid=4326) 

204 

205 

206def to_multi(polygon: WKBElement) -> Function[Any]: 

207 return func.ST_Multi(polygon) 

208 

209 

210@overload 

211def get_coordinates(geom: WKBElement | WKTElement) -> tuple[float, float]: ... 

212@overload 

213def get_coordinates(geom: None) -> None: ... 

214 

215 

216def get_coordinates(geom: WKBElement | WKTElement | None) -> tuple[float, float] | None: 

217 """ 

218 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy 

219 """ 

220 if geom: 

221 shp = to_shape(geom) 

222 # note the funniness with 4326 normally being (x, y) = (lng, lat) 

223 return shp.y, shp.x 

224 else: 

225 return None 

226 

227 

228def http_date(dt: datetime | None = None) -> str: 

229 """ 

230 Format the datetime for HTTP cookies 

231 """ 

232 if not dt: 

233 dt = now() 

234 return formatdate(dt.timestamp(), usegmt=True) 

235 

236 

237def _create_tasty_cookie(name: str, value: Any, expiry: datetime, httponly: bool) -> str: 

238 cookie: http.cookies.Morsel[str] = http.cookies.Morsel() 

239 cookie.set(name, str(value), str(value)) 

240 # tell the browser when to stop sending the cookie 

241 cookie["expires"] = http_date(expiry) 

242 # restrict to our domain, note if there's no domain, it won't include subdomains 

243 cookie["domain"] = config["COOKIE_DOMAIN"] 

244 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

245 cookie["path"] = "/" 

246 if config["DEV"]: 246 ↛ 251line 246 didn't jump to line 251 because the condition on line 246 was always true

247 # send only on requests from first-party domains 

248 cookie["samesite"] = "Strict" 

249 else: 

250 # send on all requests, requires Secure 

251 cookie["samesite"] = "None" 

252 # only set cookie on HTTPS sites in production 

253 cookie["secure"] = True 

254 # not accessible from javascript 

255 cookie["httponly"] = httponly 

256 

257 return cookie.OutputString() 

258 

259 

260def create_session_cookies(token: str, user_id: str | int, expiry: datetime) -> list[str]: 

261 """ 

262 Creates our session cookies. 

263 

264 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out 

265 """ 

266 return [ 

267 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True), 

268 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False), 

269 ] 

270 

271 

272def create_lang_cookie(lang: str) -> list[str]: 

273 return [ 

274 _create_tasty_cookie("NEXT_LOCALE", lang, expiry=(now() + PREFERRED_LANGUAGE_COOKIE_EXPIRY), httponly=False) 

275 ] 

276 

277 

278def parse_session_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

279 """ 

280 Returns our session cookie value (aka token) or None 

281 """ 

282 if "cookie" not in headers: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 return None 

284 

285 cookie_str = typing.cast(str, headers["cookie"]) 

286 

287 # parse the cookie 

288 cookie = http.cookies.SimpleCookie(cookie_str).get("couchers-sesh") 

289 

290 if not cookie: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 return None 

292 

293 return cookie.value 

294 

295 

296def parse_user_id_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

297 """ 

298 Returns our session cookie value (aka token) or None 

299 """ 

300 if "cookie" not in headers: 

301 return None 

302 

303 cookie_str = typing.cast(str, headers["cookie"]) 

304 

305 # parse the cookie 

306 cookie = http.cookies.SimpleCookie(cookie_str).get("couchers-user-id") 

307 

308 if not cookie: 

309 return None 

310 

311 return cookie.value 

312 

313 

314def parse_ui_lang_cookie(headers: Mapping[str, str | bytes]) -> str | None: 

315 """ 

316 Returns language cookie or None 

317 """ 

318 if "cookie" not in headers: 

319 return None 

320 

321 cookie_str = typing.cast(str, headers["cookie"]) 

322 

323 # else parse the cookie & return its value 

324 cookie = http.cookies.SimpleCookie(cookie_str).get("NEXT_LOCALE") 

325 

326 if not cookie: 

327 return None 

328 

329 return cookie.value 

330 

331 

332def parse_api_key(headers: Mapping[str, str | bytes]) -> str | None: 

333 """ 

334 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

335 """ 

336 if "authorization" not in headers: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 return None 

338 

339 authorization = headers["authorization"] 

340 if isinstance(authorization, bytes): 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 authorization = authorization.decode("utf-8") 

342 

343 if not authorization.startswith("Bearer "): 

344 return None 

345 

346 return authorization[7:] 

347 

348 

349def remove_duplicates_retain_order[T](list_: Sequence[T]) -> list[T]: 

350 out = [] 

351 for item in list_: 

352 if item not in out: 

353 out.append(item) 

354 return out 

355 

356 

357def date_in_timezone(date_: Mapped[date | None], timezone: str) -> Function[Any]: 

358 """ 

359 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

360 start of that date in that timezone. E.g., if postgres is in 'America/New_York', 

361 

362 SET SESSION TIME ZONE 'America/New_York'; 

363 

364 CREATE TABLE tz_trouble (to_date date, timezone text); 

365 

366 INSERT INTO tz_trouble(to_date, timezone) VALUES 

367 ('2021-03-10'::date, 'Australia/Sydney'), 

368 ('2021-03-20'::date, 'Europe/Berlin'), 

369 ('2021-04-15'::date, 'America/New_York'); 

370 

371 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

372 

373 The result is: 

374 

375 timezone 

376 ------------------------ 

377 2021-03-09 08:00:00-05 

378 2021-03-19 19:00:00-04 

379 2021-04-15 00:00:00-04 

380 """ 

381 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

382 

383 

384def millis_from_dt(dt: datetime) -> int: 

385 return round(1000 * dt.timestamp()) 

386 

387 

388def dt_from_millis(millis: int) -> datetime: 

389 return datetime.fromtimestamp(millis / 1000, tz=utc) 

390 

391 

392def dt_to_page_token(dt: datetime) -> str: 

393 """ 

394 Python has datetime resolution equal to 1 micro, as does postgres 

395 

396 We pray to deities that this never changes 

397 """ 

398 assert datetime.resolution == timedelta(microseconds=1) 

399 return encrypt_page_token(str(round(1_000_000 * dt.timestamp()))) 

400 

401 

402def dt_from_page_token(page_token: str) -> datetime: 

403 # see above comment 

404 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc) 

405 

406 

407def last_active_coarsen(dt: datetime) -> datetime: 

408 """ 

409 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

410 """ 

411 return dt.replace(minute=0, second=0, microsecond=0) 

412 

413 

414def get_tz_as_text(tz_name: str) -> str: 

415 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z") 

416 

417 

418def not_none[T](x: T | None) -> T: 

419 if x is None: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true

420 raise ValueError("Expected a value but got None") 

421 return x 

422 

423 

424def is_geom(x: Geom | None) -> Geom: 

425 """not_none does not work with unions.""" 

426 if x is None: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 raise ValueError("Expected a Geom but got None") 

428 return x