Coverage for src/couchers/utils.py: 95%

128 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import http.cookies 

2import re 

3from datetime import date, datetime, timedelta 

4from email.utils import formatdate 

5from zoneinfo import ZoneInfo 

6 

7import pytz 

8from geoalchemy2.shape import from_shape, to_shape 

9from geoalchemy2.types import Geography, Geometry 

10from google.protobuf.duration_pb2 import Duration 

11from google.protobuf.timestamp_pb2 import Timestamp 

12from shapely.geometry import Point, Polygon, shape 

13from sqlalchemy.sql import cast, func 

14from sqlalchemy.types import DateTime 

15 

16from couchers.config import config 

17from couchers.constants import EMAIL_REGEX 

18from couchers.crypto import decrypt_page_token, encrypt_page_token 

19 

20utc = pytz.UTC 

21 

22 

23# When a user logs in, they can basically input one of three things: user id, username, or email 

24# These are three non-intersecting sets 

25# * user_ids are numeric representations in base 10 

26# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, and don't start or end with underscore 

27# * emails are just whatever stack overflow says emails are ;) 

28 

29 

30def is_valid_user_id(field): 

31 """ 

32 Checks if it's a string representing a base 10 integer not starting with 0 

33 """ 

34 return re.match(r"[1-9][0-9]*$", field) is not None 

35 

36 

37def is_valid_username(field): 

38 """ 

39 Checks if it's an alphanumeric + underscore, lowercase string, at least 

40 two characters long, and starts with a letter, ends with alphanumeric 

41 """ 

42 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

43 

44 

45def is_valid_name(field): 

46 """ 

47 Checks if it has at least one non-whitespace character 

48 """ 

49 return re.match(r"\S+", field) is not None 

50 

51 

52def is_valid_email(field): 

53 return re.match(EMAIL_REGEX, field) is not None 

54 

55 

56def Timestamp_from_datetime(dt: datetime): 

57 pb_ts = Timestamp() 

58 pb_ts.FromDatetime(dt) 

59 return pb_ts 

60 

61 

62def Duration_from_timedelta(dt: datetime): 

63 pb_d = Duration() 

64 pb_d.FromTimedelta(dt) 

65 return pb_d 

66 

67 

68def parse_date(date_str: str): 

69 """ 

70 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

71 """ 

72 try: 

73 return date.fromisoformat(date_str) 

74 except ValueError: 

75 return None 

76 

77 

78def date_to_api(date: date): 

79 return date.isoformat() 

80 

81 

82def to_aware_datetime(ts: Timestamp): 

83 """ 

84 Turns a protobuf Timestamp object into a timezone-aware datetime 

85 """ 

86 return utc.localize(ts.ToDatetime()) 

87 

88 

89def now(): 

90 return datetime.now(utc) 

91 

92 

93def minimum_allowed_birthdate(): 

94 """ 

95 Most recent birthdate allowed to register (must be 18 years minimum) 

96 

97 This approximation works on leap days! 

98 """ 

99 return today() - timedelta(days=365.25 * 18) 

100 

101 

102def today(): 

103 """ 

104 Date only in UTC 

105 """ 

106 return now().date() 

107 

108 

109def now_in_timezone(tz): 

110 """ 

111 tz should be tzdata identifier, e.g. America/New_York 

112 """ 

113 return datetime.now(pytz.timezone(tz)) 

114 

115 

116def today_in_timezone(tz): 

117 """ 

118 tz should be tzdata identifier, e.g. America/New_York 

119 """ 

120 return now_in_timezone(tz).date() 

121 

122 

123# Note: be very careful with ordering of lat/lng! 

124# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

125# When entering as EPSG4326, we also need it in (lng, lat) 

126 

127 

128def create_coordinate(lat, lng): 

129 """ 

130 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

131 """ 

132 wkb_point = from_shape(Point(lng, lat), srid=4326) 

133 

134 # Casting to Geography and back here to ensure coordinate wrapping 

135 return cast( 

136 cast(wkb_point, Geography(geometry_type="POINT", srid=4326)), Geometry(geometry_type="POINT", srid=4326) 

137 ) 

138 

139 

140def create_polygon_lat_lng(points): 

141 """ 

142 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

143 """ 

144 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

145 

146 

147def create_polygon_lng_lat(points): 

148 """ 

149 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

150 """ 

151 return from_shape(Polygon(points), srid=4326) 

152 

153 

154def geojson_to_geom(geojson): 

155 """ 

156 Turns GeoJSON to PostGIS geom data in EPSG4326 

157 """ 

158 return from_shape(shape(geojson), srid=4326) 

159 

160 

161def to_multi(polygon): 

