Coverage for src/couchers/materialized_views.py: 98%
64 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import logging
2from datetime import timedelta
4from google.protobuf import empty_pb2
5from sqlalchemy import Float, Index, Integer, event
6from sqlalchemy.sql import (
7 and_,
8 case,
9 cast,
10 func,
11 literal,
12 literal_column,
13 union_all,
14)
15from sqlalchemy.sql import select as sa_select
16from sqlalchemy.sql.functions import percentile_disc
17from sqlalchemy_utils.view import (
18 CreateView,
19 DropView,
20 create_materialized_view,
21 create_table_from_selectable,
22 refresh_materialized_view,
23)
25from couchers.db import session_scope
26from couchers.models import (
27 ActivenessProbe,
28 ActivenessProbeStatus,
29 Base,
30 ClusterRole,
31 ClusterSubscription,
32 HostRequest,
33 Message,
34 MessageType,
35 StrongVerificationAttempt,
36 Upload,
37 User,
38)
40logger = logging.getLogger(__name__)
43def create_materialized_view_with_different_ddl(
44 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None
45):
46 """
47 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable
49 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
50 """
51 table = create_table_from_selectable(
52 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
53 )
55 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
57 @event.listens_for(metadata, "after_create")
58 def create_indexes(target, connection, **kw):
59 for idx in table.indexes:
60 idx.create(connection)
62 event.listen(metadata, "before_drop", DropView(name, materialized=True))
63 return table
66cluster_subscription_counts_selectable = (
67 sa_select(
68 ClusterSubscription.cluster_id.label("cluster_id"),
69 func.count().label("count"),
70 )
71 .select_from(ClusterSubscription)
72 .outerjoin(User, User.id == ClusterSubscription.user_id)
73 .where(User.is_visible)
74 .group_by(ClusterSubscription.cluster_id)
75)
77cluster_subscription_counts = create_materialized_view(
78 "cluster_subscription_counts",
79 cluster_subscription_counts_selectable,
80 Base.metadata,
81 [
82 Index(
83 "uq_cluster_subscription_counts_cluster_id",
84 cluster_subscription_counts_selectable.c.cluster_id,
85 unique=True,
86 )
87 ],
88)
90cluster_admin_counts_selectable = (
91 sa_select(
92 ClusterSubscription.cluster_id.label("cluster_id"),
93 func.count().label("count"),
94 )
95 .select_from(ClusterSubscription)
96 .outerjoin(User, User.id == ClusterSubscription.user_id)
97 .where(ClusterSubscription.role == ClusterRole.admin)
98 .where(User.is_visible)
99 .group_by(ClusterSubscription.cluster_id)
100)
102cluster_admin_counts = create_materialized_view(
103 "cluster_admin_counts",
104 cluster_admin_counts_selectable,
105 Base.metadata,
106 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)],
107)
110def make_lite_users_selectable(create=False):
111 if create:
112 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
113 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
114 geom_column = literal_column("users.geom")
115 else:
116 geom_column = User.geom
118 strong_verification_subquery = (
119 sa_select(User.id, literal(True).label("true"))
120 .select_from(StrongVerificationAttempt)
121 .where(StrongVerificationAttempt.has_strong_verification(User))
122 .distinct()
123 .subquery(name="sv_subquery")
124 )
126 return (
127 sa_select(
128 User.id.label("id"),
129 User.username.label("username"),
130 User.name.label("name"),
131 User.city.label("city"),
132 User.age.label("age"),
133 geom_column.label("geom"),
134 User.geom_radius.label("radius"),
135 User.is_visible.label("is_visible"),
136 Upload.filename.label("avatar_filename"),
137 User.has_completed_profile.label("has_completed_profile"),
138 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"),
139 )
140 .select_from(User)
141 .outerjoin(Upload, Upload.key == User.avatar_key)
142 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id)
143 )
146lite_users_selectable_select = make_lite_users_selectable(create=False)
147lite_users_selectable_create = make_lite_users_selectable(create=True)
149lite_users = create_materialized_view_with_different_ddl(
150 "lite_users",
151 lite_users_selectable_select,
152 lite_users_selectable_create,
153 Base.metadata,
154 [
155 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True),
156 Index(
157 "ix_lite_users_id_visible",
158 lite_users_selectable_create.c.id,
159 postgresql_using="hash",
160 postgresql_where=lite_users_selectable_create.c.is_visible,
161 ),
162 Index(
163 "ix_lite_users_username_visible",
164 lite_users_selectable_create.c.username,
165 postgresql_using="hash",
166 postgresql_where=lite_users_selectable_create.c.is_visible,
167 ),
168 ],
169)
172def make_clustered_users_selectable(create=False):
173 # emits something along the lines of
174 # WITH anon_1 AS (
175 # SELECT id,
176 # geom,
177 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
178 # FROM users
179 # WHERE geom IS NOT NULL
180 # )
182 cluster_cte = (
183 sa_select(
184 User.id,
185 User.geom,
186 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
187 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
188 )
189 .where(User.geom != None)
190 .cte("clustered")
191 )
193 if create:
194 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
195 cluster_geom = literal_column("clustered.geom")
196 else:
197 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
198 cluster_geom = cluster_cte.c.geom
200 clustered_users = (
201 sa_select(centroid_geom.label("geom"), func.count().label("count"))
202 .select_from(cluster_cte)
203 .where(cluster_cte.c.cluster_id != None)
204 .group_by(cluster_cte.c.cluster_id)
205 )
207 isolated_users = (
208 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
209 .select_from(cluster_cte)
210 .where(cluster_cte.c.cluster_id == None)
211 )
213 return union_all(clustered_users, isolated_users)
216clustered_users_selectable_select = make_clustered_users_selectable(create=False)
217clustered_users_selectable_create = make_clustered_users_selectable(create=True)
219clustered_users = create_materialized_view_with_different_ddl(
220 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
221)
224def float_(stmt):
225 return func.coalesce(cast(stmt, Float), 0.0)
228# this subquery gets the time that the request was sent
229t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery()
230# this subquery gets the time that the user responded to the request
231s = (
232 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
233 .group_by(Message.conversation_id, Message.author_id)
234 .subquery()
235)
236all_responses = union_all(
237 # host request responses
238 sa_select(
239 HostRequest.host_user_id.label("user_id"),
240 (s.c.time - t.c.time).label("response_time"),
241 )
242 .join(t, t.c.conversation_id == HostRequest.conversation_id)
243 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)),
244 # activeness probes
245 sa_select(
246 ActivenessProbe.user_id,
247 (
248 # expired probes have a responded time for when they were marked responded
249 case(
250 (
251 ActivenessProbe.response != ActivenessProbeStatus.expired,
252 ActivenessProbe.responded - ActivenessProbe.probe_initiated,
253 ),
254 else_=None,
255 )
256 ).label("response_time"),
257 ),
258).subquery()
260user_response_rates_selectable = sa_select(
261 all_responses.c.user_id.label("user_id"),
262 # number of requests received
263 func.count().label("requests"),
264 # percentage of requests responded to
265 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"),
266 func.avg(all_responses.c.response_time).label("avg_response_time"),
267 # the 33rd percentile response time
268 percentile_disc(0.33)
269 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
270 .label("response_time_33p"),
271 # the 66th percentile response time
272 percentile_disc(0.66)
273 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
274 .label("response_time_66p"),
275).group_by(all_responses.c.user_id)
277user_response_rates = create_materialized_view(
278 "user_response_rates",
279 user_response_rates_selectable,
280 Base.metadata,
281 [Index("uq_user_response_rates_id", user_response_rates_selectable.c.user_id, unique=True)],
282)
285def refresh_materialized_views(payload: empty_pb2.Empty):
286 logger.info("Refreshing materialized views")
287 with session_scope() as session:
288 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
289 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
290 refresh_materialized_view(session, "clustered_users")
291 refresh_materialized_view(session, "user_response_rates", concurrently=True)
294def refresh_materialized_views_rapid(payload: empty_pb2.Empty):
295 logger.info("Refreshing materialized views (rapid)")
296 with session_scope() as session:
297 refresh_materialized_view(session, "lite_users", concurrently=True)