Coverage for app/backend/src/couchers/materialized_views.py: 99%

83 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import logging 

2import typing 

3from collections.abc import Sequence 

4from datetime import datetime, timedelta 

5from typing import Any 

6 

7from google.protobuf import empty_pb2 

8from sqlalchemy import CompoundSelect, Connection, Float, Index, Integer, MetaData, Select, Table, Text, event 

9from sqlalchemy.dialects.postgresql import JSON 

10from sqlalchemy.orm import Mapped 

11from sqlalchemy.sql import ( 

12 and_, 

13 case, 

14 cast, 

15 func, 

16 literal, 

17 literal_column, 

18 select, 

19 union_all, 

20) 

21from sqlalchemy.sql.functions import percentile_disc 

22from sqlalchemy_utils.view import ( 

23 CreateView, 

24 DropView, 

25 create_materialized_view, 

26 create_table_from_selectable, 

27 refresh_materialized_view, 

28) 

29 

30from couchers.db import session_scope 

31from couchers.helpers.completed_profile import has_completed_profile_expression 

32from couchers.models import ( 

33 ActivenessProbe, 

34 ActivenessProbeStatus, 

35 Base, 

36 ClusterRole, 

37 ClusterSubscription, 

38 Geom, 

39 HostRequest, 

40 MatViewBase, 

41 Message, 

42 MessageType, 

43 StrongVerificationAttempt, 

44 Upload, 

45 User, 

46) 

47from couchers.models.uploads import get_avatar_photo_subquery 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52def create_materialized_view_with_different_ddl( 

53 name: str, 

54 select_selectable: Select[Any] | CompoundSelect[Any], 

55 create_selectable: Select[Any] | CompoundSelect[Any], 

56 metadata: MetaData, 

57 indexes: Sequence[Index] | None = None, 

58 aliases: dict[str, str] | None = None, 

59) -> Table: 

60 """ 

61 Copied wholesale from sqlalchemy_utils (3-clause BSD), with a minor tweak in {select,create}_selectable 

62 

63 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

64 """ 

65 table = create_table_from_selectable( 

66 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

67 ) 

68 

69 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

70 

71 @event.listens_for(metadata, "after_create") 

72 def create_indexes(target: Any, connection: Connection, **kw: Any) -> None: 

73 for idx in table.indexes: 

74 idx.create(connection) 

75 

76 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

77 return typing.cast(Table, table) 

78 

79 

80cluster_subscription_counts_selectable = ( 

81 select( 

82 ClusterSubscription.cluster_id.label("cluster_id"), 

83 func.count().label("count"), 

84 ) 

85 .select_from(ClusterSubscription) 

86 .outerjoin(User, User.id == ClusterSubscription.user_id) 

87 .where(User.is_visible) 

88 .group_by(ClusterSubscription.cluster_id) 

89) 

90 

91cluster_subscription_counts = create_materialized_view( 

92 "cluster_subscription_counts", 

93 cluster_subscription_counts_selectable, 

94 Base.metadata, 

95 [ 

96 Index( 

97 "uq_cluster_subscription_counts_cluster_id", 

98 cluster_subscription_counts_selectable.subquery().c.cluster_id, 

99 unique=True, 

100 ) 

101 ], 

102) 

103 

104 

105class ClusterSubscriptionCount(MatViewBase): 

106 __table__ = cluster_subscription_counts 

107 

108 cluster_id: Mapped[int] 

109 count: Mapped[int] 

110 

111 

112cluster_admin_counts_selectable = ( 

113 select( 

114 ClusterSubscription.cluster_id.label("cluster_id"), 

115 func.count().label("count"), 

116 ) 

117 .select_from(ClusterSubscription) 

118 .outerjoin(User, User.id == ClusterSubscription.user_id) 

119 .where(ClusterSubscription.role == ClusterRole.admin) 

120 .where(User.is_visible) 

121 .group_by(ClusterSubscription.cluster_id) 

122) 

123 

124cluster_admin_counts = create_materialized_view( 

125 "cluster_admin_counts", 

126 cluster_admin_counts_selectable, 

127 Base.metadata, 

128 [ 

129 Index( 

130 "uq_cluster_admin_counts_cluster_id", 

131 cluster_admin_counts_selectable.subquery().c.cluster_id, 

132 unique=True, 

133 ) 

134 ], 

135) 

136 

137 

