Coverage for src/couchers/materialized_views.py: 100%

54 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-12-20 18:03 +0000

1import logging 

2 

3from google.protobuf import empty_pb2 

4from sqlalchemy import Index, Integer, event 

5from sqlalchemy.sql import func, literal, literal_column, union_all 

6from sqlalchemy.sql import select as sa_select 

7from sqlalchemy_utils.view import ( 

8 CreateView, 

9 DropView, 

10 create_materialized_view, 

11 create_table_from_selectable, 

12 refresh_materialized_view, 

13) 

14 

15from couchers.db import session_scope 

16from couchers.models import Base, ClusterRole, ClusterSubscription, StrongVerificationAttempt, Upload, User 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21def create_materialized_view_with_different_ddl( 

22 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None 

23): 

24 """ 

25 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable 

26 

27 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

28 """ 

29 table = create_table_from_selectable( 

30 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

31 ) 

32 

33 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

34 

35 @event.listens_for(metadata, "after_create") 

36 def create_indexes(target, connection, **kw): 

37 for idx in table.indexes: 

38 idx.create(connection) 

39 

40 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

41 return table 

42 

43 

44cluster_subscription_counts_selectable = ( 

45 sa_select( 

46 ClusterSubscription.cluster_id.label("cluster_id"), 

47 func.count().label("count"), 

48 ) 

49 .select_from(ClusterSubscription) 

50 .outerjoin(User, User.id == ClusterSubscription.user_id) 

51 .where(User.is_visible) 

52 .group_by(ClusterSubscription.cluster_id) 

53) 

54 

55cluster_subscription_counts = create_materialized_view( 

56 "cluster_subscription_counts", 

57 cluster_subscription_counts_selectable, 

58 Base.metadata, 

59 [ 

60 Index( 

61 "uq_cluster_subscription_counts_cluster_id", 

62 cluster_subscription_counts_selectable.c.cluster_id, 

63 unique=True, 

64 ) 

65 ], 

66) 

67 

68cluster_admin_counts_selectable = ( 

69 sa_select( 

70 ClusterSubscription.cluster_id.label("cluster_id"), 

71 func.count().label("count"), 

72 ) 

73 .select_from(ClusterSubscription) 

74 .outerjoin(User, User.id == ClusterSubscription.user_id) 

75 .where(ClusterSubscription.role == ClusterRole.admin) 

76 .where(User.is_visible) 

77 .group_by(ClusterSubscription.cluster_id) 

78) 

79 

80cluster_admin_counts = create_materialized_view( 

81 "cluster_admin_counts", 

82 cluster_admin_counts_selectable, 

83 Base.metadata, 

84 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)], 

85) 

86 

87 

88def make_lite_users_selectable(create=False): 

89 if create: 

90 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

91 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

92 geom_column = literal_column("users.geom") 

93 else: 

94 geom_column = User.geom 

95 

96 strong_verification_subquery = ( 

97 sa_select(User.id, literal(True).label("true")) 

98 .select_from(StrongVerificationAttempt) 

99 .where(StrongVerificationAttempt.has_strong_verification(User)) 

100 .distinct() 

101 .subquery(name="sv_subquery") 

102 ) 

103 

104 return ( 

105 sa_select( 

106 User.id.label("id"), 

107 User.username.label("username"), 

108 User.name.label("name"), 

109 User.city.label("city"), 

110 User.age.label("age"), 

111 geom_column.label("geom"), 

112 User.geom_radius.label("radius"), 

113 User.is_visible.label("is_visible"), 

114 Upload.filename.label("avatar_filename"), 

115 User.has_completed_profile.label("has_completed_profile"), 

116 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"), 

117 ) 

118 .select_from(User) 

119 .outerjoin(Upload, Upload.key == User.avatar_key) 

120 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id) 

121 ) 

122 

123 

124lite_users_selectable_select = make_lite_users_selectable(create=False) 

125lite_users_selectable_create = make_lite_users_selectable(create=True) 

126 

127lite_users = create_materialized_view_with_different_ddl( 

128 "lite_users", 

129 lite_users_selectable_select, 

130 lite_users_selectable_create, 

131 Base.metadata, 

132 [ 

133 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True), 

134 Index( 

135 "uq_lite_users_id_visible", 

136 lite_users_selectable_create.c.id, 

137 postgresql_where=lite_users_selectable_create.c.is_visible, 

138 ), 

139 Index( 

140 "uq_lite_users_username_visible", 

141 lite_users_selectable_create.c.username, 

142 postgresql_where=lite_users_selectable_create.c.is_visible, 

143 ), 

144 ], 

145) 

146 

147 

148def make_clustered_users_selectable(create=False): 

149 # emits something along the lines of 

150 # WITH anon_1 AS ( 

151 # SELECT id, 

152 # geom, 

153 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

154 # FROM users 

155 # WHERE geom IS NOT NULL 

156 # ) 

157 

158 cluster_cte = ( 

159 sa_select( 

160 User.id, 

161 User.geom, 

162 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

163 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

164 ) 

165 .where(User.geom != None) 

166 .cte("clustered") 

167 ) 

168 

169 if create: 

170 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

171 cluster_geom = literal_column("clustered.geom") 

172 else: 

173 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

174 cluster_geom = cluster_cte.c.geom 

175 

176 clustered_users = ( 

177 sa_select(centroid_geom.label("geom"), func.count().label("count")) 

178 .select_from(cluster_cte) 

179 .where(cluster_cte.c.cluster_id != None) 

180 .group_by(cluster_cte.c.cluster_id) 

181 ) 

182 

183 isolated_users = ( 

184 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

185 .select_from(cluster_cte) 

186 .where(cluster_cte.c.cluster_id == None) 

187 ) 

188 

189 return union_all(clustered_users, isolated_users) 

190 

191 

192clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

193clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

194 

195clustered_users = create_materialized_view_with_different_ddl( 

196 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

197) 

198 

199 

200def refresh_materialized_views(payload: empty_pb2.Empty): 

201 logger.info("Refreshing materialized views") 

202 with session_scope() as session: 

203 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

204 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

205 refresh_materialized_view(session, "clustered_users") 

206 

207 

208def refresh_materialized_views_rapid(payload: empty_pb2.Empty): 

209 logger.info("Refreshing materialized views (rapid)") 

210 with session_scope() as session: 

211 refresh_materialized_view(session, "lite_users", concurrently=True)