Coverage for src/couchers/materialized_views.py: 98%

64 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1import logging 

2from datetime import timedelta 

3 

4from google.protobuf import empty_pb2 

5from sqlalchemy import Float, Index, Integer, event 

6from sqlalchemy.sql import ( 

7 and_, 

8 case, 

9 cast, 

10 func, 

11 literal, 

12 literal_column, 

13 union_all, 

14) 

15from sqlalchemy.sql import select as sa_select 

16from sqlalchemy.sql.functions import percentile_disc 

17from sqlalchemy_utils.view import ( 

18 CreateView, 

19 DropView, 

20 create_materialized_view, 

21 create_table_from_selectable, 

22 refresh_materialized_view, 

23) 

24 

25from couchers.db import session_scope 

26from couchers.models import ( 

27 ActivenessProbe, 

28 ActivenessProbeStatus, 

29 Base, 

30 ClusterRole, 

31 ClusterSubscription, 

32 HostRequest, 

33 Message, 

34 MessageType, 

35 StrongVerificationAttempt, 

36 Upload, 

37 User, 

38) 

39 

40logger = logging.getLogger(__name__) 

41 

42 

43def create_materialized_view_with_different_ddl( 

44 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None 

45): 

46 """ 

47 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable 

48 

49 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

50 """ 

51 table = create_table_from_selectable( 

52 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

53 ) 

54 

55 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

56 

57 @event.listens_for(metadata, "after_create") 

58 def create_indexes(target, connection, **kw): 

59 for idx in table.indexes: 

60 idx.create(connection) 

61 

62 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

63 return table 

64 

65 

66cluster_subscription_counts_selectable = ( 

67 sa_select( 

68 ClusterSubscription.cluster_id.label("cluster_id"), 

69 func.count().label("count"), 

70 ) 

71 .select_from(ClusterSubscription) 

72 .outerjoin(User, User.id == ClusterSubscription.user_id) 

73 .where(User.is_visible) 

74 .group_by(ClusterSubscription.cluster_id) 

75) 

76 

77cluster_subscription_counts = create_materialized_view( 

78 "cluster_subscription_counts", 

79 cluster_subscription_counts_selectable, 

80 Base.metadata, 

81 [ 

82 Index( 

83 "uq_cluster_subscription_counts_cluster_id", 

84 cluster_subscription_counts_selectable.c.cluster_id, 

85 unique=True, 

86 ) 

87 ], 

88) 

89 

90cluster_admin_counts_selectable = ( 

91 sa_select( 

92 ClusterSubscription.cluster_id.label("cluster_id"), 

93 func.count().label("count"), 

94 ) 

95 .select_from(ClusterSubscription) 

96 .outerjoin(User, User.id == ClusterSubscription.user_id) 

97 .where(ClusterSubscription.role == ClusterRole.admin) 

98 .where(User.is_visible) 

99 .group_by(ClusterSubscription.cluster_id) 

100) 

101 

102cluster_admin_counts = create_materialized_view( 

103 "cluster_admin_counts", 

104 cluster_admin_counts_selectable, 

105 Base.metadata, 

106 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)], 

107) 

108 

109 

110def make_lite_users_selectable(create=False): 

111 if create: 

112 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

113 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

114 geom_column = literal_column("users.geom") 

115 else: 

116 geom_column = User.geom 

117 

118 strong_verification_subquery = ( 

119 sa_select(User.id, literal(True).label("true")) 

120 .select_from(StrongVerificationAttempt) 

121 .where(StrongVerificationAttempt.has_strong_verification(User)) 

122 .distinct() 

123 .subquery(name="sv_subquery") 

124 ) 

125 

126 return ( 

127 sa_select( 

128 User.id.label("id"), 

129 User.username.label("username"), 

130 User.name.label("name"), 

131 User.city.label("city"), 

132 User.age.label("age"), 

133 geom_column.label("geom"), 

134 User.geom_radius.label("radius"), 

135 User.is_visible.label("is_visible"), 

136 Upload.filename.label("avatar_filename"), 

137 User.has_completed_profile.label("has_completed_profile"), 

138 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"), 

139 ) 

140 .select_from(User) 

141 .outerjoin(Upload, Upload.key == User.avatar_key) 

142 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id) 

143 ) 

144 

145 

146lite_users_selectable_select = make_lite_users_selectable(create=False) 

147lite_users_selectable_create = make_lite_users_selectable(create=True) 

148 