138class ClusterAdminCount(MatViewBase): 

139 __table__ = cluster_admin_counts 

140 

141 cluster_id: Mapped[int] 

142 count: Mapped[int] 

143 

144 

145def make_lite_users_selectable(create: bool = False) -> Select[Any]: 

146 if create: 

147 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

148 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

149 geom_column: Any = literal_column("users.geom") 

150 else: 

151 geom_column = User.geom 

152 

153 strong_verification_subquery = ( 

154 select(User.id, literal(True).label("true")) 

155 .select_from(StrongVerificationAttempt) 

156 .where(StrongVerificationAttempt.has_strong_verification(User)) 

157 .distinct() 

158 .subquery(name="sv_subquery") 

159 ) 

160 

161 avatar_photo_subquery = get_avatar_photo_subquery(name="avatar_photo") 

162 

163 # Pre-compute GeoJSON Feature for each user (used by GIS.GetUsers) 

164 geojson_feature = cast( 

165 func.json_build_object( 

166 "type", 

167 "Feature", 

168 "geometry", 

169 cast(func.ST_AsGeoJSON(geom_column, 5), JSON), 

170 "properties", 

171 func.json_build_object("id", User.id, "has_completed_profile", has_completed_profile_expression()), 

172 ), 

173 Text, 

174 ) 

175 

176 # Be sure to modify the LiteUser type if you add/remove columns! 

177 return ( 

178 select( 

179 User.id.label("id"), 

180 User.username.label("username"), 

181 User.name.label("name"), 

182 User.city.label("city"), 

183 User.age.label("age"), 

184 geom_column.label("geom"), 

185 User.geom_radius.label("radius"), 

186 User.is_visible.label("is_visible"), 

187 User.shadowed_at.label("shadowed_at"), 

188 Upload.filename.label("avatar_filename"), 

189 has_completed_profile_expression().label("has_completed_profile"), 

190 User.has_completed_my_home.label("has_completed_my_home"), 

191 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"), 

192 geojson_feature.label("geojson"), 

193 ) 

194 .select_from(User) 

195 .outerjoin( 

196 avatar_photo_subquery, 

197 avatar_photo_subquery.c.gallery_id == User.profile_gallery_id, 

198 ) 

199 .outerjoin(Upload, Upload.key == avatar_photo_subquery.c.upload_key) 

200 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id) 

201 ) 

202 

203 

204lite_users_selectable_select = make_lite_users_selectable(create=False) 

205lite_users_selectable_create = make_lite_users_selectable(create=True) 

206 

207lite_users_subquery = lite_users_selectable_create.subquery() 

208 

209lite_users = create_materialized_view_with_different_ddl( 

210 "lite_users", 

211 lite_users_selectable_select, 

212 lite_users_selectable_create, 

213 Base.metadata, 

214 [ 

215 Index("uq_lite_users_id", lite_users_subquery.c.id, unique=True), 

216 Index("uq_lite_users_username", lite_users_subquery.c.username, unique=True), 

217 Index( 

218 "ix_lite_users_id_visible", 

219 lite_users_subquery.c.id, 

220 postgresql_using="hash", 

221 postgresql_where=lite_users_subquery.c.is_visible, 

222 ), 

223 Index( 

224 "ix_lite_users_username_visible", 

225 lite_users_subquery.c.username, 

226 postgresql_using="hash", 

227 postgresql_where=lite_users_subquery.c.is_visible, 

228 ), 

229 ], 

230) 

231 

232 

233class LiteUser(MatViewBase): 

234 __table__ = lite_users 

235 

236 # A subset enough to make mypy happy. Taken from "make_lite_users_selectable". 

237 id: Mapped[int] 

238 username: Mapped[str] 

239 name: Mapped[str] 

240 city: Mapped[str] 

241 age: Mapped[int] 

242 geom: Mapped[Geom] 

243 radius: Mapped[float] 

244 is_visible: Mapped[bool] 

245 shadowed_at: Mapped[datetime | None] 

246 avatar_filename: Mapped[str] 

247 has_completed_profile: Mapped[bool] 

248 has_completed_my_home: Mapped[bool] 

249 has_strong_verification: Mapped[bool] 

250 geojson: Mapped[str] 

251 

252 

253def make_clustered_users_selectable(create: bool = False) -> CompoundSelect[Any]: 

254 # emits something along the lines of 

255 # WITH anon_1 AS ( 

