Coverage for src / couchers / utils.py: 92%
185 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-13 12:05 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-13 12:05 +0000
1import http.cookies
2import re
3import typing
4from collections.abc import Mapping, Sequence
5from datetime import date, datetime, timedelta
6from email.utils import formatdate
7from typing import TYPE_CHECKING, Any, overload
8from zoneinfo import ZoneInfo
10import pytz
11from geoalchemy2 import WKBElement, WKTElement
12from geoalchemy2.shape import from_shape, to_shape
13from google.protobuf.duration_pb2 import Duration
14from google.protobuf.timestamp_pb2 import Timestamp
15from shapely.geometry import Point, Polygon, shape
16from sqlalchemy import Function, cast
17from sqlalchemy.orm import Mapped
18from sqlalchemy.sql import func
19from sqlalchemy.types import DateTime
21from couchers.config import config
22from couchers.constants import EMAIL_REGEX, PREFERRED_LANGUAGE_COOKIE_EXPIRY
23from couchers.crypto import decrypt_page_token, encrypt_page_token
25if TYPE_CHECKING:
26 from couchers.models import Geom
28utc = pytz.UTC
31# When a user logs in, they can basically input one of three things: user id, username, or email
32# These are three non-intersecting sets
33# * user_ids are numeric representations in base 10
34# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number,
35# and don't start or end with underscore
36# * emails are just whatever stack overflow says emails are ;)
39def is_valid_user_id(field: str) -> bool:
40 """
41 Checks if it's a string representing a base 10 integer not starting with 0
42 """
43 return re.match(r"[1-9][0-9]*$", field) is not None
46def is_valid_username(field: str) -> bool:
47 """
48 Checks if it's an alphanumeric + underscore, lowercase string, at least
49 two characters long, and starts with a letter, ends with alphanumeric
50 """
51 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None
54def is_valid_name(field: str) -> bool:
55 """
56 Checks if it has at least one non-whitespace character
57 """
58 return re.match(r"\S+", field) is not None
61def is_valid_email(field: str) -> bool:
62 return re.match(EMAIL_REGEX, field) is not None
65def Timestamp_from_datetime(dt: datetime) -> Timestamp:
66 pb_ts = Timestamp()
67 pb_ts.FromDatetime(dt)
68 return pb_ts
71def Duration_from_timedelta(dt: timedelta) -> Duration:
72 pb_d = Duration()
73 pb_d.FromTimedelta(dt)
74 return pb_d
77def parse_date(date_str: str) -> date | None:
78 """
79 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails
80 """
81 try:
82 return date.fromisoformat(date_str)
83 except ValueError:
84 return None
87def date_to_api(date_obj: date) -> str:
88 return date_obj.isoformat()
91def to_aware_datetime(ts: Timestamp) -> datetime:
92 """
93 Turns a protobuf Timestamp object into a timezone-aware datetime
94 """
95 return ts.ToDatetime(tzinfo=utc)
98def now() -> datetime:
99 return datetime.now(utc)
102def minimum_allowed_birthdate() -> date:
103 """
104 Most recent birthdate allowed to register (must be 18 years minimum)
106 This approximation works on leap days!
107 """
108 return today() - timedelta(days=365.25 * 18)
111def today() -> date:
112 """
113 Date only in UTC
114 """
115 return now().date()
118def now_in_timezone(tz: str) -> datetime:
119 """
120 tz should be tzdata identifier, e.g. America/New_York
121 """
122 return datetime.now(pytz.timezone(tz))
125def today_in_timezone(tz: str) -> date:
126 """
127 tz should be tzdata identifier, e.g. America/New_York
128 """
129 return now_in_timezone(tz).date()
132# Note: be very careful with ordering of lat/lng!
133# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)...
134# When entering as EPSG4326, we also need it in (lng, lat)
137def wrap_coordinate(lat: float, lng: float) -> tuple[float, float]:
138 """
139 Wraps (lat, lng) point in the EPSG4326 format
140 """
142 def __wrap_gen(deg: float, ct: float, adj: float) -> float:
143 if deg > ct:
144 deg -= adj
145 if deg < -ct:
146 deg += adj
147 return deg
149 def __wrap_flip(deg: float, ct: float, adj: float) -> float:
150 if deg > ct:
151 deg = -deg + adj
152 if deg < -ct:
153 deg = -deg - adj
154 return deg
156 def __wrap_rem(deg: float, ct: float = 360) -> float:
157 if deg > ct:
158 deg = deg % ct
159 if deg < -ct:
160 deg = deg % -ct
161 return deg
163 if lng < -180 or lng > 180 or lat < -90 or lat > 90:
164 lng = __wrap_rem(lng)
165 lat = __wrap_rem(lat)
166 lng = __wrap_gen(lng, 180, 360)
167 lat = __wrap_flip(lat, 180, 180)
168 lat = __wrap_flip(lat, 90, 180)
169 if lng == -180:
170 lng = 180
171 if lng == -360: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 lng = 0
174 return lat, lng
177def create_coordinate(lat: float, lng: float) -> WKBElement:
178 """
179 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates)
180 """
181 lat, lng = wrap_coordinate(lat, lng)
182 return from_shape(Point(lng, lat), srid=4326)
185def create_polygon_lat_lng(points: list[list[float]]) -> WKBElement:
186 """
187 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples
188 """
189 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326)
192def create_polygon_lng_lat(points: list[list[float]]) -> WKBElement:
193 """
194 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples
195 """
196 return from_shape(Polygon(points), srid=4326)
199def geojson_to_geom(geojson: dict[str, Any]) -> WKBElement:
200 """
201 Turns GeoJSON to PostGIS geom data in EPSG4326
202 """
203 return from_shape(shape(geojson), srid=4326)
206def to_multi(polygon: WKBElement) -> Function[Any]:
207 return func.ST_Multi(polygon)
210@overload
211def get_coordinates(geom: WKBElement | WKTElement) -> tuple[float, float]: ...
212@overload
213def get_coordinates(geom: None) -> None: ...
216def get_coordinates(geom: WKBElement | WKTElement | None) -> tuple[float, float] | None:
217 """
218 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy
219 """
220 if geom:
221 shp = to_shape(geom)
222 # note the funniness with 4326 normally being (x, y) = (lng, lat)
223 return shp.y, shp.x
224 else:
225 return None
228def http_date(dt: datetime | None = None) -> str:
229 """
230 Format the datetime for HTTP cookies
231 """
232 if not dt:
233 dt = now()
234 return formatdate(dt.timestamp(), usegmt=True)
237def _create_tasty_cookie(name: str, value: Any, expiry: datetime, httponly: bool) -> str:
238 cookie: http.cookies.Morsel[str] = http.cookies.Morsel()
239 cookie.set(name, str(value), str(value))
240 # tell the browser when to stop sending the cookie
241 cookie["expires"] = http_date(expiry)
242 # restrict to our domain, note if there's no domain, it won't include subdomains
243 cookie["domain"] = config["COOKIE_DOMAIN"]
244 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/
245 cookie["path"] = "/"
246 if config["DEV"]: 246 ↛ 251line 246 didn't jump to line 251 because the condition on line 246 was always true
247 # send only on requests from first-party domains
248 cookie["samesite"] = "Strict"
249 else:
250 # send on all requests, requires Secure
251 cookie["samesite"] = "None"
252 # only set cookie on HTTPS sites in production
253 cookie["secure"] = True
254 # not accessible from javascript
255 cookie["httponly"] = httponly
257 return cookie.OutputString()
260def create_session_cookies(token: str, user_id: str | int, expiry: datetime) -> list[str]:
261 """
262 Creates our session cookies.
264 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out
265 """
266 return [
267 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True),
268 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False),
269 ]
272def create_lang_cookie(lang: str) -> list[str]:
273 return [
274 _create_tasty_cookie("NEXT_LOCALE", lang, expiry=(now() + PREFERRED_LANGUAGE_COOKIE_EXPIRY), httponly=False)
275 ]
278def parse_session_cookie(headers: Mapping[str, str | bytes]) -> str | None:
279 """
280 Returns our session cookie value (aka token) or None
281 """
282 if "cookie" not in headers: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 return None
285 cookie_str = typing.cast(str, headers["cookie"])
287 # parse the cookie
288 cookie = http.cookies.SimpleCookie(cookie_str).get("couchers-sesh")
290 if not cookie: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 return None
293 return cookie.value
296def parse_user_id_cookie(headers: Mapping[str, str | bytes]) -> str | None:
297 """
298 Returns our session cookie value (aka token) or None
299 """
300 if "cookie" not in headers:
301 return None
303 cookie_str = typing.cast(str, headers["cookie"])
305 # parse the cookie
306 cookie = http.cookies.SimpleCookie(cookie_str).get("couchers-user-id")
308 if not cookie:
309 return None
311 return cookie.value
314def parse_ui_lang_cookie(headers: Mapping[str, str | bytes]) -> str | None:
315 """
316 Returns language cookie or None
317 """
318 if "cookie" not in headers:
319 return None
321 cookie_str = typing.cast(str, headers["cookie"])
323 # else parse the cookie & return its value
324 cookie = http.cookies.SimpleCookie(cookie_str).get("NEXT_LOCALE")
326 if not cookie:
327 return None
329 return cookie.value
332def parse_api_key(headers: Mapping[str, str | bytes]) -> str | None:
333 """
334 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present
335 """
336 if "authorization" not in headers: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true
337 return None
339 authorization = headers["authorization"]
340 if isinstance(authorization, bytes): 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 authorization = authorization.decode("utf-8")
343 if not authorization.startswith("Bearer "):
344 return None
346 return authorization[7:]
349def remove_duplicates_retain_order[T](list_: Sequence[T]) -> list[T]:
350 out = []
351 for item in list_:
352 if item not in out:
353 out.append(item)
354 return out
357def date_in_timezone(date_: Mapped[date | None], timezone: str) -> Function[Any]:
358 """
359 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the
360 start of that date in that timezone. E.g., if postgres is in 'America/New_York',
362 SET SESSION TIME ZONE 'America/New_York';
364 CREATE TABLE tz_trouble (to_date date, timezone text);
366 INSERT INTO tz_trouble(to_date, timezone) VALUES
367 ('2021-03-10'::date, 'Australia/Sydney'),
368 ('2021-03-20'::date, 'Europe/Berlin'),
369 ('2021-04-15'::date, 'America/New_York');
371 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble;
373 The result is:
375 timezone
376 ------------------------
377 2021-03-09 08:00:00-05
378 2021-03-19 19:00:00-04
379 2021-04-15 00:00:00-04
380 """
381 return func.timezone(timezone, cast(date_, DateTime(timezone=False)))
384def millis_from_dt(dt: datetime) -> int:
385 return round(1000 * dt.timestamp())
388def dt_from_millis(millis: int) -> datetime:
389 return datetime.fromtimestamp(millis / 1000, tz=utc)
392def dt_to_page_token(dt: datetime) -> str:
393 """
394 Python has datetime resolution equal to 1 micro, as does postgres
396 We pray to deities that this never changes
397 """
398 assert datetime.resolution == timedelta(microseconds=1)
399 return encrypt_page_token(str(round(1_000_000 * dt.timestamp())))
402def dt_from_page_token(page_token: str) -> datetime:
403 # see above comment
404 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc)
407def last_active_coarsen(dt: datetime) -> datetime:
408 """
409 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC
410 """
411 return dt.replace(minute=0, second=0, microsecond=0)
414def get_tz_as_text(tz_name: str) -> str:
415 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z")
418def not_none[T](x: T | None) -> T:
419 if x is None: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true
420 raise ValueError("Expected a value but got None")
421 return x
424def is_geom(x: Geom | None) -> Geom:
425 """not_none does not work with unions."""
426 if x is None: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 raise ValueError("Expected a Geom but got None")
428 return x