Coverage for src/couchers/utils.py: 95%

111 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 17:19 +0000

1import http.cookies 

2import re 

3from datetime import date, datetime, timedelta 

4from email.utils import formatdate 

5from zoneinfo import ZoneInfo 

6 

7import pytz 

8from geoalchemy2.shape import from_shape, to_shape 

9from geoalchemy2.types import Geography, Geometry 

10from google.protobuf.duration_pb2 import Duration 

11from google.protobuf.timestamp_pb2 import Timestamp 

12from shapely.geometry import Point, Polygon, shape 

13from sqlalchemy.sql import cast, func 

14from sqlalchemy.types import DateTime 

15 

16from couchers.config import config 

17from couchers.constants import EMAIL_REGEX 

18 

19utc = pytz.UTC 

20 

21 

22# When a user logs in, they can basically input one of three things: user id, username, or email 

23# These are three non-intersecting sets 

24# * user_ids are numeric representations in base 10 

25# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, and don't start or end with underscore 

26# * emails are just whatever stack overflow says emails are ;) 

27 

28 

29def is_valid_user_id(field): 

30 """ 

31 Checks if it's a string representing a base 10 integer not starting with 0 

32 """ 

33 return re.match(r"[1-9][0-9]*$", field) is not None 

34 

35 

36def is_valid_username(field): 

37 """ 

38 Checks if it's an alphanumeric + underscore, lowercase string, at least 

39 two characters long, and starts with a letter, ends with alphanumeric 

40 """ 

41 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

42 

43 

44def is_valid_name(field): 

45 """ 

46 Checks if it has at least one non-whitespace character 

47 """ 

48 return re.match(r"\S+", field) is not None 

49 

50 

51def is_valid_email(field): 

52 return re.match(EMAIL_REGEX, field) is not None 

53 

54 

55def Timestamp_from_datetime(dt: datetime): 

56 pb_ts = Timestamp() 

57 pb_ts.FromDatetime(dt) 

58 return pb_ts 

59 

60 

61def Duration_from_timedelta(dt: datetime): 

62 pb_d = Duration() 

63 pb_d.FromTimedelta(dt) 

64 return pb_d 

65 

66 

67def parse_date(date_str: str): 

68 """ 

69 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

70 """ 

71 try: 

72 return date.fromisoformat(date_str) 

73 except ValueError: 

74 return None 

75 

76 

77def date_to_api(date: date): 

78 return date.isoformat() 

79 

80 

81def to_aware_datetime(ts: Timestamp): 

82 """ 

83 Turns a protobuf Timestamp object into a timezone-aware datetime 

84 """ 

85 return utc.localize(ts.ToDatetime()) 

86 

87 

88def now(): 

89 return datetime.now(utc) 

90 

91 

92def minimum_allowed_birthdate(): 

93 """ 

94 Most recent birthdate allowed to register (must be 18 years minimum) 

95 

96 This approximation works on leap days! 

97 """ 

98 return today() - timedelta(days=365.25 * 18) 

99 

100 

101def today(): 

102 """ 

103 Date only in UTC 

104 """ 

105 return now().date() 

106 

107 

108def now_in_timezone(tz): 

109 """ 

110 tz should be tzdata identifier, e.g. America/New_York 

111 """ 

112 return datetime.now(pytz.timezone(tz)) 

113 

114 

115def today_in_timezone(tz): 

116 """ 

117 tz should be tzdata identifier, e.g. America/New_York 

118 """ 

119 return now_in_timezone(tz).date() 

120 

121 

122# Note: be very careful with ordering of lat/lng! 

123# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

124# When entering as EPSG4326, we also need it in (lng, lat) 

125 

126 

127def create_coordinate(lat, lng): 

128 """ 

129 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

130 """ 

131 wkb_point = from_shape(Point(lng, lat), srid=4326) 

132 

133 # Casting to Geography and back here to ensure coordinate wrapping 

134 return cast( 

135 cast(wkb_point, Geography(geometry_type="POINT", srid=4326)), Geometry(geometry_type="POINT", srid=4326) 

136 ) 

137 

138 

139def create_polygon_lat_lng(points): 

140 """ 

141 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

142 """ 

