Coverage for app/backend/src/couchers/sql.py: 96%

73 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from sqlalchemy import ColumnElement, and_, false, or_, select, true 

4from sqlalchemy.orm import InstrumentedAttribute, aliased 

5from sqlalchemy.sql import Select, exists, union 

6 

7from couchers.models import ( 

8 ModerationState, 

9 ModerationVisibility, 

10 SignupFlow, 

11 User, 

12 UserBlock, 

13 get_moderated_models, 

14) 

15from couchers.utils import is_valid_email, is_valid_user_id, is_valid_username 

16 

17if TYPE_CHECKING: 

18 from couchers.context import CouchersContext 

19 from couchers.materialized_views import LiteUser 

20 from couchers.models.moderation import ModeratedContent 

21 

22 type _UserLike = type[User | LiteUser | SignupFlow] 

23 type _User = type[User | LiteUser] 

24 

25 

26def username_or_email(value: str, table: _UserLike = User) -> ColumnElement[bool]: 

27 if is_valid_username(value): 

28 return table.username == value 

29 elif is_valid_email(value) and hasattr(table, "email"): 

30 return table.email == value 

31 # no fields match, this will return no rows 

32 return false() 

33 

34 

35def username_or_id(value: str, table: _UserLike = User) -> ColumnElement[bool]: 

36 if is_valid_username(value): 

37 return table.username == value 

38 elif is_valid_user_id(value): 

39 return table.id == int(value) 

40 # no fields match, this will return no rows 

41 return false() 

42 

43 

44def username_or_email_or_id(value: str) -> ColumnElement[bool]: 

45 # Should only be used for admin APIs, etc. 

46 if is_valid_username(value): 

47 return User.username == value 

48 elif is_valid_email(value): 

49 return User.email == value 

50 elif is_valid_user_id(value): 50 ↛ 53line 50 didn't jump to line 53 because the condition on line 50 was always true

51 return User.id == int(value) 

52 # no fields match, this will return no rows 

53 return false() 

54 

55 

56def _shadow_clause(context: CouchersContext, table: _User) -> ColumnElement[bool]: 

57 if context.is_logged_in(): 

58 return or_(table.shadowed_at.is_(None), table.id == context.user_id) 

59 return table.shadowed_at.is_(None) 

60 

61 

62def _users_block_each_other( 

63 user_a: InstrumentedAttribute[int], user_b: InstrumentedAttribute[int] 

64) -> ColumnElement[bool]: 

65 """True when either of the two users (referenced by id columns) has blocked the other.""" 

66 return exists( 

67 select(1) 

68 .select_from(UserBlock) 

69 .where( 

70 or_( 

71 and_(UserBlock.blocking_user_id == user_a, UserBlock.blocked_user_id == user_b), 

72 and_(UserBlock.blocking_user_id == user_b, UserBlock.blocked_user_id == user_a), 

73 ) 

74 ) 

75 ) 

76 

77 

78def users_visible(context: CouchersContext, table: _User = User) -> ColumnElement[bool]: 

79 """ 

80 Filters out users that should not be visible: blocked, deleted, banned, or shadowed (to others). 

81 

82 Filters the given table, assuming it's already joined/selected from 

83 """ 

84 hidden_users = _relevant_user_blocks(context.user_id) 

85 return and_(table.is_visible, _shadow_clause(context, table), ~table.id.in_(hidden_users)) 

86 

87 

88def where_users_column_visible[T: tuple[Any, ...]]( 

89 query: Select[T], context: CouchersContext, column: InstrumentedAttribute[int] 

90) -> Select[T]: 

91 """ 

92 Filters the given column, not yet joined/selected from 

93 """ 

94 # EXISTS on the bare User table rather than aliased(User): aliasing the wide User entity per call 

95 # is a major CPU hotspot (rebuilds the ORM proxy index). correlate_except keeps this User local to 

96 # the subquery so it doesn't bind to a bare User already in the outer query (e.g. jobs/handlers.py). 

97 return query.where( 

98 exists( 

99 select(1) 

100 .select_from(User) 

101 .where(User.id == column) 

102 .where(users_visible(context, User)) 

103 .correlate_except(User) 

104 ) 

105 ) 

106 

107 

108def users_visible_to_each_other(*, self_user: _User, other_user: _User) -> ColumnElement[bool]: 

109 """ 

110 Filters to ensure other_user is visible to self_user, and that they haven't blocked each other. 

111 

112 Use this when both User tables are already joined/selected in the query. 

113 """ 

114 return and_( 

115 self_user.is_visible, 

116 other_user.is_visible, 

117 other_user.shadowed_at.is_(None), 

118 ~_users_block_each_other(self_user.id, other_user.id), 

119 ) 

120 

121 

122def where_user_columns_visible_to_each_other[T: tuple[Any, ...]]( 

123 query: Select[T], *, self_column: InstrumentedAttribute[int], other_column: InstrumentedAttribute[int] 

124) -> Select[T]: 

125 """ 

126 Filters to ensure the user in other_column is visible to the user in self_column, and that they 

127 haven't blocked each other. 

128 

129 Use this when you have two user_id columns that haven't been joined yet. This will join both 

130 User tables and apply the visibility checks. 

131 """ 

