Coverage for app / backend / src / couchers / sql.py: 94%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 11:04 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from sqlalchemy import ColumnElement, and_, false, or_, select, true 

4from sqlalchemy.orm import InstrumentedAttribute, aliased 

5from sqlalchemy.sql import Select, exists, union 

6 

7from couchers.context import CouchersContext 

8from couchers.models import ( 

9 EventOccurrence, 

10 FriendRelationship, 

11 GroupChat, 

12 HostRequest, 

13 ModerationObjectType, 

14 ModerationState, 

15 ModerationVisibility, 

16 SignupFlow, 

17 User, 

18 UserBlock, 

19) 

20from couchers.utils import is_valid_email, is_valid_user_id, is_valid_username 

21 

22if TYPE_CHECKING: 

23 from couchers.materialized_views import LiteUser 

24 

25 type _UserLike = type[User | LiteUser | SignupFlow] 

26 type _User = type[User | LiteUser] 

27 type _ModeratedContent = type[HostRequest | GroupChat | FriendRelationship | EventOccurrence] 

28 

29 

30def username_or_email(value: str, table: _UserLike = User) -> ColumnElement[bool]: 

31 if is_valid_username(value): 

32 return table.username == value 

33 elif is_valid_email(value) and hasattr(table, "email"): 

34 return table.email == value 

35 # no fields match, this will return no rows 

36 return false() 

37 

38 

39def username_or_id(value: str, table: _UserLike = User) -> ColumnElement[bool]: 

40 if is_valid_username(value): 

41 return table.username == value 

42 elif is_valid_user_id(value): 

43 return table.id == value 

44 # no fields match, this will return no rows 

45 return false() 

46 

47 

48def username_or_email_or_id(value: str) -> ColumnElement[bool]: 

49 # Should only be used for admin APIs, etc. 

50 if is_valid_username(value): 

51 return User.username == value 

52 elif is_valid_email(value): 

53 return User.email == value 

54 elif is_valid_user_id(value): 54 ↛ 57line 54 didn't jump to line 57 because the condition on line 54 was always true

55 return User.id == value 

56 # no fields match, this will return no rows 

57 return false() 

58 

59 

60def users_visible(context: CouchersContext, table: _User = User) -> ColumnElement[bool]: 

61 """ 

62 Filters out users that should not be visible: blocked, deleted, or banned 

63 

64 Filters the given table, assuming it's already joined/selected from 

65 """ 

66 hidden_users = _relevant_user_blocks(context.user_id) 

67 return and_(table.is_visible, ~table.id.in_(hidden_users)) 

68 

69 

70def where_users_column_visible[T: tuple[Any, ...]]( 

71 query: Select[T], context: CouchersContext, column: InstrumentedAttribute[int] 

72) -> Select[T]: 

73 """ 

74 Filters the given column, not yet joined/selected from 

75 """ 

76 hidden_users = _relevant_user_blocks(context.user_id) 

77 aliased_user = aliased(User) 

78 return ( 

79 query.join(aliased_user, aliased_user.id == column) 

80 .where(aliased_user.is_visible) 

81 .where(~aliased_user.id.in_(hidden_users)) 

82 ) 

83 

84 

85def users_visible_to_each_other(user1: _User, user2: _User) -> ColumnElement[bool]: 

86 """ 

87 Filters to ensure two users are mutually visible to each other. 

88 

89 Checks that: 

90 - Both users are visible (not deleted/banned) 

91 - Neither user has blocked the other (bidirectional check) 

92 

93 Use this when both User tables are already joined/selected in the query. 

94 """ 

95 return and_( 

96 user1.is_visible, 

97 user2.is_visible, 

98 ~exists( 

99 select(1) 

100 .select_from(UserBlock) 

101 .where( 

102 or_( 

103 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id), 

104 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id), 

105 ) 

106 ) 

107 ), 

108 ) 

109 

110 

111def where_user_columns_visible_to_each_other[T: tuple[Any, ...]]( 

112 query: Select[T], column1: InstrumentedAttribute[int], column2: InstrumentedAttribute[int] 

113) -> Select[T]: 

114 """ 

115 Filters to ensure two users are mutually visible to each other. 

116 

117 Checks that: 

118 - Both users are visible (not deleted/banned) 

119 - Neither user has blocked the other (bidirectional check) 

120 

121 Use this when you have two user_id columns that haven't been joined yet. 

122 This will join both User tables and apply the visibility checks. 

123 """ 

124 user1 = aliased(User) 

125 user2 = aliased(User) 

126 return ( 

127 query.join(user1, user1.id == column1) 

128 .join(user2, user2.id == column2) 

129 .where(user1.is_visible) 

130 .where(user2.is_visible) 

131 .where( 

132 ~exists( 

133 select(1) 

134 .select_from(UserBlock) 

135 .where( 

136 or_( 

137 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id), 

138 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id), 

139 ) 

140 ) 

141 ) 

142 ) 

143 ) 

144 

145 

146def where_moderated_content_visible_to_user_column[T: tuple[Any, ...]]( 

147 query: Select[T], 

148 table: _ModeratedContent, 

149 user_id_column: InstrumentedAttribute[int], 

150 is_list_operation: bool = False, 

151) -> Select[T]: 

