Coverage for app / backend / src / couchers / sql.py: 95%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from sqlalchemy import ColumnElement, and_, false, or_, select, true 

4from sqlalchemy.orm import InstrumentedAttribute, aliased 

5from sqlalchemy.sql import Select, exists, union 

6 

7from couchers.context import CouchersContext 

8from couchers.models import GroupChat, HostRequest, ModerationState, ModerationVisibility, SignupFlow, User, UserBlock 

9from couchers.utils import is_valid_email, is_valid_user_id, is_valid_username 

10 

11if TYPE_CHECKING: 

12 from couchers.materialized_views import LiteUser 

13 

14 type _UserLike = type[User | LiteUser | SignupFlow] 

15 type _User = type[User | LiteUser] 

16 type _ModeratedContent = type[HostRequest | GroupChat] 

17 

18 

19def username_or_email(value: str, table: _UserLike = User) -> ColumnElement[bool]: 

20 if is_valid_username(value): 

21 return table.username == value 

22 elif is_valid_email(value) and hasattr(table, "email"): 

23 return table.email == value 

24 # no fields match, this will return no rows 

25 return false() 

26 

27 

28def username_or_id(value: str, table: _UserLike = User) -> ColumnElement[bool]: 

29 if is_valid_username(value): 

30 return table.username == value 

31 elif is_valid_user_id(value): 

32 return table.id == value 

33 # no fields match, this will return no rows 

34 return false() 

35 

36 

37def username_or_email_or_id(value: str) -> ColumnElement[bool]: 

38 # Should only be used for admin APIs, etc. 

39 if is_valid_username(value): 

40 return User.username == value 

41 elif is_valid_email(value): 

42 return User.email == value 

43 elif is_valid_user_id(value): 43 ↛ 46line 43 didn't jump to line 46 because the condition on line 43 was always true

44 return User.id == value 

45 # no fields match, this will return no rows 

46 return false() 

47 

48 

49def users_visible(context: CouchersContext, table: _User = User) -> ColumnElement[bool]: 

50 """ 

51 Filters out users that should not be visible: blocked, deleted, or banned 

52 

53 Filters the given table, assuming it's already joined/selected from 

54 """ 

55 hidden_users = _relevant_user_blocks(context.user_id) 

56 return and_(table.is_visible, ~table.id.in_(hidden_users)) 

57 

58 

59def where_users_column_visible[T: tuple[Any, ...]]( 

60 query: Select[T], context: CouchersContext, column: InstrumentedAttribute[int] 

61) -> Select[T]: 

62 """ 

63 Filters the given column, not yet joined/selected from 

64 """ 

65 hidden_users = _relevant_user_blocks(context.user_id) 

66 aliased_user = aliased(User) 

67 return ( 

68 query.join(aliased_user, aliased_user.id == column) 

69 .where(aliased_user.is_visible) 

70 .where(~aliased_user.id.in_(hidden_users)) 

71 ) 

72 

73 

74def users_visible_to_each_other(user1: _User, user2: _User) -> ColumnElement[bool]: 

75 """ 

76 Filters to ensure two users are mutually visible to each other. 

77 

78 Checks that: 

79 - Both users are visible (not deleted/banned) 

80 - Neither user has blocked the other (bidirectional check) 

81 

82 Use this when both User tables are already joined/selected in the query. 

83 """ 

84 return and_( 

85 user1.is_visible, 

86 user2.is_visible, 

87 ~exists( 

88 select(1) 

89 .select_from(UserBlock) 

90 .where( 

91 or_( 

92 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id), 

93 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id), 

94 ) 

95 ) 

96 ), 

97 ) 

98 

99 

100def where_user_columns_visible_to_each_other[T: tuple[Any, ...]]( 

101 query: Select[T], column1: InstrumentedAttribute[int], column2: InstrumentedAttribute[int] 

102) -> Select[T]: 

103 """ 

104 Filters to ensure two users are mutually visible to each other. 

105 

106 Checks that: 

107 - Both users are visible (not deleted/banned) 

108 - Neither user has blocked the other (bidirectional check) 

109 

110 Use this when you have two user_id columns that haven't been joined yet. 

111 This will join both User tables and apply the visibility checks. 

112 """ 

113 user1 = aliased(User) 

114 user2 = aliased(User) 

