Coverage for src/couchers/utils.py: 94%
171 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-01 03:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-01 03:27 +0000
1import http.cookies
2import re
3from datetime import date, datetime, timedelta
4from email.utils import formatdate
5from types import SimpleNamespace
6from zoneinfo import ZoneInfo
8import pytz
9from geoalchemy2.shape import from_shape, to_shape
10from google.protobuf.duration_pb2 import Duration
11from google.protobuf.timestamp_pb2 import Timestamp
12from shapely.geometry import Point, Polygon, shape
13from sqlalchemy.sql import cast, func
14from sqlalchemy.types import DateTime
16from couchers.config import config
17from couchers.constants import EMAIL_REGEX, PREFERRED_LANGUAGE_COOKIE_EXPIRY
18from couchers.crypto import decrypt_page_token, encrypt_page_token
20utc = pytz.UTC
23# When a user logs in, they can basically input one of three things: user id, username, or email
24# These are three non-intersecting sets
25# * user_ids are numeric representations in base 10
26# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, and don't start or end with underscore
27# * emails are just whatever stack overflow says emails are ;)
30def is_valid_user_id(field):
31 """
32 Checks if it's a string representing a base 10 integer not starting with 0
33 """
34 return re.match(r"[1-9][0-9]*$", field) is not None
37def is_valid_username(field):
38 """
39 Checks if it's an alphanumeric + underscore, lowercase string, at least
40 two characters long, and starts with a letter, ends with alphanumeric
41 """
42 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None
45def is_valid_name(field):
46 """
47 Checks if it has at least one non-whitespace character
48 """
49 return re.match(r"\S+", field) is not None
52def is_valid_email(field):
53 return re.match(EMAIL_REGEX, field) is not None
56def Timestamp_from_datetime(dt: datetime):
57 pb_ts = Timestamp()
58 pb_ts.FromDatetime(dt)
59 return pb_ts
62def Duration_from_timedelta(dt: datetime):
63 pb_d = Duration()
64 pb_d.FromTimedelta(dt)
65 return pb_d
68def parse_date(date_str: str):
69 """
70 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails
71 """
72 try:
73 return date.fromisoformat(date_str)
74 except ValueError:
75 return None
78def date_to_api(date: date):
79 return date.isoformat()
82def to_aware_datetime(ts: Timestamp):
83 """
84 Turns a protobuf Timestamp object into a timezone-aware datetime
85 """
86 return utc.localize(ts.ToDatetime())
89def now():
90 return datetime.now(utc)
93def minimum_allowed_birthdate():
94 """
95 Most recent birthdate allowed to register (must be 18 years minimum)
97 This approximation works on leap days!
98 """
99 return today() - timedelta(days=365.25 * 18)
102def today():
103 """
104 Date only in UTC
105 """
106 return now().date()
109def now_in_timezone(tz):
110 """
111 tz should be tzdata identifier, e.g. America/New_York
112 """
113 return datetime.now(pytz.timezone(tz))
116def today_in_timezone(tz):
117 """
118 tz should be tzdata identifier, e.g. America/New_York
119 """
120 return now_in_timezone(tz).date()
123# Note: be very careful with ordering of lat/lng!
124# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)...
125# When entering as EPSG4326, we also need it in (lng, lat)
128def wrap_coordinate(lat, lng):
129 """
130 Wraps (lat, lng) point in the EPSG4326 format
131 """
133 def __wrap_gen(deg, ct, adj):
134 if deg > ct:
135 deg -= adj
136 if deg < -ct:
137 deg += adj
138 return deg
140 def __wrap_flip(deg, ct, adj):
141 if deg > ct:
142 deg = -deg + adj
143 if deg < -ct:
144 deg = -deg - adj
145 return deg
147 def __wrap_rem(deg, ct=360):
148 if deg > ct:
149 deg = deg % ct
150 if deg < -ct:
151 deg = deg % -ct
152 return deg
154 if lng < -180 or lng > 180 or lat < -90 or lat > 90:
155 lng = __wrap_rem(lng)
156 lat = __wrap_rem(lat)
157 lng = __wrap_gen(lng, 180, 360)
158 lat = __wrap_flip(lat, 180, 180)
159 lat = __wrap_flip(lat, 90, 180)
160 if lng == -180:
161 lng = 180
162 if lng == -360:
163 lng = 0
165 return lat, lng
168def create_coordinate(lat, lng):
169 """
170 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates)
171 """
172 lat, lng = wrap_coordinate(lat, lng)
173 return from_shape(Point(lng, lat), srid=4326)
176def create_polygon_lat_lng(points):
177 """
178 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples
179 """
180 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326)
183def create_polygon_lng_lat(points):
184 """
185 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples
186 """
187 return from_shape(Polygon(points), srid=4326)
190def geojson_to_geom(geojson):
191 """
192 Turns GeoJSON to PostGIS geom data in EPSG4326
193 """
194 return from_shape(shape(geojson), srid=4326)
197def to_multi(polygon):
198 return func.ST_Multi(polygon)
201def get_coordinates(geom):
202 """
203 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy
204 """
205 if geom:
206 shp = to_shape(geom)
207 # note the funiness with 4326 normally being (x, y) = (lng, lat)
208 return (shp.y, shp.x)
209 else:
210 return None
213def http_date(dt=None):
214 """
215 Format the datetime for HTTP cookies
216 """
217 if not dt:
218 dt = now()
219 return formatdate(dt.timestamp(), usegmt=True)
222def _create_tasty_cookie(name: str, value, expiry: datetime, httponly: bool):
223 cookie = http.cookies.Morsel()
224 cookie.set(name, str(value), str(value))
225 # tell the browser when to stop sending the cookie
226 cookie["expires"] = http_date(expiry)
227 # restrict to our domain, note if there's no domain, it won't include subdomains
228 cookie["domain"] = config["COOKIE_DOMAIN"]
229 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/
230 cookie["path"] = "/"
231 if config["DEV"]:
232 # send only on requests from first-party domains
233 cookie["samesite"] = "Strict"
234 else:
235 # send on all requests, requires Secure
236 cookie["samesite"] = "None"
237 # only set cookie on HTTPS sites in production
238 cookie["secure"] = True
239 # not accessible from javascript
240 cookie["httponly"] = httponly
242 return cookie.OutputString()
245def create_session_cookies(token, user_id, expiry) -> list[str]:
246 """
247 Creates our session cookies.
249 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out
250 """
251 return [
252 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True),
253 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False),
254 ]
257def create_lang_cookie(lang):
258 return [
259 _create_tasty_cookie("NEXT_LOCALE", lang, expiry=(now() + PREFERRED_LANGUAGE_COOKIE_EXPIRY), httponly=False)
260 ]
263def parse_session_cookie(headers):
264 """
265 Returns our session cookie value (aka token) or None
266 """
267 if "cookie" not in headers:
268 return None
270 # parse the cookie
271 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-sesh")
273 if not cookie:
274 return None
276 return cookie.value
279def parse_user_id_cookie(headers):
280 """
281 Returns our session cookie value (aka token) or None
282 """
283 if "cookie" not in headers:
284 return None
286 # parse the cookie
287 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-user-id")
289 if not cookie:
290 return None
292 return cookie.value
295def parse_ui_lang_cookie(headers):
296 """
297 Returns language cookie or None
298 """
299 if "cookie" not in headers:
300 return None
302 # else parse the cookie & return its value
303 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("NEXT_LOCALE")
305 if not cookie:
306 return None
308 return cookie.value
311def parse_api_key(headers):
312 """
313 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present
314 """
315 if "authorization" not in headers:
316 return None
318 authorization = headers["authorization"]
319 if not authorization.startswith("Bearer "):
320 return None
322 return authorization[7:]
325def remove_duplicates_retain_order(list_):
326 out = []
327 for item in list_:
328 if item not in out:
329 out.append(item)
330 return out
333def date_in_timezone(date_, timezone):
334 """
335 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the
336 start of that date in that timezone. E.g. if postgres is in 'America/New_York',
338 SET SESSION TIME ZONE 'America/New_York';
340 CREATE TABLE tz_trouble (to_date date, timezone text);
342 INSERT INTO tz_trouble(to_date, timezone) VALUES
343 ('2021-03-10'::date, 'Australia/Sydney'),
344 ('2021-03-20'::date, 'Europe/Berlin'),
345 ('2021-04-15'::date, 'America/New_York');
347 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble;
349 The result is:
351 timezone
352 ------------------------
353 2021-03-09 08:00:00-05
354 2021-03-19 19:00:00-04
355 2021-04-15 00:00:00-04
356 """
357 return func.timezone(timezone, cast(date_, DateTime(timezone=False)))
360def millis_from_dt(dt):
361 return round(1000 * dt.timestamp())
364def dt_from_millis(millis):
365 return datetime.fromtimestamp(millis / 1000, tz=utc)
368def dt_to_page_token(dt):
369 """
370 Python has datetime resolution equal to 1 micro, as does postgres
372 We pray to deities that this never changes
373 """
374 assert datetime.resolution == timedelta(microseconds=1)
375 return encrypt_page_token(str(round(1_000_000 * dt.timestamp())))
378def dt_from_page_token(page_token):
379 # see above comment
380 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc)
383def last_active_coarsen(dt):
384 """
385 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC
386 """
387 return dt.replace(minute=0, second=0, microsecond=0)
390def get_tz_as_text(tz_name):
391 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z")
394def make_user_context(user_id):
395 return SimpleNamespace(user_id=user_id)
398def make_logged_out_context():
399 return SimpleNamespace(user_id=0)