Coverage for src/couchers/utils.py: 95%

129 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-11-21 04:21 +0000

1import http.cookies 

2import re 

3from datetime import date, datetime, timedelta 

4from email.utils import formatdate 

5from typing import List 

6from zoneinfo import ZoneInfo 

7 

8import pytz 

9from geoalchemy2.shape import from_shape, to_shape 

10from geoalchemy2.types import Geography, Geometry 

11from google.protobuf.duration_pb2 import Duration 

12from google.protobuf.timestamp_pb2 import Timestamp 

13from shapely.geometry import Point, Polygon, shape 

14from sqlalchemy.sql import cast, func 

15from sqlalchemy.types import DateTime 

16 

17from couchers.config import config 

18from couchers.constants import EMAIL_REGEX 

19from couchers.crypto import decrypt_page_token, encrypt_page_token 

20 

21utc = pytz.UTC 

22 

23 

24# When a user logs in, they can basically input one of three things: user id, username, or email 

25# These are three non-intersecting sets 

26# * user_ids are numeric representations in base 10 

27# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, and don't start or end with underscore 

28# * emails are just whatever stack overflow says emails are ;) 

29 

30 

31def is_valid_user_id(field): 

32 """ 

33 Checks if it's a string representing a base 10 integer not starting with 0 

34 """ 

35 return re.match(r"[1-9][0-9]*$", field) is not None 

36 

37 

38def is_valid_username(field): 

39 """ 

40 Checks if it's an alphanumeric + underscore, lowercase string, at least 

41 two characters long, and starts with a letter, ends with alphanumeric 

42 """ 

43 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

44 

45 

46def is_valid_name(field): 

47 """ 

48 Checks if it has at least one non-whitespace character 

49 """ 

50 return re.match(r"\S+", field) is not None 

51 

52 

53def is_valid_email(field): 

54 return re.match(EMAIL_REGEX, field) is not None 

55 

56 

57def Timestamp_from_datetime(dt: datetime): 

58 pb_ts = Timestamp() 

59 pb_ts.FromDatetime(dt) 

60 return pb_ts 

61 

62 

63def Duration_from_timedelta(dt: datetime): 

64 pb_d = Duration() 

65 pb_d.FromTimedelta(dt) 

66 return pb_d 

67 

68 

69def parse_date(date_str: str): 

70 """ 

71 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

72 """ 

73 try: 

74 return date.fromisoformat(date_str) 

75 except ValueError: 

76 return None 

77 

78 

79def date_to_api(date: date): 

80 return date.isoformat() 

81 

82 

83def to_aware_datetime(ts: Timestamp): 

84 """ 

85 Turns a protobuf Timestamp object into a timezone-aware datetime 

86 """ 

87 return utc.localize(ts.ToDatetime()) 

88 

89 

90def now(): 

91 return datetime.now(utc) 

92 

93 

94def minimum_allowed_birthdate(): 

95 """ 

96 Most recent birthdate allowed to register (must be 18 years minimum) 

97 

98 This approximation works on leap days! 

99 """ 

100 return today() - timedelta(days=365.25 * 18) 

101 

102 

103def today(): 

104 """ 

105 Date only in UTC 

106 """ 

107 return now().date() 

108 

109 

110def now_in_timezone(tz): 

111 """ 

112 tz should be tzdata identifier, e.g. America/New_York 

113 """ 

114 return datetime.now(pytz.timezone(tz)) 

115 

116 

117def today_in_timezone(tz): 

118 """ 

119 tz should be tzdata identifier, e.g. America/New_York 

120 """ 

121 return now_in_timezone(tz).date() 

122 

123 

124# Note: be very careful with ordering of lat/lng! 

125# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

126# When entering as EPSG4326, we also need it in (lng, lat) 

127 

128 

129def create_coordinate(lat, lng): 

130 """ 

131 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

132 """ 

133 wkb_point = from_shape(Point(lng, lat), srid=4326) 

134 

135 # Casting to Geography and back here to ensure coordinate wrapping 

136 return cast( 

137 cast(wkb_point, Geography(geometry_type="POINT", srid=4326)), Geometry(geometry_type="POINT", srid=4326) 

138 ) 

139 

140 

141def create_polygon_lat_lng(points): 

142 """ 

143 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

144 """ 

145 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

146 

147 

148def create_polygon_lng_lat(points): 

149 """ 

150 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

151 """ 

152 return from_shape(Polygon(points), srid=4326) 

153 

154 

155def geojson_to_geom(geojson): 

156 """ 

157 Turns GeoJSON to PostGIS geom data in EPSG4326 

158 """ 

159 return from_shape(shape(geojson), srid=4326) 

160 

161 

162def to_multi(polygon): 

163 return func.ST_Multi(polygon) 

164 

165 

166def get_coordinates(geom): 

167 """ 

168 Returns EPSG4326 (lat, lng) pair for a given WKT geom point or None if the input is not truthy 

169 """ 

