Coverage for app / backend / src / couchers / sql.py: 95%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from sqlalchemy import ColumnElement, and_, false, or_, select, true 

4from sqlalchemy.orm import InstrumentedAttribute, aliased 

5from sqlalchemy.sql import Select, exists, union 

6 

7from couchers.context import CouchersContext 

8from couchers.models import ( 

9 EventOccurrence, 

10 FriendRelationship, 

11 GroupChat, 

12 HostRequest, 

13 ModerationState, 

14 ModerationVisibility, 

15 SignupFlow, 

16 User, 

17 UserBlock, 

18) 

19from couchers.utils import is_valid_email, is_valid_user_id, is_valid_username 

20 

21if TYPE_CHECKING: 

22 from couchers.materialized_views import LiteUser 

23 

24 type _UserLike = type[User | LiteUser | SignupFlow] 

25 type _User = type[User | LiteUser] 

26 type _ModeratedContent = type[HostRequest | GroupChat | FriendRelationship | EventOccurrence] 

27 

28 

29def username_or_email(value: str, table: _UserLike = User) -> ColumnElement[bool]: 

30 if is_valid_username(value): 

31 return table.username == value 

32 elif is_valid_email(value) and hasattr(table, "email"): 

33 return table.email == value 

34 # no fields match, this will return no rows 

35 return false() 

36 

37 

38def username_or_id(value: str, table: _UserLike = User) -> ColumnElement[bool]: 

39 if is_valid_username(value): 

40 return table.username == value 

41 elif is_valid_user_id(value): 

42 return table.id == value 

43 # no fields match, this will return no rows 

44 return false() 

45 

46 

47def username_or_email_or_id(value: str) -> ColumnElement[bool]: 

48 # Should only be used for admin APIs, etc. 

49 if is_valid_username(value): 

50 return User.username == value 

51 elif is_valid_email(value): 

52 return User.email == value 

53 elif is_valid_user_id(value): 53 ↛ 56line 53 didn't jump to line 56 because the condition on line 53 was always true

54 return User.id == value 

55 # no fields match, this will return no rows 

56 return false() 

57 

58 

59def users_visible(context: CouchersContext, table: _User = User) -> ColumnElement[bool]: 

60 """ 

61 Filters out users that should not be visible: blocked, deleted, or banned 

62 

63 Filters the given table, assuming it's already joined/selected from 

64 """ 

65 hidden_users = _relevant_user_blocks(context.user_id) 

66 return and_(table.is_visible, ~table.id.in_(hidden_users)) 

67 

68 

69def where_users_column_visible[T: tuple[Any, ...]]( 

70 query: Select[T], context: CouchersContext, column: InstrumentedAttribute[int] 

71) -> Select[T]: 

72 """ 

73 Filters the given column, not yet joined/selected from 

74 """ 

75 hidden_users = _relevant_user_blocks(context.user_id) 

76 aliased_user = aliased(User) 

77 return ( 

78 query.join(aliased_user, aliased_user.id == column) 

79 .where(aliased_user.is_visible) 

80 .where(~aliased_user.id.in_(hidden_users)) 

81 ) 

82 

83 

84def users_visible_to_each_other(user1: _User, user2: _User) -> ColumnElement[bool]: 

85 """ 

86 Filters to ensure two users are mutually visible to each other. 

87 

88 Checks that: 

89 - Both users are visible (not deleted/banned) 

90 - Neither user has blocked the other (bidirectional check) 

91 

92 Use this when both User tables are already joined/selected in the query. 

93 """ 

94 return and_( 

95 user1.is_visible, 

96 user2.is_visible, 

97 ~exists( 

98 select(1) 

99 .select_from(UserBlock) 

100 .where( 

101 or_( 

102 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id), 

103 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id), 

104 ) 

105 ) 

106 ), 

107 ) 

108 

109 

110def where_user_columns_visible_to_each_other[T: tuple[Any, ...]]( 

111 query: Select[T], column1: InstrumentedAttribute[int], column2: InstrumentedAttribute[int] 

112) -> Select[T]: 

113 """ 

114 Filters to ensure two users are mutually visible to each other. 

115 

116 Checks that: 

117 - Both users are visible (not deleted/banned) 

118 - Neither user has blocked the other (bidirectional check) 

119 

120 Use this when you have two user_id columns that haven't been joined yet. 

121 This will join both User tables and apply the visibility checks. 

122 """ 

123 user1 = aliased(User) 

124 user2 = aliased(User) 

