Coverage for app / backend / src / couchers / materialized_views.py: 99%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import logging 

2import typing 

3from collections.abc import Sequence 

4from datetime import timedelta 

5from typing import Any 

6 

7from google.protobuf import empty_pb2 

8from sqlalchemy import CompoundSelect, Connection, Float, Index, Integer, MetaData, Select, Table, event 

9from sqlalchemy.orm import Mapped 

10from sqlalchemy.sql import ( 

11 and_, 

12 case, 

13 cast, 

14 func, 

15 literal, 

16 literal_column, 

17 union_all, 

18) 

19from sqlalchemy.sql import select as sa_select 

20from sqlalchemy.sql.functions import percentile_disc 

21from sqlalchemy_utils.view import ( 

22 CreateView, 

23 DropView, 

24 create_materialized_view, 

25 create_table_from_selectable, 

26 refresh_materialized_view, 

27) 

28 

29from couchers.db import session_scope 

30from couchers.helpers.completed_profile import has_completed_profile_expression 

31from couchers.models import ( 

32 ActivenessProbe, 

33 ActivenessProbeStatus, 

34 Base, 

35 ClusterRole, 

36 ClusterSubscription, 

37 Geom, 

38 HostRequest, 

39 MatViewBase, 

40 Message, 

41 MessageType, 

42 StrongVerificationAttempt, 

43 Upload, 

44 User, 

45) 

46from couchers.models.uploads import get_avatar_photo_subquery 

47 

48logger = logging.getLogger(__name__) 

49 

50 

51def create_materialized_view_with_different_ddl( 

52 name: str, 

53 select_selectable: Select[Any] | CompoundSelect[Any], 

54 create_selectable: Select[Any] | CompoundSelect[Any], 

55 metadata: MetaData, 

56 indexes: Sequence[Index] | None = None, 

57 aliases: dict[str, str] | None = None, 

58) -> Table: 

59 """ 

60 Copied wholesale from sqlalchemy_utils (3-clause BSD), with a minor tweak in {select,create}_selectable 

61 

62 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

63 """ 

64 table = create_table_from_selectable( 

65 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

66 ) 

67 

68 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

69 

70 @event.listens_for(metadata, "after_create") 

71 def create_indexes(target: Any, connection: Connection, **kw: Any) -> None: 

72 for idx in table.indexes: 

73 idx.create(connection) 

74 

75 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

76 return typing.cast(Table, table) 

77 

78 

79cluster_subscription_counts_selectable = ( 

80 sa_select( 

81 ClusterSubscription.cluster_id.label("cluster_id"), 

82 func.count().label("count"), 

83 ) 

84 .select_from(ClusterSubscription) 

85 .outerjoin(User, User.id == ClusterSubscription.user_id) 

86 .where(User.is_visible) 

87 .group_by(ClusterSubscription.cluster_id) 

88) 

89 

90cluster_subscription_counts = create_materialized_view( 

91 "cluster_subscription_counts", 

92 cluster_subscription_counts_selectable, 

93 Base.metadata, 

94 [ 

95 Index( 

96 "uq_cluster_subscription_counts_cluster_id", 

97 cluster_subscription_counts_selectable.subquery().c.cluster_id, 

98 unique=True, 

99 ) 

100 ], 

101) 

102 

103 

104class ClusterSubscriptionCount(MatViewBase): 

105 __table__ = cluster_subscription_counts 

106 

107 cluster_id: Mapped[int] 

108 count: Mapped[int] 

109 

110 

111cluster_admin_counts_selectable = ( 

112 sa_select( 

113 ClusterSubscription.cluster_id.label("cluster_id"), 

114 func.count().label("count"), 

115 ) 

116 .select_from(ClusterSubscription) 

117 .outerjoin(User, User.id == ClusterSubscription.user_id) 

118 .where(ClusterSubscription.role == ClusterRole.admin) 

119 .where(User.is_visible) 

120 .group_by(ClusterSubscription.cluster_id) 

121) 

122 

