Coverage for src/couchers/materialized_views.py: 99%

90 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-08 06:07 +0000

1import logging 

2import typing 

3from collections.abc import Sequence 

4from datetime import timedelta 

5from typing import Any 

6 

7from google.protobuf import empty_pb2 

8from sqlalchemy import Column, CompoundSelect, Connection, Float, Index, Integer, MetaData, Select, Table, event 

9from sqlalchemy.sql import ( 

10 and_, 

11 case, 

12 cast, 

13 func, 

14 literal, 

15 literal_column, 

16 union_all, 

17) 

18from sqlalchemy.sql import select as sa_select 

19from sqlalchemy.sql.functions import percentile_disc 

20from sqlalchemy_utils.view import ( 

21 CreateView, 

22 DropView, 

23 create_materialized_view, 

24 create_table_from_selectable, 

25 refresh_materialized_view, 

26) 

27 

28from couchers.db import session_scope 

29from couchers.models import ( 

30 ActivenessProbe, 

31 ActivenessProbeStatus, 

32 Base, 

33 ClusterRole, 

34 ClusterSubscription, 

35 Geom, 

36 HostRequest, 

37 Message, 

38 MessageType, 

39 StrongVerificationAttempt, 

40 Upload, 

41 User, 

42) 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47def create_materialized_view_with_different_ddl( 

48 name: str, 

49 select_selectable: Select[Any] | CompoundSelect[Any], 

50 create_selectable: Select[Any] | CompoundSelect[Any], 

51 metadata: MetaData, 

52 indexes: Sequence[Index] | None = None, 

53 aliases: dict[str, str] | None = None, 

54) -> Table: 

55 """ 

56 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable 

57 

58 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

59 """ 

60 table = create_table_from_selectable( 

61 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

62 ) 

63 

64 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

65 

66 @event.listens_for(metadata, "after_create") 

67 def create_indexes(target: Any, connection: Connection, **kw: Any) -> None: 

68 for idx in table.indexes: 

69 idx.create(connection) 

70 

71 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

72 return typing.cast(Table, table) 

73 

74 

75cluster_subscription_counts_selectable = ( 

76 sa_select( 

77 ClusterSubscription.cluster_id.label("cluster_id"), 

78 func.count().label("count"), 

79 ) 

80 .select_from(ClusterSubscription) 

81 .outerjoin(User, User.id == ClusterSubscription.user_id) 

82 .where(User.is_visible) 

83 .group_by(ClusterSubscription.cluster_id) 

84) 

85 

86cluster_subscription_counts = create_materialized_view( 

87 "cluster_subscription_counts", 

88 cluster_subscription_counts_selectable, 

89 Base.metadata, 

90 [ 

91 Index( 

92 "uq_cluster_subscription_counts_cluster_id", 

93 cluster_subscription_counts_selectable.c.cluster_id, 

94 unique=True, 

95 ) 

96 ], 

97) 

98 

99 

100class ClusterSubscriptionCount(Base): 

101 __table__ = cluster_subscription_counts 

102 

103 

104cluster_admin_counts_selectable = ( 

105 sa_select( 

106 ClusterSubscription.cluster_id.label("cluster_id"), 

107 func.count().label("count"), 

108 ) 

109 .select_from(ClusterSubscription) 

110 .outerjoin(User, User.id == ClusterSubscription.user_id) 

111 .where(ClusterSubscription.role == ClusterRole.admin) 

112 .where(User.is_visible) 

113 .group_by(ClusterSubscription.cluster_id) 

114) 

115 

116cluster_admin_counts = create_materialized_view( 

117 "cluster_admin_counts", 

118 cluster_admin_counts_selectable, 

119 Base.metadata, 

120 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)], 

121) 

122 

123 

124class ClusterAdminCount(Base): 

125 __table__ = cluster_admin_counts 

126 

127 

128def make_lite_users_selectable(create: bool = False) -> Select[Any]: 

129 if create: 

130 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

131 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

132 geom_column: Any = literal_column("users.geom") 

133 else: 

134 geom_column = User.geom 

135 

136 strong_verification_subquery = ( 

137 sa_select(User.id, literal(True).label("true")) 

138 .select_from(StrongVerificationAttempt) 

139 .where(StrongVerificationAttempt.has_strong_verification(User)) 

140 .distinct() 

141 .subquery(name="sv_subquery") 

142 ) 

143 

144 # Be sure to modify the LiteUser type if you add/remove columns! 

145 return ( 

146 sa_select( 

147 User.id.label("id"), 

148 User.username.label("username"), 

149 User.name.label("name"), 

150 User.city.label("city"), 

151 User.age.label("age"), 

152 geom_column.label("geom"), 

153 User.geom_radius.label("radius"), 

154 User.is_visible.label("is_visible"), # type: ignore[attr-defined] 

155 Upload.filename.label("avatar_filename"), 

156 User.has_completed_profile.label("has_completed_profile"), # type: ignore[attr-defined] 

157 User.has_completed_my_home.label("has_completed_my_home"), # type: ignore[attr-defined] 

158 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"), 

159 ) 

160 .select_from(User) 

161 .outerjoin(Upload, Upload.key == User.avatar_key) 

162 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id) 

163 ) 

164 

165 

166lite_users_selectable_select = make_lite_users_selectable(create=False) 

167lite_users_selectable_create = make_lite_users_selectable(create=True) 

168 

