Coverage for src/couchers/materialized_views.py: 99%
74 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1import logging
2from datetime import timedelta
4from google.protobuf import empty_pb2
5from sqlalchemy import Float, Index, Integer, event
6from sqlalchemy.sql import (
7 and_,
8 case,
9 cast,
10 func,
11 literal,
12 literal_column,
13 union_all,
14)
15from sqlalchemy.sql import select as sa_select
16from sqlalchemy.sql.functions import percentile_disc
17from sqlalchemy_utils.view import (
18 CreateView,
19 DropView,
20 create_materialized_view,
21 create_table_from_selectable,
22 refresh_materialized_view,
23)
25from couchers.db import session_scope
26from couchers.models import (
27 ActivenessProbe,
28 ActivenessProbeStatus,
29 Base,
30 ClusterRole,
31 ClusterSubscription,
32 HostRequest,
33 Message,
34 MessageType,
35 StrongVerificationAttempt,
36 Upload,
37 User,
38)
40logger = logging.getLogger(__name__)
43def create_materialized_view_with_different_ddl(
44 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None
45):
46 """
47 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable
49 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
50 """
51 table = create_table_from_selectable(
52 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
53 )
55 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
57 @event.listens_for(metadata, "after_create")
58 def create_indexes(target, connection, **kw):
59 for idx in table.indexes:
60 idx.create(connection)
62 event.listen(metadata, "before_drop", DropView(name, materialized=True))
63 return table
66cluster_subscription_counts_selectable = (
67 sa_select(
68 ClusterSubscription.cluster_id.label("cluster_id"),
69 func.count().label("count"),
70 )
71 .select_from(ClusterSubscription)
72 .outerjoin(User, User.id == ClusterSubscription.user_id)
73 .where(User.is_visible)
74 .group_by(ClusterSubscription.cluster_id)
75)
77cluster_subscription_counts = create_materialized_view(
78 "cluster_subscription_counts",
79 cluster_subscription_counts_selectable,
80 Base.metadata,
81 [
82 Index(
83 "uq_cluster_subscription_counts_cluster_id",
84 cluster_subscription_counts_selectable.c.cluster_id,
85 unique=True,
86 )
87 ],
88)
91class ClusterSubscriptionCount(Base):
92 __table__ = cluster_subscription_counts
95cluster_admin_counts_selectable = (
96 sa_select(
97 ClusterSubscription.cluster_id.label("cluster_id"),
98 func.count().label("count"),
99 )
100 .select_from(ClusterSubscription)
101 .outerjoin(User, User.id == ClusterSubscription.user_id)
102 .where(ClusterSubscription.role == ClusterRole.admin)
103 .where(User.is_visible)
104 .group_by(ClusterSubscription.cluster_id)
105)
107cluster_admin_counts = create_materialized_view(
108 "cluster_admin_counts",
109 cluster_admin_counts_selectable,
110 Base.metadata,
111 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)],
112)
115class ClusterAdminCount(Base):
116 __table__ = cluster_admin_counts
119def make_lite_users_selectable(create=False):
120 if create:
121 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
122 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
123 geom_column = literal_column("users.geom")
124 else:
125 geom_column = User.geom
127 strong_verification_subquery = (
128 sa_select(User.id, literal(True).label("true"))
129 .select_from(StrongVerificationAttempt)
130 .where(StrongVerificationAttempt.has_strong_verification(User))
131 .distinct()
132 .subquery(name="sv_subquery")
133 )
135 return (
136 sa_select(
137 User.id.label("id"),
138 User.username.label("username"),
139 User.name.label("name"),
140 User.city.label("city"),
141 User.age.label("age"),
142 geom_column.label("geom"),
143 User.geom_radius.label("radius"),
144 User.is_visible.label("is_visible"),
145 Upload.filename.label("avatar_filename"),
146 User.has_completed_profile.label("has_completed_profile"),
147 User.has_completed_my_home.label("has_completed_my_home"),
148 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"),
149 )
150 .select_from(User)
151 .outerjoin(Upload, Upload.key == User.avatar_key)
152 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id)
153 )
156lite_users_selectable_select = make_lite_users_selectable(create=False)
157lite_users_selectable_create = make_lite_users_selectable(create=True)
159lite_users = create_materialized_view_with_different_ddl(
160 "lite_users",
161 lite_users_selectable_select,
162 lite_users_selectable_create,
163 Base.metadata,
164 [
165 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True),
166 Index(
167 "ix_lite_users_id_visible",
168 lite_users_selectable_create.c.id,
169 postgresql_using="hash",
170 postgresql_where=lite_users_selectable_create.c.is_visible,
171 ),
172 Index(
173 "ix_lite_users_username_visible",
174 lite_users_selectable_create.c.username,
175 postgresql_using="hash",
176 postgresql_where=lite_users_selectable_create.c.is_visible,
177 ),
178 ],
179)
182class LiteUser(Base):
183 __table__ = lite_users
186def make_clustered_users_selectable(create=False):
187 # emits something along the lines of
188 # WITH anon_1 AS (
189 # SELECT id,
190 # geom,
191 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
192 # FROM users
193 # WHERE geom IS NOT NULL
194 # )
196 cluster_cte = (
197 sa_select(
198 User.id,
199 User.geom,
200 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
201 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
202 )
203 .where(User.is_visible)
204 .cte("clustered")
205 )
207 if create:
208 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
209 cluster_geom = literal_column("clustered.geom")
210 else:
211 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
212 cluster_geom = cluster_cte.c.geom
214 clustered_users = (
215 sa_select(centroid_geom.label("geom"), func.count().label("count"))
216 .select_from(cluster_cte)
217 .where(cluster_cte.c.cluster_id != None)
218 .group_by(cluster_cte.c.cluster_id)
219 )
221 isolated_users = (
222 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
223 .select_from(cluster_cte)
224 .where(cluster_cte.c.cluster_id == None)
225 )
227 return union_all(clustered_users, isolated_users)
230clustered_users_selectable_select = make_clustered_users_selectable(create=False)
231clustered_users_selectable_create = make_clustered_users_selectable(create=True)
233clustered_users = create_materialized_view_with_different_ddl(
234 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
235)
238class ClusteredUser(Base):
239 __table__ = clustered_users
242def float_(stmt):
243 return func.coalesce(cast(stmt, Float), 0.0)
246# this subquery gets the time that the request was sent
247t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery()
248# this subquery gets the time that the user responded to the request
249s = (
250 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
251 .group_by(Message.conversation_id, Message.author_id)
252 .subquery()
253)
254all_responses = union_all(
255 # host request responses
256 sa_select(
257 HostRequest.host_user_id.label("user_id"),
258 (s.c.time - t.c.time).label("response_time"),
259 )
260 .join(t, t.c.conversation_id == HostRequest.conversation_id)
261 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)),
262 # activeness probes
263 sa_select(
264 ActivenessProbe.user_id,
265 (
266 # expired probes have a responded time for when they were marked responded
267 case(
268 (
269 ActivenessProbe.response != ActivenessProbeStatus.expired,
270 ActivenessProbe.responded - ActivenessProbe.probe_initiated,
271 ),
272 else_=None,
273 )
274 ).label("response_time"),
275 ),
276).subquery()
278user_response_rates_selectable = sa_select(
279 all_responses.c.user_id.label("user_id"),
280 # number of requests received
281 func.count().label("requests"),
282 # percentage of requests responded to
283 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"),
284 func.avg(all_responses.c.response_time).label("avg_response_time"),
285 # the 33rd percentile response time
286 percentile_disc(0.33)
287 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
288 .label("response_time_33p"),
289 # the 66th percentile response time
290 percentile_disc(0.66)
291 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
292 .label("response_time_66p"),
293).group_by(all_responses.c.user_id)
295user_response_rates = create_materialized_view(
296 "user_response_rates",
297 user_response_rates_selectable,
298 Base.metadata,
299 [Index("uq_user_response_rates_id", user_response_rates_selectable.c.user_id, unique=True)],
300)
303class UserResponseRate(Base):
304 __table__ = user_response_rates
307def refresh_materialized_views(payload: empty_pb2.Empty):
308 logger.info("Refreshing materialized views")
309 with session_scope() as session:
310 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
311 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
312 refresh_materialized_view(session, "clustered_users")
313 refresh_materialized_view(session, "user_response_rates", concurrently=True)
316def refresh_materialized_views_rapid(payload: empty_pb2.Empty):
317 logger.info("Refreshing materialized views (rapid)")
318 with session_scope() as session:
319 refresh_materialized_view(session, "lite_users", concurrently=True)