Coverage for src/couchers/utils.py: 94%

137 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-24 14:08 +0000

1import http.cookies 

2import re 

3from datetime import date, datetime, timedelta 

4from email.utils import formatdate 

5from zoneinfo import ZoneInfo 

6 

7import pytz 

8from geoalchemy2.shape import from_shape, to_shape 

9from geoalchemy2.types import Geography, Geometry 

10from google.protobuf.duration_pb2 import Duration 

11from google.protobuf.timestamp_pb2 import Timestamp 

12from shapely.geometry import Point, Polygon, shape 

13from sqlalchemy.sql import cast, func 

14from sqlalchemy.types import DateTime 

15 

16from couchers.config import config 

17from couchers.constants import EMAIL_REGEX, PREFERRED_LANGUAGE_COOKIE_EXPIRY 

18from couchers.crypto import decrypt_page_token, encrypt_page_token 

19 

20utc = pytz.UTC 

21 

22 

23# When a user logs in, they can basically input one of three things: user id, username, or email 

24# These are three non-intersecting sets 

25# * user_ids are numeric representations in base 10 

26# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, and don't start or end with underscore 

27# * emails are just whatever stack overflow says emails are ;) 

28 

29 

30def is_valid_user_id(field): 

31 """ 

32 Checks if it's a string representing a base 10 integer not starting with 0 

33 """ 

34 return re.match(r"[1-9][0-9]*$", field) is not None 

35 

36 

37def is_valid_username(field): 

38 """ 

39 Checks if it's an alphanumeric + underscore, lowercase string, at least 

40 two characters long, and starts with a letter, ends with alphanumeric 

41 """ 

42 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

43 

44 

45def is_valid_name(field): 

46 """ 

47 Checks if it has at least one non-whitespace character 

48 """ 

49 return re.match(r"\S+", field) is not None 

50 

51 

52def is_valid_email(field): 

53 return re.match(EMAIL_REGEX, field) is not None 

54 

55 

56def Timestamp_from_datetime(dt: datetime): 

57 pb_ts = Timestamp() 

58 pb_ts.FromDatetime(dt) 

59 return pb_ts 

60 

61 

62def Duration_from_timedelta(dt: datetime): 

63 pb_d = Duration() 

64 pb_d.FromTimedelta(dt) 

65 return pb_d 

66 

67 

68def parse_date(date_str: str): 

69 """ 

70 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

71 """ 

72 try: 

73 return date.fromisoformat(date_str) 

74 except ValueError: 

75 return None 

76 

77 

78def date_to_api(date: date): 

79 return date.isoformat() 

80 

81 

82def to_aware_datetime(ts: Timestamp): 

83 """ 

84 Turns a protobuf Timestamp object into a timezone-aware datetime 

85 """ 

86 return utc.localize(ts.ToDatetime()) 

87 

88 

89def now(): 

90 return datetime.now(utc) 

91 

92 

93def minimum_allowed_birthdate(): 

94 """ 

95 Most recent birthdate allowed to register (must be 18 years minimum) 

96 

97 This approximation works on leap days! 

98 """ 

99 return today() - timedelta(days=365.25 * 18) 

100 

101 

102def today(): 

103 """ 

104 Date only in UTC 

105 """ 

106 return now().date() 

107 

108 

109def now_in_timezone(tz): 

110 """ 

111 tz should be tzdata identifier, e.g. America/New_York 

112 """ 

113 return datetime.now(pytz.timezone(tz)) 

114 

115 

116def today_in_timezone(tz): 

117 """ 

118 tz should be tzdata identifier, e.g. America/New_York 

119 """ 

120 return now_in_timezone(tz).date() 

121 

122 

123# Note: be very careful with ordering of lat/lng! 

124# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

125# When entering as EPSG4326, we also need it in (lng, lat) 

126 

127 

128def create_coordinate(lat, lng): 

129 """ 

130 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

131 """ 

132 wkb_point = from_shape(Point(lng, lat), srid=4326) 

133 

134 # Casting to Geography and back here to ensure coordinate wrapping 

135 return cast( 

136 cast(wkb_point, Geography(geometry_type="POINT", srid=4326)), Geometry(geometry_type="POINT", srid=4326) 

137 ) 

138 

139 

140def create_polygon_lat_lng(points): 

141 """ 

142 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

143 """ 

144 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

145 

146 

147def create_polygon_lng_lat(points): 

148 """ 

149 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

150 """ 

151 return from_shape(Polygon(points), srid=4326) 

152 

153 

154def geojson_to_geom(geojson): 

155 """ 

156 Turns GeoJSON to PostGIS geom data in EPSG4326 

157 """ 

158 return from_shape(shape(geojson), srid=4326) 

159 

160 

161def to_multi(polygon): 

162 return func.ST_Multi(polygon) 

163 

164 

165def get_coordinates(geom): 

166 """ 

167 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy 

168 """ 

169 if geom: 

170 shp = to_shape(geom) 

171 # note the funiness with 4326 normally being (x, y) = (lng, lat) 

