Coverage for src/couchers/materialized_views.py: 100%

53 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-11-04 02:51 +0000

1import logging 

2 

3from google.protobuf import empty_pb2 

4from sqlalchemy import Index, Integer, event 

5from sqlalchemy.sql import func, literal, literal_column, union_all 

6from sqlalchemy.sql import select as sa_select 

7from sqlalchemy_utils.view import ( 

8 CreateView, 

9 DropView, 

10 create_materialized_view, 

11 create_table_from_selectable, 

12 refresh_materialized_view, 

13) 

14 

15from couchers.db import session_scope 

16from couchers.models import Base, ClusterRole, ClusterSubscription, Upload, User 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21def create_materialized_view_with_different_ddl( 

22 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None 

23): 

24 """ 

25 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable 

26 

27 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

28 """ 

29 table = create_table_from_selectable( 

30 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

31 ) 

32 

33 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

34 

35 @event.listens_for(metadata, "after_create") 

36 def create_indexes(target, connection, **kw): 

37 for idx in table.indexes: 

38 idx.create(connection) 

39 

40 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

41 return table 

42 

43 

44cluster_subscription_counts_selectable = ( 

45 sa_select( 

46 ClusterSubscription.cluster_id.label("cluster_id"), 

47 func.count().label("count"), 

48 ) 

49 .select_from(ClusterSubscription) 

50 .outerjoin(User, User.id == ClusterSubscription.user_id) 

51 .where(User.is_visible) 

52 .group_by(ClusterSubscription.cluster_id) 

53) 

54 

55cluster_subscription_counts = create_materialized_view( 

56 "cluster_subscription_counts", 

57 cluster_subscription_counts_selectable, 

58 Base.metadata, 

59 [ 

60 Index( 

61 "uq_cluster_subscription_counts_cluster_id", 

62 cluster_subscription_counts_selectable.c.cluster_id, 

63 unique=True, 

64 ) 

65 ], 

66) 

67 

68cluster_admin_counts_selectable = ( 

69 sa_select( 

70 ClusterSubscription.cluster_id.label("cluster_id"), 

71 func.count().label("count"), 

72 ) 

73 .select_from(ClusterSubscription) 

74 .outerjoin(User, User.id == ClusterSubscription.user_id) 

75 .where(ClusterSubscription.role == ClusterRole.admin) 

76 .where(User.is_visible) 

77 .group_by(ClusterSubscription.cluster_id) 

78) 

79 

80cluster_admin_counts = create_materialized_view( 

81 "cluster_admin_counts", 

82 cluster_admin_counts_selectable, 

83 Base.metadata, 

84 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)], 

85) 

86 

87 

88def make_lite_users_selectable(create=False): 

89 if create: 

90 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

91 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

92 geom_column = literal_column("users.geom") 

93 else: 

94 geom_column = User.geom 

95 

96 return ( 

97 sa_select( 

98 User.id.label("id"), 

99 User.username.label("username"), 

100 User.name.label("name"), 

101 User.city.label("city"), 

102 User.age.label("age"), 

103 geom_column.label("geom"), 

104 User.geom_radius.label("radius"), 

105 User.is_visible.label("is_visible"), 

106 Upload.filename.label("avatar_filename"), 

107 User.has_completed_profile.label("has_completed_profile"), 

108 ) 

109 .select_from(User) 

110 .outerjoin(Upload, Upload.key == User.avatar_key) 

111 ) 

112 

113 

114lite_users_selectable_select = make_lite_users_selectable(create=False) 

115lite_users_selectable_create = make_lite_users_selectable(create=True) 

116 

117lite_users = create_materialized_view_with_different_ddl( 

118 "lite_users", 

119 lite_users_selectable_select, 

120 lite_users_selectable_create, 

121 Base.metadata, 

122 [Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True)], 

123) 

124 

125 

126def make_clustered_users_selectable(create=False): 

127 # emits something along the lines of 

128 # WITH anon_1 AS ( 

129 # SELECT id, 

130 # geom, 

131 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

132 # FROM users 

133 # WHERE geom IS NOT NULL 

134 # ) 

135 

136 cluster_cte = ( 

137 sa_select( 

138 User.id, 

139 User.geom, 

140 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

141 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

142 ) 

143 .where(User.geom != None) 

144 .cte("clustered") 

145 ) 

146 

147 if create: 

148 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

149 cluster_geom = literal_column("clustered.geom") 

150 else: 

151 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

152 cluster_geom = cluster_cte.c.geom 

153 

154 clustered_users = ( 

155 sa_select(centroid_geom.label("geom"), func.count().label("count")) 

156 .select_from(cluster_cte) 

157 .where(cluster_cte.c.cluster_id != None) 

158 .group_by(cluster_cte.c.cluster_id) 

159 ) 

160 

161 isolated_users = ( 

162 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

163 .select_from(cluster_cte) 

164 .where(cluster_cte.c.cluster_id == None) 

165 ) 

166 

167 return union_all(clustered_users, isolated_users) 

168 

169 

170clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

171clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

172 

173clustered_users = create_materialized_view_with_different_ddl( 

174 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

175) 

176 

177 

178def refresh_materialized_views(payload: empty_pb2.Empty): 

179 logger.info("Refreshing materialized views") 

180 with session_scope() as session: 

181 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

182 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

183 refresh_materialized_view(session, "clustered_users") 

184 

185 

186def refresh_materialized_views_rapid(payload: empty_pb2.Empty): 

187 logger.info("Refreshing materialized views (rapid)") 

188 with session_scope() as session: 

189 refresh_materialized_view(session, "lite_users", concurrently=True)