Coverage for src/couchers/utils.py: 94%

127 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-10-15 13:03 +0000

1import http.cookies 

2import re 

3from datetime import date, datetime, timedelta 

4from email.utils import formatdate 

5from typing import List 

6from zoneinfo import ZoneInfo 

7 

8import pytz 

9from geoalchemy2.shape import from_shape, to_shape 

10from geoalchemy2.types import Geography, Geometry 

11from google.protobuf.duration_pb2 import Duration 

12from google.protobuf.timestamp_pb2 import Timestamp 

13from shapely.geometry import Point, Polygon, shape 

14from sqlalchemy.sql import cast, func 

15from sqlalchemy.types import DateTime 

16 

17from couchers.config import config 

18from couchers.constants import EMAIL_REGEX 

19from couchers.crypto import decrypt_page_token, encrypt_page_token 

20 

21utc = pytz.UTC 

22 

23 

24# When a user logs in, they can basically input one of three things: user id, username, or email 

25# These are three non-intersecting sets 

26# * user_ids are numeric representations in base 10 

27# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, and don't start or end with underscore 

28# * emails are just whatever stack overflow says emails are ;) 

29 

30 

31def is_valid_user_id(field): 

32 """ 

33 Checks if it's a string representing a base 10 integer not starting with 0 

34 """ 

35 return re.match(r"[1-9][0-9]*$", field) is not None 

36 

37 

38def is_valid_username(field): 

39 """ 

40 Checks if it's an alphanumeric + underscore, lowercase string, at least 

41 two characters long, and starts with a letter, ends with alphanumeric 

42 """ 

43 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

44 

45 

46def is_valid_name(field): 

47 """ 

48 Checks if it has at least one non-whitespace character 

49 """ 

50 return re.match(r"\S+", field) is not None 

51 

52 

53def is_valid_email(field): 

54 return re.match(EMAIL_REGEX, field) is not None 

55 

56 

57def Timestamp_from_datetime(dt: datetime): 

58 pb_ts = Timestamp() 

59 pb_ts.FromDatetime(dt) 

60 return pb_ts 

61 

62 

63def Duration_from_timedelta(dt: datetime): 

64 pb_d = Duration() 

65 pb_d.FromTimedelta(dt) 

66 return pb_d 

67 

68 

69def parse_date(date_str: str): 

70 """ 

71 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

72 """ 

73 try: 

74 return date.fromisoformat(date_str) 

75 except ValueError: 

76 return None 

77 

78 

79def date_to_api(date: date): 

80 return date.isoformat() 

81 

82 

83def to_aware_datetime(ts: Timestamp): 

84 """ 

85 Turns a protobuf Timestamp object into a timezone-aware datetime 

86 """ 

87 return utc.localize(ts.ToDatetime()) 

88 

89 

90def now(): 

91 return datetime.now(utc) 

92 

93 

94def minimum_allowed_birthdate(): 

95 """ 

96 Most recent birthdate allowed to register (must be 18 years minimum) 

97 

98 This approximation works on leap days! 

99 """ 

100 return today() - timedelta(days=365.25 * 18) 

101 

102 

103def today(): 

104 """ 

105 Date only in UTC 

106 """ 

107 return now().date() 

108 

109 

110def now_in_timezone(tz): 

111 """ 

112 tz should be tzdata identifier, e.g. America/New_York 

113 """ 

114 return datetime.now(pytz.timezone(tz)) 

115 

116 

117def today_in_timezone(tz): 

118 """ 

119 tz should be tzdata identifier, e.g. America/New_York 

120 """ 

121 return now_in_timezone(tz).date() 

122 

123 

124# Note: be very careful with ordering of lat/lng! 

125# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

126# When entering as EPSG4326, we also need it in (lng, lat) 

127 

128 

129def create_coordinate(lat, lng): 

130 """ 

131 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

132 """ 

133 wkb_point = from_shape(Point(lng, lat), srid=4326) 

134 

135 # Casting to Geography and back here to ensure coordinate wrapping 

136 return cast( 

137 cast(wkb_point, Geography(geometry_type="POINT", srid=4326)), Geometry(geometry_type="POINT", srid=4326) 

138 ) 

139 

140 

141def create_polygon_lat_lng(points): 

142 """ 

143 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

144 """ 

145 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

146 

147 

148def create_polygon_lng_lat(points): 

149 """ 

150 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

151 """ 

152 return from_shape(Polygon(points), srid=4326) 

153 

154 

155def geojson_to_geom(geojson): 

156 """ 

157 Turns GeoJSON to PostGIS geom data in EPSG4326 

158 """ 

159 return from_shape(shape(geojson), srid=4326) 

160 

161 

162def to_multi(polygon): 