132 self_user = aliased(User) 

133 other_user = aliased(User) 

134 return ( 

135 query.join(self_user, self_user.id == self_column) 

136 .join(other_user, other_user.id == other_column) 

137 .where(self_user.is_visible) 

138 .where(other_user.is_visible) 

139 .where(other_user.shadowed_at.is_(None)) 

140 .where(~_users_block_each_other(self_user.id, other_user.id)) 

141 ) 

142 

143 

144def where_moderated_content_visible_to_user_column[T: tuple[Any, ...]]( 

145 query: Select[T], 

146 table: type[ModeratedContent], 

147 user_id_column: InstrumentedAttribute[int], 

148 is_list_operation: bool = False, 

149) -> Select[T]: 

150 entry = get_moderated_models()[table.__moderation_object_type__] 

151 aliased_mod_state = aliased(ModerationState) 

152 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible] 

153 

154 # UNLISTED content is visible in single-item operations but not in lists 

155 if not is_list_operation: 155 ↛ 159line 155 didn't jump to line 159 because the condition on line 155 was always true

156 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted) 

157 

158 # Authors can always see their own SHADOWED content 

159 conditions.append( 

160 and_( 

161 aliased_mod_state.visibility == ModerationVisibility.shadowed, 

162 entry.author_column == user_id_column, 

163 ) 

164 ) 

165 

166 return query.join(aliased_mod_state, aliased_mod_state.id == entry.moderation_state_id_column).where( 

167 or_(*conditions) 

168 ) 

169 

170 

171def where_moderated_content_visible[T: tuple[Any, ...]]( 

172 query: Select[T], 

173 context: CouchersContext, 

174 table: type[ModeratedContent], 

175 is_list_operation: bool = False, 

176) -> Select[T]: 

177 entry = get_moderated_models()[table.__moderation_object_type__] 

178 aliased_mod_state = aliased(ModerationState) 

179 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible] 

180 

181 # UNLISTED content is visible in single-item operations but not in lists 

182 if not is_list_operation: 

183 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted) 

184 

185 # Authors can always see their own SHADOWED content 

186 if context.is_logged_in(): 

187 conditions.append( 

188 and_( 

189 aliased_mod_state.visibility == ModerationVisibility.shadowed, 

190 entry.author_column == context.user_id, 

191 ) 

192 ) 

193 

194 return query.join(aliased_mod_state, aliased_mod_state.id == entry.moderation_state_id_column).where( 

195 or_(*conditions) 

196 ) 

197 

198 

199def moderation_state_column_visible( 

200 context: CouchersContext, 

201 column: InstrumentedAttribute[int | None], 

202) -> ColumnElement[bool]: 

203 """ 

204 Filters based on whether the moderation state referenced by the column is visible. 

205 

206 Use this when you have a moderation_state_id column on a table that's not the moderated 

207 content itself (e.g., Notification.moderation_state_id). 

208 

209 The condition evaluates to True when: 

210 - The column is NULL (non-moderated content), OR 

211 - The linked moderation state has visibility 'visible' or 'unlisted', OR 

212 - The linked moderation state has visibility 'shadowed' and the current user is the author 

213 """ 

214 aliased_mod_state = aliased(ModerationState) 

215 

216 # For 'shadowed' content, look up the moderated content via object_type/object_id to check the author 

217 shadowed_conditions: list[ColumnElement[bool]] = [] 

218 if context.is_logged_in(): 218 ↛ 232line 218 didn't jump to line 232 because the condition on line 218 was always true

219 for entry in get_moderated_models().values(): 

220 shadowed_conditions.append( 

221 and_( 

222 aliased_mod_state.object_type == entry.object_type, 

223 exists( 

224 select(1) 

225 .select_from(entry.model) 

226 .where(entry.object_id_column == aliased_mod_state.object_id) 

227 .where(entry.author_column == context.user_id) 

228 ), 

229 ) 

230 ) 

231 

232 return or_( 

233 column.is_(None), 

234 exists( 

235 select(aliased_mod_state.id).where( 

236 aliased_mod_state.id == column, 

237 or_( 

238 aliased_mod_state.visibility == ModerationVisibility.visible, 

239 aliased_mod_state.visibility == ModerationVisibility.unlisted, 

240 *shadowed_conditions, 

241 ), 

242 ) 

243 ), 

244 ) 

245 

246 

247def _relevant_user_blocks(user_id: int) -> Select[tuple[int]]: 

248 """ 

249 Gets a list of blocked user IDs or users that have blocked this user: those should be hidden 

250 """ 

251 blocked_users = select(UserBlock.blocked_user_id).where(UserBlock.blocking_user_id == user_id) 

252 blocking_users = select(UserBlock.blocking_user_id).where(UserBlock.blocked_user_id == user_id) 

253 

254 return select(union(blocked_users, blocking_users).subquery()) 

255 

256 

257def to_bool(value: bool) -> ColumnElement[bool]: 

258 return true() if value else false()