172 return (shp.y, shp.x) 

173 else: 

174 return None 

175 

176 

177def http_date(dt=None): 

178 """ 

179 Format the datetime for HTTP cookies 

180 """ 

181 if not dt: 

182 dt = now() 

183 return formatdate(dt.timestamp(), usegmt=True) 

184 

185 

186def _create_tasty_cookie(name: str, value, expiry: datetime, httponly: bool): 

187 cookie = http.cookies.Morsel() 

188 cookie.set(name, str(value), str(value)) 

189 # tell the browser when to stop sending the cookie 

190 cookie["expires"] = http_date(expiry) 

191 # restrict to our domain, note if there's no domain, it won't include subdomains 

192 cookie["domain"] = config["COOKIE_DOMAIN"] 

193 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

194 cookie["path"] = "/" 

195 if config["DEV"]: 

196 # send only on requests from first-party domains 

197 cookie["samesite"] = "Strict" 

198 else: 

199 # send on all requests, requires Secure 

200 cookie["samesite"] = "None" 

201 # only set cookie on HTTPS sites in production 

202 cookie["secure"] = True 

203 # not accessible from javascript 

204 cookie["httponly"] = httponly 

205 

206 return cookie.OutputString() 

207 

208 

209def create_session_cookies(token, user_id, expiry) -> list[str]: 

210 """ 

211 Creates our session cookies. 

212 

213 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out 

214 """ 

215 return [ 

216 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True), 

217 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False), 

218 ] 

219 

220 

221def create_lang_cookie(lang): 

222 return [ 

223 _create_tasty_cookie( 

224 "couchers-preferred-language", lang, expiry=(now() + PREFERRED_LANGUAGE_COOKIE_EXPIRY), httponly=False 

225 ) 

226 ] 

227 

228 

229def parse_session_cookie(headers): 

230 """ 

231 Returns our session cookie value (aka token) or None 

232 """ 

233 if "cookie" not in headers: 

234 return None 

235 

236 # parse the cookie 

237 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-sesh") 

238 

239 if not cookie: 

240 return None 

241 

242 return cookie.value 

243 

244 

245def parse_user_id_cookie(headers): 

246 """ 

247 Returns our session cookie value (aka token) or None 

248 """ 

249 if "cookie" not in headers: 

250 return None 

251 

252 # parse the cookie 

253 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-user-id") 

254 

255 if not cookie: 

256 return None 

257 

258 return cookie.value 

259 

260 

261def parse_ui_lang_cookie(headers): 

262 """ 

263 Returns language cookie or None 

264 """ 

265 if "cookie" not in headers: 

266 return None 

267 

268 # else parse the cookie & return its value 

269 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-preferred-language") 

270 

271 if not cookie: 

272 return None 

273 

274 return cookie.value 

275 

276 

277def parse_api_key(headers): 

278 """ 

279 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

280 """ 

281 if "authorization" not in headers: 

282 return None 

283 

284 authorization = headers["authorization"] 

285 if not authorization.startswith("Bearer "): 

286 return None 

287 

288 return authorization[7:] 

289 

290 

291def remove_duplicates_retain_order(list_): 

292 out = [] 

293 for item in list_: 

294 if item not in out: 

295 out.append(item) 

296 return out 

297 

298 

299def date_in_timezone(date_, timezone): 

300 """ 

301 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

302 start of that date in that timezone. E.g. if postgres is in 'America/New_York', 

303 

304 SET SESSION TIME ZONE 'America/New_York'; 

305 

306 CREATE TABLE tz_trouble (to_date date, timezone text); 

307 

308 INSERT INTO tz_trouble(to_date, timezone) VALUES 

309 ('2021-03-10'::date, 'Australia/Sydney'), 

310 ('2021-03-20'::date, 'Europe/Berlin'), 

311 ('2021-04-15'::date, 'America/New_York'); 

312 

313 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

314 

315 The result is: 

316 

317 timezone 

318 ------------------------ 

319 2021-03-09 08:00:00-05 

320 2021-03-19 19:00:00-04 

321 2021-04-15 00:00:00-04 

322 """ 

323 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

324 

325 

326def millis_from_dt(dt): 

327 return round(1000 * dt.timestamp()) 

328 

329 

330def dt_from_millis(millis): 

331 return datetime.fromtimestamp(millis / 1000, tz=utc) 

332 

333 

334def dt_to_page_token(dt): 

335 """ 

336 Python has datetime resolution equal to 1 micro, as does postgres 

337 

338 We pray to deities that this never changes 

339 """ 

340 assert datetime.resolution == timedelta(microseconds=1) 

341 return encrypt_page_token(str(round(1_000_000 * dt.timestamp()))) 

342 

343 

344def dt_from_page_token(page_token): 

345 # see above comment 

346 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc) 

347 

348 

349def last_active_coarsen(dt): 

350 """ 

351 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

352 """ 

353 return dt.replace(minute=0, second=0, microsecond=0) 

354 

355 

356def get_tz_as_text(tz_name): 

357 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z")