Coverage for app / backend / src / couchers / materialized_views.py: 99%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import logging 

2import typing 

3from collections.abc import Sequence 

4from datetime import timedelta 

5from typing import Any 

6 

7from google.protobuf import empty_pb2 

8from sqlalchemy import CompoundSelect, Connection, Float, Index, Integer, MetaData, Select, Table, Text, event 

9from sqlalchemy.dialects.postgresql import JSON 

10from sqlalchemy.orm import Mapped 

11from sqlalchemy.sql import ( 

12 and_, 

13 case, 

14 cast, 

15 func, 

16 literal, 

17 literal_column, 

18 select, 

19 union_all, 

20) 

21from sqlalchemy.sql.functions import percentile_disc 

22from sqlalchemy_utils.view import ( 

23 CreateView, 

24 DropView, 

25 create_materialized_view, 

26 create_table_from_selectable, 

27 refresh_materialized_view, 

28) 

29 

30from couchers.db import session_scope 

31from couchers.helpers.completed_profile import has_completed_profile_expression 

32from couchers.models import ( 

33 ActivenessProbe, 

34 ActivenessProbeStatus, 

35 Base, 

36 ClusterRole, 

37 ClusterSubscription, 

38 Geom, 

39 HostRequest, 

40 MatViewBase, 

41 Message, 

42 MessageType, 

43 StrongVerificationAttempt, 

44 Upload, 

45 User, 

46) 

47from couchers.models.uploads import get_avatar_photo_subquery 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52def create_materialized_view_with_different_ddl( 

53 name: str, 

54 select_selectable: Select[Any] | CompoundSelect[Any], 

55 create_selectable: Select[Any] | CompoundSelect[Any], 

56 metadata: MetaData, 

57 indexes: Sequence[Index] | None = None, 

58 aliases: dict[str, str] | None = None, 

59) -> Table: 

60 """ 

61 Copied wholesale from sqlalchemy_utils (3-clause BSD), with a minor tweak in {select,create}_selectable 

62 

63 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

64 """ 

65 table = create_table_from_selectable( 

66 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

67 ) 

68 

69 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

70 

71 @event.listens_for(metadata, "after_create") 

72 def create_indexes(target: Any, connection: Connection, **kw: Any) -> None: 

73 for idx in table.indexes: 

74 idx.create(connection) 

75 

76 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

77 return typing.cast(Table, table) 

78 

79 

80cluster_subscription_counts_selectable = ( 

81 select( 

82 ClusterSubscription.cluster_id.label("cluster_id"), 

83 func.count().label("count"), 

84 ) 

85 .select_from(ClusterSubscription) 

86 .outerjoin(User, User.id == ClusterSubscription.user_id) 

87 .where(User.is_visible) 

88 .group_by(ClusterSubscription.cluster_id) 

89) 

90 

91cluster_subscription_counts = create_materialized_view( 

92 "cluster_subscription_counts", 

93 cluster_subscription_counts_selectable, 

94 Base.metadata, 

95 [ 

96 Index( 

97 "uq_cluster_subscription_counts_cluster_id", 

98 cluster_subscription_counts_selectable.subquery().c.cluster_id, 

99 unique=True, 

100 ) 

101 ], 

102) 

103 

104 

105class ClusterSubscriptionCount(MatViewBase): 

106 __table__ = cluster_subscription_counts 

107 

108 cluster_id: Mapped[int] 

109 count: Mapped[int] 

110 

111 

112cluster_admin_counts_selectable = ( 

113 select( 

114 ClusterSubscription.cluster_id.label("cluster_id"), 

115 func.count().label("count"), 

116 ) 

117 .select_from(ClusterSubscription) 

118 .outerjoin(User, User.id == ClusterSubscription.user_id) 

119 .where(ClusterSubscription.role == ClusterRole.admin) 

120 .where(User.is_visible) 

121 .group_by(ClusterSubscription.cluster_id) 

122) 

123 

124cluster_admin_counts = create_materialized_view( 

125 "cluster_admin_counts", 

126 cluster_admin_counts_selectable, 

127 Base.metadata, 

128 [ 

129 Index( 

130 "uq_cluster_admin_counts_cluster_id", 

131 cluster_admin_counts_selectable.subquery().c.cluster_id, 

132 unique=True, 

133 ) 

134 ], 

135) 

136 

