Coverage for src/couchers/utils.py: 95%
128 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1import http.cookies
2import re
3from datetime import date, datetime, timedelta
4from email.utils import formatdate
5from zoneinfo import ZoneInfo
7import pytz
8from geoalchemy2.shape import from_shape, to_shape
9from geoalchemy2.types import Geography, Geometry
10from google.protobuf.duration_pb2 import Duration
11from google.protobuf.timestamp_pb2 import Timestamp
12from shapely.geometry import Point, Polygon, shape
13from sqlalchemy.sql import cast, func
14from sqlalchemy.types import DateTime
16from couchers.config import config
17from couchers.constants import EMAIL_REGEX
18from couchers.crypto import decrypt_page_token, encrypt_page_token
20utc = pytz.UTC
23# When a user logs in, they can basically input one of three things: user id, username, or email
24# These are three non-intersecting sets
25# * user_ids are numeric representations in base 10
26# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, and don't start or end with underscore
27# * emails are just whatever stack overflow says emails are ;)
30def is_valid_user_id(field):
31 """
32 Checks if it's a string representing a base 10 integer not starting with 0
33 """
34 return re.match(r"[1-9][0-9]*$", field) is not None
37def is_valid_username(field):
38 """
39 Checks if it's an alphanumeric + underscore, lowercase string, at least
40 two characters long, and starts with a letter, ends with alphanumeric
41 """
42 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None
45def is_valid_name(field):
46 """
47 Checks if it has at least one non-whitespace character
48 """
49 return re.match(r"\S+", field) is not None
52def is_valid_email(field):
53 return re.match(EMAIL_REGEX, field) is not None
56def Timestamp_from_datetime(dt: datetime):
57 pb_ts = Timestamp()
58 pb_ts.FromDatetime(dt)
59 return pb_ts
62def Duration_from_timedelta(dt: datetime):
63 pb_d = Duration()
64 pb_d.FromTimedelta(dt)
65 return pb_d
68def parse_date(date_str: str):
69 """
70 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails
71 """
72 try:
73 return date.fromisoformat(date_str)
74 except ValueError:
75 return None
78def date_to_api(date: date):
79 return date.isoformat()
82def to_aware_datetime(ts: Timestamp):
83 """
84 Turns a protobuf Timestamp object into a timezone-aware datetime
85 """
86 return utc.localize(ts.ToDatetime())
89def now():
90 return datetime.now(utc)
93def minimum_allowed_birthdate():
94 """
95 Most recent birthdate allowed to register (must be 18 years minimum)
97 This approximation works on leap days!
98 """
99 return today() - timedelta(days=365.25 * 18)
102def today():
103 """
104 Date only in UTC
105 """
106 return now().date()
109def now_in_timezone(tz):
110 """
111 tz should be tzdata identifier, e.g. America/New_York
112 """
113 return datetime.now(pytz.timezone(tz))
116def today_in_timezone(tz):
117 """
118 tz should be tzdata identifier, e.g. America/New_York
119 """
120 return now_in_timezone(tz).date()
123# Note: be very careful with ordering of lat/lng!
124# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)...
125# When entering as EPSG4326, we also need it in (lng, lat)
128def create_coordinate(lat, lng):
129 """
130 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates)
131 """
132 wkb_point = from_shape(Point(lng, lat), srid=4326)
134 # Casting to Geography and back here to ensure coordinate wrapping
135 return cast(
136 cast(wkb_point, Geography(geometry_type="POINT", srid=4326)), Geometry(geometry_type="POINT", srid=4326)
137 )
140def create_polygon_lat_lng(points):
141 """
142 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples
143 """
144 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326)
147def create_polygon_lng_lat(points):
148 """
149 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples
150 """
151 return from_shape(Polygon(points), srid=4326)
154def geojson_to_geom(geojson):
155 """
156 Turns GeoJSON to PostGIS geom data in EPSG4326
157 """
158 return from_shape(shape(geojson), srid=4326)
161def to_multi(polygon):
162 return func.ST_Multi(polygon)
165def get_coordinates(geom):
166 """
167 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy
168 """
169 if geom:
170 shp = to_shape(geom)
171 # note the funiness with 4326 normally being (x, y) = (lng, lat)
172 return (shp.y, shp.x)
173 else:
174 return None
177def http_date(dt=None):
178 """
179 Format the datetime for HTTP cookies
180 """
181 if not dt:
182 dt = now()
183 return formatdate(dt.timestamp(), usegmt=True)
186def _create_tasty_cookie(name: str, value, expiry: datetime, httponly: bool):
187 cookie = http.cookies.Morsel()
188 cookie.set(name, str(value), str(value))
189 # tell the browser when to stop sending the cookie
190 cookie["expires"] = http_date(expiry)
191 # restrict to our domain, note if there's no domain, it won't include subdomains
192 cookie["domain"] = config["COOKIE_DOMAIN"]
193 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/
194 cookie["path"] = "/"
195 if config["DEV"]:
196 # send only on requests from first-party domains
197 cookie["samesite"] = "Strict"
198 else:
199 # send on all requests, requires Secure
200 cookie["samesite"] = "None"
201 # only set cookie on HTTPS sites in production
202 cookie["secure"] = True
203 # not accessible from javascript
204 cookie["httponly"] = httponly
206 return cookie.OutputString()
209def create_session_cookies(token, user_id, expiry) -> list[str]:
210 """
211 Creates our session cookies.
213 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out
214 """
215 return [
216 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True),
217 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False),
218 ]
221def parse_session_cookie(headers):
222 """
223 Returns our session cookie value (aka token) or None
224 """
225 if "cookie" not in headers:
226 return None
228 # parse the cookie
229 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-sesh")
231 if not cookie:
232 return None
234 return cookie.value
237def parse_user_id_cookie(headers):
238 """
239 Returns our session cookie value (aka token) or None
240 """
241 if "cookie" not in headers:
242 return None
244 # parse the cookie
245 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-user-id")
247 if not cookie:
248 return None
250 return cookie.value
253def parse_api_key(headers):
254 """
255 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present
256 """
257 if "authorization" not in headers:
258 return None
260 authorization = headers["authorization"]
261 if not authorization.startswith("Bearer "):
262 return None
264 return authorization[7:]
267def remove_duplicates_retain_order(list_):
268 out = []
269 for item in list_:
270 if item not in out:
271 out.append(item)
272 return out
275def date_in_timezone(date_, timezone):
276 """
277 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the
278 start of that date in that timezone. E.g. if postgres is in 'America/New_York',
280 SET SESSION TIME ZONE 'America/New_York';
282 CREATE TABLE tz_trouble (to_date date, timezone text);
284 INSERT INTO tz_trouble(to_date, timezone) VALUES
285 ('2021-03-10'::date, 'Australia/Sydney'),
286 ('2021-03-20'::date, 'Europe/Berlin'),
287 ('2021-04-15'::date, 'America/New_York');
289 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble;
291 The result is:
293 timezone
294 ------------------------
295 2021-03-09 08:00:00-05
296 2021-03-19 19:00:00-04
297 2021-04-15 00:00:00-04
298 """
299 return func.timezone(timezone, cast(date_, DateTime(timezone=False)))
302def millis_from_dt(dt):
303 return round(1000 * dt.timestamp())
306def dt_from_millis(millis):
307 return datetime.fromtimestamp(millis / 1000, tz=utc)
310def dt_to_page_token(dt):
311 """
312 Python has datetime resolution equal to 1 micro, as does postgres
314 We pray to deities that this never changes
315 """
316 assert datetime.resolution == timedelta(microseconds=1)
317 return encrypt_page_token(str(round(1_000_000 * dt.timestamp())))
320def dt_from_page_token(page_token):
321 # see above comment
322 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc)
325def last_active_coarsen(dt):
326 """
327 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC
328 """
329 return dt.replace(minute=0, second=0, microsecond=0)
332def get_tz_as_text(tz_name):
333 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z")