125 return ( 

126 query.join(user1, user1.id == column1) 

127 .join(user2, user2.id == column2) 

128 .where(user1.is_visible) 

129 .where(user2.is_visible) 

130 .where( 

131 ~exists( 

132 select(1) 

133 .select_from(UserBlock) 

134 .where( 

135 or_( 

136 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id), 

137 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id), 

138 ) 

139 ) 

140 ) 

141 ) 

142 ) 

143 

144 

145def where_moderated_content_visible_to_user_column[T: tuple[Any, ...]]( 

146 query: Select[T], 

147 table: _ModeratedContent, 

148 user_id_column: InstrumentedAttribute[int], 

149 is_list_operation: bool = False, 

150) -> Select[T]: 

151 aliased_mod_state = aliased(ModerationState) 

152 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible] 

153 

154 # UNLISTED content is visible in single-item operations but not in lists 

155 if not is_list_operation: 155 ↛ 159line 155 didn't jump to line 159 because the condition on line 155 was always true

156 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted) 

157 

158 # Authors can always see their own SHADOWED content 

159 conditions.append( 

160 and_( 

161 aliased_mod_state.visibility == ModerationVisibility.shadowed, 

162 getattr(table, table.__moderation_author_column__) == user_id_column, 

163 ) 

164 ) 

165 

166 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions)) 

167 

168 

169def where_moderated_content_visible[T: tuple[Any, ...]]( 

170 query: Select[T], 

171 context: CouchersContext, 

172 table: _ModeratedContent, 

173 is_list_operation: bool = False, 

174) -> Select[T]: 

175 aliased_mod_state = aliased(ModerationState) 

176 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible] 

177 

178 # UNLISTED content is visible in single-item operations but not in lists 

179 if not is_list_operation: 

180 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted) 

181 

182 # Authors can always see their own SHADOWED content 

183 if context.is_logged_in(): 183 ↛ 191line 183 didn't jump to line 191 because the condition on line 183 was always true

184 conditions.append( 

185 and_( 

186 aliased_mod_state.visibility == ModerationVisibility.shadowed, 

187 getattr(table, table.__moderation_author_column__) == context.user_id, 

188 ) 

189 ) 

190 

191 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions)) 

192 

193 

194def moderation_state_column_visible( 

195 context: CouchersContext, 

196 column: InstrumentedAttribute[int | None], 

197) -> ColumnElement[bool]: 

198 """ 

199 Filters based on whether the moderation state referenced by the column is visible. 

200 

201 Use this when you have a moderation_state_id column on a table that's not the moderated 

202 content itself (e.g., Notification.moderation_state_id). 

203 

204 The condition evaluates to True when: 

205 - The column is NULL (non-moderated content), OR 

206 - The linked content (HostRequest/GroupChat) is visible per where_moderated_content_visible 

207 

208 TODO: if you use this with a non-null column, check what's going on 

209 """ 

210 hr_visible = exists( 

211 where_moderated_content_visible( 

212 select(HostRequest).where(HostRequest.moderation_state_id == column), context, HostRequest 

213 ) 

214 ) 

215 gc_visible = exists( 

216 where_moderated_content_visible( 

217 select(GroupChat).where(GroupChat.moderation_state_id == column), context, GroupChat 

218 ) 

219 ) 

220 fr_visible = exists( 

221 where_moderated_content_visible( 

222 select(FriendRelationship).where(FriendRelationship.moderation_state_id == column), 

223 context, 

224 FriendRelationship, 

225 ) 

226 ) 

227 eo_visible = exists( 

228 where_moderated_content_visible( 

229 select(EventOccurrence).where(EventOccurrence.moderation_state_id == column), 

230 context, 

231 EventOccurrence, 

232 ) 

233 ) 

234 return or_(column.is_(None), hr_visible, gc_visible, fr_visible, eo_visible) 

235 

236 

237def _relevant_user_blocks(user_id: int) -> Select[tuple[int]]: 

238 """ 

239 Gets a list of blocked user IDs or users that have blocked this user: those should be hidden 

240 """ 

241 blocked_users = select(UserBlock.blocked_user_id).where(UserBlock.blocking_user_id == user_id) 

242 blocking_users = select(UserBlock.blocking_user_id).where(UserBlock.blocked_user_id == user_id) 

243 

244 return select(union(blocked_users, blocking_users).subquery()) 

245 

246 

247def to_bool(value: bool) -> ColumnElement[bool]: 

248 return true() if value else false()