Coverage for src/couchers/materialized_views.py: 99%

74 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1import logging 

2from datetime import timedelta 

3 

4from google.protobuf import empty_pb2 

5from sqlalchemy import Float, Index, Integer, event 

6from sqlalchemy.sql import ( 

7 and_, 

8 case, 

9 cast, 

10 func, 

11 literal, 

12 literal_column, 

13 union_all, 

14) 

15from sqlalchemy.sql import select as sa_select 

16from sqlalchemy.sql.functions import percentile_disc 

17from sqlalchemy_utils.view import ( 

18 CreateView, 

19 DropView, 

20 create_materialized_view, 

21 create_table_from_selectable, 

22 refresh_materialized_view, 

23) 

24 

25from couchers.db import session_scope 

26from couchers.models import ( 

27 ActivenessProbe, 

28 ActivenessProbeStatus, 

29 Base, 

30 ClusterRole, 

31 ClusterSubscription, 

32 HostRequest, 

33 Message, 

34 MessageType, 

35 StrongVerificationAttempt, 

36 Upload, 

37 User, 

38) 

39 

40logger = logging.getLogger(__name__) 

41 

42 

43def create_materialized_view_with_different_ddl( 

44 name, select_selectable, create_selectable, metadata, indexes=None, aliases=None 

45): 

46 """ 

47 Copied wholesale from sqlalchemy_utils (3-clause BSD), with minor tweak in {select,create}_selectable 

48 

49 https://github.com/kvesteri/sqlalchemy-utils/blob/baf53cd1a3e779fc127010543fed53cf4a97fe16/sqlalchemy_utils/view.py#L77-L124 

50 """ 

51 table = create_table_from_selectable( 

52 name=name, selectable=select_selectable, indexes=indexes, metadata=None, aliases=aliases 

53 ) 

54 

55 event.listen(metadata, "after_create", CreateView(name, create_selectable, materialized=True)) 

56 

57 @event.listens_for(metadata, "after_create") 

58 def create_indexes(target, connection, **kw): 

59 for idx in table.indexes: 

60 idx.create(connection) 

61 

62 event.listen(metadata, "before_drop", DropView(name, materialized=True)) 

63 return table 

64 

65 

66cluster_subscription_counts_selectable = ( 

67 sa_select( 

68 ClusterSubscription.cluster_id.label("cluster_id"), 

69 func.count().label("count"), 

70 ) 

71 .select_from(ClusterSubscription) 

72 .outerjoin(User, User.id == ClusterSubscription.user_id) 

73 .where(User.is_visible) 

74 .group_by(ClusterSubscription.cluster_id) 

75) 

76 

77cluster_subscription_counts = create_materialized_view( 

78 "cluster_subscription_counts", 

79 cluster_subscription_counts_selectable, 

80 Base.metadata, 

81 [ 

82 Index( 

83 "uq_cluster_subscription_counts_cluster_id", 

84 cluster_subscription_counts_selectable.c.cluster_id, 

85 unique=True, 

86 ) 

87 ], 

88) 

89 

90 

91class ClusterSubscriptionCount(Base): 

92 __table__ = cluster_subscription_counts 

93 

94 

95cluster_admin_counts_selectable = ( 

96 sa_select( 

97 ClusterSubscription.cluster_id.label("cluster_id"), 

98 func.count().label("count"), 

99 ) 

100 .select_from(ClusterSubscription) 

101 .outerjoin(User, User.id == ClusterSubscription.user_id) 

102 .where(ClusterSubscription.role == ClusterRole.admin) 

103 .where(User.is_visible) 

104 .group_by(ClusterSubscription.cluster_id) 

105) 

106 

107cluster_admin_counts = create_materialized_view( 

108 "cluster_admin_counts", 

109 cluster_admin_counts_selectable, 

110 Base.metadata, 

111 [Index("uq_cluster_admin_counts_cluster_id", cluster_admin_counts_selectable.c.cluster_id, unique=True)], 

112) 

113 

114 

115class ClusterAdminCount(Base): 

116 __table__ = cluster_admin_counts 

117 