123cluster_admin_counts = create_materialized_view( 

124 "cluster_admin_counts", 

125 cluster_admin_counts_selectable, 

126 Base.metadata, 

127 [ 

128 Index( 

129 "uq_cluster_admin_counts_cluster_id", 

130 cluster_admin_counts_selectable.subquery().c.cluster_id, 

131 unique=True, 

132 ) 

133 ], 

134) 

135 

136 

137class ClusterAdminCount(MatViewBase): 

138 __table__ = cluster_admin_counts 

139 

140 cluster_id: Mapped[int] 

141 count: Mapped[int] 

142 

143 

144def make_lite_users_selectable(create: bool = False) -> Select[Any]: 

145 if create: 

146 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

147 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

148 geom_column: Any = literal_column("users.geom") 

149 else: 

150 geom_column = User.geom 

151 

152 strong_verification_subquery = ( 

153 sa_select(User.id, literal(True).label("true")) 

154 .select_from(StrongVerificationAttempt) 

155 .where(StrongVerificationAttempt.has_strong_verification(User)) 

156 .distinct() 

157 .subquery(name="sv_subquery") 

158 ) 

159 

160 avatar_photo_subquery = get_avatar_photo_subquery(name="avatar_photo") 

161 

162 # Be sure to modify the LiteUser type if you add/remove columns! 

163 return ( 

164 sa_select( 

165 User.id.label("id"), 

166 User.username.label("username"), 

167 User.name.label("name"), 

168 User.city.label("city"), 

169 User.age.label("age"), 

170 geom_column.label("geom"), 

171 User.geom_radius.label("radius"), 

172 User.is_visible.label("is_visible"), 

173 Upload.filename.label("avatar_filename"), 

174 has_completed_profile_expression().label("has_completed_profile"), 

175 User.has_completed_my_home.label("has_completed_my_home"), 

176 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"), 

177 ) 

178 .select_from(User) 

179 .outerjoin( 

180 avatar_photo_subquery, 

181 avatar_photo_subquery.c.gallery_id == User.profile_gallery_id, 

182 ) 

183 .outerjoin(Upload, Upload.key == avatar_photo_subquery.c.upload_key) 

184 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id) 

185 ) 

186 

187 

188lite_users_selectable_select = make_lite_users_selectable(create=False) 

189lite_users_selectable_create = make_lite_users_selectable(create=True) 

190 

191lite_users_subquery = lite_users_selectable_create.subquery() 

192 

193lite_users = create_materialized_view_with_different_ddl( 

194 "lite_users", 

195 lite_users_selectable_select, 

196 lite_users_selectable_create, 

197 Base.metadata, 

198 [ 

199 Index("uq_lite_users_id", lite_users_subquery.c.id, unique=True), 

200 Index("uq_lite_users_username", lite_users_subquery.c.username, unique=True), 

201 Index( 

202 "ix_lite_users_id_visible", 

203 lite_users_subquery.c.id, 

204 postgresql_using="hash", 

205 postgresql_where=lite_users_subquery.c.is_visible, 

206 ), 

207 Index( 

208 "ix_lite_users_username_visible", 

209 lite_users_subquery.c.username, 

210 postgresql_using="hash", 

211 postgresql_where=lite_users_subquery.c.is_visible, 

212 ), 

213 ], 

214) 

215 

216 

217class LiteUser(MatViewBase): 

218 __table__ = lite_users 

219 

220 # A subset enough to make mypy happy. Taken from "make_lite_users_selectable". 

221 id: Mapped[int] 

222 username: Mapped[str] 

223 name: Mapped[str] 

224 city: Mapped[str] 

225 age: Mapped[int] 

226 geom: Mapped[Geom] 

227 radius: Mapped[float] 

228 is_visible: Mapped[bool] 

229 avatar_filename: Mapped[str] 

230 has_completed_profile: Mapped[bool] 

231 has_completed_my_home: Mapped[bool] 

232 has_strong_verification: Mapped[bool] 

233 

234 

235def make_clustered_users_selectable(create: bool = False) -> CompoundSelect[Any]: 

236 # emits something along the lines of 

237 # WITH anon_1 AS ( 

238 # SELECT id, 

239 # geom, 

240 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

241 # FROM users 

242 # WHERE geom IS NOT NULL 

243 # ) 

244 

