Coverage for src/couchers/materialized_views.py: 98%

64 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-04-16 15:13 +0000

1import logging 

2from datetime import timedelta 

3 

4from google.protobuf import empty_pb2 

5from sqlalchemy import Float, Index, Integer, event 

6from sqlalchemy.sql import ( 

7 and_, 

8 case, 

9 cast, 

10 func, 

11 literal, 

12 literal_column, 

13 union_all, 

14) 

15from sqlalchemy.sql import select as sa_select 

16from sqlalchemy.sql.functions import percentile_disc 

17from sqlalchemy_utils.view import ( 

18 CreateView, 

19 DropView, 

20 create_materialized_view, 

21 create_table_from_selectable, 

22 refresh_materialized_view, 

23) 

24 

25from couchers.db import session_scope 

26from couchers.models import ( 

27 ActivenessProbe, 

28 ActivenessProbeStatus, 

29 Base, 

30 ClusterRole, 

31 ClusterSubscription, 

32 HostRequest, 

33 Message, 

34 MessageType, 

35 StrongVerificationAttempt, 

36 Upload, 

37 User, 

38) 

39 

40logger = logging.getLogger(__name__) 

41 

42 

43def create_materialized_view_with_different_ddl( 

44 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None 

45): 

46 """ 

47 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable 

48 

49 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

50 """ 

51 table = create_table_from_selectable( 

52 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

53 ) 

54 

55 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

56 

57 @event.listens_for(metadata, "after_create") 

58 def create_indexes(target, connection, **kw): 

59 for idx in table.indexes: 

60 idx.create(connection) 

61 

62 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

63 return table 

64 

65 

66cluster_subscription_counts_selectable = ( 

67 sa_select( 

68 ClusterSubscription.cluster_id.label("cluster_id"), 

69 func.count().label("count"), 

70 ) 

71 .select_from(ClusterSubscription) 

72 .outerjoin(User, User.id == ClusterSubscription.user_id) 

73 .where(User.is_visible) 

74 .group_by(ClusterSubscription.cluster_id) 

75) 

76 

77cluster_subscription_counts = create_materialized_view( 

78 "cluster_subscription_counts", 

79 cluster_subscription_counts_selectable, 

80 Base.metadata, 

81 [ 

82 Index( 

83 "uq_cluster_subscription_counts_cluster_id", 

84 cluster_subscription_counts_selectable.c.cluster_id, 

85 unique=True, 

86 ) 

87 ], 

88) 

89 

90cluster_admin_counts_selectable = ( 

91 sa_select( 

92 ClusterSubscription.cluster_id.label("cluster_id"), 

93 func.count().label("count"), 

94 ) 

95 .select_from(ClusterSubscription) 

96 .outerjoin(User, User.id == ClusterSubscription.user_id) 

97 .where(ClusterSubscription.role == ClusterRole.admin) 

98 .where(User.is_visible) 

99 .group_by(ClusterSubscription.cluster_id) 

100) 

101 

102cluster_admin_counts = create_materialized_view( 

103 "cluster_admin_counts", 

104 cluster_admin_counts_selectable, 

105 Base.metadata, 

106 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)], 

107) 

108 

109 

110def make_lite_users_selectable(create=False): 

111 if create: 

112 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

113 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

114 geom_column = literal_column("users.geom") 

115 else: 

116 geom_column = User.geom 

117 

118 strong_verification_subquery = ( 

119 sa_select(User.id, literal(True).label("true")) 

120 .select_from(StrongVerificationAttempt) 

121 .where(StrongVerificationAttempt.has_strong_verification(User)) 

122 .distinct() 

123 .subquery(name="sv_subquery") 

124 ) 

125 

126 return ( 

127 sa_select( 

128 User.id.label("id"), 

129 User.username.label("username"), 

130 User.name.label("name"), 

131 User.city.label("city"), 

132 User.age.label("age"), 

133 geom_column.label("geom"), 

134 User.geom_radius.label("radius"), 

135 User.is_visible.label("is_visible"), 

136 Upload.filename.label("avatar_filename"), 

137 User.has_completed_profile.label("has_completed_profile"), 

138 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"), 

139 ) 

140 .select_from(User) 

141 .outerjoin(Upload, Upload.key == User.avatar_key) 

142 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id) 

143 ) 

144 

145 

146lite_users_selectable_select = make_lite_users_selectable(create=False) 

147lite_users_selectable_create = make_lite_users_selectable(create=True) 

148 