169lite_users = create_materialized_view_with_different_ddl( 

170 "lite_users", 

171 lite_users_selectable_select, 

172 lite_users_selectable_create, 

173 Base.metadata, 

174 [ 

175 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True), 

176 Index("uq_lite_users_username", lite_users_selectable_create.c.username, unique=True), 

177 Index( 

178 "ix_lite_users_id_visible", 

179 lite_users_selectable_create.c.id, 

180 postgresql_using="hash", 

181 postgresql_where=lite_users_selectable_create.c.is_visible, 

182 ), 

183 Index( 

184 "ix_lite_users_username_visible", 

185 lite_users_selectable_create.c.username, 

186 postgresql_using="hash", 

187 postgresql_where=lite_users_selectable_create.c.is_visible, 

188 ), 

189 ], 

190) 

191 

192 

193class LiteUser(Base): 

194 __table__ = lite_users 

195 

196 # to allow type annotations without affecting SQLAlchemy 

197 __allow_unmapped__ = True 

198 

199 # A subset enough to make mypy happy. Taken from "make_lite_users_selectable". 

200 id: Column[int] 

201 username: Column[str] 

202 name: Column[str] 

203 city: Column[str] 

204 age: Column[int] 

205 geom: Column[Geom] 

206 radius: Column[float] 

207 is_visible: Column[bool] 

208 avatar_filename: Column[str] 

209 has_completed_profile: Column[bool] 

210 has_completed_my_home: Column[bool] 

211 has_strong_verification: Column[bool] 

212 

213 

214def make_clustered_users_selectable(create: bool = False) -> CompoundSelect[Any]: 

215 # emits something along the lines of 

216 # WITH anon_1 AS ( 

217 # SELECT id, 

218 # geom, 

219 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

220 # FROM users 

221 # WHERE geom IS NOT NULL 

222 # ) 

223 

224 cluster_cte = ( 

225 sa_select( 

226 User.id, 

227 User.geom, 

228 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

229 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

230 ) 

231 .where(User.is_visible) 

232 .cte("clustered") 

233 ) 

234 

235 if create: 

236 centroid_geom: Any = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

237 cluster_geom: Any = literal_column("clustered.geom") 

238 else: 

239 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

240 cluster_geom = cluster_cte.c.geom 

241 

242 clustered_users = ( 

243 sa_select(centroid_geom.label("geom"), func.count().label("count")) 

244 .select_from(cluster_cte) 

245 .where(cluster_cte.c.cluster_id != None) 

246 .group_by(cluster_cte.c.cluster_id) 

247 ) 

248 

249 isolated_users = ( 

250 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

251 .select_from(cluster_cte) 

252 .where(cluster_cte.c.cluster_id == None) 

253 ) 

254 

255 return union_all(clustered_users, isolated_users) 

256 

257 

258clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

259clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

260 

261clustered_users = create_materialized_view_with_different_ddl( 

262 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

263) 

264 

265 

266class ClusteredUser(Base): 

267 __table__ = clustered_users 

268 

269 

270def float_(stmt: Any) -> Any: 

271 return func.coalesce(cast(stmt, Float), 0.0) 

272 

273 

274# this subquery gets the time that the request was sent 

275t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery() 

276# this subquery gets the time that the user responded to the request 

277s = ( 

278 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

279 .group_by(Message.conversation_id, Message.author_id) 

280 .subquery() 

281) 

282all_responses = union_all( 

283 # host request responses 

284 sa_select( 

285 HostRequest.host_user_id.label("user_id"), 

286 (s.c.time - t.c.time).label("response_time"), 

287 ) 

288 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

289 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)), 

290 # activeness probes 

291 sa_select( 

292 ActivenessProbe.user_id, 

293 ( 

294 # expired probes have a responded time for when they were marked responded 

295 case( 

296 ( 

297 ActivenessProbe.response != ActivenessProbeStatus.expired, 

298 ActivenessProbe.responded - ActivenessProbe.probe_initiated, 

299 ), 

300 else_=None, 

301 ) 

302 ).label("response_time"), 

303 ), 

304).subquery() 

305 

306user_response_rates_selectable = sa_select( 

307 all_responses.c.user_id.label("user_id"), 

308 # number of requests received 

309 func.count().label("requests"), 

310 # percentage of requests responded to 

311 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"), 

312 func.avg(all_responses.c.response_time).label("avg_response_time"), 

313 # the 33rd percentile response time 

314 percentile_disc(0.33) 

315 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

316 .label("response_time_33p"), 

317 # the 66th percentile response time 

318 percentile_disc(0.66) 

319 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

320 .label("response_time_66p"), 

321).group_by(all_responses.c.user_id) 

322 

323user_response_rates = create_materialized_view( 

324 "user_response_rates", 

325 user_response_rates_selectable, 

326 Base.metadata, 

327 [Index("uq_user_response_rates_id", user_response_rates_selectable.c.user_id, unique=True)], 

328) 

329 

330 

331class UserResponseRate(Base): 

332 __table__ = user_response_rates 

333 

334 

335def refresh_materialized_views(payload: empty_pb2.Empty) -> None: 

336 logger.info("Refreshing materialized views") 

337 with session_scope() as session: 

338 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

339 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

340 refresh_materialized_view(session, "clustered_users") 

341 refresh_materialized_view(session, "user_response_rates", concurrently=True) 

342 

343 

344def refresh_materialized_views_rapid(payload: empty_pb2.Empty) -> None: 

345 logger.info("Refreshing materialized views (rapid)") 

346 with session_scope() as session: 

347 refresh_materialized_view(session, "lite_users", concurrently=True)