143 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

144 

145 

146def create_polygon_lng_lat(points): 

147 """ 

148 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

149 """ 

150 return from_shape(Polygon(points), srid=4326) 

151 

152 

153def geojson_to_geom(geojson): 

154 """ 

155 Turns GeoJSON to PostGIS geom data in EPSG4326 

156 """ 

157 return from_shape(shape(geojson), srid=4326) 

158 

159 

160def to_multi(polygon): 

161 return func.ST_Multi(polygon) 

162 

163 

164def get_coordinates(geom): 

165 """ 

166 Returns EPSG4326 (lat, lng) pair for a given WKT geom point 

167 """ 

168 shp = to_shape(geom) 

169 # note the funiness with 4326 normally being (x, y) = (lng, lat) 

170 return (shp.y, shp.x) 

171 

172 

173def http_date(dt=None): 

174 """ 

175 Format the datetime for HTTP cookies 

176 """ 

177 if not dt: 

178 dt = now() 

179 return formatdate(dt.timestamp(), usegmt=True) 

180 

181 

182def create_session_cookie(token, expiry): 

183 cookie = http.cookies.Morsel() 

184 cookie.set("couchers-sesh", token, token) 

185 # tell the browser when to stop sending the cookie 

186 cookie["expires"] = http_date(expiry) 

187 # restrict to our domain, note if there's no domain, it won't include subdomains 

188 cookie["domain"] = config["COOKIE_DOMAIN"] 

189 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

190 cookie["path"] = "/" 

191 if config["DEV"]: 

192 # send only on requests from first-party domains 

193 cookie["samesite"] = "Strict" 

194 else: 

195 # send on all requests, requires Secure 

196 cookie["samesite"] = "None" 

197 # only set cookie on HTTPS sites in production 

198 cookie["secure"] = True 

199 # not accessible from javascript 

200 cookie["httponly"] = True 

201 

202 return cookie.OutputString() 

203 

204 

205def parse_session_cookie(headers): 

206 """ 

207 Returns our session cookie value (aka token) or None 

208 """ 

209 if "cookie" not in headers: 

210 return None 

211 

212 # parse the cookie 

213 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-sesh") 

214 

215 if not cookie: 

216 return None 

217 

218 return cookie.value 

219 

220 

221def parse_api_key(headers): 

222 """ 

223 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

224 """ 

225 if "authorization" not in headers: 

226 return None 

227 

228 authorization = headers["authorization"] 

229 if not authorization.startswith("Bearer "): 

230 return None 

231 

232 return authorization[7:] 

233 

234 

235def remove_duplicates_retain_order(list_): 

236 out = [] 

237 for item in list_: 

238 if item not in out: 

239 out.append(item) 

240 return out 

241 

242 

243def date_in_timezone(date_, timezone): 

244 """ 

245 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

246 start of that date in that timezone. E.g. if postgres is in 'America/New_York', 

247 

248 SET SESSION TIME ZONE 'America/New_York'; 

249 

250 CREATE TABLE tz_trouble (to_date date, timezone text); 

251 

252 INSERT INTO tz_trouble(to_date, timezone) VALUES 

253 ('2021-03-10'::date, 'Australia/Sydney'), 

254 ('2021-03-20'::date, 'Europe/Berlin'), 

255 ('2021-04-15'::date, 'America/New_York'); 

256 

257 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

258 

259 The result is: 

260 

261 timezone 

262 ------------------------ 

263 2021-03-09 08:00:00-05 

264 2021-03-19 19:00:00-04 

265 2021-04-15 00:00:00-04 

266 """ 

267 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

268 

269 

270def millis_from_dt(dt): 

271 return round(1000 * dt.timestamp()) 

272 

273 

274def dt_from_millis(millis): 

275 return datetime.fromtimestamp(millis / 1000, tz=utc) 

276 

277 

278def last_active_coarsen(dt): 

279 """ 

280 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

281 """ 

282 return dt.replace(minute=0, second=0, microsecond=0) 

283 

284 

285def get_tz_as_text(tz_name): 

286 return datetime.now(tz=ZoneInfo(tz_name)).strftime("%Z/UTC%z")