Coverage for src/couchers/materialized_views.py: 99%
90 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-08 06:07 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-08 06:07 +0000
1import logging
2import typing
3from collections.abc import Sequence
4from datetime import timedelta
5from typing import Any
7from google.protobuf import empty_pb2
8from sqlalchemy import Column, CompoundSelect, Connection, Float, Index, Integer, MetaData, Select, Table, event
9from sqlalchemy.sql import (
10 and_,
11 case,
12 cast,
13 func,
14 literal,
15 literal_column,
16 union_all,
17)
18from sqlalchemy.sql import select as sa_select
19from sqlalchemy.sql.functions import percentile_disc
20from sqlalchemy_utils.view import (
21 CreateView,
22 DropView,
23 create_materialized_view,
24 create_table_from_selectable,
25 refresh_materialized_view,
26)
28from couchers.db import session_scope
29from couchers.models import (
30 ActivenessProbe,
31 ActivenessProbeStatus,
32 Base,
33 ClusterRole,
34 ClusterSubscription,
35 Geom,
36 HostRequest,
37 Message,
38 MessageType,
39 StrongVerificationAttempt,
40 Upload,
41 User,
42)
44logger = logging.getLogger(__name__)
47def create_materialized_view_with_different_ddl(
48 name: str,
49 select_selectable: Select[Any] | CompoundSelect[Any],
50 create_selectable: Select[Any] | CompoundSelect[Any],
51 metadata: MetaData,
52 indexes: Sequence[Index] | None = None,
53 aliases: dict[str, str] | None = None,
54) -> Table:
55 """
56 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable
58 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
59 """
60 table = create_table_from_selectable(
61 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
62 )
64 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
66 @event.listens_for(metadata, "after_create")
67 def create_indexes(target: Any, connection: Connection, **kw: Any) -> None:
68 for idx in table.indexes:
69 idx.create(connection)
71 event.listen(metadata, "before_drop", DropView(name, materialized=True))
72 return typing.cast(Table, table)
75cluster_subscription_counts_selectable = (
76 sa_select(
77 ClusterSubscription.cluster_id.label("cluster_id"),
78 func.count().label("count"),
79 )
80 .select_from(ClusterSubscription)
81 .outerjoin(User, User.id == ClusterSubscription.user_id)
82 .where(User.is_visible)
83 .group_by(ClusterSubscription.cluster_id)
84)
86cluster_subscription_counts = create_materialized_view(
87 "cluster_subscription_counts",
88 cluster_subscription_counts_selectable,
89 Base.metadata,
90 [
91 Index(
92 "uq_cluster_subscription_counts_cluster_id",
93 cluster_subscription_counts_selectable.c.cluster_id,
94 unique=True,
95 )
96 ],
97)
100class ClusterSubscriptionCount(Base):
101 __table__ = cluster_subscription_counts
104cluster_admin_counts_selectable = (
105 sa_select(
106 ClusterSubscription.cluster_id.label("cluster_id"),
107 func.count().label("count"),
108 )
109 .select_from(ClusterSubscription)
110 .outerjoin(User, User.id == ClusterSubscription.user_id)
111 .where(ClusterSubscription.role == ClusterRole.admin)
112 .where(User.is_visible)
113 .group_by(ClusterSubscription.cluster_id)
114)
116cluster_admin_counts = create_materialized_view(
117 "cluster_admin_counts",
118 cluster_admin_counts_selectable,
119 Base.metadata,
120 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)],
121)
124class ClusterAdminCount(Base):
125 __table__ = cluster_admin_counts
128def make_lite_users_selectable(create: bool = False) -> Select[Any]:
129 if create:
130 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
131 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
132 geom_column: Any = literal_column("users.geom")
133 else:
134 geom_column = User.geom
136 strong_verification_subquery = (
137 sa_select(User.id, literal(True).label("true"))
138 .select_from(StrongVerificationAttempt)
139 .where(StrongVerificationAttempt.has_strong_verification(User))
140 .distinct()
141 .subquery(name="sv_subquery")
142 )
144 # Be sure to modify the LiteUser type if you add/remove columns!
145 return (
146 sa_select(
147 User.id.label("id"),
148 User.username.label("username"),
149 User.name.label("name"),
150 User.city.label("city"),
151 User.age.label("age"),
152 geom_column.label("geom"),
153 User.geom_radius.label("radius"),
154 User.is_visible.label("is_visible"), # type: ignore[attr-defined]
155 Upload.filename.label("avatar_filename"),
156 User.has_completed_profile.label("has_completed_profile"), # type: ignore[attr-defined]
157 User.has_completed_my_home.label("has_completed_my_home"), # type: ignore[attr-defined]
158 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"),
159 )
160 .select_from(User)
161 .outerjoin(Upload, Upload.key == User.avatar_key)
162 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id)
163 )
166lite_users_selectable_select = make_lite_users_selectable(create=False)
167lite_users_selectable_create = make_lite_users_selectable(create=True)
169lite_users = create_materialized_view_with_different_ddl(
170 "lite_users",
171 lite_users_selectable_select,
172 lite_users_selectable_create,
173 Base.metadata,
174 [
175 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True),
176 Index("uq_lite_users_username", lite_users_selectable_create.c.username, unique=True),
177 Index(
178 "ix_lite_users_id_visible",
179 lite_users_selectable_create.c.id,
180 postgresql_using="hash",
181 postgresql_where=lite_users_selectable_create.c.is_visible,
182 ),
183 Index(
184 "ix_lite_users_username_visible",
185 lite_users_selectable_create.c.username,
186 postgresql_using="hash",
187 postgresql_where=lite_users_selectable_create.c.is_visible,
188 ),
189 ],
190)
193class LiteUser(Base):
194 __table__ = lite_users
196 # to allow type annotations without affecting SQLAlchemy
197 __allow_unmapped__ = True
199 # A subset enough to make mypy happy. Taken from "make_lite_users_selectable".
200 id: Column[int]
201 username: Column[str]
202 name: Column[str]
203 city: Column[str]
204 age: Column[int]
205 geom: Column[Geom]
206 radius: Column[float]
207 is_visible: Column[bool]
208 avatar_filename: Column[str]
209 has_completed_profile: Column[bool]
210 has_completed_my_home: Column[bool]
211 has_strong_verification: Column[bool]
214def make_clustered_users_selectable(create: bool = False) -> CompoundSelect[Any]:
215 # emits something along the lines of
216 # WITH anon_1 AS (
217 # SELECT id,
218 # geom,
219 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
220 # FROM users
221 # WHERE geom IS NOT NULL
222 # )
224 cluster_cte = (
225 sa_select(
226 User.id,
227 User.geom,
228 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
229 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
230 )
231 .where(User.is_visible)
232 .cte("clustered")
233 )
235 if create:
236 centroid_geom: Any = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
237 cluster_geom: Any = literal_column("clustered.geom")
238 else:
239 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
240 cluster_geom = cluster_cte.c.geom
242 clustered_users = (
243 sa_select(centroid_geom.label("geom"), func.count().label("count"))
244 .select_from(cluster_cte)
245 .where(cluster_cte.c.cluster_id != None)
246 .group_by(cluster_cte.c.cluster_id)
247 )
249 isolated_users = (
250 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
251 .select_from(cluster_cte)
252 .where(cluster_cte.c.cluster_id == None)
253 )
255 return union_all(clustered_users, isolated_users)
258clustered_users_selectable_select = make_clustered_users_selectable(create=False)
259clustered_users_selectable_create = make_clustered_users_selectable(create=True)
261clustered_users = create_materialized_view_with_different_ddl(
262 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
263)
266class ClusteredUser(Base):
267 __table__ = clustered_users
270def float_(stmt: Any) -> Any:
271 return func.coalesce(cast(stmt, Float), 0.0)
274# this subquery gets the time that the request was sent
275t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery()
276# this subquery gets the time that the user responded to the request
277s = (
278 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
279 .group_by(Message.conversation_id, Message.author_id)
280 .subquery()
281)
282all_responses = union_all(
283 # host request responses
284 sa_select(
285 HostRequest.host_user_id.label("user_id"),
286 (s.c.time - t.c.time).label("response_time"),
287 )
288 .join(t, t.c.conversation_id == HostRequest.conversation_id)
289 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)),
290 # activeness probes
291 sa_select(
292 ActivenessProbe.user_id,
293 (
294 # expired probes have a responded time for when they were marked responded
295 case(
296 (
297 ActivenessProbe.response != ActivenessProbeStatus.expired,
298 ActivenessProbe.responded - ActivenessProbe.probe_initiated,
299 ),
300 else_=None,
301 )
302 ).label("response_time"),
303 ),
304).subquery()
306user_response_rates_selectable = sa_select(
307 all_responses.c.user_id.label("user_id"),
308 # number of requests received
309 func.count().label("requests"),
310 # percentage of requests responded to
311 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"),
312 func.avg(all_responses.c.response_time).label("avg_response_time"),
313 # the 33rd percentile response time
314 percentile_disc(0.33)
315 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
316 .label("response_time_33p"),
317 # the 66th percentile response time
318 percentile_disc(0.66)
319 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
320 .label("response_time_66p"),
321).group_by(all_responses.c.user_id)
323user_response_rates = create_materialized_view(
324 "user_response_rates",
325 user_response_rates_selectable,
326 Base.metadata,
327 [Index("uq_user_response_rates_id", user_response_rates_selectable.c.user_id, unique=True)],
328)
331class UserResponseRate(Base):
332 __table__ = user_response_rates
335def refresh_materialized_views(payload: empty_pb2.Empty) -> None:
336 logger.info("Refreshing materialized views")
337 with session_scope() as session:
338 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
339 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
340 refresh_materialized_view(session, "clustered_users")
341 refresh_materialized_view(session, "user_response_rates", concurrently=True)
344def refresh_materialized_views_rapid(payload: empty_pb2.Empty) -> None:
345 logger.info("Refreshing materialized views (rapid)")
346 with session_scope() as session:
347 refresh_materialized_view(session, "lite_users", concurrently=True)