118 

119def make_lite_users_selectable(create=False): 

120 if create: 

121 # because this is rendered as a select when emitting the CREATE VIEW, using User.geom would be rendered as 

122 # `ST_AsEWKB(users.geom)` instead of the literal column, the following fixes it 

123 geom_column = literal_column("users.geom") 

124 else: 

125 geom_column = User.geom 

126 

127 strong_verification_subquery = ( 

128 sa_select(User.id, literal(True).label("true")) 

129 .select_from(StrongVerificationAttempt) 

130 .where(StrongVerificationAttempt.has_strong_verification(User)) 

131 .distinct() 

132 .subquery(name="sv_subquery") 

133 ) 

134 

135 return ( 

136 sa_select( 

137 User.id.label("id"), 

138 User.username.label("username"), 

139 User.name.label("name"), 

140 User.city.label("city"), 

141 User.age.label("age"), 

142 geom_column.label("geom"), 

143 User.geom_radius.label("radius"), 

144 User.is_visible.label("is_visible"), 

145 Upload.filename.label("avatar_filename"), 

146 User.has_completed_profile.label("has_completed_profile"), 

147 User.has_completed_my_home.label("has_completed_my_home"), 

148 func.coalesce(strong_verification_subquery.c.true, False).label("has_strong_verification"), 

149 ) 

150 .select_from(User) 

151 .outerjoin(Upload, Upload.key == User.avatar_key) 

152 .outerjoin(strong_verification_subquery, strong_verification_subquery.c.id == User.id) 

153 ) 

154 

155 

156lite_users_selectable_select = make_lite_users_selectable(create=False) 

157lite_users_selectable_create = make_lite_users_selectable(create=True) 

158 

159lite_users = create_materialized_view_with_different_ddl( 

160 "lite_users", 

161 lite_users_selectable_select, 

162 lite_users_selectable_create, 

163 Base.metadata, 

164 [ 

165 Index("uq_lite_users_id", lite_users_selectable_create.c.id, unique=True), 

166 Index( 

167 "ix_lite_users_id_visible", 

168 lite_users_selectable_create.c.id, 

169 postgresql_using="hash", 

170 postgresql_where=lite_users_selectable_create.c.is_visible, 

171 ), 

172 Index( 

173 "ix_lite_users_username_visible", 

174 lite_users_selectable_create.c.username, 

175 postgresql_using="hash", 

176 postgresql_where=lite_users_selectable_create.c.is_visible, 

177 ), 

178 ], 

179) 

180 

181 

182class LiteUser(Base): 

183 __table__ = lite_users 

184 

185 

186def make_clustered_users_selectable(create=False): 

187 # emits something along the lines of 

188 # WITH anon_1 AS ( 

189 # SELECT id, 

190 # geom, 

191 # ST_ClusterDBSCAN(geom, eps := .15, minpoints := 5) OVER (ORDER BY id) AS cluster_id 

192 # FROM users 

193 # WHERE geom IS NOT NULL 

194 # ) 

195 

196 cluster_cte = ( 

197 sa_select( 

198 User.id, 

199 User.geom, 

200 # DBSCAN clustering with epsilon=.15 deg (~17 km), minpoints=5, cluster will be NULL for not in any cluster 

201 func.ST_ClusterDBSCAN(User.geom, 0.15, 5).over(order_by=User.id).label("cluster_id"), 

202 ) 

203 .where(User.is_visible) 

204 .cte("clustered") 

205 ) 

206 

207 if create: 

208 centroid_geom = literal_column("ST_Centroid(ST_Collect(clustered.geom))") 

209 cluster_geom = literal_column("clustered.geom") 

210 else: 

211 centroid_geom = func.ST_Centroid(func.ST_Collect(cluster_cte.c.geom)) 

212 cluster_geom = cluster_cte.c.geom 

213 

214 clustered_users = ( 

215 sa_select(centroid_geom.label("geom"), func.count().label("count")) 

216 .select_from(cluster_cte) 

217 .where(cluster_cte.c.cluster_id != None) 

218 .group_by(cluster_cte.c.cluster_id) 

219 ) 

