Coverage for app / backend / src / tests / fixtures / db.py: 100%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import subprocess
2from collections.abc import Sequence
3from contextlib import contextmanager
4from datetime import date, timedelta
5from pathlib import Path
6from typing import Any, cast
8from sqlalchemy import Connection, Engine, create_engine, func, or_, select, text, update
9from sqlalchemy.orm import Session
11from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
12from couchers.context import CouchersContext
13from couchers.crypto import random_hex
14from couchers.db import _get_base_engine, session_scope
15from couchers.helpers.completed_profile import has_completed_profile
16from couchers.models import (
17 Base,
18 FriendRelationship,
19 FriendStatus,
20 HostingStatus,
21 LanguageAbility,
22 LanguageFluency,
23 ModerationObjectType,
24 ModerationState,
25 ModerationUserList,
26 ModerationVisibility,
27 PassportSex,
28 PhotoGallery,
29 PhotoGalleryItem,
30 RegionLived,
31 RegionVisited,
32 StrongVerificationAttempt,
33 StrongVerificationAttemptStatus,
34 Upload,
35 User,
36 UserBlock,
37 UserSession,
38 Volunteer,
39)
40from couchers.servicers.auth import create_session
41from couchers.utils import create_coordinate, now
42from tests.fixtures.sessions import _MockCouchersContext
45def create_schema_from_models(engine: Engine | None = None) -> None:
46 """
47 Create everything from the current models, not incrementally
48 through migrations.
49 """
50 if engine is None:
51 engine = _get_base_engine()
53 # create sql functions (these are created in migrations otherwise)
54 functions = Path(__file__).parent / "sql_functions.sql"
55 with open(functions) as f, engine.connect() as conn:
56 conn.execute(text(f.read()))
57 conn.commit()
59 Base.metadata.create_all(engine)
62def populate_testing_resources(conn: Connection) -> None:
63 """
64 Testing version of couchers.resources.copy_resources_to_database
65 """
66 conn.execute(
67 text("""
68 INSERT INTO regions (code, name) VALUES
69 ('AUS', 'Australia'),
70 ('CAN', 'Canada'),
71 ('CHE', 'Switzerland'),
72 ('CUB', 'Cuba'),
73 ('CXR', 'Christmas Island'),
74 ('CZE', 'Czechia'),
75 ('DEU', 'Germany'),
76 ('EGY', 'Egypt'),
77 ('ESP', 'Spain'),
78 ('EST', 'Estonia'),
79 ('FIN', 'Finland'),
80 ('FRA', 'France'),
81 ('GBR', 'United Kingdom'),
82 ('GEO', 'Georgia'),
83 ('GHA', 'Ghana'),
84 ('GRC', 'Greece'),
85 ('HKG', 'Hong Kong'),
86 ('IRL', 'Ireland'),
87 ('ISR', 'Israel'),
88 ('ITA', 'Italy'),
89 ('JPN', 'Japan'),
90 ('LAO', 'Laos'),
91 ('MEX', 'Mexico'),
92 ('MMR', 'Myanmar'),
93 ('NAM', 'Namibia'),
94 ('NLD', 'Netherlands'),
95 ('NZL', 'New Zealand'),
96 ('POL', 'Poland'),
97 ('PRK', 'North Korea'),
98 ('REU', 'Réunion'),
99 ('SGP', 'Singapore'),
100 ('SWE', 'Sweden'),
101 ('THA', 'Thailand'),
102 ('TUR', 'Turkey'),
103 ('TWN', 'Taiwan'),
104 ('USA', 'United States'),
105 ('VNM', 'Vietnam');
106 """)
107 )
109 # Insert languages as textual SQL
110 conn.execute(
111 text("""
112 INSERT INTO languages (code, name) VALUES
113 ('arb', 'Arabic (Standard)'),
114 ('deu', 'German'),
115 ('eng', 'English'),
116 ('fin', 'Finnish'),
117 ('fra', 'French'),
118 ('heb', 'Hebrew'),
119 ('hun', 'Hungarian'),
120 ('jpn', 'Japanese'),
121 ('pol', 'Polish'),
122 ('swe', 'Swedish'),
123 ('cmn', 'Chinese (Mandarin)')
124 """)
125 )
127 with open(Path(__file__).parent.parent.parent.parent / "resources" / "timezone_areas.sql-fake", "r") as f:
128 tz_sql = f.read()
130 conn.execute(text(tz_sql))
133def drop_database() -> None:
134 with session_scope() as session:
135 # postgis is required for all the Geographic Information System (GIS) stuff
136 # pg_trgm is required for trigram-based search
137 # btree_gist is required for gist-based exclusion constraints
138 session.execute(
139 text(
140 "DROP SCHEMA IF EXISTS public CASCADE;"
141 "DROP SCHEMA IF EXISTS logging CASCADE;"
142 "DROP EXTENSION IF EXISTS postgis CASCADE;"
143 "CREATE SCHEMA IF NOT EXISTS public;"
144 "CREATE SCHEMA IF NOT EXISTS logging;"
145 "CREATE EXTENSION postgis;"
146 "CREATE EXTENSION pg_trgm;"
147 "CREATE EXTENSION btree_gist;"
148 )
149 )
152@contextmanager
153def autocommit_engine(url: str):
154 """
155 An engine that executes every statement in a transaction. Mainly needed
156 because CREATE/DROP DATABASE cannot be executed any other way.
157 """
158 engine = create_engine(
159 url,
160 isolation_level="AUTOCOMMIT",
161 )
162 yield engine
163 engine.dispose()
166def make_user(**kwargs: Any) -> User:
167 username = "test_user_" + random_hex(16)
169 user = User(
170 username=username,
171 email=f"{username}@dev.couchers.org",
172 hashed_password=b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
173 name=username.capitalize(),
174 hosting_status=HostingStatus.cant_host,
175 city="Testing city",
176 hometown="Test hometown",
177 community_standing=0.5,
178 birthdate=date(year=2000, month=1, day=1),
179 gender="Woman",
180 pronouns="",
181 occupation="Tester",
182 education="UST(esting)",
183 about_me="I test things",
184 things_i_like="Code",
185 about_place="My place has a lot of testing paraphenelia",
186 additional_information="I can be a bit testy",
187 accepted_tos=TOS_VERSION,
188 geom=create_coordinate(40.7108, -73.9740),
189 geom_radius=100,
190 last_onboarding_email_sent=now(),
191 last_donated=now(),
192 )
193 user.accepted_community_guidelines = GUIDELINES_VERSION
194 user.onboarding_emails_sent = 1
196 # Ensure superusers are also editors (DB constraint)
197 if kwargs.get("is_superuser") and "is_editor" not in kwargs:
198 kwargs["is_editor"] = True
200 for key, value in kwargs.items():
201 setattr(user, key, value)
203 return user
206def generate_user(
207 *,
208 delete_user=False,
209 complete_profile=True,
210 strong_verification=False,
211 regions_visited: Sequence[str] = (),
212 regions_lived: Sequence[str] = (),
213 language_abilities: Sequence[tuple[str, LanguageFluency]] = (),
214 **kwargs: Any,
215) -> tuple[User, str]:
216 """
217 Create a new user, return session token
219 The user is detached from any session, and you can access its static attributes, but you can't modify it
221 Use this most of the time
222 """
223 with session_scope() as session:
224 user = make_user(**kwargs)
226 session.add(user)
227 session.flush()
229 # Create a profile gallery for the user and link it
230 profile_gallery = PhotoGallery(owner_user_id=user.id)
231 session.add(profile_gallery)
232 session.flush()
233 user.profile_gallery_id = profile_gallery.id
235 for region in regions_visited:
236 session.add(RegionVisited(user_id=user.id, region_code=region))
238 for region in regions_lived:
239 session.add(RegionLived(user_id=user.id, region_code=region))
241 for lang, fluency in language_abilities:
242 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency))
244 # this expires the user, so now it's "dirty"
245 context = cast(CouchersContext, _MockCouchersContext())
246 token, _ = create_session(context, session, user, False, set_cookie=False)
248 # deleted user aborts session creation, hence this follows and necessitates a second commit
249 if delete_user:
250 user.deleted_at = now()
252 user.recommendation_score = 1e10 - user.id
254 if complete_profile:
255 key = random_hex(32)
256 session.add(
257 Upload(
258 key=key,
259 filename=random_hex(32) + ".jpg",
260 creator_user_id=user.id,
261 )
262 )
263 session.add(
264 PhotoGalleryItem(
265 gallery_id=profile_gallery.id,
266 upload_key=key,
267 position=0,
268 )
269 )
270 session.flush()
272 user.about_me = "I have a complete profile!\n" * 20
274 if strong_verification:
275 attempt = StrongVerificationAttempt(
276 verification_attempt_token=f"verification_attempt_token_{user.id}",
277 user_id=user.id,
278 status=StrongVerificationAttemptStatus.succeeded,
279 has_full_data=True,
280 passport_encrypted_data=b"not real",
281 passport_date_of_birth=user.birthdate,
282 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
283 user.gender, PassportSex.unspecified
284 ),
285 has_minimal_data=True,
286 passport_expiry_date=date.today() + timedelta(days=10),
287 passport_nationality="UTO",
288 passport_last_three_document_chars=f"{user.id:03}",
289 iris_token=f"iris_token_{user.id}",
290 iris_session_id=user.id,
291 )
292 session.add(attempt)
293 session.flush()
294 assert attempt.has_strong_verification(user)
296 session.commit()
298 assert has_completed_profile(session, user) == complete_profile
300 # refresh it, undoes the expiry
301 session.refresh(user)
303 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
304 user.timezone # noqa: B018
306 # allows detaches the user from the session, allowing its use outside this session
307 session.expunge(user)
309 return user, token
312def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]:
313 user_id = session.execute(select(User.id).where(User.username == username)).scalar_one()
314 token = session.execute(select(UserSession.token).where(UserSession.user_id == user_id)).scalar_one()
315 return user_id, token
318def make_friends(user1: User, user2: User) -> None:
319 with session_scope() as session:
320 # Create moderation state with VISIBLE status (approved friendship for tests)
321 moderation_state = ModerationState(
322 object_type=ModerationObjectType.friend_request,
323 object_id=0, # Placeholder, will be updated
324 visibility=ModerationVisibility.visible,
325 )
326 session.add(moderation_state)
327 session.flush()
329 friend_relationship = FriendRelationship(
330 from_user_id=user1.id,
331 to_user_id=user2.id,
332 status=FriendStatus.accepted,
333 moderation_state_id=moderation_state.id,
334 )
335 session.add(friend_relationship)
336 session.flush()
338 # Update the moderation state with the actual object id
339 moderation_state.object_id = friend_relationship.id
342def make_user_block(user1: User, user2: User) -> None:
343 with session_scope() as session:
344 user_block = UserBlock(
345 blocking_user_id=user1.id,
346 blocked_user_id=user2.id,
347 )
348 session.add(user_block)
351def make_user_invisible(user_id: int) -> None:
352 with session_scope() as session:
353 session.execute(update(User).where(User.id == user_id).values(banned_at=func.now()))
356# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
357def get_friend_relationship(user1: User, user2: User) -> FriendRelationship | None:
358 with session_scope() as session:
359 friend_relationship = session.execute(
360 select(FriendRelationship).where(
361 or_(
362 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
363 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
364 )
365 )
366 ).scalar_one_or_none()
368 session.expunge(friend_relationship)
369 return friend_relationship
372def add_users_to_new_moderation_list(users: list[User]) -> int:
373 """Group users as duplicated accounts"""
374 with session_scope() as session:
375 moderation_user_list = ModerationUserList()
376 session.add(moderation_user_list)
377 session.flush()
378 for user in users:
379 refreshed_user = session.get_one(User, user.id)
380 moderation_user_list.users.append(refreshed_user)
381 return moderation_user_list.id
384def pg_dump_is_available() -> bool:
385 result = subprocess.run(["which", "pg_dump"], stdout=subprocess.PIPE, encoding="ascii")
386 return result.returncode == 0
389def make_volunteer(started_volunteering: date, show_on_team_page: bool = True, **kwargs: Any) -> Volunteer:
390 vol = Volunteer(show_on_team_page=show_on_team_page, **kwargs)
391 vol.started_volunteering = started_volunteering
393 return vol