Coverage for src / tests / fixtures / db.py: 97%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-07 16:21 +0000

1import os 

2from collections.abc import Sequence 

3from contextlib import contextmanager 

4from datetime import date, timedelta 

5from pathlib import Path 

6from typing import Any, cast 

7 

8from sqlalchemy import Connection, Engine, create_engine, or_, select, text, update 

9from sqlalchemy.orm import Session 

10 

11from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

12from couchers.context import CouchersContext 

13from couchers.crypto import random_hex 

14from couchers.db import _get_base_engine, session_scope 

15from couchers.models import ( 

16 Base, 

17 FriendRelationship, 

18 FriendStatus, 

19 HostingStatus, 

20 LanguageAbility, 

21 LanguageFluency, 

22 ModerationUserList, 

23 PassportSex, 

24 PhotoGallery, 

25 RegionLived, 

26 RegionVisited, 

27 StrongVerificationAttempt, 

28 StrongVerificationAttemptStatus, 

29 Upload, 

30 User, 

31 UserBlock, 

32 UserSession, 

33) 

34from couchers.servicers.auth import create_session 

35from couchers.utils import create_coordinate, now 

36from tests.fixtures.sessions import _MockCouchersContext 

37 

38 

39def create_schema_from_models(engine: Engine | None = None) -> None: 

40 """ 

41 Create everything from the current models, not incrementally 

42 through migrations. 

43 """ 

44 if engine is None: 

45 engine = _get_base_engine() 

46 

47 # create sql functions (these are created in migrations otherwise) 

48 functions = Path(__file__).parent / "sql_functions.sql" 

49 with open(functions) as f, engine.connect() as conn: 

50 conn.execute(text(f.read())) 

51 conn.commit() 

52 

53 Base.metadata.create_all(engine) 

54 

55 

56def populate_testing_resources(conn: Connection) -> None: 

57 """ 

58 Testing version of couchers.resources.copy_resources_to_database 

59 """ 

60 conn.execute( 

61 text(""" 

62 INSERT INTO regions (code, name) VALUES 

63 ('AUS', 'Australia'), 

64 ('CAN', 'Canada'), 

65 ('CHE', 'Switzerland'), 

66 ('CUB', 'Cuba'), 

67 ('CXR', 'Christmas Island'), 

68 ('CZE', 'Czechia'), 

69 ('DEU', 'Germany'), 

70 ('EGY', 'Egypt'), 

71 ('ESP', 'Spain'), 

72 ('EST', 'Estonia'), 

73 ('FIN', 'Finland'), 

74 ('FRA', 'France'), 

75 ('GBR', 'United Kingdom'), 

76 ('GEO', 'Georgia'), 

77 ('GHA', 'Ghana'), 

78 ('GRC', 'Greece'), 

79 ('HKG', 'Hong Kong'), 

80 ('IRL', 'Ireland'), 

81 ('ISR', 'Israel'), 

82 ('ITA', 'Italy'), 

83 ('JPN', 'Japan'), 

84 ('LAO', 'Laos'), 

85 ('MEX', 'Mexico'), 

86 ('MMR', 'Myanmar'), 

87 ('NAM', 'Namibia'), 

88 ('NLD', 'Netherlands'), 

89 ('NZL', 'New Zealand'), 

90 ('POL', 'Poland'), 

91 ('PRK', 'North Korea'), 

92 ('REU', 'Réunion'), 

93 ('SGP', 'Singapore'), 

94 ('SWE', 'Sweden'), 

95 ('THA', 'Thailand'), 

96 ('TUR', 'Turkey'), 

97 ('TWN', 'Taiwan'), 

98 ('USA', 'United States'), 

99 ('VNM', 'Vietnam'); 

100 """) 

101 ) 

102 

103 # Insert languages as textual SQL 

104 conn.execute( 

105 text(""" 

106 INSERT INTO languages (code, name) VALUES 

107 ('arb', 'Arabic (Standard)'), 

108 ('deu', 'German'), 

109 ('eng', 'English'), 

110 ('fin', 'Finnish'), 

111 ('fra', 'French'), 

112 ('heb', 'Hebrew'), 

113 ('hun', 'Hungarian'), 

114 ('jpn', 'Japanese'), 

115 ('pol', 'Polish'), 

116 ('swe', 'Swedish'), 

117 ('cmn', 'Chinese (Mandarin)') 

118 """) 

119 ) 

