Coverage for app / backend / src / couchers / sql.py: 95%
65 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1from typing import TYPE_CHECKING, Any
3from sqlalchemy import ColumnElement, and_, false, or_, select, true
4from sqlalchemy.orm import InstrumentedAttribute, aliased
5from sqlalchemy.sql import Select, exists, union
7from couchers.context import CouchersContext
8from couchers.models import GroupChat, HostRequest, ModerationState, ModerationVisibility, SignupFlow, User, UserBlock
9from couchers.utils import is_valid_email, is_valid_user_id, is_valid_username
11if TYPE_CHECKING:
12 from couchers.materialized_views import LiteUser
14 type _UserLike = type[User | LiteUser | SignupFlow]
15 type _User = type[User | LiteUser]
16 type _ModeratedContent = type[HostRequest | GroupChat]
19def username_or_email(value: str, table: _UserLike = User) -> ColumnElement[bool]:
20 if is_valid_username(value):
21 return table.username == value
22 elif is_valid_email(value) and hasattr(table, "email"):
23 return table.email == value
24 # no fields match, this will return no rows
25 return false()
28def username_or_id(value: str, table: _UserLike = User) -> ColumnElement[bool]:
29 if is_valid_username(value):
30 return table.username == value
31 elif is_valid_user_id(value):
32 return table.id == value
33 # no fields match, this will return no rows
34 return false()
37def username_or_email_or_id(value: str) -> ColumnElement[bool]:
38 # Should only be used for admin APIs, etc.
39 if is_valid_username(value):
40 return User.username == value
41 elif is_valid_email(value):
42 return User.email == value
43 elif is_valid_user_id(value): 43 ↛ 46line 43 didn't jump to line 46 because the condition on line 43 was always true
44 return User.id == value
45 # no fields match, this will return no rows
46 return false()
49def users_visible(context: CouchersContext, table: _User = User) -> ColumnElement[bool]:
50 """
51 Filters out users that should not be visible: blocked, deleted, or banned
53 Filters the given table, assuming it's already joined/selected from
54 """
55 hidden_users = _relevant_user_blocks(context.user_id)
56 return and_(table.is_visible, ~table.id.in_(hidden_users))
59def where_users_column_visible[T: tuple[Any, ...]](
60 query: Select[T], context: CouchersContext, column: InstrumentedAttribute[int]
61) -> Select[T]:
62 """
63 Filters the given column, not yet joined/selected from
64 """
65 hidden_users = _relevant_user_blocks(context.user_id)
66 aliased_user = aliased(User)
67 return (
68 query.join(aliased_user, aliased_user.id == column)
69 .where(aliased_user.is_visible)
70 .where(~aliased_user.id.in_(hidden_users))
71 )
74def users_visible_to_each_other(user1: _User, user2: _User) -> ColumnElement[bool]:
75 """
76 Filters to ensure two users are mutually visible to each other.
78 Checks that:
79 - Both users are visible (not deleted/banned)
80 - Neither user has blocked the other (bidirectional check)
82 Use this when both User tables are already joined/selected in the query.
83 """
84 return and_(
85 user1.is_visible,
86 user2.is_visible,
87 ~exists(
88 select(1)
89 .select_from(UserBlock)
90 .where(
91 or_(
92 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id),
93 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id),
94 )
95 )
96 ),
97 )
100def where_user_columns_visible_to_each_other[T: tuple[Any, ...]](
101 query: Select[T], column1: InstrumentedAttribute[int], column2: InstrumentedAttribute[int]
102) -> Select[T]:
103 """
104 Filters to ensure two users are mutually visible to each other.
106 Checks that:
107 - Both users are visible (not deleted/banned)
108 - Neither user has blocked the other (bidirectional check)
110 Use this when you have two user_id columns that haven't been joined yet.
111 This will join both User tables and apply the visibility checks.
112 """
113 user1 = aliased(User)
114 user2 = aliased(User)
115 return (
116 query.join(user1, user1.id == column1)
117 .join(user2, user2.id == column2)
118 .where(user1.is_visible)
119 .where(user2.is_visible)
120 .where(
121 ~exists(
122 select(1)
123 .select_from(UserBlock)
124 .where(
125 or_(
126 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id),
127 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id),
128 )
129 )
130 )
131 )
132 )
135def where_moderated_content_visible_to_user_column[T: tuple[Any, ...]](
136 query: Select[T],
137 table: _ModeratedContent,
138 user_id_column: InstrumentedAttribute[int],
139 is_list_operation: bool = False,
140) -> Select[T]:
141 aliased_mod_state = aliased(ModerationState)
142 conditions = [aliased_mod_state.visibility == ModerationVisibility.VISIBLE]
144 # UNLISTED content is visible in single-item operations but not in lists
145 if not is_list_operation: 145 ↛ 149line 145 didn't jump to line 149 because the condition on line 145 was always true
146 conditions.append(aliased_mod_state.visibility == ModerationVisibility.UNLISTED)
148 # Authors can always see their own SHADOWED content
149 conditions.append(
150 and_(
151 aliased_mod_state.visibility == ModerationVisibility.SHADOWED,
152 getattr(table, table.__moderation_author_column__) == user_id_column,
153 )
154 )
156 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions))
159def where_moderated_content_visible[T: tuple[Any, ...]](
160 query: Select[T],
161 context: CouchersContext,
162 table: _ModeratedContent,
163 is_list_operation: bool = False,
164) -> Select[T]:
165 aliased_mod_state = aliased(ModerationState)
166 conditions = [aliased_mod_state.visibility == ModerationVisibility.VISIBLE]
168 # UNLISTED content is visible in single-item operations but not in lists
169 if not is_list_operation:
170 conditions.append(aliased_mod_state.visibility == ModerationVisibility.UNLISTED)
172 # Authors can always see their own SHADOWED content
173 if context.is_logged_in(): 173 ↛ 181line 173 didn't jump to line 181 because the condition on line 173 was always true
174 conditions.append(
175 and_(
176 aliased_mod_state.visibility == ModerationVisibility.SHADOWED,
177 getattr(table, table.__moderation_author_column__) == context.user_id,
178 )
179 )
181 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions))
184def moderation_state_column_visible(
185 context: CouchersContext,
186 column: InstrumentedAttribute[int | None],
187) -> ColumnElement[bool]:
188 """
189 Filters based on whether the moderation state referenced by the column is visible.
191 Use this when you have a moderation_state_id column on a table that's not the moderated
192 content itself (e.g., Notification.moderation_state_id).
194 The condition evaluates to True when:
195 - The column is NULL (non-moderated content), OR
196 - The linked content (HostRequest/GroupChat) is visible per where_moderated_content_visible
198 TODO: if you use this with a non-null column, check what's going on
199 """
200 hr_visible = exists(
201 where_moderated_content_visible(
202 select(HostRequest).where(HostRequest.moderation_state_id == column), context, HostRequest
203 )
204 )
205 gc_visible = exists(
206 where_moderated_content_visible(
207 select(GroupChat).where(GroupChat.moderation_state_id == column), context, GroupChat
208 )
209 )
210 return or_(column.is_(None), hr_visible, gc_visible)
213def _relevant_user_blocks(user_id: int) -> Select[tuple[int]]:
214 """
215 Gets a list of blocked user IDs or users that have blocked this user: those should be hidden
216 """
217 blocked_users = select(UserBlock.blocked_user_id).where(UserBlock.blocking_user_id == user_id)
218 blocking_users = select(UserBlock.blocking_user_id).where(UserBlock.blocked_user_id == user_id)
220 return select(union(blocked_users, blocking_users).subquery())
223def to_bool(value: bool) -> ColumnElement[bool]:
224 return true() if value else false()