Coverage for src/couchers/materialized_views.py: 100%
54 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1import logging
3from google.protobuf import empty_pb2
4from sqlalchemy import Index, Integer, event
5from sqlalchemy.sql import func, literal, literal_column, union_all
6from sqlalchemy.sql import select as sa_select
7from sqlalchemy_utils.view import (
8 CreateView,
9 DropView,
10 create_materialized_view,
11 create_table_from_selectable,
12 refresh_materialized_view,
13)
15from couchers.db import session_scope
16from couchers.models import Base, ClusterRole, ClusterSubscription, StrongVerificationAttempt, Upload, User
18logger = logging.getLogger(__name__)
21def create_materialized_view_with_different_ddl(
22 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None
23):
24 """
25 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable
27 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
28 """
29 table = create_table_from_selectable(
30 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
31 )
33 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
35 @event.listens_for(metadata, "after_create")
36 def create_indexes(target, connection, **kw):
37 for idx in table.indexes:
38 idx.create(connection)
40 event.listen(metadata, "before_drop", DropView(name, materialized=True))
41 return table
44cluster_subscription_counts_selectable = (
45 sa_select(
46 ClusterSubscription.cluster_id.label("cluster_id"),
47 func.count().label("count"),
48 )
49 .select_from(ClusterSubscription)
50 .outerjoin(User, User.id == ClusterSubscription.user_id)
51 .where(User.is_visible)
52 .group_by(ClusterSubscription.cluster_id)
53)
55cluster_subscription_counts = create_materialized_view(
56 "cluster_subscription_counts",
57 cluster_subscription_counts_selectable,
58 Base.metadata,
59 [
60 Index(
61 "uq_cluster_subscription_counts_cluster_id",
62 cluster_subscription_counts_selectable.c.cluster_id,
63 unique=True,
64 )
65 ],
66)
68cluster_admin_counts_selectable = (
69 sa_select(
70 ClusterSubscription.cluster_id.label("cluster_id"),
71 func.count().label("count"),
72 )
73 .select_from(ClusterSubscription)
74 .outerjoin(User, User.id == ClusterSubscription.user_id)
75 .where(ClusterSubscription.role == ClusterRole.admin)
76 .where(User.is_visible)
77 .group_by(ClusterSubscription.cluster_id)
78)
80cluster_admin_counts = create_materialized_view(
81 "cluster_admin_counts",
82 cluster_admin_counts_selectable,
83 Base.metadata,
84 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)],
85)
88def make_lite_users_selectable(create=False):
89 if create:
90 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
91 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
92 geom_column = literal_column("users.geom")
93 else:
94 geom_column = User.geom
96 strong_verification_subquery = (
97 sa_select(User.id, literal(True).label("true"))
98 .select_from(StrongVerificationAttempt)
99 .where(StrongVerificationAttempt.has_strong_verification(User))
100 .distinct()
101 .subquery(name="sv_subquery")
102 )
104 return (
105 sa_select(
106 User.id.label("id"),
107 User.username.label("username"),
108 User.name.label("name"),
109 User.city.label("city"),
110 User.age.label("age"),
111 geom_column.label("geom"),
112 User.geom_radius.label("radius"),
113 User.is_visible.label("is_visible"),
114 Upload.filename.label("avatar_filename"),
115 User.has_completed_profile.label("has_completed_profile"),
116 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"),
117 )
118 .select_from(User)
119 .outerjoin(Upload, Upload.key == User.avatar_key)
120 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id)
121 )
124lite_users_selectable_select = make_lite_users_selectable(create=False)
125lite_users_selectable_create = make_lite_users_selectable(create=True)
127lite_users = create_materialized_view_with_different_ddl(
128 "lite_users",
129 lite_users_selectable_select,
130 lite_users_selectable_create,
131 Base.metadata,
132 [
133 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True),
134 Index(
135 "uq_lite_users_id_visible",
136 lite_users_selectable_create.c.id,
137 postgresql_where=lite_users_selectable_create.c.is_visible,
138 ),
139 Index(
140 "uq_lite_users_username_visible",
141 lite_users_selectable_create.c.username,
142 postgresql_where=lite_users_selectable_create.c.is_visible,
143 ),
144 ],
145)
148def make_clustered_users_selectable(create=False):
149 # emits something along the lines of
150 # WITH anon_1 AS (
151 # SELECT id,
152 # geom,
153 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
154 # FROM users
155 # WHERE geom IS NOT NULL
156 # )
158 cluster_cte = (
159 sa_select(
160 User.id,
161 User.geom,
162 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
163 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
164 )
165 .where(User.geom != None)
166 .cte("clustered")
167 )
169 if create:
170 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
171 cluster_geom = literal_column("clustered.geom")
172 else:
173 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
174 cluster_geom = cluster_cte.c.geom
176 clustered_users = (
177 sa_select(centroid_geom.label("geom"), func.count().label("count"))
178 .select_from(cluster_cte)
179 .where(cluster_cte.c.cluster_id != None)
180 .group_by(cluster_cte.c.cluster_id)
181 )
183 isolated_users = (
184 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
185 .select_from(cluster_cte)
186 .where(cluster_cte.c.cluster_id == None)
187 )
189 return union_all(clustered_users, isolated_users)
192clustered_users_selectable_select = make_clustered_users_selectable(create=False)
193clustered_users_selectable_create = make_clustered_users_selectable(create=True)
195clustered_users = create_materialized_view_with_different_ddl(
196 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
197)
200def refresh_materialized_views(payload: empty_pb2.Empty):
201 logger.info("Refreshing materialized views")
202 with session_scope() as session:
203 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
204 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
205 refresh_materialized_view(session, "clustered_users")
208def refresh_materialized_views_rapid(payload: empty_pb2.Empty):
209 logger.info("Refreshing materialized views (rapid)")
210 with session_scope() as session:
211 refresh_materialized_view(session, "lite_users", concurrently=True)