137 

138class ClusterAdminCount(MatViewBase): 

139 __table__ = cluster_admin_counts 

140 

141 cluster_id: Mapped[int] 

142 count: Mapped[int] 

143 

144 

145def make_lite_users_selectable(create: bool = False) -> Select[Any]: 

146 if create: 

147 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

148 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

149 geom_column: Any = literal_column("users.geom") 

150 else: 

151 geom_column = User.geom 

152 

153 strong_verification_subquery = ( 

154 select(User.id, literal(True).label("true")) 

155 .select_from(StrongVerificationAttempt) 

156 .where(StrongVerificationAttempt.has_strong_verification(User)) 

157 .distinct() 

158 .subquery(name="sv_subquery") 

159 ) 

160 

161 avatar_photo_subquery = get_avatar_photo_subquery(name="avatar_photo") 

162 

163 # Pre-compute GeoJSON Feature for each user (used by GIS.GetUsers) 

164 geojson_feature = cast( 

165 func.json_build_object( 

166 "type", 

167 "Feature", 

168 "geometry", 

169 cast(func.ST_AsGeoJSON(geom_column, 5), JSON), 

170 "properties", 

171 func.json_build_object("id", User.id, "has_completed_profile", has_completed_profile_expression()), 

172 ), 

173 Text, 

174 ) 

175 

176 # Be sure to modify the LiteUser type if you add/remove columns! 

177 return ( 

178 select( 

179 User.id.label("id"), 

180 User.username.label("username"), 

181 User.name.label("name"), 

182 User.city.label("city"), 

183 User.age.label("age"), 

184 geom_column.label("geom"), 

185 User.geom_radius.label("radius"), 

186 User.is_visible.label("is_visible"), 

187 Upload.filename.label("avatar_filename"), 

188 has_completed_profile_expression().label("has_completed_profile"), 

189 User.has_completed_my_home.label("has_completed_my_home"), 

190 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"), 

191 geojson_feature.label("geojson"), 

192 ) 

193 .select_from(User) 

194 .outerjoin( 

195 avatar_photo_subquery, 

196 avatar_photo_subquery.c.gallery_id == User.profile_gallery_id, 

197 ) 

198 .outerjoin(Upload, Upload.key == avatar_photo_subquery.c.upload_key) 

199 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id) 

200 ) 

201 

202 

203lite_users_selectable_select = make_lite_users_selectable(create=False) 

204lite_users_selectable_create = make_lite_users_selectable(create=True) 

205 

206lite_users_subquery = lite_users_selectable_create.subquery() 

207 

208lite_users = create_materialized_view_with_different_ddl( 

209 "lite_users", 

210 lite_users_selectable_select, 

211 lite_users_selectable_create, 

212 Base.metadata, 

213 [ 

214 Index("uq_lite_users_id", lite_users_subquery.c.id, unique=True), 

215 Index("uq_lite_users_username", lite_users_subquery.c.username, unique=True), 

216 Index( 

217 "ix_lite_users_id_visible", 

218 lite_users_subquery.c.id, 

219 postgresql_using="hash", 

220 postgresql_where=lite_users_subquery.c.is_visible, 

221 ), 

222 Index( 

223 "ix_lite_users_username_visible", 

224 lite_users_subquery.c.username, 

225 postgresql_using="hash", 

226 postgresql_where=lite_users_subquery.c.is_visible, 

227 ), 

228 ], 

229) 

230 

231 

232class LiteUser(MatViewBase): 

233 __table__ = lite_users 

234 

235 # A subset enough to make mypy happy. Taken from "make_lite_users_selectable". 

236 id: Mapped[int] 

237 username: Mapped[str] 

238 name: Mapped[str] 

239 city: Mapped[str] 

240 age: Mapped[int] 

241 geom: Mapped[Geom] 

242 radius: Mapped[float] 

243 is_visible: Mapped[bool] 

244 avatar_filename: Mapped[str] 

245 has_completed_profile: Mapped[bool] 

246 has_completed_my_home: Mapped[bool] 

247 has_strong_verification: Mapped[bool] 

248 geojson: Mapped[str] 

249 

250 

251def make_clustered_users_selectable(create: bool = False) -> CompoundSelect[Any]: 

252 # emits something along the lines of 

253 # WITH anon_1 AS ( 

254 # SELECT id, 

255 # geom, 