163 return func.ST_Multi(polygon) 

164 

165 

166def get_coordinates(geom): 

167 """ 

168 Returns EPSG4326 (lat, lng) pair for a given WKT geom point 

169 """ 

170 shp = to_shape(geom) 

171 # note the funiness with 4326 normally being (x, y) = (lng, lat) 

172 return (shp.y, shp.x) 

173 

174 

175def http_date(dt=None): 

176 """ 

177 Format the datetime for HTTP cookies 

178 """ 

179 if not dt: 

180 dt = now() 

181 return formatdate(dt.timestamp(), usegmt=True) 

182 

183 

184def _create_tasty_cookie(name: str, value, expiry: datetime, httponly: bool): 

185 cookie = http.cookies.Morsel() 

186 cookie.set(name, str(value), str(value)) 

187 # tell the browser when to stop sending the cookie 

188 cookie["expires"] = http_date(expiry) 

189 # restrict to our domain, note if there's no domain, it won't include subdomains 

190 cookie["domain"] = config["COOKIE_DOMAIN"] 

191 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

192 cookie["path"] = "/" 

193 if config["DEV"]: 

194 # send only on requests from first-party domains 

195 cookie["samesite"] = "Strict" 

196 else: 

197 # send on all requests, requires Secure 

198 cookie["samesite"] = "None" 

199 # only set cookie on HTTPS sites in production 

200 cookie["secure"] = True 

201 # not accessible from javascript 

202 cookie["httponly"] = httponly 

203 

204 return cookie.OutputString() 

205 

206 

207def create_session_cookies(token, user_id, expiry) -> List[str]: 

208 """ 

209 Creates our session cookies. 

210 

211 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out 

212 """ 

213 return [ 

214 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True), 

215 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False), 

216 ] 

217 

218 

219def parse_session_cookie(headers): 

220 """ 

221 Returns our session cookie value (aka token) or None 

222 """ 

223 if "cookie" not in headers: 

224 return None 

225 

226 # parse the cookie 

227 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-sesh") 

228 

229 if not cookie: 

230 return None 

231 

232 return cookie.value 

233 

234 

235def parse_user_id_cookie(headers): 

236 """ 

237 Returns our session cookie value (aka token) or None 

238 """ 

239 if "cookie" not in headers: 

240 return None 

241 

242 # parse the cookie 

243 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-user-id") 

244 

245 if not cookie: 

246 return None 

247 

248 return cookie.value 

249 

250 

251def parse_api_key(headers): 

252 """ 

253 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

254 """ 

255 if "authorization" not in headers: 

256 return None 

257 

258 authorization = headers["authorization"] 

259 if not authorization.startswith("Bearer "): 

260 return None 

261 

262 return authorization[7:] 

263 

264 

265def remove_duplicates_retain_order(list_): 

266 out = [] 

267 for item in list_: 

268 if item not in out: 

269 out.append(item) 

270 return out 

271 

272 

273def date_in_timezone(date_, timezone): 

274 """ 

275 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

276 start of that date in that timezone. E.g. if postgres is in 'America/New_York', 

277 

278 SET SESSION TIME ZONE 'America/New_York'; 

279 

280 CREATE TABLE tz_trouble (to_date date, timezone text); 

281 

282 INSERT INTO tz_trouble(to_date, timezone) VALUES 

283 ('2021-03-10'::date, 'Australia/Sydney'), 

284 ('2021-03-20'::date, 'Europe/Berlin'), 

285 ('2021-04-15'::date, 'America/New_York'); 

286 

287 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

288 

289 The result is: 

290 

291 timezone 

292 ------------------------ 

293 2021-03-09 08:00:00-05 

294 2021-03-19 19:00:00-04 

295 2021-04-15 00:00:00-04 

296 """ 

297 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

298 

299 

300def millis_from_dt(dt): 

301 return round(1000 * dt.timestamp()) 

302 

303 

304def dt_from_millis(millis): 

305 return datetime.fromtimestamp(millis / 1000, tz=utc) 

306 

307 

308def dt_to_page_token(dt): 

309 """ 

310 Python has datetime resolution equal to 1 micro, as does postgres 

311 

312 We pray to deities that this never changes 

313 """ 

314 assert datetime.resolution == timedelta(microseconds=1) 

315 return encrypt_page_token(str(round(1_000_000 * dt.timestamp()))) 

316 

317 

318def dt_from_page_token(page_token): 

319 # see above comment 

320 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc) 

321 

322 

323def last_active_coarsen(dt): 

324 """ 

325 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

326 """ 

327 return dt.replace(minute=0, second=0, microsecond=0) 

328 

329 

330def get_tz_as_text(tz_name): 

331 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z")