Coverage for src/couchers/utils.py: 94%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1import http.cookies 

2import re 

3from datetime import date, datetime 

4from email.utils import formatdate 

5 

6import pytz 

7from geoalchemy2.shape import from_shape, to_shape 

8from geoalchemy2.types import Geography, Geometry 

9from google.protobuf.duration_pb2 import Duration 

10from google.protobuf.timestamp_pb2 import Timestamp 

11from shapely.geometry import Point, Polygon, shape 

12from sqlalchemy.sql import cast, func 

13from sqlalchemy.types import DateTime 

14 

15from couchers.config import config 

16from couchers.constants import EMAIL_REGEX 

17 

18utc = pytz.UTC 

19 

20 

21# When a user logs in, they can basically input one of three things: user id, username, or email 

22# These are three non-intersecting sets 

23# * user_ids are numeric representations in base 10 

24# * usernames are alphanumeric + underscores, at least 2 chars long, and don't start with a number, and don't start or end with underscore 

25# * emails are just whatever stack overflow says emails are ;) 

26 

27 

28def is_valid_user_id(field): 

29 """ 

30 Checks if it's a string representing a base 10 integer not starting with 0 

31 """ 

32 return re.match(r"[1-9][0-9]*$", field) is not None 

33 

34 

35def is_valid_username(field): 

36 """ 

37 Checks if it's an alphanumeric + underscore, lowercase string, at least 

38 two characters long, and starts with a letter, ends with alphanumeric 

39 """ 

40 return re.match(r"[a-z][0-9a-z_]*[a-z0-9]$", field) is not None 

41 

42 

43def is_valid_name(field): 

44 """ 

45 Checks if it has at least one non-whitespace character 

46 """ 

47 return re.match(r"\S+", field) is not None 

48 

49 

50def is_valid_email(field): 

51 return re.match(EMAIL_REGEX, field) is not None 

52 

53 

54def Timestamp_from_datetime(dt: datetime): 

55 pb_ts = Timestamp() 

56 pb_ts.FromDatetime(dt) 

57 return pb_ts 

58 

59 

60def Duration_from_timedelta(dt: datetime): 

61 pb_d = Duration() 

62 pb_d.FromTimedelta(dt) 

63 return pb_d 

64 

65 

66def parse_date(date_str: str): 

67 """ 

68 Parses a date-only string in the format "YYYY-MM-DD" returning None if it fails 

69 """ 

70 try: 

71 return date.fromisoformat(date_str) 

72 except ValueError: 

73 return None 

74 

75 

76def date_to_api(date: date): 

77 return date.isoformat() 

78 

79 

80def to_aware_datetime(ts: Timestamp): 

81 """ 

82 Turns a protobuf Timestamp object into a timezone-aware datetime 

83 """ 

84 return utc.localize(ts.ToDatetime()) 

85 

86 

87def now(): 

88 return datetime.now(utc) 

89 

90 

91def minimum_allowed_birthdate(): 

92 """ 

93 Most recent birthdate allowed to register (must be 18 years minimum) 

94 """ 

95 today_ = today() 

96 return today_.replace(today_.year - 18) 

97 

98 

99def today(): 

100 """ 

101 Date only in UTC 

102 """ 

103 return now().date() 

104 

105 

106def now_in_timezone(tz): 

107 """ 

108 tz should be tzdata identifier, e.g. America/New_York 

109 """ 

110 return datetime.now(pytz.timezone(tz)) 

111 

112 

113def today_in_timezone(tz): 

114 """ 

115 tz should be tzdata identifier, e.g. America/New_York 

116 """ 

117 return now_in_timezone(tz).date() 

118 

119 

120# Note: be very careful with ordering of lat/lng! 

121# In a lot of cases they come as (lng, lat), but us humans tend to use them from GPS as (lat, lng)... 

122# When entering as EPSG4326, we also need it in (lng, lat) 

123 

124 

125def create_coordinate(lat, lng): 

126 """ 

127 Creates a WKT point from a (lat, lng) tuple in EPSG4326 coordinate system (normal GPS-coordinates) 

128 """ 

129 wkb_point = from_shape(Point(lng, lat), srid=4326) 

130 

131 # Casting to Geography and back here to ensure coordinate wrapping 

132 return cast( 

133 cast(wkb_point, Geography(geometry_type="POINT", srid=4326)), Geometry(geometry_type="POINT", srid=4326) 

134 ) 

135 

136 

137def create_polygon_lat_lng(points): 

138 """ 

139 Creates a EPSG4326 WKT polygon from a list of (lat, lng) tuples 

140 """ 

141 return from_shape(Polygon([(lng, lat) for (lat, lng) in points]), srid=4326) 

142 

143 

