Coverage for src/couchers/materialized_views.py: 98%
64 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
1import logging
2from datetime import timedelta
4from google.protobuf import empty_pb2
5from sqlalchemy import Float, Index, Integer, event
6from sqlalchemy.sql import (
7 and_,
8 case,
9 cast,
10 func,
11 literal,
12 literal_column,
13 union_all,
14)
15from sqlalchemy.sql import select as sa_select
16from sqlalchemy.sql.functions import percentile_disc
17from sqlalchemy_utils.view import (
18 CreateView,
19 DropView,
20 create_materialized_view,
21 create_table_from_selectable,
22 refresh_materialized_view,
23)
25from couchers.db import session_scope
26from couchers.models import (
27 ActivenessProbe,
28 ActivenessProbeStatus,
29 Base,
30 ClusterRole,
31 ClusterSubscription,
32 HostRequest,
33 Message,
34 MessageType,
35 StrongVerificationAttempt,
36 Upload,
37 User,
38)
40logger = logging.getLogger(__name__)
43def create_materialized_view_with_different_ddl(
44 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None
45):
46 """
47 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable
49 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
50 """
51 table = create_table_from_selectable(
52 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
53 )
55 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
57 @event.listens_for(metadata, "after_create")
58 def create_indexes(target, connection, **kw):
59 for idx in table.indexes:
60 idx.create(connection)
62 event.listen(metadata, "before_drop", DropView(name, materialized=True))
63 return table
66cluster_subscription_counts_selectable = (
67 sa_select(
68 ClusterSubscription.cluster_id.label("cluster_id"),
69 func.count().label("count"),
70 )
71 .select_from(ClusterSubscription)
72 .outerjoin(User, User.id == ClusterSubscription.user_id)
73 .where(User.is_visible)
74 .group_by(ClusterSubscription.cluster_id)
75)
77cluster_subscription_counts = create_materialized_view(
78 "cluster_subscription_counts",
79 cluster_subscription_counts_selectable,
80 Base.metadata,
81 [
82 Index(
83 "uq_cluster_subscription_counts_cluster_id",
84 cluster_subscription_counts_selectable.c.cluster_id,
85 unique=True,
86 )
87 ],
88)
90cluster_admin_counts_selectable = (
91 sa_select(
92 ClusterSubscription.cluster_id.label("cluster_id"),
93 func.count().label("count"),
94 )
95 .select_from(ClusterSubscription)
96 .outerjoin(User, User.id == ClusterSubscription.user_id)
97 .where(ClusterSubscription.role == ClusterRole.admin)
98 .where(User.is_visible)
99 .group_by(ClusterSubscription.cluster_id)
100)
102cluster_admin_counts = create_materialized_view(
103 "cluster_admin_counts",
104 cluster_admin_counts_selectable,
105 Base.metadata,
106 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)],
107)
110def make_lite_users_selectable(create=False):
111 if create:
112 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
113 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
114 geom_column = literal_column("users.geom")
115 else:
116 geom_column = User.geom
118 strong_verification_subquery = (
119 sa_select(User.id, literal(True).label("true"))
120 .select_from(StrongVerificationAttempt)
121 .where(StrongVerificationAttempt.has_strong_verification(User))
122 .distinct()
123 .subquery(name="sv_subquery")
124 )
126 return (
127 sa_select(
128 User.id.label("id"),
129 User.username.label("username"),
130 User.name.label("name"),
131 User.city.label("city"),
132 User.age.label("age"),
133 geom_column.label("geom"),
134 User.geom_radius.label("radius"),
135 User.is_visible.label("is_visible"),
136 Upload.filename.label("avatar_filename"),
137 User.has_completed_profile.label("has_completed_profile"),
138 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"),
139 )
140 .select_from(User)
141 .outerjoin(Upload, Upload.key == User.avatar_key)
142 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id)
143 )
146lite_users_selectable_select = make_lite_users_selectable(create=False)
147lite_users_selectable_create = make_lite_users_selectable(create=True)
149lite_users = create_materialized_view_with_different_ddl(
150 "lite_users",
151 lite_users_selectable_select,
152 lite_users_selectable_create,
153 Base.metadata,
154 [
155 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True),
156 Index(
157 "uq_lite_users_id_visible",
158 lite_users_selectable_create.c.id,
159 postgresql_where=lite_users_selectable_create.c.is_visible,
160 ),
161 Index(
162 "uq_lite_users_username_visible",
163 lite_users_selectable_create.c.username,
164 postgresql_where=lite_users_selectable_create.c.is_visible,
165 ),
166 ],
167)
170def make_clustered_users_selectable(create=False):
171 # emits something along the lines of
172 # WITH anon_1 AS (
173 # SELECT id,
174 # geom,
175 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
176 # FROM users
177 # WHERE geom IS NOT NULL
178 # )
180 cluster_cte = (
181 sa_select(
182 User.id,
183 User.geom,
184 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
185 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
186 )
187 .where(User.geom != None)
188 .cte("clustered")
189 )
191 if create:
192 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
193 cluster_geom = literal_column("clustered.geom")
194 else:
195 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
196 cluster_geom = cluster_cte.c.geom
198 clustered_users = (
199 sa_select(centroid_geom.label("geom"), func.count().label("count"))
200 .select_from(cluster_cte)
201 .where(cluster_cte.c.cluster_id != None)
202 .group_by(cluster_cte.c.cluster_id)
203 )
205 isolated_users = (
206 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
207 .select_from(cluster_cte)
208 .where(cluster_cte.c.cluster_id == None)
209 )
211 return union_all(clustered_users, isolated_users)
214clustered_users_selectable_select = make_clustered_users_selectable(create=False)
215clustered_users_selectable_create = make_clustered_users_selectable(create=True)
217clustered_users = create_materialized_view_with_different_ddl(
218 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
219)
222def float_(stmt):
223 return func.coalesce(cast(stmt, Float), 0.0)
226# this subquery gets the time that the request was sent
227t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery()
228# this subquery gets the time that the user responded to the request
229s = (
230 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time"))
231 .group_by(Message.conversation_id, Message.author_id)
232 .subquery()
233)
234all_responses = union_all(
235 # host request responses
236 sa_select(
237 HostRequest.host_user_id.label("user_id"),
238 (s.c.time - t.c.time).label("response_time"),
239 )
240 .join(t, t.c.conversation_id == HostRequest.conversation_id)
241 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)),
242 # activeness probes
243 sa_select(
244 ActivenessProbe.user_id,
245 (
246 # expired probes have a responded time for when they were marked responded
247 case(
248 (
249 ActivenessProbe.response != ActivenessProbeStatus.expired,
250 ActivenessProbe.responded - ActivenessProbe.probe_initiated,
251 ),
252 else_=None,
253 )
254 ).label("response_time"),
255 ),
256).subquery()
258user_response_rates_selectable = sa_select(
259 all_responses.c.user_id.label("user_id"),
260 # number of requests received
261 func.count().label("requests"),
262 # percentage of requests responded to
263 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"),
264 func.avg(all_responses.c.response_time).label("avg_response_time"),
265 # the 33rd percentile response time
266 percentile_disc(0.33)
267 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
268 .label("response_time_33p"),
269 # the 66th percentile response time
270 percentile_disc(0.66)
271 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000)))
272 .label("response_time_66p"),
273).group_by(all_responses.c.user_id)
275user_response_rates = create_materialized_view(
276 "user_response_rates",
277 user_response_rates_selectable,
278 Base.metadata,
279 [Index("uq_user_response_rates_id", user_response_rates_selectable.c.user_id, unique=True)],
280)
283def refresh_materialized_views(payload: empty_pb2.Empty):
284 logger.info("Refreshing materialized views")
285 with session_scope() as session:
286 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
287 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
288 refresh_materialized_view(session, "clustered_users")
289 refresh_materialized_view(session, "user_response_rates", concurrently=True)
292def refresh_materialized_views_rapid(payload: empty_pb2.Empty):
293 logger.info("Refreshing materialized views (rapid)")
294 with session_scope() as session:
295 refresh_materialized_view(session, "lite_users", concurrently=True)