170 if geom: 

171 shp = to_shape(geom) 

172 # note the funiness with 4326 normally being (x, y) = (lng, lat) 

173 return (shp.y, shp.x) 

174 else: 

175 return None 

176 

177 

178def http_date(dt=None): 

179 """ 

180 Format the datetime for HTTP cookies 

181 """ 

182 if not dt: 

183 dt = now() 

184 return formatdate(dt.timestamp(), usegmt=True) 

185 

186 

187def _create_tasty_cookie(name: str, value, expiry: datetime, httponly: bool): 

188 cookie = http.cookies.Morsel() 

189 cookie.set(name, str(value), str(value)) 

190 # tell the browser when to stop sending the cookie 

191 cookie["expires"] = http_date(expiry) 

192 # restrict to our domain, note if there's no domain, it won't include subdomains 

193 cookie["domain"] = config["COOKIE_DOMAIN"] 

194 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

195 cookie["path"] = "/" 

196 if config["DEV"]: 

197 # send only on requests from first-party domains 

198 cookie["samesite"] = "Strict" 

199 else: 

200 # send on all requests, requires Secure 

201 cookie["samesite"] = "None" 

202 # only set cookie on HTTPS sites in production 

203 cookie["secure"] = True 

204 # not accessible from javascript 

205 cookie["httponly"] = httponly 

206 

207 return cookie.OutputString() 

208 

209 

210def create_session_cookies(token, user_id, expiry) -> List[str]: 

211 """ 

212 Creates our session cookies. 

213 

214 We have two: the secure session token (in couchers-sesh) that's inaccessible to javascript, and the user id (in couchers-user-id) which the javascript frontend can access, so that it knows when it's logged in/out 

215 """ 

216 return [ 

217 _create_tasty_cookie("couchers-sesh", token, expiry, httponly=True), 

218 _create_tasty_cookie("couchers-user-id", user_id, expiry, httponly=False), 

219 ] 

220 

221 

222def parse_session_cookie(headers): 

223 """ 

224 Returns our session cookie value (aka token) or None 

225 """ 

226 if "cookie" not in headers: 

227 return None 

228 

229 # parse the cookie 

230 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-sesh") 

231 

232 if not cookie: 

233 return None 

234 

235 return cookie.value 

236 

237 

238def parse_user_id_cookie(headers): 

239 """ 

240 Returns our session cookie value (aka token) or None 

241 """ 

242 if "cookie" not in headers: 

243 return None 

244 

245 # parse the cookie 

246 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-user-id") 

247 

248 if not cookie: 

249 return None 

250 

251 return cookie.value 

252 

253 

254def parse_api_key(headers): 

255 """ 

256 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

257 """ 

258 if "authorization" not in headers: 

259 return None 

260 

261 authorization = headers["authorization"] 

262 if not authorization.startswith("Bearer "): 

263 return None 

264 

265 return authorization[7:] 

266 

267 

268def remove_duplicates_retain_order(list_): 

269 out = [] 

270 for item in list_: 

271 if item not in out: 

272 out.append(item) 

273 return out 

274 

275 

276def date_in_timezone(date_, timezone): 

277 """ 

278 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

279 start of that date in that timezone. E.g. if postgres is in 'America/New_York', 

280 

281 SET SESSION TIME ZONE 'America/New_York'; 

282 

283 CREATE TABLE tz_trouble (to_date date, timezone text); 

284 

285 INSERT INTO tz_trouble(to_date, timezone) VALUES 

286 ('2021-03-10'::date, 'Australia/Sydney'), 

287 ('2021-03-20'::date, 'Europe/Berlin'), 

288 ('2021-04-15'::date, 'America/New_York'); 

289 

290 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

291 

292 The result is: 

293 

294 timezone 

295 ------------------------ 

296 2021-03-09 08:00:00-05 

297 2021-03-19 19:00:00-04 

298 2021-04-15 00:00:00-04 

299 """ 

300 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

301 

302 

303def millis_from_dt(dt): 

304 return round(1000 * dt.timestamp()) 

305 

306 

307def dt_from_millis(millis): 

308 return datetime.fromtimestamp(millis / 1000, tz=utc) 

309 

310 

311def dt_to_page_token(dt): 

312 """ 

313 Python has datetime resolution equal to 1 micro, as does postgres 

314 

315 We pray to deities that this never changes 

316 """ 

317 assert datetime.resolution == timedelta(microseconds=1) 

318 return encrypt_page_token(str(round(1_000_000 * dt.timestamp()))) 

319 

320 

321def dt_from_page_token(page_token): 

322 # see above comment 

323 return datetime.fromtimestamp(int(decrypt_page_token(page_token)) / 1_000_000, tz=utc) 

324 

325 

326def last_active_coarsen(dt): 

327 """ 

328 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

329 """ 

330 return dt.replace(minute=0, second=0, microsecond=0) 

331 

332 

333def get_tz_as_text(tz_name): 

334 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z")