Coverage for app/backend/src/couchers/materialized_views.py: 99%
83 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import logging
2import typing
3from collections.abc import Sequence
4from datetime import datetime, timedelta
5from typing import Any
7from google.protobuf import empty_pb2
8from sqlalchemy import CompoundSelect, Connection, Float, Index, Integer, MetaData, Select, Table, Text, event
9from sqlalchemy.dialects.postgresql import JSON
10from sqlalchemy.orm import Mapped
11from sqlalchemy.sql import (
12 and_,
13 case,
14 cast,
15 func,
16 literal,
17 literal_column,
18 select,
19 union_all,
20)
21from sqlalchemy.sql.functions import percentile_disc
22from sqlalchemy_utils.view import (
23 CreateView,
24 DropView,
25 create_materialized_view,
26 create_table_from_selectable,
27 refresh_materialized_view,
28)
30from couchers.db import session_scope
31from couchers.helpers.completed_profile import has_completed_profile_expression
32from couchers.models import (
33 ActivenessProbe,
34 ActivenessProbeStatus,
35 Base,
36 ClusterRole,
37 ClusterSubscription,
38 Geom,
39 HostRequest,
40 MatViewBase,
41 Message,
42 MessageType,
43 StrongVerificationAttempt,
44 Upload,
45 User,
46)
47from couchers.models.uploads import get_avatar_photo_subquery
49logger = logging.getLogger(__name__)
52def create_materialized_view_with_different_ddl(
53 name: str,
54 select_selectable: Select[Any] | CompoundSelect[Any],
55 create_selectable: Select[Any] | CompoundSelect[Any],
56 metadata: MetaData,
57 indexes: Sequence[Index] | None = None,
58 aliases: dict[str, str] | None = None,
59) -> Table:
60 """
61 Copied wholesale from sqlalchemy_utils (3-clause BSD), with a minor tweak in {select,create}_selectable
63 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
64 """
65 table = create_table_from_selectable(
66 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
67 )
69 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
71 @event.listens_for(metadata, "after_create")
72 def create_indexes(target: Any, connection: Connection, **kw: Any) -> None:
73 for idx in table.indexes:
74 idx.create(connection)
76 event.listen(metadata, "before_drop", DropView(name, materialized=True))
77 return typing.cast(Table, table)
80cluster_subscription_counts_selectable = (
81 select(
82 ClusterSubscription.cluster_id.label("cluster_id"),
83 func.count().label("count"),
84 )
85 .select_from(ClusterSubscription)
86 .outerjoin(User, User.id == ClusterSubscription.user_id)
87 .where(User.is_visible)
88 .group_by(ClusterSubscription.cluster_id)
89)
91cluster_subscription_counts = create_materialized_view(
92 "cluster_subscription_counts",
93 cluster_subscription_counts_selectable,
94 Base.metadata,
95 [
96 Index(
97 "uq_cluster_subscription_counts_cluster_id",
98 cluster_subscription_counts_selectable.subquery().c.cluster_id,
99 unique=True,
100 )
101 ],
102)
105class ClusterSubscriptionCount(MatViewBase):
106 __table__ = cluster_subscription_counts
108 cluster_id: Mapped[int]
109 count: Mapped[int]
112cluster_admin_counts_selectable = (
113 select(
114 ClusterSubscription.cluster_id.label("cluster_id"),
115 func.count().label("count"),
116 )
117 .select_from(ClusterSubscription)
118 .outerjoin(User, User.id == ClusterSubscription.user_id)
119 .where(ClusterSubscription.role == ClusterRole.admin)
120 .where(User.is_visible)
121 .group_by(ClusterSubscription.cluster_id)
122)
124cluster_admin_counts = create_materialized_view(
125 "cluster_admin_counts",
126 cluster_admin_counts_selectable,
127 Base.metadata,
128 [
129 Index(
130 "uq_cluster_admin_counts_cluster_id",
131 cluster_admin_counts_selectable.subquery().c.cluster_id,
132 unique=True,
133 )
134 ],
135)
138class ClusterAdminCount(MatViewBase):
139 __table__ = cluster_admin_counts
141 cluster_id: Mapped[int]
142 count: Mapped[int]
145def make_lite_users_selectable(create: bool = False) -> Select[Any]:
146 if create:
147 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
148 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
149 geom_column: Any = literal_column("users.geom")
150 else:
151 geom_column = User.geom
153 strong_verification_subquery = (
154 select(User.id, literal(True).label("true"))
155 .select_from(StrongVerificationAttempt)
156 .where(StrongVerificationAttempt.has_strong_verification(User))
157 .distinct()
158 .subquery(name="sv_subquery")
159 )
161 avatar_photo_subquery = get_avatar_photo_subquery(name="avatar_photo")
163 # Pre-compute GeoJSON Feature for each user (used by GIS.GetUsers)
164 geojson_feature = cast(
165 func.json_build_object(
166 "type",
167 "Feature",
168 "geometry",
169 cast(func.ST_AsGeoJSON(geom_column, 5), JSON),
170 "properties",
171 func.json_build_object("id", User.id, "has_completed_profile", has_completed_profile_expression()),
172 ),
173 Text,
174 )
176 # Be sure to modify the LiteUser type if you add/remove columns!
177 return (
178 select(
179 User.id.label("id"),
180 User.username.label("username"),
181 User.name.label("name"),
182 User.city.label("city"),
183 User.age.label("age"),
184 geom_column.label("geom"),
185 User.geom_radius.label("radius"),
186 User.is_visible.label("is_visible"),
187 User.shadowed_at.label("shadowed_at"),
188 Upload.filename.label("avatar_filename"),
189 has_completed_profile_expression().label("has_completed_profile"),
190 User.has_completed_my_home.label("has_completed_my_home"),
191 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"),
192 geojson_feature.label("geojson"),
193 )
194 .select_from(User)
195 .outerjoin(
196 avatar_photo_subquery,
197 avatar_photo_subquery.c.gallery_id == User.profile_gallery_id,
198 )
199 .outerjoin(Upload, Upload.key == avatar_photo_subquery.c.upload_key)
200 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id)
201 )
204lite_users_selectable_select = make_lite_users_selectable(create=False)
205lite_users_selectable_create = make_lite_users_selectable(create=True)
207lite_users_subquery = lite_users_selectable_create.subquery()
209lite_users = create_materialized_view_with_different_ddl(
210 "lite_users",
211 lite_users_selectable_select,
212 lite_users_selectable_create,
213 Base.metadata,
214 [
215 Index("uq_lite_users_id", lite_users_subquery.c.id, unique=True),
216 Index("uq_lite_users_username", lite_users_subquery.c.username, unique=True),
217 Index(
218 "ix_lite_users_id_visible",
219 lite_users_subquery.c.id,
220 postgresql_using="hash",
221 postgresql_where=lite_users_subquery.c.is_visible,
222 ),
223 Index(
224 "ix_lite_users_username_visible",
225 lite_users_subquery.c.username,
226 postgresql_using="hash",
227 postgresql_where=lite_users_subquery.c.is_visible,
228 ),
229 ],
230)
233class LiteUser(MatViewBase):
234 __table__ = lite_users
236 # A subset enough to make mypy happy. Taken from "make_lite_users_selectable".
237 id: Mapped[int]
238 username: Mapped[str]
239 name: Mapped[str]
240 city: Mapped[str]
241 age: Mapped[int]
242 geom: Mapped[Geom]
243 radius: Mapped[float]
244 is_visible: Mapped[bool]
245 shadowed_at: Mapped[datetime | None]
246 avatar_filename: Mapped[str]
247 has_completed_profile: Mapped[bool]
248 has_completed_my_home: Mapped[bool]
249 has_strong_verification: Mapped[bool]
250 geojson: Mapped[str]
253def make_clustered_users_selectable(create: bool = False) -> CompoundSelect[Any]:
254 # emits something along the lines of
255 # WITH anon_1 AS (
256 # SELECT id,
257 # geom,
258 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
259 # FROM users
260 # WHERE geom IS NOT NULL
261 # )
263 cluster_cte = (
264 select(
265 User.id,
266 User.geom,
267 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
268 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
269 )
270 .where(User.is_visible)
271 .cte("clustered")
272 )
274 if create:
275 centroid_geom: Any = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
276 cluster_geom: Any = literal_column("clustered.geom")
277 else:
278 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
279 cluster_geom = cluster_cte.c.geom
281 clustered_users = (
282 select(centroid_geom.label("geom"), func.count().label("count"))
283 .select_from(cluster_cte)
284 .where(cluster_cte.c.cluster_id != None)
285 .group_by(cluster_cte.c.cluster_id)
286 )
288 isolated_users = (
289 select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
290 .select_from(cluster_cte)
291 .where(cluster_cte.c.cluster_id == None)
292 )
294 return union_all(clustered_users, isolated_users)
297clustered_users_selectable_select = make_clustered_users_selectable(create=False)
298clustered_users_selectable_create = make_clustered_users_selectable(create=True)
300clustered_users = create_materialized_view_with_different_ddl(
301 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
302)
305class ClusteredUser(MatViewBase):
306 __table__ = clustered_users
308 geom: Mapped[Geom]
309 count: Mapped[int]
312def float_(stmt: Any) -> Any:
313 return func.coalesce(cast(stmt, Float), 0.0)
316# this subquery gets the time that the request was sent
317t = select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery()
318# this subquery gets the time that the user responded to the request
319s = (
320 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
321 .group_by(Message.conversation_id, Message.author_id)
322 .subquery()
323)
324all_responses = union_all(
325 # host request responses
326 select(
327 HostRequest.recipient_user_id.label("user_id"),
328 (s.c.time - t.c.time).label("response_time"),
329 )
330 .join(t, t.c.conversation_id == HostRequest.conversation_id)
331 .outerjoin(
332 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.recipient_user_id)
333 ),
334 # activeness probes
335 select(
336 ActivenessProbe.user_id,
337 (
338 # expired probes have a responded time for when they were marked responded
339 case(
340 (
341 ActivenessProbe.response != ActivenessProbeStatus.expired,
342 ActivenessProbe.responded - ActivenessProbe.probe_initiated,
343 ),
344 else_=None,
345 )
346 ).label("response_time"),
347 ),
348).subquery()
350user_response_rates_selectable = select(
351 all_responses.c.user_id.label("user_id"),
352 # number of requests received
353 func.count().label("requests"),
354 # percentage of requests responded to
355 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"),
356 func.avg(all_responses.c.response_time).label("avg_response_time"),
357 # the 33rd percentile response time
358 percentile_disc(0.33)
359 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
360 .label("response_time_33p"),
361 # the 66th percentile response time
362 percentile_disc(0.66)
363 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
364 .label("response_time_66p"),
365).group_by(all_responses.c.user_id)
367user_response_rates = create_materialized_view(
368 "user_response_rates",
369 user_response_rates_selectable,
370 Base.metadata,
371 [Index("uq_user_response_rates_id", user_response_rates_selectable.subquery().c.user_id, unique=True)],
372)
375class UserResponseRate(MatViewBase):
376 __table__ = user_response_rates
378 user_id: Mapped[int]
379 requests: Mapped[int]
380 response_rate: Mapped[float]
381 avg_response_time: Mapped[float]
382 response_time_33p: Mapped[timedelta]
383 response_time_66p: Mapped[timedelta]
386def refresh_materialized_views(payload: empty_pb2.Empty) -> None:
387 logger.info("Refreshing materialized views")
388 with session_scope() as session:
389 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
390 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
391 refresh_materialized_view(session, "clustered_users")
392 refresh_materialized_view(session, "user_response_rates", concurrently=True)
395def refresh_materialized_views_rapid(payload: empty_pb2.Empty) -> None:
396 logger.info("Refreshing materialized views (rapid)")
397 with session_scope() as session:
398 refresh_materialized_view(session, "lite_users", concurrently=True)