Coverage for src/couchers/materialized_views.py: 100%
53 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
1import logging
3from google.protobuf import empty_pb2
4from sqlalchemy import Index, Integer, event
5from sqlalchemy.sql import func, literal, literal_column, union_all
6from sqlalchemy.sql import select as sa_select
7from sqlalchemy_utils.view import (
8 CreateView,
9 DropView,
10 create_materialized_view,
11 create_table_from_selectable,
12 refresh_materialized_view,
13)
15from couchers.db import session_scope
16from couchers.models import Base, ClusterRole, ClusterSubscription, Upload, User
18logger = logging.getLogger(__name__)
21def create_materialized_view_with_different_ddl(
22 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None
23):
24 """
25 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable
27 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124
28 """
29 table = create_table_from_selectable(
30 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases
31 )
33 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True))
35 @event.listens_for(metadata, "after_create")
36 def create_indexes(target, connection, **kw):
37 for idx in table.indexes:
38 idx.create(connection)
40 event.listen(metadata, "before_drop", DropView(name, materialized=True))
41 return table
44cluster_subscription_counts_selectable = (
45 sa_select(
46 ClusterSubscription.cluster_id.label("cluster_id"),
47 func.count().label("count"),
48 )
49 .select_from(ClusterSubscription)
50 .outerjoin(User, User.id == ClusterSubscription.user_id)
51 .where(User.is_visible)
52 .group_by(ClusterSubscription.cluster_id)
53)
55cluster_subscription_counts = create_materialized_view(
56 "cluster_subscription_counts",
57 cluster_subscription_counts_selectable,
58 Base.metadata,
59 [
60 Index(
61 "uq_cluster_subscription_counts_cluster_id",
62 cluster_subscription_counts_selectable.c.cluster_id,
63 unique=True,
64 )
65 ],
66)
68cluster_admin_counts_selectable = (
69 sa_select(
70 ClusterSubscription.cluster_id.label("cluster_id"),
71 func.count().label("count"),
72 )
73 .select_from(ClusterSubscription)
74 .outerjoin(User, User.id == ClusterSubscription.user_id)
75 .where(ClusterSubscription.role == ClusterRole.admin)
76 .where(User.is_visible)
77 .group_by(ClusterSubscription.cluster_id)
78)
80cluster_admin_counts = create_materialized_view(
81 "cluster_admin_counts",
82 cluster_admin_counts_selectable,
83 Base.metadata,
84 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)],
85)
88def make_lite_users_selectable(create=False):
89 if create:
90 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as
91 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it
92 geom_column = literal_column("users.geom")
93 else:
94 geom_column = User.geom
96 return (
97 sa_select(
98 User.id.label("id"),
99 User.username.label("username"),
100 User.name.label("name"),
101 User.city.label("city"),
102 User.age.label("age"),
103 geom_column.label("geom"),
104 User.geom_radius.label("radius"),
105 User.is_visible.label("is_visible"),
106 Upload.filename.label("avatar_filename"),
107 User.has_completed_profile.label("has_completed_profile"),
108 )
109 .select_from(User)
110 .outerjoin(Upload, Upload.key == User.avatar_key)
111 )
114lite_users_selectable_select = make_lite_users_selectable(create=False)
115lite_users_selectable_create = make_lite_users_selectable(create=True)
117lite_users = create_materialized_view_with_different_ddl(
118 "lite_users",
119 lite_users_selectable_select,
120 lite_users_selectable_create,
121 Base.metadata,
122 [Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True)],
123)
126def make_clustered_users_selectable(create=False):
127 # emits something along the lines of
128 # WITH anon_1 AS (
129 # SELECT id,
130 # geom,
131 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id
132 # FROM users
133 # WHERE geom IS NOT NULL
134 # )
136 cluster_cte = (
137 sa_select(
138 User.id,
139 User.geom,
140 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster
141 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"),
142 )
143 .where(User.geom != None)
144 .cte("clustered")
145 )
147 if create:
148 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))")
149 cluster_geom = literal_column("clustered.geom")
150 else:
151 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom))
152 cluster_geom = cluster_cte.c.geom
154 clustered_users = (
155 sa_select(centroid_geom.label("geom"), func.count().label("count"))
156 .select_from(cluster_cte)
157 .where(cluster_cte.c.cluster_id != None)
158 .group_by(cluster_cte.c.cluster_id)
159 )
161 isolated_users = (
162 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count"))
163 .select_from(cluster_cte)
164 .where(cluster_cte.c.cluster_id == None)
165 )
167 return union_all(clustered_users, isolated_users)
170clustered_users_selectable_select = make_clustered_users_selectable(create=False)
171clustered_users_selectable_create = make_clustered_users_selectable(create=True)
173clustered_users = create_materialized_view_with_different_ddl(
174 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata
175)
178def refresh_materialized_views(payload: empty_pb2.Empty):
179 logger.info("Refreshing materialized views")
180 with session_scope() as session:
181 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True)
182 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True)
183 refresh_materialized_view(session, "clustered_users")
186def refresh_materialized_views_rapid(payload: empty_pb2.Empty):
187 logger.info("Refreshing materialized views (rapid)")
188 with session_scope() as session:
189 refresh_materialized_view(session, "lite_users", concurrently=True)