256 # SELECT id, 

257 # geom, 

258 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

259 # FROM users 

260 # WHERE geom IS NOT NULL 

261 # ) 

262 

263 cluster_cte = ( 

264 select( 

265 User.id, 

266 User.geom, 

267 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

268 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

269 ) 

270 .where(User.is_visible) 

271 .cte("clustered") 

272 ) 

273 

274 if create: 

275 centroid_geom: Any = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

276 cluster_geom: Any = literal_column("clustered.geom") 

277 else: 

278 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

279 cluster_geom = cluster_cte.c.geom 

280 

281 clustered_users = ( 

282 select(centroid_geom.label("geom"), func.count().label("count")) 

283 .select_from(cluster_cte) 

284 .where(cluster_cte.c.cluster_id != None) 

285 .group_by(cluster_cte.c.cluster_id) 

286 ) 

287 

288 isolated_users = ( 

289 select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

290 .select_from(cluster_cte) 

291 .where(cluster_cte.c.cluster_id == None) 

292 ) 

293 

294 return union_all(clustered_users, isolated_users) 

295 

296 

297clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

298clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

299 

300clustered_users = create_materialized_view_with_different_ddl( 

301 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

302) 

303 

304 

305class ClusteredUser(MatViewBase): 

306 __table__ = clustered_users 

307 

308 geom: Mapped[Geom] 

309 count: Mapped[int] 

310 

311 

312def float_(stmt: Any) -> Any: 

313 return func.coalesce(cast(stmt, Float), 0.0) 

314 

315 

316# this subquery gets the time that the request was sent 

317t = select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery() 

318# this subquery gets the time that the user responded to the request 

319s = ( 

320 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

321 .group_by(Message.conversation_id, Message.author_id) 

322 .subquery() 

323) 

324all_responses = union_all( 

325 # host request responses 

326 select( 

327 HostRequest.recipient_user_id.label("user_id"), 

328 (s.c.time - t.c.time).label("response_time"), 

329 ) 

330 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

331 .outerjoin( 

332 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.recipient_user_id) 

333 ), 

334 # activeness probes 

335 select( 

336 ActivenessProbe.user_id, 

337 ( 

338 # expired probes have a responded time for when they were marked responded 

339 case( 

340 ( 

341 ActivenessProbe.response != ActivenessProbeStatus.expired, 

342 ActivenessProbe.responded - ActivenessProbe.probe_initiated, 

343 ), 

344 else_=None, 

345 ) 

346 ).label("response_time"), 

347 ), 

348).subquery() 

349 

350user_response_rates_selectable = select( 

351 all_responses.c.user_id.label("user_id"), 

352 # number of requests received 

353 func.count().label("requests"), 

354 # percentage of requests responded to 

355 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"), 

356 func.avg(all_responses.c.response_time).label("avg_response_time"), 

357 # the 33rd percentile response time 

358 percentile_disc(0.33) 

359 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

360 .label("response_time_33p"), 

361 # the 66th percentile response time 

362 percentile_disc(0.66) 

363 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

364 .label("response_time_66p"), 

365).group_by(all_responses.c.user_id) 

366 

367user_response_rates = create_materialized_view( 

368 "user_response_rates", 

369 user_response_rates_selectable, 

370 Base.metadata, 

371 [Index("uq_user_response_rates_id", user_response_rates_selectable.subquery().c.user_id, unique=True)], 

372) 

373 

374 

375class UserResponseRate(MatViewBase): 

376 __table__ = user_response_rates 

377 

378 user_id: Mapped[int] 

379 requests: Mapped[int] 

380 response_rate: Mapped[float] 

381 avg_response_time: Mapped[float] 

382 response_time_33p: Mapped[timedelta] 

383 response_time_66p: Mapped[timedelta] 

384 

385 

386def refresh_materialized_views(payload: empty_pb2.Empty) -> None: 

387 logger.info("Refreshing materialized views") 

388 with session_scope() as session: 

389 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

390 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

391 refresh_materialized_view(session, "clustered_users") 

392 refresh_materialized_view(session, "user_response_rates", concurrently=True) 

393 

394 

395def refresh_materialized_views_rapid(payload: empty_pb2.Empty) -> None: 

396 logger.info("Refreshing materialized views (rapid)") 

397 with session_scope() as session: 

398 refresh_materialized_view(session, "lite_users", concurrently=True)