Coverage for src / tests / fixtures / db.py: 97%
117 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 16:21 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 16:21 +0000
1import os
2from collections.abc import Sequence
3from contextlib import contextmanager
4from datetime import date, timedelta
5from pathlib import Path
6from typing import Any, cast
8from sqlalchemy import Connection, Engine, create_engine, or_, select, text, update
9from sqlalchemy.orm import Session
11from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
12from couchers.context import CouchersContext
13from couchers.crypto import random_hex
14from couchers.db import _get_base_engine, session_scope
15from couchers.models import (
16 Base,
17 FriendRelationship,
18 FriendStatus,
19 HostingStatus,
20 LanguageAbility,
21 LanguageFluency,
22 ModerationUserList,
23 PassportSex,
24 PhotoGallery,
25 RegionLived,
26 RegionVisited,
27 StrongVerificationAttempt,
28 StrongVerificationAttemptStatus,
29 Upload,
30 User,
31 UserBlock,
32 UserSession,
33)
34from couchers.servicers.auth import create_session
35from couchers.utils import create_coordinate, now
36from tests.fixtures.sessions import _MockCouchersContext
39def create_schema_from_models(engine: Engine | None = None) -> None:
40 """
41 Create everything from the current models, not incrementally
42 through migrations.
43 """
44 if engine is None:
45 engine = _get_base_engine()
47 # create sql functions (these are created in migrations otherwise)
48 functions = Path(__file__).parent / "sql_functions.sql"
49 with open(functions) as f, engine.connect() as conn:
50 conn.execute(text(f.read()))
51 conn.commit()
53 Base.metadata.create_all(engine)
56def populate_testing_resources(conn: Connection) -> None:
57 """
58 Testing version of couchers.resources.copy_resources_to_database
59 """
60 conn.execute(
61 text("""
62 INSERT INTO regions (code, name) VALUES
63 ('AUS', 'Australia'),
64 ('CAN', 'Canada'),
65 ('CHE', 'Switzerland'),
66 ('CUB', 'Cuba'),
67 ('CXR', 'Christmas Island'),
68 ('CZE', 'Czechia'),
69 ('DEU', 'Germany'),
70 ('EGY', 'Egypt'),
71 ('ESP', 'Spain'),
72 ('EST', 'Estonia'),
73 ('FIN', 'Finland'),
74 ('FRA', 'France'),
75 ('GBR', 'United Kingdom'),
76 ('GEO', 'Georgia'),
77 ('GHA', 'Ghana'),
78 ('GRC', 'Greece'),
79 ('HKG', 'Hong Kong'),
80 ('IRL', 'Ireland'),
81 ('ISR', 'Israel'),
82 ('ITA', 'Italy'),
83 ('JPN', 'Japan'),
84 ('LAO', 'Laos'),
85 ('MEX', 'Mexico'),
86 ('MMR', 'Myanmar'),
87 ('NAM', 'Namibia'),
88 ('NLD', 'Netherlands'),
89 ('NZL', 'New Zealand'),
90 ('POL', 'Poland'),
91 ('PRK', 'North Korea'),
92 ('REU', 'Réunion'),
93 ('SGP', 'Singapore'),
94 ('SWE', 'Sweden'),
95 ('THA', 'Thailand'),
96 ('TUR', 'Turkey'),
97 ('TWN', 'Taiwan'),
98 ('USA', 'United States'),
99 ('VNM', 'Vietnam');
100 """)
101 )
103 # Insert languages as textual SQL
104 conn.execute(
105 text("""
106 INSERT INTO languages (code, name) VALUES
107 ('arb', 'Arabic (Standard)'),
108 ('deu', 'German'),
109 ('eng', 'English'),
110 ('fin', 'Finnish'),
111 ('fra', 'French'),
112 ('heb', 'Hebrew'),
113 ('hun', 'Hungarian'),
114 ('jpn', 'Japanese'),
115 ('pol', 'Polish'),
116 ('swe', 'Swedish'),
117 ('cmn', 'Chinese (Mandarin)')
118 """)
119 )
121 with open(Path(__file__).parent.parent.parent.parent / "resources" / "timezone_areas.sql-fake", "r") as f:
122 tz_sql = f.read()
124 conn.execute(text(tz_sql))
127def drop_database() -> None:
128 with session_scope() as session:
129 # postgis is required for all the Geographic Information System (GIS) stuff
130 # pg_trgm is required for trigram-based search
131 # btree_gist is required for gist-based exclusion constraints
132 session.execute(
133 text(
134 "DROP SCHEMA IF EXISTS public CASCADE;"
135 "DROP SCHEMA IF EXISTS logging CASCADE;"
136 "DROP EXTENSION IF EXISTS postgis CASCADE;"
137 "CREATE SCHEMA IF NOT EXISTS public;"
138 "CREATE SCHEMA IF NOT EXISTS logging;"
139 "CREATE EXTENSION postgis;"
140 "CREATE EXTENSION pg_trgm;"
141 "CREATE EXTENSION btree_gist;"
142 )
143 )
146@contextmanager
147def autocommit_engine(url: str):
148 """
149 An engine that executes every statement in a transaction. Mainly needed
150 because CREATE/DROP DATABASE cannot be executed any other way.
151 """
152 engine = create_engine(
153 url,
154 isolation_level="AUTOCOMMIT",
155 )
156 yield engine
157 engine.dispose()
160def make_user(**kwargs: Any) -> User:
161 username = "test_user_" + random_hex(16)
163 user = User(
164 username=username,
165 email=f"{username}@dev.couchers.org",
166 hashed_password=b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
167 name=username.capitalize(),
168 hosting_status=HostingStatus.cant_host,
169 city="Testing city",
170 hometown="Test hometown",
171 community_standing=0.5,
172 birthdate=date(year=2000, month=1, day=1),
173 gender="Woman",
174 pronouns="",
175 occupation="Tester",
176 education="UST(esting)",
177 about_me="I test things",
178 things_i_like="Code",
179 about_place="My place has a lot of testing paraphenelia",
180 additional_information="I can be a bit testy",
181 accepted_tos=TOS_VERSION,
182 geom=create_coordinate(40.7108, -73.9740),
183 geom_radius=100,
184 last_onboarding_email_sent=now(),
185 last_donated=now(),
186 )
187 user.accepted_community_guidelines = GUIDELINES_VERSION
188 user.onboarding_emails_sent = 1
190 # Ensure superusers are also editors (DB constraint)
191 if kwargs.get("is_superuser") and "is_editor" not in kwargs:
192 kwargs["is_editor"] = True
194 for key, value in kwargs.items():
195 setattr(user, key, value)
197 return user
200def generate_user(
201 *,
202 delete_user=False,
203 complete_profile=True,
204 strong_verification=False,
205 regions_visited: Sequence[str] = (),
206 regions_lived: Sequence[str] = (),
207 language_abilities: Sequence[tuple[str, LanguageFluency]] = (),
208 **kwargs: Any,
209) -> tuple[User, str]:
210 """
211 Create a new user, return session token
213 The user is detached from any session, and you can access its static attributes, but you can't modify it
215 Use this most of the time
216 """
217 with session_scope() as session:
218 user = make_user(**kwargs)
220 session.add(user)
221 session.flush()
223 # Create a profile gallery for the user and link it
224 profile_gallery = PhotoGallery(owner_user_id=user.id)
225 session.add(profile_gallery)
226 session.flush()
227 user.profile_gallery_id = profile_gallery.id
229 for region in regions_visited:
230 session.add(RegionVisited(user_id=user.id, region_code=region))
232 for region in regions_lived:
233 session.add(RegionLived(user_id=user.id, region_code=region))
235 for lang, fluency in language_abilities:
236 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency))
238 # this expires the user, so now it's "dirty"
239 context = cast(CouchersContext, _MockCouchersContext())
240 token, _ = create_session(context, session, user, False, set_cookie=False)
242 # deleted user aborts session creation, hence this follows and necessitates a second commit
243 if delete_user:
244 user.is_deleted = True
246 user.recommendation_score = 1e10 - user.id
248 if complete_profile:
249 key = random_hex(32)
250 filename = random_hex(32) + ".jpg"
251 session.add(
252 Upload(
253 key=key,
254 filename=filename,
255 creator_user_id=user.id,
256 )
257 )
258 session.flush()
259 user.avatar_key = key
260 user.about_me = "I have a complete profile!\n" * 20
262 if strong_verification:
263 attempt = StrongVerificationAttempt(
264 verification_attempt_token=f"verification_attempt_token_{user.id}",
265 user_id=user.id,
266 status=StrongVerificationAttemptStatus.succeeded,
267 has_full_data=True,
268 passport_encrypted_data=b"not real",
269 passport_date_of_birth=user.birthdate,
270 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
271 user.gender, PassportSex.unspecified
272 ),
273 has_minimal_data=True,
274 passport_expiry_date=date.today() + timedelta(days=10),
275 passport_nationality="UTO",
276 passport_last_three_document_chars=f"{user.id:03}",
277 iris_token=f"iris_token_{user.id}",
278 iris_session_id=user.id,
279 )
280 session.add(attempt)
281 session.flush()
282 assert attempt.has_strong_verification(user)
284 session.commit()
286 assert user.has_completed_profile == complete_profile
288 # refresh it, undoes the expiry
289 session.refresh(user)
291 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
292 user.timezone # noqa: B018
294 # allows detaches the user from the session, allowing its use outside this session
295 session.expunge(user)
297 return user, token
300def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]:
301 user_id = session.execute(select(User.id).where(User.username == username)).scalar_one()
302 token = session.execute(select(UserSession.token).where(UserSession.user_id == user_id)).scalar_one()
303 return user_id, token
306def make_friends(user1: User, user2: User) -> None:
307 with session_scope() as session:
308 friend_relationship = FriendRelationship(
309 from_user_id=user1.id,
310 to_user_id=user2.id,
311 status=FriendStatus.accepted,
312 )
313 session.add(friend_relationship)
316def make_user_block(user1: User, user2: User) -> None:
317 with session_scope() as session:
318 user_block = UserBlock(
319 blocking_user_id=user1.id,
320 blocked_user_id=user2.id,
321 )
322 session.add(user_block)
325def make_user_invisible(user_id: int) -> None:
326 with session_scope() as session:
327 session.execute(update(User).where(User.id == user_id).values(is_banned=True))
330# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
331def get_friend_relationship(user1: User, user2: User) -> FriendRelationship | None:
332 with session_scope() as session:
333 friend_relationship = session.execute(
334 select(FriendRelationship).where(
335 or_(
336 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
337 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
338 )
339 )
340 ).scalar_one_or_none()
342 session.expunge(friend_relationship)
343 return friend_relationship
346def add_users_to_new_moderation_list(users: list[User]) -> int:
347 """Group users as duplicated accounts"""
348 with session_scope() as session:
349 moderation_user_list = ModerationUserList()
350 session.add(moderation_user_list)
351 session.flush()
352 for user in users:
353 refreshed_user = session.get_one(User, user.id)
354 moderation_user_list.users.append(refreshed_user)
355 return moderation_user_list.id
358def run_migration_test():
359 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true"