Coverage for app / backend / src / couchers / materialized_views.py: 99%
83 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import logging
2import typing
3from collections.abc import Sequence
4from datetime import timedelta
5from typing import Any
7from google.protobuf import empty_pb2
8from sqlalchemy import CompoundSelect, Connection, Float, Index, Integer, MetaData, Select, Table, Text, event
9from sqlalchemy.dialects.postgresql import JSON
10from sqlalchemy.orm import Mapped
11from sqlalchemy.sql import (
12 and_,
13 case,
14 cast,
15 func,
16 literal,
17 literal_column,
18 select,
19 union_all,
20)
21from sqlalchemy.sql.functions import percentile_disc
22from sqlalchemy_utils.view import (
23 CreateView,
24 DropView,
25 create_materialized_view,
26 create_table_from_selectable,
27 refresh_materialized_view,
28)
30from couchers.db import session_scope
31from couchers.helpers.completed_profile import has_completed_profile_expression
32from couchers.models import (
33 ActivenessProbe,
34 ActivenessProbeStatus,
35 Base,
36 ClusterRole,
37 ClusterSubscription,
38 Geom,
39 HostRequest,
40 MatViewBase,
41 Message,
42 MessageType,
43 StrongVerificationAttempt,
44 Upload,
45 User,
46)
47from couchers.models.uploads import get_avatar_photo_subquery
49logger = logging.getLogger(__name__)
52def create_materialized_view_with_different_ddl(
53 name: str,
54 select_selectable: Select[Any] | CompoundSelect[Any],
55 create_selectable: Select[Any] | CompoundSelect[Any],
56 metadata: MetaData,
57 indexes: Sequence[Index] | None = None,
58 aliases: dict[str, str] | None = None,
59) -> Table:
60 """
61 Copied wholesale from sqlalchemy_utils (3-clause BSD), with a minor tweak in {select,create}_selectable
63 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
64 """
65 table = create_table_from_selectable(
66 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
67 )
69 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
71 @event.listens_for(metadata, "after_create")
72 def create_indexes(target: Any, connection: Connection, **kw: Any) -> None:
73 for idx in table.indexes:
74 idx.create(connection)
76 event.listen(metadata, "before_drop", DropView(name, materialized=True))
77 return typing.cast(Table, table)
80cluster_subscription_counts_selectable = (
81 select(
82 ClusterSubscription.cluster_id.label("cluster_id"),
83 func.count().label("count"),
84 )
85 .select_from(ClusterSubscription)
86 .outerjoin(User, User.id == ClusterSubscription.user_id)
87 .where(User.is_visible)
88 .group_by(ClusterSubscription.cluster_id)
89)
91cluster_subscription_counts = create_materialized_view(
92 "cluster_subscription_counts",
93 cluster_subscription_counts_selectable,
94 Base.metadata,
95 [
96 Index(
97 "uq_cluster_subscription_counts_cluster_id",
98 cluster_subscription_counts_selectable.subquery().c.cluster_id,
99 unique=True,
100 )
101 ],
102)
105class ClusterSubscriptionCount(MatViewBase):
106 __table__ = cluster_subscription_counts
108 cluster_id: Mapped[int]
109 count: Mapped[int]
112cluster_admin_counts_selectable = (
113 select(
114 ClusterSubscription.cluster_id.label("cluster_id"),
115 func.count().label("count"),
116 )
117 .select_from(ClusterSubscription)
118 .outerjoin(User, User.id == ClusterSubscription.user_id)
119 .where(ClusterSubscription.role == ClusterRole.admin)
120 .where(User.is_visible)
121 .group_by(ClusterSubscription.cluster_id)
122)
124cluster_admin_counts = create_materialized_view(
125 "cluster_admin_counts",
126 cluster_admin_counts_selectable,
127 Base.metadata,
128 [
129 Index(
130 "uq_cluster_admin_counts_cluster_id",
131 cluster_admin_counts_selectable.subquery().c.cluster_id,
132 unique=True,
133 )
134 ],
135)
138class ClusterAdminCount(MatViewBase):
139 __table__ = cluster_admin_counts
141 cluster_id: Mapped[int]
142 count: Mapped[int]
145def make_lite_users_selectable(create: bool = False) -> Select[Any]:
146 if create:
147 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
148 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
149 geom_column: Any = literal_column("users.geom")
150 else:
151 geom_column = User.geom
153 strong_verification_subquery = (
154 select(User.id, literal(True).label("true"))
155 .select_from(StrongVerificationAttempt)
156 .where(StrongVerificationAttempt.has_strong_verification(User))
157 .distinct()
158 .subquery(name="sv_subquery")
159 )
161 avatar_photo_subquery = get_avatar_photo_subquery(name="avatar_photo")
163 # Pre-compute GeoJSON Feature for each user (used by GIS.GetUsers)
164 geojson_feature = cast(
165 func.json_build_object(
166 "type",
167 "Feature",
168 "geometry",
169 cast(func.ST_AsGeoJSON(geom_column, 5), JSON),
170 "properties",
171 func.json_build_object("id", User.id, "has_completed_profile", has_completed_profile_expression()),
172 ),
173 Text,
174 )
176 # Be sure to modify the LiteUser type if you add/remove columns!
177 return (
178 select(
179 User.id.label("id"),
180 User.username.label("username"),
181 User.name.label("name"),
182 User.city.label("city"),
183 User.age.label("age"),
184 geom_column.label("geom"),
185 User.geom_radius.label("radius"),
186 User.is_visible.label("is_visible"),
187 Upload.filename.label("avatar_filename"),
188 has_completed_profile_expression().label("has_completed_profile"),
189 User.has_completed_my_home.label("has_completed_my_home"),
190 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"),
191 geojson_feature.label("geojson"),
192 )
193 .select_from(User)
194 .outerjoin(
195 avatar_photo_subquery,
196 avatar_photo_subquery.c.gallery_id == User.profile_gallery_id,
197 )
198 .outerjoin(Upload, Upload.key == avatar_photo_subquery.c.upload_key)
199 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id)
200 )
203lite_users_selectable_select = make_lite_users_selectable(create=False)
204lite_users_selectable_create = make_lite_users_selectable(create=True)
206lite_users_subquery = lite_users_selectable_create.subquery()
208lite_users = create_materialized_view_with_different_ddl(
209 "lite_users",
210 lite_users_selectable_select,
211 lite_users_selectable_create,
212 Base.metadata,
213 [
214 Index("uq_lite_users_id", lite_users_subquery.c.id, unique=True),
215 Index("uq_lite_users_username", lite_users_subquery.c.username, unique=True),
216 Index(
217 "ix_lite_users_id_visible",
218 lite_users_subquery.c.id,
219 postgresql_using="hash",
220 postgresql_where=lite_users_subquery.c.is_visible,
221 ),
222 Index(
223 "ix_lite_users_username_visible",
224 lite_users_subquery.c.username,
225 postgresql_using="hash",
226 postgresql_where=lite_users_subquery.c.is_visible,
227 ),
228 ],
229)
232class LiteUser(MatViewBase):
233 __table__ = lite_users
235 # A subset enough to make mypy happy. Taken from "make_lite_users_selectable".
236 id: Mapped[int]
237 username: Mapped[str]
238 name: Mapped[str]
239 city: Mapped[str]
240 age: Mapped[int]
241 geom: Mapped[Geom]
242 radius: Mapped[float]
243 is_visible: Mapped[bool]
244 avatar_filename: Mapped[str]
245 has_completed_profile: Mapped[bool]
246 has_completed_my_home: Mapped[bool]
247 has_strong_verification: Mapped[bool]
248 geojson: Mapped[str]
251def make_clustered_users_selectable(create: bool = False) -> CompoundSelect[Any]:
252 # emits something along the lines of
253 # WITH anon_1 AS (
254 # SELECT id,
255 # geom,
256 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
257 # FROM users
258 # WHERE geom IS NOT NULL
259 # )
261 cluster_cte = (
262 select(
263 User.id,
264 User.geom,
265 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
266 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
267 )
268 .where(User.is_visible)
269 .cte("clustered")
270 )
272 if create:
273 centroid_geom: Any = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
274 cluster_geom: Any = literal_column("clustered.geom")
275 else:
276 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
277 cluster_geom = cluster_cte.c.geom
279 clustered_users = (
280 select(centroid_geom.label("geom"), func.count().label("count"))
281 .select_from(cluster_cte)
282 .where(cluster_cte.c.cluster_id != None)
283 .group_by(cluster_cte.c.cluster_id)
284 )
286 isolated_users = (
287 select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
288 .select_from(cluster_cte)
289 .where(cluster_cte.c.cluster_id == None)
290 )
292 return union_all(clustered_users, isolated_users)
295clustered_users_selectable_select = make_clustered_users_selectable(create=False)
296clustered_users_selectable_create = make_clustered_users_selectable(create=True)
298clustered_users = create_materialized_view_with_different_ddl(
299 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
300)
303class ClusteredUser(MatViewBase):
304 __table__ = clustered_users
306 geom: Mapped[Geom]
307 count: Mapped[int]
310def float_(stmt: Any) -> Any:
311 return func.coalesce(cast(stmt, Float), 0.0)
314# this subquery gets the time that the request was sent
315t = select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery()
316# this subquery gets the time that the user responded to the request
317s = (
318 select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
319 .group_by(Message.conversation_id, Message.author_id)
320 .subquery()
321)
322all_responses = union_all(
323 # host request responses
324 select(
325 HostRequest.recipient_user_id.label("user_id"),
326 (s.c.time - t.c.time).label("response_time"),
327 )
328 .join(t, t.c.conversation_id == HostRequest.conversation_id)
329 .outerjoin(
330 s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.recipient_user_id)
331 ),
332 # activeness probes
333 select(
334 ActivenessProbe.user_id,
335 (
336 # expired probes have a responded time for when they were marked responded
337 case(
338 (
339 ActivenessProbe.response != ActivenessProbeStatus.expired,
340 ActivenessProbe.responded - ActivenessProbe.probe_initiated,
341 ),
342 else_=None,
343 )
344 ).label("response_time"),
345 ),
346).subquery()
348user_response_rates_selectable = select(
349 all_responses.c.user_id.label("user_id"),
350 # number of requests received
351 func.count().label("requests"),
352 # percentage of requests responded to
353 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"),
354 func.avg(all_responses.c.response_time).label("avg_response_time"),
355 # the 33rd percentile response time
356 percentile_disc(0.33)
357 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
358 .label("response_time_33p"),
359 # the 66th percentile response time
360 percentile_disc(0.66)
361 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
362 .label("response_time_66p"),
363).group_by(all_responses.c.user_id)
365user_response_rates = create_materialized_view(
366 "user_response_rates",
367 user_response_rates_selectable,
368 Base.metadata,
369 [Index("uq_user_response_rates_id", user_response_rates_selectable.subquery().c.user_id, unique=True)],
370)
373class UserResponseRate(MatViewBase):
374 __table__ = user_response_rates
376 user_id: Mapped[int]
377 requests: Mapped[int]
378 response_rate: Mapped[float]
379 avg_response_time: Mapped[float]
380 response_time_33p: Mapped[timedelta]
381 response_time_66p: Mapped[timedelta]
384def refresh_materialized_views(payload: empty_pb2.Empty) -> None:
385 logger.info("Refreshing materialized views")
386 with session_scope() as session:
387 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
388 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
389 refresh_materialized_view(session, "clustered_users")
390 refresh_materialized_view(session, "user_response_rates", concurrently=True)
393def refresh_materialized_views_rapid(payload: empty_pb2.Empty) -> None:
394 logger.info("Refreshing materialized views (rapid)")
395 with session_scope() as session:
396 refresh_materialized_view(session, "lite_users", concurrently=True)