149lite_users = create_materialized_view_with_different_ddl( 

150 "lite_users", 

151 lite_users_selectable_select, 

152 lite_users_selectable_create, 

153 Base.metadata, 

154 [ 

155 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True), 

156 Index( 

157 "ix_lite_users_id_visible", 

158 lite_users_selectable_create.c.id, 

159 postgresql_using="hash", 

160 postgresql_where=lite_users_selectable_create.c.is_visible, 

161 ), 

162 Index( 

163 "ix_lite_users_username_visible", 

164 lite_users_selectable_create.c.username, 

165 postgresql_using="hash", 

166 postgresql_where=lite_users_selectable_create.c.is_visible, 

167 ), 

168 ], 

169) 

170 

171 

172def make_clustered_users_selectable(create=False): 

173 # emits something along the lines of 

174 # WITH anon_1 AS ( 

175 # SELECT id, 

176 # geom, 

177 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

178 # FROM users 

179 # WHERE geom IS NOT NULL 

180 # ) 

181 

182 cluster_cte = ( 

183 sa_select( 

184 User.id, 

185 User.geom, 

186 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

187 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

188 ) 

189 .where(User.geom != None) 

190 .cte("clustered") 

191 ) 

192 

193 if create: 

194 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

195 cluster_geom = literal_column("clustered.geom") 

196 else: 

197 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

198 cluster_geom = cluster_cte.c.geom 

199 

200 clustered_users = ( 

201 sa_select(centroid_geom.label("geom"), func.count().label("count")) 

202 .select_from(cluster_cte) 

203 .where(cluster_cte.c.cluster_id != None) 

204 .group_by(cluster_cte.c.cluster_id) 

205 ) 

206 

207 isolated_users = ( 

208 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

209 .select_from(cluster_cte) 

210 .where(cluster_cte.c.cluster_id == None) 

211 ) 

212 

213 return union_all(clustered_users, isolated_users) 

214 

215 

216clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

217clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

218 

219clustered_users = create_materialized_view_with_different_ddl( 

220 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

221) 

222 

223 

224def float_(stmt): 

225 return func.coalesce(cast(stmt, Float), 0.0) 

226 

227 

228# this subquery gets the time that the request was sent 

229t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery() 

230# this subquery gets the time that the user responded to the request 

231s = ( 

232 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

233 .group_by(Message.conversation_id, Message.author_id) 

234 .subquery() 

235) 

236all_responses = union_all( 

237 # host request responses 

238 sa_select( 

239 HostRequest.host_user_id.label("user_id"), 

240 (s.c.time - t.c.time).label("response_time"), 

241 ) 

242 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

243 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)), 

244 # activeness probes 

245 sa_select( 

246 ActivenessProbe.user_id, 

247 ( 

248 # expired probes have a responded time for when they were marked responded 

249 case( 

250 ( 

251 ActivenessProbe.response != ActivenessProbeStatus.expired, 

252 ActivenessProbe.responded - ActivenessProbe.probe_initiated, 

253 ), 

254 else_=None, 

255 ) 

256 ).label("response_time"), 

257 ), 

258).subquery() 

259 

260user_response_rates_selectable = sa_select( 

261 all_responses.c.user_id.label("user_id"), 

262 # number of requests received 

263 func.count().label("requests"), 

264 # percentage of requests responded to 

265 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"), 

266 func.avg(all_responses.c.response_time).label("avg_response_time"), 

267 # the 33rd percentile response time 

268 percentile_disc(0.33) 

269 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

270 .label("response_time_33p"), 

271 # the 66th percentile response time 

272 percentile_disc(0.66) 

273 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

274 .label("response_time_66p"), 

275).group_by(all_responses.c.user_id) 

276 

277user_response_rates = create_materialized_view( 

278 "user_response_rates", 

279 user_response_rates_selectable, 

280 Base.metadata, 

281 [Index("uq_user_response_rates_id", user_response_rates_selectable.c.user_id, unique=True)], 

282) 

283 

284 

285def refresh_materialized_views(payload: empty_pb2.Empty): 

286 logger.info("Refreshing materialized views") 

287 with session_scope() as session: 

288 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

289 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

290 refresh_materialized_view(session, "clustered_users") 

291 refresh_materialized_view(session, "user_response_rates", concurrently=True) 

292 

293 

294def refresh_materialized_views_rapid(payload: empty_pb2.Empty): 

295 logger.info("Refreshing materialized views (rapid)") 

296 with session_scope() as session: 

297 refresh_materialized_view(session, "lite_users", concurrently=True)