Coverage for app / backend / src / tests / fixtures / db.py: 100%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import subprocess
2from collections.abc import Sequence
3from contextlib import contextmanager
4from datetime import date, timedelta
5from pathlib import Path
6from typing import Any, cast
8from sqlalchemy import Connection, Engine, create_engine, func, or_, select, text, update
9from sqlalchemy.orm import Session
11from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
12from couchers.context import CouchersContext
13from couchers.crypto import random_hex
14from couchers.db import _get_base_engine, session_scope
15from couchers.helpers.completed_profile import has_completed_profile
16from couchers.models import (
17 Base,
18 FriendRelationship,
19 FriendStatus,
20 HostingStatus,
21 LanguageAbility,
22 LanguageFluency,
23 ModerationObjectType,
24 ModerationState,
25 ModerationUserList,
26 ModerationVisibility,
27 PassportSex,
28 PhotoGallery,
29 PhotoGalleryItem,
30 RegionLived,
31 RegionVisited,
32 StrongVerificationAttempt,
33 StrongVerificationAttemptStatus,
34 Upload,
35 User,
36 UserBlock,
37 UserSession,
38 Volunteer,
39)
40from couchers.servicers.auth import create_session
41from couchers.utils import create_coordinate, now
42from tests.fixtures.sessions import _MockCouchersContext
45def create_schema_from_models(engine: Engine | None = None) -> None:
46 """
47 Create everything from the current models, not incrementally
48 through migrations.
49 """
50 if engine is None:
51 engine = _get_base_engine()
53 # create sql functions (these are created in migrations otherwise)
54 functions = Path(__file__).parent / "sql_functions.sql"
55 with open(functions) as f, engine.connect() as conn:
56 conn.execute(text(f.read()))
57 conn.commit()
59 Base.metadata.create_all(engine)
62def populate_testing_resources(conn: Connection) -> None:
63 """
64 Testing version of couchers.resources.copy_resources_to_database
65 """
66 conn.execute(
67 text("""
68 INSERT INTO regions (code, name) VALUES
69 ('AUS', 'Australia'),
70 ('CAN', 'Canada'),
71 ('CHE', 'Switzerland'),
72 ('CUB', 'Cuba'),
73 ('CXR', 'Christmas Island'),
74 ('CZE', 'Czechia'),
75 ('DEU', 'Germany'),
76 ('EGY', 'Egypt'),
77 ('ESP', 'Spain'),
78 ('EST', 'Estonia'),
79 ('FIN', 'Finland'),
80 ('FRA', 'France'),
81 ('GBR', 'United Kingdom'),
82 ('GEO', 'Georgia'),
83 ('GHA', 'Ghana'),
84 ('GRC', 'Greece'),
85 ('HKG', 'Hong Kong'),
86 ('IRL', 'Ireland'),
87 ('ISR', 'Israel'),
88 ('ITA', 'Italy'),
89 ('JPN', 'Japan'),
90 ('LAO', 'Laos'),
91 ('MEX', 'Mexico'),
92 ('MMR', 'Myanmar'),
93 ('NAM', 'Namibia'),
94 ('NLD', 'Netherlands'),
95 ('NZL', 'New Zealand'),
96 ('POL', 'Poland'),
97 ('PRK', 'North Korea'),
98 ('REU', 'Réunion'),
99 ('SGP', 'Singapore'),
100 ('SWE', 'Sweden'),
101 ('THA', 'Thailand'),
102 ('TUR', 'Turkey'),
103 ('TWN', 'Taiwan'),
104 ('USA', 'United States'),
105 ('VNM', 'Vietnam');
106 """)
107 )
109 # Insert languages as textual SQL
110 conn.execute(
111 text("""
112 INSERT INTO languages (code, name) VALUES
113 ('arb', 'Arabic (Standard)'),
114 ('deu', 'German'),
115 ('eng', 'English'),
116 ('fin', 'Finnish'),
117 ('fra', 'French'),
118 ('heb', 'Hebrew'),
119 ('hun', 'Hungarian'),
120 ('jpn', 'Japanese'),
121 ('pol', 'Polish'),
122 ('swe', 'Swedish'),
123 ('cmn', 'Chinese (Mandarin)')
124 """)
125 )
127 with open(Path(__file__).parent.parent.parent.parent / "resources" / "timezone_areas.sql-fake", "r") as f:
128 tz_sql = f.read()
130 conn.execute(text(tz_sql))
133def drop_database() -> None:
134 with session_scope() as session:
135 # postgis is required for all the Geographic Information System (GIS) stuff
136 # pg_trgm is required for trigram-based search
137 # btree_gist is required for gist-based exclusion constraints
138 session.execute(
139 text(
140 "DROP SCHEMA IF EXISTS public CASCADE;"
141 "DROP SCHEMA IF EXISTS logging CASCADE;"
142 "DROP EXTENSION IF EXISTS postgis CASCADE;"
143 "CREATE SCHEMA IF NOT EXISTS public;"
144 "CREATE SCHEMA IF NOT EXISTS logging;"
145 "CREATE EXTENSION postgis;"
146 "CREATE EXTENSION pg_trgm;"
147 "CREATE EXTENSION btree_gist;"
148 "CREATE EXTENSION pg_stat_statements;"
149 )
150 )
153@contextmanager
154def autocommit_engine(url: str):
155 """
156 An engine that executes every statement in a transaction. Mainly needed
157 because CREATE/DROP DATABASE cannot be executed any other way.
158 """
159 engine = create_engine(
160 url,
161 isolation_level="AUTOCOMMIT",
162 )
163 yield engine
164 engine.dispose()
167def make_user(**kwargs: Any) -> User:
168 username = "test_user_" + random_hex(16)
170 user = User(
171 username=username,
172 email=f"{username}@dev.couchers.org",
173 hashed_password=b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
174 name=username.capitalize(),
175 hosting_status=HostingStatus.cant_host,
176 city="Testing city",
177 hometown="Test hometown",
178 community_standing=0.5,
179 birthdate=date(year=2000, month=1, day=1),
180 gender="Woman",
181 pronouns="",
182 occupation="Tester",
183 education="UST(esting)",
184 about_me="I test things",
185 things_i_like="Code",
186 about_place="My place has a lot of testing paraphenelia",
187 additional_information="I can be a bit testy",
188 accepted_tos=TOS_VERSION,
189 geom=create_coordinate(40.7108, -73.9740),
190 geom_radius=100,
191 last_onboarding_email_sent=now(),
192 last_donated=now(),
193 )
194 user.accepted_community_guidelines = GUIDELINES_VERSION
195 user.onboarding_emails_sent = 1
197 # Ensure superusers are also editors (DB constraint)
198 if kwargs.get("is_superuser") and "is_editor" not in kwargs:
199 kwargs["is_editor"] = True
201 for key, value in kwargs.items():
202 setattr(user, key, value)
204 return user
207def generate_user(
208 *,
209 delete_user=False,
210 complete_profile=True,
211 strong_verification=False,
212 regions_visited: Sequence[str] = (),
213 regions_lived: Sequence[str] = (),
214 language_abilities: Sequence[tuple[str, LanguageFluency]] = (),
215 **kwargs: Any,
216) -> tuple[User, str]:
217 """
218 Create a new user, return session token
220 The user is detached from any session, and you can access its static attributes, but you can't modify it
222 Use this most of the time
223 """
224 with session_scope() as session:
225 user = make_user(**kwargs)
227 session.add(user)
228 session.flush()
230 # Create a profile gallery for the user and link it
231 profile_gallery = PhotoGallery(owner_user_id=user.id)
232 session.add(profile_gallery)
233 session.flush()
234 user.profile_gallery_id = profile_gallery.id
236 for region in regions_visited:
237 session.add(RegionVisited(user_id=user.id, region_code=region))
239 for region in regions_lived:
240 session.add(RegionLived(user_id=user.id, region_code=region))
242 for lang, fluency in language_abilities:
243 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency))
245 # this expires the user, so now it's "dirty"
246 context = cast(CouchersContext, _MockCouchersContext())
247 token, _ = create_session(context, session, user, False, set_cookie=False)
249 # deleted user aborts session creation, hence this follows and necessitates a second commit
250 if delete_user:
251 user.deleted_at = now()
253 user.recommendation_score = 1e10 - user.id
255 if complete_profile:
256 key = random_hex(32)
257 session.add(
258 Upload(
259 key=key,
260 filename=random_hex(32) + ".jpg",
261 creator_user_id=user.id,
262 )
263 )
264 session.add(
265 PhotoGalleryItem(
266 gallery_id=profile_gallery.id,
267 upload_key=key,
268 position=0,
269 )
270 )
271 session.flush()
273 user.about_me = "I have a complete profile!\n" * 20
275 if strong_verification:
276 attempt = StrongVerificationAttempt(
277 verification_attempt_token=f"verification_attempt_token_{user.id}",
278 user_id=user.id,
279 status=StrongVerificationAttemptStatus.succeeded,
280 has_full_data=True,
281 passport_encrypted_data=b"not real",
282 passport_date_of_birth=user.birthdate,
283 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
284 user.gender, PassportSex.unspecified
285 ),
286 has_minimal_data=True,
287 passport_expiry_date=date.today() + timedelta(days=10),
288 passport_nationality="UTO",
289 passport_last_three_document_chars=f"{user.id:03}",
290 iris_token=f"iris_token_{user.id}",
291 iris_session_id=user.id,
292 )
293 session.add(attempt)
294 session.flush()
295 assert attempt.has_strong_verification(user)
297 session.commit()
299 assert has_completed_profile(session, user) == complete_profile
301 # refresh it, undoes the expiry
302 session.refresh(user)
304 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
305 user.timezone # noqa: B018
307 # allows detaches the user from the session, allowing its use outside this session
308 session.expunge(user)
310 return user, token
313def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]:
314 user_id = session.execute(select(User.id).where(User.username == username)).scalar_one()
315 token = session.execute(select(UserSession.token).where(UserSession.user_id == user_id)).scalar_one()
316 return user_id, token
319def make_friends(user1: User, user2: User) -> None:
320 with session_scope() as session:
321 # Create moderation state with VISIBLE status (approved friendship for tests)
322 moderation_state = ModerationState(
323 object_type=ModerationObjectType.friend_request,
324 object_id=0, # Placeholder, will be updated
325 visibility=ModerationVisibility.visible,
326 )
327 session.add(moderation_state)
328 session.flush()
330 friend_relationship = FriendRelationship(
331 from_user_id=user1.id,
332 to_user_id=user2.id,
333 status=FriendStatus.accepted,
334 moderation_state_id=moderation_state.id,
335 )
336 session.add(friend_relationship)
337 session.flush()
339 # Update the moderation state with the actual object id
340 moderation_state.object_id = friend_relationship.id
343def make_user_block(user1: User, user2: User) -> None:
344 with session_scope() as session:
345 user_block = UserBlock(
346 blocking_user_id=user1.id,
347 blocked_user_id=user2.id,
348 )
349 session.add(user_block)
352def make_user_invisible(user_id: int) -> None:
353 with session_scope() as session:
354 session.execute(update(User).where(User.id == user_id).values(banned_at=func.now()))
357# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
358def get_friend_relationship(user1: User, user2: User) -> FriendRelationship | None:
359 with session_scope() as session:
360 friend_relationship = session.execute(
361 select(FriendRelationship).where(
362 or_(
363 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
364 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
365 )
366 )
367 ).scalar_one_or_none()
369 session.expunge(friend_relationship)
370 return friend_relationship
373def add_users_to_new_moderation_list(users: list[User]) -> int:
374 """Group users as duplicated accounts"""
375 with session_scope() as session:
376 moderation_user_list = ModerationUserList()
377 session.add(moderation_user_list)
378 session.flush()
379 for user in users:
380 refreshed_user = session.get_one(User, user.id)
381 moderation_user_list.users.append(refreshed_user)
382 return moderation_user_list.id
385def pg_dump_is_available() -> bool:
386 result = subprocess.run(["which", "pg_dump"], stdout=subprocess.PIPE, encoding="ascii")
387 return result.returncode == 0
390def make_volunteer(started_volunteering: date, show_on_team_page: bool = True, **kwargs: Any) -> Volunteer:
391 vol = Volunteer(show_on_team_page=show_on_team_page, **kwargs)
392 vol.started_volunteering = started_volunteering
394 return vol