120 

121 with open(Path(__file__).parent.parent.parent.parent / "resources" / "timezone_areas.sql-fake", "r") as f: 

122 tz_sql = f.read() 

123 

124 conn.execute(text(tz_sql)) 

125 

126 

127def drop_database() -> None: 

128 with session_scope() as session: 

129 # postgis is required for all the Geographic Information System (GIS) stuff 

130 # pg_trgm is required for trigram-based search 

131 # btree_gist is required for gist-based exclusion constraints 

132 session.execute( 

133 text( 

134 "DROP SCHEMA IF EXISTS public CASCADE;" 

135 "DROP SCHEMA IF EXISTS logging CASCADE;" 

136 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

137 "CREATE SCHEMA IF NOT EXISTS public;" 

138 "CREATE SCHEMA IF NOT EXISTS logging;" 

139 "CREATE EXTENSION postgis;" 

140 "CREATE EXTENSION pg_trgm;" 

141 "CREATE EXTENSION btree_gist;" 

142 ) 

143 ) 

144 

145 

146@contextmanager 

147def autocommit_engine(url: str): 

148 """ 

149 An engine that executes every statement in a transaction. Mainly needed 

150 because CREATE/DROP DATABASE cannot be executed any other way. 

151 """ 

152 engine = create_engine( 

153 url, 

154 isolation_level="AUTOCOMMIT", 

155 ) 

156 yield engine 

157 engine.dispose() 

158 

159 

160def make_user(**kwargs: Any) -> User: 

161 username = "test_user_" + random_hex(16) 

162 

163 user = User( 

164 username=username, 

165 email=f"{username}@dev.couchers.org", 

166 hashed_password=b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

167 name=username.capitalize(), 

168 hosting_status=HostingStatus.cant_host, 

169 city="Testing city", 

170 hometown="Test hometown", 

171 community_standing=0.5, 

172 birthdate=date(year=2000, month=1, day=1), 

173 gender="Woman", 

174 pronouns="", 

175 occupation="Tester", 

176 education="UST(esting)", 

177 about_me="I test things", 

178 things_i_like="Code", 

179 about_place="My place has a lot of testing paraphenelia", 

180 additional_information="I can be a bit testy", 

181 accepted_tos=TOS_VERSION, 

182 geom=create_coordinate(40.7108, -73.9740), 

183 geom_radius=100, 

184 last_onboarding_email_sent=now(), 

185 last_donated=now(), 

186 ) 

187 user.accepted_community_guidelines = GUIDELINES_VERSION 

188 user.onboarding_emails_sent = 1 

189 

190 # Ensure superusers are also editors (DB constraint) 

191 if kwargs.get("is_superuser") and "is_editor" not in kwargs: 

192 kwargs["is_editor"] = True 

193 

194 for key, value in kwargs.items(): 

195 setattr(user, key, value) 

196 

197 return user 

198 

199 

200def generate_user( 

201 *, 

202 delete_user=False, 

203 complete_profile=True, 

204 strong_verification=False, 

205 regions_visited: Sequence[str] = (), 

206 regions_lived: Sequence[str] = (), 

207 language_abilities: Sequence[tuple[str, LanguageFluency]] = (), 

208 **kwargs: Any, 

209) -> tuple[User, str]: 

210 """ 

211 Create a new user, return session token 

212 

213 The user is detached from any session, and you can access its static attributes, but you can't modify it 

214 

215 Use this most of the time 

216 """ 

217 with session_scope() as session: 

218 user = make_user(**kwargs) 

219 

220 session.add(user) 

221 session.flush() 

222 

223 # Create a profile gallery for the user and link it 

224 profile_gallery = PhotoGallery(owner_user_id=user.id) 

225 session.add(profile_gallery) 

226 session.flush() 

227 user.profile_gallery_id = profile_gallery.id 

228 

229 for region in regions_visited: 

230 session.add(RegionVisited(user_id=user.id, region_code=region)) 

231 