245 cluster_cte = ( 

246 sa_select( 

247 User.id, 

248 User.geom, 

249 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

250 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

251 ) 

252 .where(User.is_visible) 

253 .cte("clustered") 

254 ) 

255 

256 if create: 

257 centroid_geom: Any = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

258 cluster_geom: Any = literal_column("clustered.geom") 

259 else: 

260 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

261 cluster_geom = cluster_cte.c.geom 

262 

263 clustered_users = ( 

264 sa_select(centroid_geom.label("geom"), func.count().label("count")) 

265 .select_from(cluster_cte) 

266 .where(cluster_cte.c.cluster_id != None) 

267 .group_by(cluster_cte.c.cluster_id) 

268 ) 

269 

270 isolated_users = ( 

271 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

272 .select_from(cluster_cte) 

273 .where(cluster_cte.c.cluster_id == None) 

274 ) 

275 

276 return union_all(clustered_users, isolated_users) 

277 

278 

279clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

280clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

281 

282clustered_users = create_materialized_view_with_different_ddl( 

283 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

284) 

285 

286 

287class ClusteredUser(MatViewBase): 

288 __table__ = clustered_users 

289 

290 geom: Mapped[Geom] 

291 count: Mapped[int] 

292 

293 

294def float_(stmt: Any) -> Any: 

295 return func.coalesce(cast(stmt, Float), 0.0) 

296 

297 

298# this subquery gets the time that the request was sent 

299t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery() 

300# this subquery gets the time that the user responded to the request 

301s = ( 

302 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

303 .group_by(Message.conversation_id, Message.author_id) 

304 .subquery() 

305) 

306all_responses = union_all( 

307 # host request responses 

308 sa_select( 

309 HostRequest.host_user_id.label("user_id"), 

310 (s.c.time - t.c.time).label("response_time"), 

311 ) 

312 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

313 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)), 

314 # activeness probes 

315 sa_select( 

316 ActivenessProbe.user_id, 

317 ( 

318 # expired probes have a responded time for when they were marked responded 

319 case( 

320 ( 

321 ActivenessProbe.response != ActivenessProbeStatus.expired, 

322 ActivenessProbe.responded - ActivenessProbe.probe_initiated, 

323 ), 

324 else_=None, 

325 ) 

326 ).label("response_time"), 

327 ), 

328).subquery() 

329 

330user_response_rates_selectable = sa_select( 

331 all_responses.c.user_id.label("user_id"), 

332 # number of requests received 

333 func.count().label("requests"), 

334 # percentage of requests responded to 

335 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"), 

336 func.avg(all_responses.c.response_time).label("avg_response_time"), 

337 # the 33rd percentile response time 

338 percentile_disc(0.33) 

339 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

340 .label("response_time_33p"), 

341 # the 66th percentile response time 

342 percentile_disc(0.66) 

343 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

344 .label("response_time_66p"), 

345).group_by(all_responses.c.user_id) 

346 

347user_response_rates = create_materialized_view( 

348 "user_response_rates", 

349 user_response_rates_selectable, 

350 Base.metadata, 

351 [Index("uq_user_response_rates_id", user_response_rates_selectable.subquery().c.user_id, unique=True)], 

352) 

353 

354 

355class UserResponseRate(MatViewBase): 

356 __table__ = user_response_rates 

357 

358 user_id: Mapped[int] 

359 requests: Mapped[int] 

360 response_rate: Mapped[float] 

361 avg_response_time: Mapped[float] 

362 response_time_33p: Mapped[timedelta] 

363 response_time_66p: Mapped[timedelta] 

364 

365 

366def refresh_materialized_views(payload: empty_pb2.Empty) -> None: 

367 logger.info("Refreshing materialized views") 

368 with session_scope() as session: 

369 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

370 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

371 refresh_materialized_view(session, "clustered_users") 

372 refresh_materialized_view(session, "user_response_rates", concurrently=True) 

373 

374 

375def refresh_materialized_views_rapid(payload: empty_pb2.Empty) -> None: 

376 logger.info("Refreshing materialized views (rapid)") 

377 with session_scope() as session: 

378 refresh_materialized_view(session, "lite_users", concurrently=True)