162 return func.ST_Multi(polygon) 

163 

164 

165def get_coordinates(geom): 

166 """ 

167 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy 

168 """ 

169 if geom: 

170 shp = to_shape(geom) 

171 # note the funiness with 4326 normally being (x, y) = (lng, lat) 

172 return (shp.y, shp.x) 

173 else: 

174 return None 

175 

176 

177def http_date(dt=None): 

178 """ 

179 Format the datetime for HTTP cookies 

180 """ 

181 if not dt: 

182 dt = now() 

183 return formatdate(dt.timestamp(), usegmt=True) 

184 

185 

186def _create_tasty_cookie(name: str, value, expiry: datetime, httponly: bool): 

187 cookie = http.cookies.Morsel() 

188 cookie.set(name, str(value), str(value)) 

189 # tell the browser when to stop sending the cookie 

190 cookie["expires"] = http_date(expiry) 

191 # restrict to our domain, note if there's no domain, it won't include subdomains 

192 cookie["domain"] = config["COOKIE_DOMAIN"] 

193 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

194 cookie["path"] = "/" 

195 if config["DEV"]: 

196 # send only on requests from first-party domains 

197 cookie["samesite"] = "Strict" 

198 else: 

199 # send on all requests, requires Secure 

200 cookie["samesite"] = "None" 

201 # only set cookie on HTTPS sites in production 

202 cookie["secure"] = True 

203 # not accessible from javascript 

204 cookie["httponly"] = httponly 

205 

206 return cookie.OutputString() 

207 

208 

209def create_session_cookies(token, user_id, expiry) -> list[str]: 

210 """ 

211 Creates our session cookies. 

212 

213 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out 

214 """ 

215 return [ 

216 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True), 

217 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False), 

218 ] 

219 

220 

221def parse_session_cookie(headers): 

222 """ 

223 Returns our session cookie value (aka token) or None 

224 """ 

225 if "cookie" not in headers: 

226 return None 

227 

228 # parse the cookie 

229 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-sesh") 

230 

231 if not cookie: 

232 return None 

233 

234 return cookie.value 

235 

236 

237def parse_user_id_cookie(headers): 

238 """ 

239 Returns our session cookie value (aka token) or None 

240 """ 

241 if "cookie" not in headers: 

242 return None 

243 

244 # parse the cookie 

245 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-user-id") 

246 

247 if not cookie: 

248 return None 

249 

250 return cookie.value 

251 

252 

253def parse_api_key(headers): 

254 """ 

255 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

256 """ 

257 if "authorization" not in headers: 

258 return None 

259 

260 authorization = headers["authorization"] 

261 if not authorization.startswith("Bearer "): 

262 return None 

263 

264 return authorization[7:] 

265 

266 

267def remove_duplicates_retain_order(list_): 

268 out = [] 

269 for item in list_: 

270 if item not in out: 

271 out.append(item) 

272 return out 

273 

274 

275def date_in_timezone(date_, timezone): 

276 """ 

277 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

278 start of that date in that timezone. E.g. if postgres is in 'America/New_York', 

279 

280 SET SESSION TIME ZONE 'America/New_York'; 

281 

282 CREATE TABLE tz_trouble (to_date date, timezone text); 

283 

284 INSERT INTO tz_trouble(to_date, timezone) VALUES 

285 ('2021-03-10'::date, 'Australia/Sydney'), 

286 ('2021-03-20'::date, 'Europe/Berlin'), 

287 ('2021-04-15'::date, 'America/New_York'); 

288 

289 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

290 

291 The result is: 

292 

293 timezone 

294 ------------------------ 

295 2021-03-09 08:00:00-05 

296 2021-03-19 19:00:00-04 

297 2021-04-15 00:00:00-04 

298 """ 

299 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

300 

301 

302def millis_from_dt(dt): 

303 return round(1000 * dt.timestamp()) 

304 

305 

306def dt_from_millis(millis): 

307 return datetime.fromtimestamp(millis / 1000, tz=utc) 

308 

309 

310def dt_to_page_token(dt): 

311 """ 

312 Python has datetime resolution equal to 1 micro, as does postgres 

313 

314 We pray to deities that this never changes 

315 """ 

316 assert datetime.resolution == timedelta(microseconds=1) 

317 return encrypt_page_token(str(round(1_000_000 * dt.timestamp()))) 

318 

319 

320def dt_from_page_token(page_token): 

321 # see above comment 

322 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc) 

323 

324 

325def last_active_coarsen(dt): 

326 """ 

327 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

328 """ 

329 return dt.replace(minute=0, second=0, microsecond=0) 

330 

331 

332def get_tz_as_text(tz_name): 

333 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z")