149lite_users = create_materialized_view_with_different_ddl( 

150 "lite_users", 

151 lite_users_selectable_select, 

152 lite_users_selectable_create, 

153 Base.metadata, 

154 [ 

155 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True), 

156 Index( 

157 "uq_lite_users_id_visible", 

158 lite_users_selectable_create.c.id, 

159 postgresql_where=lite_users_selectable_create.c.is_visible, 

160 ), 

161 Index( 

162 "uq_lite_users_username_visible", 

163 lite_users_selectable_create.c.username, 

164 postgresql_where=lite_users_selectable_create.c.is_visible, 

165 ), 

166 ], 

167) 

168 

169 

170def make_clustered_users_selectable(create=False): 

171 # emits something along the lines of 

172 # WITH anon_1 AS ( 

173 # SELECT id, 

174 # geom, 

175 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

176 # FROM users 

177 # WHERE geom IS NOT NULL 

178 # ) 

179 

180 cluster_cte = ( 

181 sa_select( 

182 User.id, 

183 User.geom, 

184 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

185 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

186 ) 

187 .where(User.geom != None) 

188 .cte("clustered") 

189 ) 

190 

191 if create: 

192 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

193 cluster_geom = literal_column("clustered.geom") 

194 else: 

195 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

196 cluster_geom = cluster_cte.c.geom 

197 

198 clustered_users = ( 

199 sa_select(centroid_geom.label("geom"), func.count().label("count")) 

200 .select_from(cluster_cte) 

201 .where(cluster_cte.c.cluster_id != None) 

202 .group_by(cluster_cte.c.cluster_id) 

203 ) 

204 

205 isolated_users = ( 

206 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

207 .select_from(cluster_cte) 

208 .where(cluster_cte.c.cluster_id == None) 

209 ) 

210 

211 return union_all(clustered_users, isolated_users) 

212 

213 

214clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

215clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

216 

217clustered_users = create_materialized_view_with_different_ddl( 

218 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

219) 

220 

221 

222def float_(stmt): 

223 return func.coalesce(cast(stmt, Float), 0.0) 

224 

225 

226# this subquery gets the time that the request was sent 

227t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery() 

228# this subquery gets the time that the user responded to the request 

229s = ( 

230 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

231 .group_by(Message.conversation_id, Message.author_id) 

232 .subquery() 

233) 

234all_responses = union_all( 

235 # host request responses 

236 sa_select( 

237 HostRequest.host_user_id.label("user_id"), 

238 (s.c.time - t.c.time).label("response_time"), 

239 ) 

240 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

241 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)), 

242 # activeness probes 

243 sa_select( 

244 ActivenessProbe.user_id, 

245 ( 

246 # expired probes have a responded time for when they were marked responded 

247 case( 

248 ( 

249 ActivenessProbe.response != ActivenessProbeStatus.expired, 

250 ActivenessProbe.responded - ActivenessProbe.probe_initiated, 

251 ), 

252 else_=None, 

253 ) 

254 ).label("response_time"), 

255 ), 

256).subquery() 

257 

258user_response_rates_selectable = sa_select( 

259 all_responses.c.user_id.label("user_id"), 

260 # number of requests received 

261 func.count().label("requests"), 

262 # percentage of requests responded to 

263 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"), 

264 func.avg(all_responses.c.response_time).label("avg_response_time"), 

265 # the 33rd percentile response time 

266 percentile_disc(0.33) 

267 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

268 .label("response_time_33p"), 

269 # the 66th percentile response time 

270 percentile_disc(0.66) 

271 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

272 .label("response_time_66p"), 

273).group_by(all_responses.c.user_id) 

274 

275user_response_rates = create_materialized_view( 

276 "user_response_rates", 

277 user_response_rates_selectable, 

278 Base.metadata, 

279 [Index("uq_user_response_rates_id", user_response_rates_selectable.c.user_id, unique=True)], 

280) 

281 

282 

283def refresh_materialized_views(payload: empty_pb2.Empty): 

284 logger.info("Refreshing materialized views") 

285 with session_scope() as session: 

286 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

287 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

288 refresh_materialized_view(session, "clustered_users") 

289 refresh_materialized_view(session, "user_response_rates", concurrently=True) 

290 

291 

292def refresh_materialized_views_rapid(payload: empty_pb2.Empty): 

293 logger.info("Refreshing materialized views (rapid)") 

294 with session_scope() as session: 

295 refresh_materialized_view(session, "lite_users", concurrently=True)