220 

221 isolated_users = ( 

222 sa_select(cluster_geom.label("geom"), literal(1, type_=Integer).label("count")) 

223 .select_from(cluster_cte) 

224 .where(cluster_cte.c.cluster_id == None) 

225 ) 

226 

227 return union_all(clustered_users, isolated_users) 

228 

229 

230clustered_users_selectable_select = make_clustered_users_selectable(create=False) 

231clustered_users_selectable_create = make_clustered_users_selectable(create=True) 

232 

233clustered_users = create_materialized_view_with_different_ddl( 

234 "clustered_users", clustered_users_selectable_select, clustered_users_selectable_create, Base.metadata 

235) 

236 

237 

238class ClusteredUser(Base): 

239 __table__ = clustered_users 

240 

241 

242def float_(stmt): 

243 return func.coalesce(cast(stmt, Float), 0.0) 

244 

245 

246# this subquery gets the time that the request was sent 

247t = sa_select(Message.conversation_id, Message.time).where(Message.message_type == MessageType.chat_created).subquery() 

248# this subquery gets the time that the user responded to the request 

249s = ( 

250 sa_select(Message.conversation_id, Message.author_id, func.min(Message.time).label("time")) 

251 .group_by(Message.conversation_id, Message.author_id) 

252 .subquery() 

253) 

254all_responses = union_all( 

255 # host request responses 

256 sa_select( 

257 HostRequest.host_user_id.label("user_id"), 

258 (s.c.time - t.c.time).label("response_time"), 

259 ) 

260 .join(t, t.c.conversation_id == HostRequest.conversation_id) 

261 .outerjoin(s, and_(s.c.conversation_id == HostRequest.conversation_id, s.c.author_id == HostRequest.host_user_id)), 

262 # activeness probes 

263 sa_select( 

264 ActivenessProbe.user_id, 

265 ( 

266 # expired probes have a responded time for when they were marked responded 

267 case( 

268 ( 

269 ActivenessProbe.response != ActivenessProbeStatus.expired, 

270 ActivenessProbe.responded - ActivenessProbe.probe_initiated, 

271 ), 

272 else_=None, 

273 ) 

274 ).label("response_time"), 

275 ), 

276).subquery() 

277 

278user_response_rates_selectable = sa_select( 

279 all_responses.c.user_id.label("user_id"), 

280 # number of requests received 

281 func.count().label("requests"), 

282 # percentage of requests responded to 

283 (func.count(all_responses.c.response_time) / func.count()).label("response_rate"), 

284 func.avg(all_responses.c.response_time).label("avg_response_time"), 

285 # the 33rd percentile response time 

286 percentile_disc(0.33) 

287 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

288 .label("response_time_33p"), 

289 # the 66th percentile response time 

290 percentile_disc(0.66) 

291 .within_group(func.coalesce(all_responses.c.response_time, timedelta(days=1000))) 

292 .label("response_time_66p"), 

293).group_by(all_responses.c.user_id) 

294 

295user_response_rates = create_materialized_view( 

296 "user_response_rates", 

297 user_response_rates_selectable, 

298 Base.metadata, 

299 [Index("uq_user_response_rates_id", user_response_rates_selectable.c.user_id, unique=True)], 

300) 

301 

302 

303class UserResponseRate(Base): 

304 __table__ = user_response_rates 

305 

306 

307def refresh_materialized_views(payload: empty_pb2.Empty): 

308 logger.info("Refreshing materialized views") 

309 with session_scope() as session: 

310 refresh_materialized_view(session, "cluster_subscription_counts", concurrently=True) 

311 refresh_materialized_view(session, "cluster_admin_counts", concurrently=True) 

312 refresh_materialized_view(session, "clustered_users") 

313 refresh_materialized_view(session, "user_response_rates", concurrently=True) 

314 

315 

316def refresh_materialized_views_rapid(payload: empty_pb2.Empty): 

317 logger.info("Refreshing materialized views (rapid)") 

318 with session_scope() as session: 

319 refresh_materialized_view(session, "lite_users", concurrently=True)