115 return ( 

116 query.join(user1, user1.id == column1) 

117 .join(user2, user2.id == column2) 

118 .where(user1.is_visible) 

119 .where(user2.is_visible) 

120 .where( 

121 ~exists( 

122 select(1) 

123 .select_from(UserBlock) 

124 .where( 

125 or_( 

126 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id), 

127 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id), 

128 ) 

129 ) 

130 ) 

131 ) 

132 ) 

133 

134 

135def where_moderated_content_visible_to_user_column[T: tuple[Any, ...]]( 

136 query: Select[T], 

137 table: _ModeratedContent, 

138 user_id_column: InstrumentedAttribute[int], 

139 is_list_operation: bool = False, 

140) -> Select[T]: 

141 aliased_mod_state = aliased(ModerationState) 

142 conditions = [aliased_mod_state.visibility == ModerationVisibility.VISIBLE] 

143 

144 # UNLISTED content is visible in single-item operations but not in lists 

145 if not is_list_operation: 145 ↛ 149line 145 didn't jump to line 149 because the condition on line 145 was always true

146 conditions.append(aliased_mod_state.visibility == ModerationVisibility.UNLISTED) 

147 

148 # Authors can always see their own SHADOWED content 

149 conditions.append( 

150 and_( 

151 aliased_mod_state.visibility == ModerationVisibility.SHADOWED, 

152 getattr(table, table.__moderation_author_column__) == user_id_column, 

153 ) 

154 ) 

155 

156 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions)) 

157 

158 

159def where_moderated_content_visible[T: tuple[Any, ...]]( 

160 query: Select[T], 

161 context: CouchersContext, 

162 table: _ModeratedContent, 

163 is_list_operation: bool = False, 

164) -> Select[T]: 

165 aliased_mod_state = aliased(ModerationState) 

166 conditions = [aliased_mod_state.visibility == ModerationVisibility.VISIBLE] 

167 

168 # UNLISTED content is visible in single-item operations but not in lists 

169 if not is_list_operation: 

170 conditions.append(aliased_mod_state.visibility == ModerationVisibility.UNLISTED) 

171 

172 # Authors can always see their own SHADOWED content 

173 if context.is_logged_in(): 173 ↛ 181line 173 didn't jump to line 181 because the condition on line 173 was always true

174 conditions.append( 

175 and_( 

176 aliased_mod_state.visibility == ModerationVisibility.SHADOWED, 

177 getattr(table, table.__moderation_author_column__) == context.user_id, 

178 ) 

179 ) 

180 

181 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions)) 

182 

183 

184def moderation_state_column_visible( 

185 context: CouchersContext, 

186 column: InstrumentedAttribute[int | None], 

187) -> ColumnElement[bool]: 

188 """ 

189 Filters based on whether the moderation state referenced by the column is visible. 

190 

191 Use this when you have a moderation_state_id column on a table that's not the moderated 

192 content itself (e.g., Notification.moderation_state_id). 

193 

194 The condition evaluates to True when: 

195 - The column is NULL (non-moderated content), OR 

196 - The linked content (HostRequest/GroupChat) is visible per where_moderated_content_visible 

197 

198 TODO: if you use this with a non-null column, check what's going on 

199 """ 

200 hr_visible = exists( 

201 where_moderated_content_visible( 

202 select(HostRequest).where(HostRequest.moderation_state_id == column), context, HostRequest 

203 ) 

204 ) 

205 gc_visible = exists( 

206 where_moderated_content_visible( 

207 select(GroupChat).where(GroupChat.moderation_state_id == column), context, GroupChat 

208 ) 

209 ) 

210 return or_(column.is_(None), hr_visible, gc_visible) 

211 

212 

213def _relevant_user_blocks(user_id: int) -> Select[tuple[int]]: 

214 """ 

215 Gets a list of blocked user IDs or users that have blocked this user: those should be hidden 

216 """ 

217 blocked_users = select(UserBlock.blocked_user_id).where(UserBlock.blocking_user_id == user_id) 

218 blocking_users = select(UserBlock.blocking_user_id).where(UserBlock.blocked_user_id == user_id) 

219 

220 return select(union(blocked_users, blocking_users).subquery()) 

221 

222 

223def to_bool(value: bool) -> ColumnElement[bool]: 

224 return true() if value else false()