256 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

257 # FROM users 

258 # WHERE geom IS NOT NULL 

259 # ) 

260 

261 cluster_cte = ( 

262 select( 

263 User.id, 

264 User.geom, 

265 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

266 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

267 ) 

268 .where(User.is_visible) 

269 .cte("clustered") 

270 ) 

271 

272 if create: 

273 centroid_geom: Any = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

274 cluster_geom: Any = literal_column("clustered.geom") 

275 else: 

276 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

277 cluster_geom = cluster_cte.c.geom 

278 

279 clustered_users = ( 

280 select(centroid_geom.label("geom"), func.count().label("count")) 

281 .select_from(cluster_cte) 

282 .where(cluster_cte.c.cluster_id != None) 

283 .group_by(cluster_cte.c.cluster_id) 

284 ) 

285 

286 isolated_users = ( 

287 select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

288 .select_from(cluster_cte) 

289 .where(cluster_cte.c.cluster_id == None) 

290 ) 

291 

292 return union_all(clustered_users, isolated_users) 

293 

294 

295clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

296clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

297 

298clustered_users = create_materialized_view_with_different_ddl( 

299 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

300) 

301 

302 

303class ClusteredUser(MatViewBase): 

304 __table__ = clustered_users 

305 

306 geom: Mapped[Geom] 

307 count: Mapped[int] 

308 

309 

310def float_(stmt: Any) -> Any: 

311 return func.coalesce(cast(stmt, Float), 0.0) 

312 

313 

314# this subquery gets the time that the request was sent 

315t = select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery() 

316# this subquery gets the time that the user responded to the request 

317s = ( 

318 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

319 .group_by(Message.conversation_id, Message.author_id) 

320 .subquery() 

321) 

322all_responses = union_all( 

323 # host request responses 

324 select( 

325 HostRequest.recipient_user_id.label("user_id"), 

326 (s.c.time - t.c.time).label("response_time"), 

327 ) 

328 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

329 .outerjoin( 

330 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.recipient_user_id) 

331 ), 

332 # activeness probes 

333 select( 

334 ActivenessProbe.user_id, 

335 ( 

336 # expired probes have a responded time for when they were marked responded 

337 case( 

338 ( 

339 ActivenessProbe.response != ActivenessProbeStatus.expired, 

340 ActivenessProbe.responded - ActivenessProbe.probe_initiated, 

341 ), 

342 else_=None, 

343 ) 

344 ).label("response_time"), 

345 ), 

346).subquery() 

347 

348user_response_rates_selectable = select( 

349 all_responses.c.user_id.label("user_id"), 

350 # number of requests received 

351 func.count().label("requests"), 

352 # percentage of requests responded to 

353 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"), 

354 func.avg(all_responses.c.response_time).label("avg_response_time"), 

355 # the 33rd percentile response time 

356 percentile_disc(0.33) 

357 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

358 .label("response_time_33p"), 

359 # the 66th percentile response time 

360 percentile_disc(0.66) 

361 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

362 .label("response_time_66p"), 

363).group_by(all_responses.c.user_id) 

364 

365user_response_rates = create_materialized_view( 

366 "user_response_rates", 

367 user_response_rates_selectable, 

368 Base.metadata, 

369 [Index("uq_user_response_rates_id", user_response_rates_selectable.subquery().c.user_id, unique=True)], 

370) 

371 

372 

373class UserResponseRate(MatViewBase): 

374 __table__ = user_response_rates 

375 

376 user_id: Mapped[int] 

377 requests: Mapped[int] 

378 response_rate: Mapped[float] 

379 avg_response_time: Mapped[float] 

380 response_time_33p: Mapped[timedelta] 

381 response_time_66p: Mapped[timedelta] 

382 

383 

384def refresh_materialized_views(payload: empty_pb2.Empty) -> None: 

385 logger.info("Refreshing materialized views") 

386 with session_scope() as session: 

387 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

388 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

389 refresh_materialized_view(session, "clustered_users") 

390 refresh_materialized_view(session, "user_response_rates", concurrently=True) 

391 

392 

393def refresh_materialized_views_rapid(payload: empty_pb2.Empty) -> None: 

394 logger.info("Refreshing materialized views (rapid)") 

395 with session_scope() as session: 

396 refresh_materialized_view(session, "lite_users", concurrently=True)