144def create_polygon_lng_lat(points): 

145 """ 

146 Creates a EPSG4326 WKT polygon from a list of (lng, lat) tuples 

147 """ 

148 return from_shape(Polygon(points), srid=4326) 

149 

150 

151def geojson_to_geom(geojson): 

152 """ 

153 Turns GeoJSON to PostGIS geom data in EPSG4326 

154 """ 

155 return from_shape(shape(geojson), srid=4326) 

156 

157 

158def to_multi(polygon): 

159 return func.ST_Multi(polygon) 

160 

161 

162def get_coordinates(geom): 

163 """ 

164 Returns EPSG4326 (lat, lng) pair for a given WKT geom point 

165 """ 

166 shp = to_shape(geom) 

167 # note the funiness with 4326 normally being (x, y) = (lng, lat) 

168 return (shp.y, shp.x) 

169 

170 

171def http_date(dt=None): 

172 """ 

173 Format the datetime for HTTP cookies 

174 """ 

175 if not dt: 

176 dt = now() 

177 return formatdate(dt.timestamp(), usegmt=True) 

178 

179 

180def create_session_cookie(token, expiry): 

181 cookie = http.cookies.Morsel() 

182 cookie.set("couchers-sesh", token, token) 

183 # tell the browser when to stop sending the cookie 

184 cookie["expires"] = http_date(expiry) 

185 # restrict to our domain, note if there's no domain, it won't include subdomains 

186 cookie["domain"] = config["COOKIE_DOMAIN"] 

187 # path so that it's accessible for all API requests, otherwise defaults to something like /org.couchers.auth/ 

188 cookie["path"] = "/" 

189 if config["DEV"]: 

190 # send only on requests from first-party domains 

191 cookie["samesite"] = "Strict" 

192 else: 

193 # send on all requests, requires Secure 

194 cookie["samesite"] = "None" 

195 # only set cookie on HTTPS sites in production 

196 cookie["secure"] = True 

197 # not accessible from javascript 

198 cookie["httponly"] = True 

199 

200 return cookie.OutputString() 

201 

202 

203def parse_session_cookie(headers): 

204 """ 

205 Returns our session cookie value (aka token) or None 

206 """ 

207 if "cookie" not in headers: 

208 return None 

209 

210 # parse the cookie 

211 cookie = http.cookies.SimpleCookie(headers["cookie"]).get("couchers-sesh") 

212 

213 if not cookie: 

214 return None 

215 

216 return cookie.value 

217 

218 

219def parse_api_key(headers): 

220 """ 

221 Returns a bearer token (API key) from the `authorization` header, or None if invalid/not present 

222 """ 

223 if "authorization" not in headers: 

224 return None 

225 

226 authorization = headers["authorization"] 

227 if not authorization.startswith("Bearer "): 

228 return None 

229 

230 return authorization[7:] 

231 

232 

233def remove_duplicates_retain_order(list_): 

234 out = [] 

235 for item in list_: 

236 if item not in out: 

237 out.append(item) 

238 return out 

239 

240 

241def date_in_timezone(date_, timezone): 

242 """ 

243 Given a naive postgres date object (postgres doesn't have tzd dates), returns a timezone-aware timestamp for the 

244 start of that date in that timezone. E.g. if postgres is in 'America/New_York', 

245 

246 SET SESSION TIME ZONE 'America/New_York'; 

247 

248 CREATE TABLE tz_trouble (to_date date, timezone text); 

249 

250 INSERT INTO tz_trouble(to_date, timezone) VALUES 

251 ('2021-03-10'::date, 'Australia/Sydney'), 

252 ('2021-03-20'::date, 'Europe/Berlin'), 

253 ('2021-04-15'::date, 'America/New_York'); 

254 

255 SELECT timezone(timezone, to_date::timestamp) FROM tz_trouble; 

256 

257 The result is: 

258 

259 timezone 

260 ------------------------ 

261 2021-03-09 08:00:00-05 

262 2021-03-19 19:00:00-04 

263 2021-04-15 00:00:00-04 

264 """ 

265 return func.timezone(timezone, cast(date_, DateTime(timezone=False))) 

266 

267 

268def millis_from_dt(dt): 

269 return round(1000 * dt.timestamp()) 

270 

271 

272def dt_from_millis(millis): 

273 return datetime.fromtimestamp(millis / 1000, tz=utc) 

274 

275 

276def last_active_coarsen(dt): 

277 """ 

278 Coarsens a "last active" time to the accuracy we use for last active times, currently to the last hour, e.g. if the current time is 27th June 2021, 16:53 UTC, this returns 27th June 2021, 16:00 UTC 

279 """ 

280 return dt.replace(minute=0, second=0, microsecond=0)