Coverage for app / backend / src / tests / fixtures / db.py: 97%
122 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import subprocess
2from collections.abc import Sequence
3from contextlib import contextmanager
4from datetime import date, timedelta
5from pathlib import Path
6from typing import Any, cast
8from sqlalchemy import Connection, Engine, create_engine, or_, select, text, update
9from sqlalchemy.orm import Session
11from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
12from couchers.context import CouchersContext
13from couchers.crypto import random_hex
14from couchers.db import _get_base_engine, session_scope
15from couchers.helpers.completed_profile import has_completed_profile
16from couchers.models import (
17 Base,
18 FriendRelationship,
19 FriendStatus,
20 HostingStatus,
21 LanguageAbility,
22 LanguageFluency,
23 ModerationUserList,
24 PassportSex,
25 PhotoGallery,
26 PhotoGalleryItem,
27 RegionLived,
28 RegionVisited,
29 StrongVerificationAttempt,
30 StrongVerificationAttemptStatus,
31 Upload,
32 User,
33 UserBlock,
34 UserSession,
35 Volunteer,
36)
37from couchers.servicers.auth import create_session
38from couchers.utils import create_coordinate, now
39from tests.fixtures.sessions import _MockCouchersContext
42def create_schema_from_models(engine: Engine | None = None) -> None:
43 """
44 Create everything from the current models, not incrementally
45 through migrations.
46 """
47 if engine is None:
48 engine = _get_base_engine()
50 # create sql functions (these are created in migrations otherwise)
51 functions = Path(__file__).parent / "sql_functions.sql"
52 with open(functions) as f, engine.connect() as conn:
53 conn.execute(text(f.read()))
54 conn.commit()
56 Base.metadata.create_all(engine)
59def populate_testing_resources(conn: Connection) -> None:
60 """
61 Testing version of couchers.resources.copy_resources_to_database
62 """
63 conn.execute(
64 text("""
65 INSERT INTO regions (code, name) VALUES
66 ('AUS', 'Australia'),
67 ('CAN', 'Canada'),
68 ('CHE', 'Switzerland'),
69 ('CUB', 'Cuba'),
70 ('CXR', 'Christmas Island'),
71 ('CZE', 'Czechia'),
72 ('DEU', 'Germany'),
73 ('EGY', 'Egypt'),
74 ('ESP', 'Spain'),
75 ('EST', 'Estonia'),
76 ('FIN', 'Finland'),
77 ('FRA', 'France'),
78 ('GBR', 'United Kingdom'),
79 ('GEO', 'Georgia'),
80 ('GHA', 'Ghana'),
81 ('GRC', 'Greece'),
82 ('HKG', 'Hong Kong'),
83 ('IRL', 'Ireland'),
84 ('ISR', 'Israel'),
85 ('ITA', 'Italy'),
86 ('JPN', 'Japan'),
87 ('LAO', 'Laos'),
88 ('MEX', 'Mexico'),
89 ('MMR', 'Myanmar'),
90 ('NAM', 'Namibia'),
91 ('NLD', 'Netherlands'),
92 ('NZL', 'New Zealand'),
93 ('POL', 'Poland'),
94 ('PRK', 'North Korea'),
95 ('REU', 'Réunion'),
96 ('SGP', 'Singapore'),
97 ('SWE', 'Sweden'),
98 ('THA', 'Thailand'),
99 ('TUR', 'Turkey'),
100 ('TWN', 'Taiwan'),
101 ('USA', 'United States'),
102 ('VNM', 'Vietnam');
103 """)
104 )
106 # Insert languages as textual SQL
107 conn.execute(
108 text("""
109 INSERT INTO languages (code, name) VALUES
110 ('arb', 'Arabic (Standard)'),
111 ('deu', 'German'),
112 ('eng', 'English'),
113 ('fin', 'Finnish'),
114 ('fra', 'French'),
115 ('heb', 'Hebrew'),
116 ('hun', 'Hungarian'),
117 ('jpn', 'Japanese'),
118 ('pol', 'Polish'),
119 ('swe', 'Swedish'),
120 ('cmn', 'Chinese (Mandarin)')
121 """)
122 )
124 with open(Path(__file__).parent.parent.parent.parent / "resources" / "timezone_areas.sql-fake", "r") as f:
125 tz_sql = f.read()
127 conn.execute(text(tz_sql))
130def drop_database() -> None:
131 with session_scope() as session:
132 # postgis is required for all the Geographic Information System (GIS) stuff
133 # pg_trgm is required for trigram-based search
134 # btree_gist is required for gist-based exclusion constraints
135 session.execute(
136 text(
137 "DROP SCHEMA IF EXISTS public CASCADE;"
138 "DROP SCHEMA IF EXISTS logging CASCADE;"
139 "DROP EXTENSION IF EXISTS postgis CASCADE;"
140 "CREATE SCHEMA IF NOT EXISTS public;"
141 "CREATE SCHEMA IF NOT EXISTS logging;"
142 "CREATE EXTENSION postgis;"
143 "CREATE EXTENSION pg_trgm;"
144 "CREATE EXTENSION btree_gist;"
145 )
146 )
149@contextmanager
150def autocommit_engine(url: str):
151 """
152 An engine that executes every statement in a transaction. Mainly needed
153 because CREATE/DROP DATABASE cannot be executed any other way.
154 """
155 engine = create_engine(
156 url,
157 isolation_level="AUTOCOMMIT",
158 )
159 yield engine
160 engine.dispose()
163def make_user(**kwargs: Any) -> User:
164 username = "test_user_" + random_hex(16)
166 user = User(
167 username=username,
168 email=f"{username}@dev.couchers.org",
169 hashed_password=b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
170 name=username.capitalize(),
171 hosting_status=HostingStatus.cant_host,
172 city="Testing city",
173 hometown="Test hometown",
174 community_standing=0.5,
175 birthdate=date(year=2000, month=1, day=1),
176 gender="Woman",
177 pronouns="",
178 occupation="Tester",
179 education="UST(esting)",
180 about_me="I test things",
181 things_i_like="Code",
182 about_place="My place has a lot of testing paraphenelia",
183 additional_information="I can be a bit testy",
184 accepted_tos=TOS_VERSION,
185 geom=create_coordinate(40.7108, -73.9740),
186 geom_radius=100,
187 last_onboarding_email_sent=now(),
188 last_donated=now(),
189 )
190 user.accepted_community_guidelines = GUIDELINES_VERSION
191 user.onboarding_emails_sent = 1
193 # Ensure superusers are also editors (DB constraint)
194 if kwargs.get("is_superuser") and "is_editor" not in kwargs:
195 kwargs["is_editor"] = True
197 for key, value in kwargs.items():
198 setattr(user, key, value)
200 return user
203def generate_user(
204 *,
205 delete_user=False,
206 complete_profile=True,
207 strong_verification=False,
208 regions_visited: Sequence[str] = (),
209 regions_lived: Sequence[str] = (),
210 language_abilities: Sequence[tuple[str, LanguageFluency]] = (),
211 **kwargs: Any,
212) -> tuple[User, str]:
213 """
214 Create a new user, return session token
216 The user is detached from any session, and you can access its static attributes, but you can't modify it
218 Use this most of the time
219 """
220 with session_scope() as session:
221 user = make_user(**kwargs)
223 session.add(user)
224 session.flush()
226 # Create a profile gallery for the user and link it
227 profile_gallery = PhotoGallery(owner_user_id=user.id)
228 session.add(profile_gallery)
229 session.flush()
230 user.profile_gallery_id = profile_gallery.id
232 for region in regions_visited:
233 session.add(RegionVisited(user_id=user.id, region_code=region))
235 for region in regions_lived:
236 session.add(RegionLived(user_id=user.id, region_code=region))
238 for lang, fluency in language_abilities:
239 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency))
241 # this expires the user, so now it's "dirty"
242 context = cast(CouchersContext, _MockCouchersContext())
243 token, _ = create_session(context, session, user, False, set_cookie=False)
245 # deleted user aborts session creation, hence this follows and necessitates a second commit
246 if delete_user:
247 user.is_deleted = True
249 user.recommendation_score = 1e10 - user.id
251 if complete_profile:
252 key = random_hex(32)
253 session.add(
254 Upload(
255 key=key,
256 filename=random_hex(32) + ".jpg",
257 creator_user_id=user.id,
258 )
259 )
260 session.add(
261 PhotoGalleryItem(
262 gallery_id=profile_gallery.id,
263 upload_key=key,
264 position=0,
265 )
266 )
267 session.flush()
269 user.about_me = "I have a complete profile!\n" * 20
271 if strong_verification:
272 attempt = StrongVerificationAttempt(
273 verification_attempt_token=f"verification_attempt_token_{user.id}",
274 user_id=user.id,
275 status=StrongVerificationAttemptStatus.succeeded,
276 has_full_data=True,
277 passport_encrypted_data=b"not real",
278 passport_date_of_birth=user.birthdate,
279 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
280 user.gender, PassportSex.unspecified
281 ),
282 has_minimal_data=True,
283 passport_expiry_date=date.today() + timedelta(days=10),
284 passport_nationality="UTO",
285 passport_last_three_document_chars=f"{user.id:03}",
286 iris_token=f"iris_token_{user.id}",
287 iris_session_id=user.id,
288 )
289 session.add(attempt)
290 session.flush()
291 assert attempt.has_strong_verification(user)
293 session.commit()
295 assert has_completed_profile(session, user) == complete_profile
297 # refresh it, undoes the expiry
298 session.refresh(user)
300 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
301 user.timezone # noqa: B018
303 # allows detaches the user from the session, allowing its use outside this session
304 session.expunge(user)
306 return user, token
309def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]:
310 user_id = session.execute(select(User.id).where(User.username == username)).scalar_one()
311 token = session.execute(select(UserSession.token).where(UserSession.user_id == user_id)).scalar_one()
312 return user_id, token
315def make_friends(user1: User, user2: User) -> None:
316 with session_scope() as session:
317 friend_relationship = FriendRelationship(
318 from_user_id=user1.id,
319 to_user_id=user2.id,
320 status=FriendStatus.accepted,
321 )
322 session.add(friend_relationship)
325def make_user_block(user1: User, user2: User) -> None:
326 with session_scope() as session:
327 user_block = UserBlock(
328 blocking_user_id=user1.id,
329 blocked_user_id=user2.id,
330 )
331 session.add(user_block)
334def make_user_invisible(user_id: int) -> None:
335 with session_scope() as session:
336 session.execute(update(User).where(User.id == user_id).values(is_banned=True))
339# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
340def get_friend_relationship(user1: User, user2: User) -> FriendRelationship | None:
341 with session_scope() as session:
342 friend_relationship = session.execute(
343 select(FriendRelationship).where(
344 or_(
345 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
346 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
347 )
348 )
349 ).scalar_one_or_none()
351 session.expunge(friend_relationship)
352 return friend_relationship
355def add_users_to_new_moderation_list(users: list[User]) -> int:
356 """Group users as duplicated accounts"""
357 with session_scope() as session:
358 moderation_user_list = ModerationUserList()
359 session.add(moderation_user_list)
360 session.flush()
361 for user in users:
362 refreshed_user = session.get_one(User, user.id)
363 moderation_user_list.users.append(refreshed_user)
364 return moderation_user_list.id
367def pg_dump_is_available() -> bool:
368 result = subprocess.run(["which", "pg_dump"], stdout=subprocess.PIPE, encoding="ascii")
369 return result.returncode == 0
372def make_volunteer(started_volunteering: date, show_on_team_page: bool = True, **kwargs: Any) -> Volunteer:
373 vol = Volunteer(show_on_team_page=show_on_team_page, **kwargs)
374 vol.started_volunteering = started_volunteering
376 return vol