232 for region in regions_lived: 

233 session.add(RegionLived(user_id=user.id, region_code=region)) 

234 

235 for lang, fluency in language_abilities: 

236 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency)) 

237 

238 # this expires the user, so now it's "dirty" 

239 context = cast(CouchersContext, _MockCouchersContext()) 

240 token, _ = create_session(context, session, user, False, set_cookie=False) 

241 

242 # deleted user aborts session creation, hence this follows and necessitates a second commit 

243 if delete_user: 

244 user.is_deleted = True 

245 

246 user.recommendation_score = 1e10 - user.id 

247 

248 if complete_profile: 

249 key = random_hex(32) 

250 filename = random_hex(32) + ".jpg" 

251 session.add( 

252 Upload( 

253 key=key, 

254 filename=filename, 

255 creator_user_id=user.id, 

256 ) 

257 ) 

258 session.flush() 

259 user.avatar_key = key 

260 user.about_me = "I have a complete profile!\n" * 20 

261 

262 if strong_verification: 

263 attempt = StrongVerificationAttempt( 

264 verification_attempt_token=f"verification_attempt_token_{user.id}", 

265 user_id=user.id, 

266 status=StrongVerificationAttemptStatus.succeeded, 

267 has_full_data=True, 

268 passport_encrypted_data=b"not real", 

269 passport_date_of_birth=user.birthdate, 

270 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

271 user.gender, PassportSex.unspecified 

272 ), 

273 has_minimal_data=True, 

274 passport_expiry_date=date.today() + timedelta(days=10), 

275 passport_nationality="UTO", 

276 passport_last_three_document_chars=f"{user.id:03}", 

277 iris_token=f"iris_token_{user.id}", 

278 iris_session_id=user.id, 

279 ) 

280 session.add(attempt) 

281 session.flush() 

282 assert attempt.has_strong_verification(user) 

283 

284 session.commit() 

285 

286 assert user.has_completed_profile == complete_profile 

287 

288 # refresh it, undoes the expiry 

289 session.refresh(user) 

290 

291 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

292 user.timezone # noqa: B018 

293 

294 # allows detaches the user from the session, allowing its use outside this session 

295 session.expunge(user) 

296 

297 return user, token 

298 

299 

300def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]: 

301 user_id = session.execute(select(User.id).where(User.username == username)).scalar_one() 

302 token = session.execute(select(UserSession.token).where(UserSession.user_id == user_id)).scalar_one() 

303 return user_id, token 

304 

305 

306def make_friends(user1: User, user2: User) -> None: 

307 with session_scope() as session: 

308 friend_relationship = FriendRelationship( 

309 from_user_id=user1.id, 

310 to_user_id=user2.id, 

311 status=FriendStatus.accepted, 

312 ) 

313 session.add(friend_relationship) 

314 

315 

316def make_user_block(user1: User, user2: User) -> None: 

317 with session_scope() as session: 

318 user_block = UserBlock( 

319 blocking_user_id=user1.id, 

320 blocked_user_id=user2.id, 

321 ) 

322 session.add(user_block) 

323 

324 

325def make_user_invisible(user_id: int) -> None: 

326 with session_scope() as session: 

327 session.execute(update(User).where(User.id == user_id).values(is_banned=True)) 

328 

329 

330# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

331def get_friend_relationship(user1: User, user2: User) -> FriendRelationship | None: 

332 with session_scope() as session: 

333 friend_relationship = session.execute( 

334 select(FriendRelationship).where( 

335 or_( 

336 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

337 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

338 ) 

339 ) 

340 ).scalar_one_or_none() 

341 

342 session.expunge(friend_relationship) 

343 return friend_relationship 

344 

345 

346def add_users_to_new_moderation_list(users: list[User]) -> int: 

347 """Group users as duplicated accounts""" 

348 with session_scope() as session: 

349 moderation_user_list = ModerationUserList() 

350 session.add(moderation_user_list) 

351 session.flush() 

352 for user in users: 

353 refreshed_user = session.get_one(User, user.id) 

354 moderation_user_list.users.append(refreshed_user) 

355 return moderation_user_list.id 

356 

357 

358def run_migration_test(): 

359 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true"