152 aliased_mod_state = aliased(ModerationState) 

153 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible] 

154 

155 # UNLISTED content is visible in single-item operations but not in lists 

156 if not is_list_operation: 156 ↛ 160line 156 didn't jump to line 160 because the condition on line 156 was always true

157 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted) 

158 

159 # Authors can always see their own SHADOWED content 

160 conditions.append( 

161 and_( 

162 aliased_mod_state.visibility == ModerationVisibility.shadowed, 

163 getattr(table, table.__moderation_author_column__) == user_id_column, 

164 ) 

165 ) 

166 

167 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions)) 

168 

169 

170def where_moderated_content_visible[T: tuple[Any, ...]]( 

171 query: Select[T], 

172 context: CouchersContext, 

173 table: _ModeratedContent, 

174 is_list_operation: bool = False, 

175) -> Select[T]: 

176 aliased_mod_state = aliased(ModerationState) 

177 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible] 

178 

179 # UNLISTED content is visible in single-item operations but not in lists 

180 if not is_list_operation: 

181 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted) 

182 

183 # Authors can always see their own SHADOWED content 

184 if context.is_logged_in(): 184 ↛ 192line 184 didn't jump to line 192 because the condition on line 184 was always true

185 conditions.append( 

186 and_( 

187 aliased_mod_state.visibility == ModerationVisibility.shadowed, 

188 getattr(table, table.__moderation_author_column__) == context.user_id, 

189 ) 

190 ) 

191 

192 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions)) 

193 

194 

195def moderation_state_column_visible( 

196 context: CouchersContext, 

197 column: InstrumentedAttribute[int | None], 

198) -> ColumnElement[bool]: 

199 """ 

200 Filters based on whether the moderation state referenced by the column is visible. 

201 

202 Use this when you have a moderation_state_id column on a table that's not the moderated 

203 content itself (e.g., Notification.moderation_state_id). 

204 

205 The condition evaluates to True when: 

206 - The column is NULL (non-moderated content), OR 

207 - The linked moderation state has visibility 'visible' or 'unlisted', OR 

208 - The linked moderation state has visibility 'shadowed' and the current user is the author 

209 """ 

210 aliased_mod_state = aliased(ModerationState) 

211 

212 # For 'shadowed' content, check if the user is the author by looking up the content table 

213 # using object_type and object_id on the moderation_state row 

214 shadowed_conditions: list[ColumnElement[bool]] = [] 

215 if context.is_logged_in(): 215 ↛ 260line 215 didn't jump to line 260 because the condition on line 215 was always true

216 shadowed_conditions = [ 

217 and_( 

218 aliased_mod_state.visibility == ModerationVisibility.shadowed, 

219 or_( 

220 and_( 

221 aliased_mod_state.object_type == ModerationObjectType.host_request, 

222 exists( 

223 select(HostRequest.conversation_id).where( 

224 HostRequest.conversation_id == aliased_mod_state.object_id, 

225 HostRequest.initiator_user_id == context.user_id, 

226 ) 

227 ), 

228 ), 

229 and_( 

230 aliased_mod_state.object_type == ModerationObjectType.group_chat, 

231 exists( 

232 select(GroupChat.conversation_id).where( 

233 GroupChat.conversation_id == aliased_mod_state.object_id, 

234 GroupChat.creator_id == context.user_id, 

235 ) 

236 ), 

237 ), 

238 and_( 

239 aliased_mod_state.object_type == ModerationObjectType.friend_request, 

240 exists( 

241 select(FriendRelationship.id).where( 

242 FriendRelationship.id == aliased_mod_state.object_id, 

243 FriendRelationship.from_user_id == context.user_id, 

244 ) 

245 ), 

246 ), 

247 and_( 

248 aliased_mod_state.object_type == ModerationObjectType.event_occurrence, 

249 exists( 

250 select(EventOccurrence.id).where( 

251 EventOccurrence.id == aliased_mod_state.object_id, 

252 EventOccurrence.creator_user_id == context.user_id, 

253 ) 

254 ), 

255 ), 

256 ), 

257 ) 

258 ] 

259 

260 return or_( 

261 column.is_(None), 

262 exists( 

263 select(aliased_mod_state.id).where( 

264 aliased_mod_state.id == column, 

265 or_( 

266 aliased_mod_state.visibility == ModerationVisibility.visible, 

267 aliased_mod_state.visibility == ModerationVisibility.unlisted, 

268 *shadowed_conditions, 

269 ), 

270 ) 

271 ), 

272 ) 

273 

274 

275def _relevant_user_blocks(user_id: int) -> Select[tuple[int]]: 

276 """ 

277 Gets a list of blocked user IDs or users that have blocked this user: those should be hidden 

278 """ 

279 blocked_users = select(UserBlock.blocked_user_id).where(UserBlock.blocking_user_id == user_id) 

280 blocking_users = select(UserBlock.blocking_user_id).where(UserBlock.blocked_user_id == user_id) 

281 

282 return select(union(blocked_users, blocking_users).subquery()) 

283 

284 

285def to_bool(value: bool) -> ColumnElement[bool]: 

286 return true() if value else false()