Coverage for app / backend / src / couchers / materialized_views.py: 99%
82 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import logging
2import typing
3from collections.abc import Sequence
4from datetime import timedelta
5from typing import Any
7from google.protobuf import empty_pb2
8from sqlalchemy import CompoundSelect, Connection, Float, Index, Integer, MetaData, Select, Table, event
9from sqlalchemy.orm import Mapped
10from sqlalchemy.sql import (
11 and_,
12 case,
13 cast,
14 func,
15 literal,
16 literal_column,
17 union_all,
18)
19from sqlalchemy.sql import select as sa_select
20from sqlalchemy.sql.functions import percentile_disc
21from sqlalchemy_utils.view import (
22 CreateView,
23 DropView,
24 create_materialized_view,
25 create_table_from_selectable,
26 refresh_materialized_view,
27)
29from couchers.db import session_scope
30from couchers.helpers.completed_profile import has_completed_profile_expression
31from couchers.models import (
32 ActivenessProbe,
33 ActivenessProbeStatus,
34 Base,
35 ClusterRole,
36 ClusterSubscription,
37 Geom,
38 HostRequest,
39 MatViewBase,
40 Message,
41 MessageType,
42 StrongVerificationAttempt,
43 Upload,
44 User,
45)
46from couchers.models.uploads import get_avatar_photo_subquery
48logger = logging.getLogger(__name__)
51def create_materialized_view_with_different_ddl(
52 name: str,
53 select_selectable: Select[Any] | CompoundSelect[Any],
54 create_selectable: Select[Any] | CompoundSelect[Any],
55 metadata: MetaData,
56 indexes: Sequence[Index] | None = None,
57 aliases: dict[str, str] | None = None,
58) -> Table:
59 """
60 Copied wholesale from sqlalchemy_utils (3-clause BSD), with a minor tweak in {select,create}_selectable
62 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
63 """
64 table = create_table_from_selectable(
65 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
66 )
68 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
70 @event.listens_for(metadata, "after_create")
71 def create_indexes(target: Any, connection: Connection, **kw: Any) -> None:
72 for idx in table.indexes:
73 idx.create(connection)
75 event.listen(metadata, "before_drop", DropView(name, materialized=True))
76 return typing.cast(Table, table)
79cluster_subscription_counts_selectable = (
80 sa_select(
81 ClusterSubscription.cluster_id.label("cluster_id"),
82 func.count().label("count"),
83 )
84 .select_from(ClusterSubscription)
85 .outerjoin(User, User.id == ClusterSubscription.user_id)
86 .where(User.is_visible)
87 .group_by(ClusterSubscription.cluster_id)
88)
90cluster_subscription_counts = create_materialized_view(
91 "cluster_subscription_counts",
92 cluster_subscription_counts_selectable,
93 Base.metadata,
94 [
95 Index(
96 "uq_cluster_subscription_counts_cluster_id",
97 cluster_subscription_counts_selectable.subquery().c.cluster_id,
98 unique=True,
99 )
100 ],
101)
104class ClusterSubscriptionCount(MatViewBase):
105 __table__ = cluster_subscription_counts
107 cluster_id: Mapped[int]
108 count: Mapped[int]
111cluster_admin_counts_selectable = (
112 sa_select(
113 ClusterSubscription.cluster_id.label("cluster_id"),
114 func.count().label("count"),
115 )
116 .select_from(ClusterSubscription)
117 .outerjoin(User, User.id == ClusterSubscription.user_id)
118 .where(ClusterSubscription.role == ClusterRole.admin)
119 .where(User.is_visible)
120 .group_by(ClusterSubscription.cluster_id)
121)
123cluster_admin_counts = create_materialized_view(
124 "cluster_admin_counts",
125 cluster_admin_counts_selectable,
126 Base.metadata,
127 [
128 Index(
129 "uq_cluster_admin_counts_cluster_id",
130 cluster_admin_counts_selectable.subquery().c.cluster_id,
131 unique=True,
132 )
133 ],
134)
137class ClusterAdminCount(MatViewBase):
138 __table__ = cluster_admin_counts
140 cluster_id: Mapped[int]
141 count: Mapped[int]
144def make_lite_users_selectable(create: bool = False) -> Select[Any]:
145 if create:
146 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
147 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
148 geom_column: Any = literal_column("users.geom")
149 else:
150 geom_column = User.geom
152 strong_verification_subquery = (
153 sa_select(User.id, literal(True).label("true"))
154 .select_from(StrongVerificationAttempt)
155 .where(StrongVerificationAttempt.has_strong_verification(User))
156 .distinct()
157 .subquery(name="sv_subquery")
158 )
160 avatar_photo_subquery = get_avatar_photo_subquery(name="avatar_photo")
162 # Be sure to modify the LiteUser type if you add/remove columns!
163 return (
164 sa_select(
165 User.id.label("id"),
166 User.username.label("username"),
167 User.name.label("name"),
168 User.city.label("city"),
169 User.age.label("age"),
170 geom_column.label("geom"),
171 User.geom_radius.label("radius"),
172 User.is_visible.label("is_visible"),
173 Upload.filename.label("avatar_filename"),
174 has_completed_profile_expression().label("has_completed_profile"),
175 User.has_completed_my_home.label("has_completed_my_home"),
176 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"),
177 )
178 .select_from(User)
179 .outerjoin(
180 avatar_photo_subquery,
181 avatar_photo_subquery.c.gallery_id == User.profile_gallery_id,
182 )
183 .outerjoin(Upload, Upload.key == avatar_photo_subquery.c.upload_key)
184 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id)
185 )
188lite_users_selectable_select = make_lite_users_selectable(create=False)
189lite_users_selectable_create = make_lite_users_selectable(create=True)
191lite_users_subquery = lite_users_selectable_create.subquery()
193lite_users = create_materialized_view_with_different_ddl(
194 "lite_users",
195 lite_users_selectable_select,
196 lite_users_selectable_create,
197 Base.metadata,
198 [
199 Index("uq_lite_users_id", lite_users_subquery.c.id, unique=True),
200 Index("uq_lite_users_username", lite_users_subquery.c.username, unique=True),
201 Index(
202 "ix_lite_users_id_visible",
203 lite_users_subquery.c.id,
204 postgresql_using="hash",
205 postgresql_where=lite_users_subquery.c.is_visible,
206 ),
207 Index(
208 "ix_lite_users_username_visible",
209 lite_users_subquery.c.username,
210 postgresql_using="hash",
211 postgresql_where=lite_users_subquery.c.is_visible,
212 ),
213 ],
214)
217class LiteUser(MatViewBase):
218 __table__ = lite_users
220 # A subset enough to make mypy happy. Taken from "make_lite_users_selectable".
221 id: Mapped[int]
222 username: Mapped[str]
223 name: Mapped[str]
224 city: Mapped[str]
225 age: Mapped[int]
226 geom: Mapped[Geom]
227 radius: Mapped[float]
228 is_visible: Mapped[bool]
229 avatar_filename: Mapped[str]
230 has_completed_profile: Mapped[bool]
231 has_completed_my_home: Mapped[bool]
232 has_strong_verification: Mapped[bool]
235def make_clustered_users_selectable(create: bool = False) -> CompoundSelect[Any]:
236 # emits something along the lines of
237 # WITH anon_1 AS (
238 # SELECT id,
239 # geom,
240 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
241 # FROM users
242 # WHERE geom IS NOT NULL
243 # )
245 cluster_cte = (
246 sa_select(
247 User.id,
248 User.geom,
249 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
250 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
251 )
252 .where(User.is_visible)
253 .cte("clustered")
254 )
256 if create:
257 centroid_geom: Any = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
258 cluster_geom: Any = literal_column("clustered.geom")
259 else:
260 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
261 cluster_geom = cluster_cte.c.geom
263 clustered_users = (
264 sa_select(centroid_geom.label("geom"), func.count().label("count"))
265 .select_from(cluster_cte)
266 .where(cluster_cte.c.cluster_id != None)
267 .group_by(cluster_cte.c.cluster_id)
268 )
270 isolated_users = (
271 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
272 .select_from(cluster_cte)
273 .where(cluster_cte.c.cluster_id == None)
274 )
276 return union_all(clustered_users, isolated_users)
279clustered_users_selectable_select = make_clustered_users_selectable(create=False)
280clustered_users_selectable_create = make_clustered_users_selectable(create=True)
282clustered_users = create_materialized_view_with_different_ddl(
283 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
284)
287class ClusteredUser(MatViewBase):
288 __table__ = clustered_users
290 geom: Mapped[Geom]
291 count: Mapped[int]
294def float_(stmt: Any) -> Any:
295 return func.coalesce(cast(stmt, Float), 0.0)
298# this subquery gets the time that the request was sent
299t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery()
300# this subquery gets the time that the user responded to the request
301s = (
302 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
303 .group_by(Message.conversation_id, Message.author_id)
304 .subquery()
305)
306all_responses = union_all(
307 # host request responses
308 sa_select(
309 HostRequest.host_user_id.label("user_id"),
310 (s.c.time - t.c.time).label("response_time"),
311 )
312 .join(t, t.c.conversation_id == HostRequest.conversation_id)
313 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)),
314 # activeness probes
315 sa_select(
316 ActivenessProbe.user_id,
317 (
318 # expired probes have a responded time for when they were marked responded
319 case(
320 (
321 ActivenessProbe.response != ActivenessProbeStatus.expired,
322 ActivenessProbe.responded - ActivenessProbe.probe_initiated,
323 ),
324 else_=None,
325 )
326 ).label("response_time"),
327 ),
328).subquery()
330user_response_rates_selectable = sa_select(
331 all_responses.c.user_id.label("user_id"),
332 # number of requests received
333 func.count().label("requests"),
334 # percentage of requests responded to
335 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"),
336 func.avg(all_responses.c.response_time).label("avg_response_time"),
337 # the 33rd percentile response time
338 percentile_disc(0.33)
339 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
340 .label("response_time_33p"),
341 # the 66th percentile response time
342 percentile_disc(0.66)
343 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
344 .label("response_time_66p"),
345).group_by(all_responses.c.user_id)
347user_response_rates = create_materialized_view(
348 "user_response_rates",
349 user_response_rates_selectable,
350 Base.metadata,
351 [Index("uq_user_response_rates_id", user_response_rates_selectable.subquery().c.user_id, unique=True)],
352)
355class UserResponseRate(MatViewBase):
356 __table__ = user_response_rates
358 user_id: Mapped[int]
359 requests: Mapped[int]
360 response_rate: Mapped[float]
361 avg_response_time: Mapped[float]
362 response_time_33p: Mapped[timedelta]
363 response_time_66p: Mapped[timedelta]
366def refresh_materialized_views(payload: empty_pb2.Empty) -> None:
367 logger.info("Refreshing materialized views")
368 with session_scope() as session:
369 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
370 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
371 refresh_materialized_view(session, "clustered_users")
372 refresh_materialized_view(session, "user_response_rates", concurrently=True)
375def refresh_materialized_views_rapid(payload: empty_pb2.Empty) -> None:
376 logger.info("Refreshing materialized views (rapid)")
377 with session_scope() as session:
378 refresh_materialized_view(session, "lite_users", concurrently=True)