Coverage for app / backend / src / tests / fixtures / db.py: 100%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import subprocess 

2from collections.abc import Sequence 

3from contextlib import contextmanager 

4from datetime import date, timedelta 

5from pathlib import Path 

6from typing import Any, cast 

7 

8from sqlalchemy import Connection, Engine, create_engine, func, or_, select, text, update 

9from sqlalchemy.orm import Session 

10 

11from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

12from couchers.context import CouchersContext 

13from couchers.crypto import random_hex 

14from couchers.db import _get_base_engine, session_scope 

15from couchers.helpers.completed_profile import has_completed_profile 

16from couchers.models import ( 

17 Base, 

18 FriendRelationship, 

19 FriendStatus, 

20 HostingStatus, 

21 LanguageAbility, 

22 LanguageFluency, 

23 ModerationObjectType, 

24 ModerationState, 

25 ModerationUserList, 

26 ModerationVisibility, 

27 PassportSex, 

28 PhotoGallery, 

29 PhotoGalleryItem, 

30 RegionLived, 

31 RegionVisited, 

32 StrongVerificationAttempt, 

33 StrongVerificationAttemptStatus, 

34 Upload, 

35 User, 

36 UserBlock, 

37 UserSession, 

38 Volunteer, 

39) 

40from couchers.servicers.auth import create_session 

41from couchers.utils import create_coordinate, now 

42from tests.fixtures.sessions import _MockCouchersContext 

43 

44 

45def create_schema_from_models(engine: Engine | None = None) -> None: 

46 """ 

47 Create everything from the current models, not incrementally 

48 through migrations. 

49 """ 

50 if engine is None: 

51 engine = _get_base_engine() 

52 

53 # create sql functions (these are created in migrations otherwise) 

54 functions = Path(__file__).parent / "sql_functions.sql" 

55 with open(functions) as f, engine.connect() as conn: 

56 conn.execute(text(f.read())) 

57 conn.commit() 

58 

59 Base.metadata.create_all(engine) 

60 

61 

62def populate_testing_resources(conn: Connection) -> None: 

63 """ 

64 Testing version of couchers.resources.copy_resources_to_database 

65 """ 

66 conn.execute( 

67 text(""" 

68 INSERT INTO regions (code, name) VALUES 

69 ('AUS', 'Australia'), 

70 ('CAN', 'Canada'), 

71 ('CHE', 'Switzerland'), 

72 ('CUB', 'Cuba'), 

73 ('CXR', 'Christmas Island'), 

74 ('CZE', 'Czechia'), 

75 ('DEU', 'Germany'), 

76 ('EGY', 'Egypt'), 

77 ('ESP', 'Spain'), 

78 ('EST', 'Estonia'), 

79 ('FIN', 'Finland'), 

80 ('FRA', 'France'), 

81 ('GBR', 'United Kingdom'), 

82 ('GEO', 'Georgia'), 

83 ('GHA', 'Ghana'), 

84 ('GRC', 'Greece'), 

85 ('HKG', 'Hong Kong'), 

86 ('IRL', 'Ireland'), 

87 ('ISR', 'Israel'), 

88 ('ITA', 'Italy'), 

89 ('JPN', 'Japan'), 

90 ('LAO', 'Laos'), 

91 ('MEX', 'Mexico'), 

92 ('MMR', 'Myanmar'), 

93 ('NAM', 'Namibia'), 

94 ('NLD', 'Netherlands'), 

95 ('NZL', 'New Zealand'), 

96 ('POL', 'Poland'), 

97 ('PRK', 'North Korea'), 

98 ('REU', 'Réunion'), 

99 ('SGP', 'Singapore'), 

100 ('SWE', 'Sweden'), 

101 ('THA', 'Thailand'), 

102 ('TUR', 'Turkey'), 

103 ('TWN', 'Taiwan'), 

104 ('USA', 'United States'), 

105 ('VNM', 'Vietnam'); 

106 """) 

107 ) 

108 

109 # Insert languages as textual SQL 

110 conn.execute( 

111 text(""" 

112 INSERT INTO languages (code, name) VALUES 

113 ('arb', 'Arabic (Standard)'), 

114 ('deu', 'German'), 

115 ('eng', 'English'), 

116 ('fin', 'Finnish'), 

117 ('fra', 'French'), 

118 ('heb', 'Hebrew'), 

119 ('hun', 'Hungarian'), 

120 ('jpn', 'Japanese'), 

121 ('pol', 'Polish'), 

122 ('swe', 'Swedish'), 

123 ('cmn', 'Chinese (Mandarin)') 

124 """) 

125 ) 

126 

127 with open(Path(__file__).parent.parent.parent.parent / "resources" / "timezone_areas.sql-fake", "r") as f: 

128 tz_sql = f.read() 

129 

130 conn.execute(text(tz_sql)) 

131 

132 

133def drop_database() -> None: 

134 with session_scope() as session: 

135 # postgis is required for all the Geographic Information System (GIS) stuff 

136 # pg_trgm is required for trigram-based search 

137 # btree_gist is required for gist-based exclusion constraints 

138 session.execute( 

139 text( 

140 "DROP SCHEMA IF EXISTS public CASCADE;" 

141 "DROP SCHEMA IF EXISTS logging CASCADE;" 

142 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

143 "CREATE SCHEMA IF NOT EXISTS public;" 

144 "CREATE SCHEMA IF NOT EXISTS logging;" 

145 "CREATE EXTENSION postgis;" 

146 "CREATE EXTENSION pg_trgm;" 

147 "CREATE EXTENSION btree_gist;" 

148 "CREATE EXTENSION pg_stat_statements;" 

149 ) 

150 ) 

151 

152 

153@contextmanager 

154def autocommit_engine(url: str): 

155 """ 

156 An engine that executes every statement in a transaction. Mainly needed 

157 because CREATE/DROP DATABASE cannot be executed any other way. 

158 """ 

159 engine = create_engine( 

160 url, 

161 isolation_level="AUTOCOMMIT", 

162 ) 

163 yield engine 

164 engine.dispose() 

165 

166 

167def make_user(**kwargs: Any) -> User: 

168 username = "test_user_" + random_hex(16) 

169 

170 user = User( 

171 username=username, 

172 email=f"{username}@dev.couchers.org", 

173 hashed_password=b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

174 name=username.capitalize(), 

175 hosting_status=HostingStatus.cant_host, 

176 city="Testing city", 

177 hometown="Test hometown", 

178 community_standing=0.5, 

179 birthdate=date(year=2000, month=1, day=1), 

180 gender="Woman", 

181 pronouns="", 

182 occupation="Tester", 

183 education="UST(esting)", 

184 about_me="I test things", 

185 things_i_like="Code", 

186 about_place="My place has a lot of testing paraphenelia", 

187 additional_information="I can be a bit testy", 

188 accepted_tos=TOS_VERSION, 

189 geom=create_coordinate(40.7108, -73.9740), 

190 geom_radius=100, 

191 last_onboarding_email_sent=now(), 

192 last_donated=now(), 

193 ) 

194 user.accepted_community_guidelines = GUIDELINES_VERSION 

195 user.onboarding_emails_sent = 1 

196 

197 # Ensure superusers are also editors (DB constraint) 

198 if kwargs.get("is_superuser") and "is_editor" not in kwargs: 

199 kwargs["is_editor"] = True 

200 

201 for key, value in kwargs.items(): 

202 setattr(user, key, value) 

203 

204 return user 

205 

206 

207def generate_user( 

208 *, 

209 delete_user=False, 

210 complete_profile=True, 

211 strong_verification=False, 

212 regions_visited: Sequence[str] = (), 

213 regions_lived: Sequence[str] = (), 

214 language_abilities: Sequence[tuple[str, LanguageFluency]] = (), 

215 **kwargs: Any, 

216) -> tuple[User, str]: 

217 """ 

218 Create a new user, return session token 

219 

220 The user is detached from any session, and you can access its static attributes, but you can't modify it 

221 

222 Use this most of the time 

223 """ 

224 with session_scope() as session: 

225 user = make_user(**kwargs) 

226 

227 session.add(user) 

228 session.flush() 

229 

230 # Create a profile gallery for the user and link it 

231 profile_gallery = PhotoGallery(owner_user_id=user.id) 

232 session.add(profile_gallery) 

233 session.flush() 

234 user.profile_gallery_id = profile_gallery.id 

235 

236 for region in regions_visited: 

237 session.add(RegionVisited(user_id=user.id, region_code=region)) 

238 

239 for region in regions_lived: 

240 session.add(RegionLived(user_id=user.id, region_code=region)) 

241 

242 for lang, fluency in language_abilities: 

243 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency)) 

244 

245 # this expires the user, so now it's "dirty" 

246 context = cast(CouchersContext, _MockCouchersContext()) 

247 token, _ = create_session(context, session, user, False, set_cookie=False) 

248 

249 # deleted user aborts session creation, hence this follows and necessitates a second commit 

250 if delete_user: 

251 user.deleted_at = now() 

252 

253 user.recommendation_score = 1e10 - user.id 

254 

255 if complete_profile: 

256 key = random_hex(32) 

257 session.add( 

258 Upload( 

259 key=key, 

260 filename=random_hex(32) + ".jpg", 

261 creator_user_id=user.id, 

262 ) 

263 ) 

264 session.add( 

265 PhotoGalleryItem( 

266 gallery_id=profile_gallery.id, 

267 upload_key=key, 

268 position=0, 

269 ) 

270 ) 

271 session.flush() 

272 

273 user.about_me = "I have a complete profile!\n" * 20 

274 

275 if strong_verification: 

276 attempt = StrongVerificationAttempt( 

277 verification_attempt_token=f"verification_attempt_token_{user.id}", 

278 user_id=user.id, 

279 status=StrongVerificationAttemptStatus.succeeded, 

280 has_full_data=True, 

281 passport_encrypted_data=b"not real", 

282 passport_date_of_birth=user.birthdate, 

283 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

284 user.gender, PassportSex.unspecified 

285 ), 

286 has_minimal_data=True, 

287 passport_expiry_date=date.today() + timedelta(days=10), 

288 passport_nationality="UTO", 

289 passport_last_three_document_chars=f"{user.id:03}", 

290 iris_token=f"iris_token_{user.id}", 

291 iris_session_id=user.id, 

292 ) 

293 session.add(attempt) 

294 session.flush() 

295 assert attempt.has_strong_verification(user) 

296 

297 session.commit() 

298 

299 assert has_completed_profile(session, user) == complete_profile 

300 

301 # refresh it, undoes the expiry 

302 session.refresh(user) 

303 

304 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

305 user.timezone # noqa: B018 

306 

307 # allows detaches the user from the session, allowing its use outside this session 

308 session.expunge(user) 

309 

310 return user, token 

311 

312 

313def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]: 

314 user_id = session.execute(select(User.id).where(User.username == username)).scalar_one() 

315 token = session.execute(select(UserSession.token).where(UserSession.user_id == user_id)).scalar_one() 

316 return user_id, token 

317 

318 

319def make_friends(user1: User, user2: User) -> None: 

320 with session_scope() as session: 

321 # Create moderation state with VISIBLE status (approved friendship for tests) 

322 moderation_state = ModerationState( 

323 object_type=ModerationObjectType.friend_request, 

324 object_id=0, # Placeholder, will be updated 

325 visibility=ModerationVisibility.visible, 

326 ) 

327 session.add(moderation_state) 

328 session.flush() 

329 

330 friend_relationship = FriendRelationship( 

331 from_user_id=user1.id, 

332 to_user_id=user2.id, 

333 status=FriendStatus.accepted, 

334 moderation_state_id=moderation_state.id, 

335 ) 

336 session.add(friend_relationship) 

337 session.flush() 

338 

339 # Update the moderation state with the actual object id 

340 moderation_state.object_id = friend_relationship.id 

341 

342 

343def make_user_block(user1: User, user2: User) -> None: 

344 with session_scope() as session: 

345 user_block = UserBlock( 

346 blocking_user_id=user1.id, 

347 blocked_user_id=user2.id, 

348 ) 

349 session.add(user_block) 

350 

351 

352def make_user_invisible(user_id: int) -> None: 

353 with session_scope() as session: 

354 session.execute(update(User).where(User.id == user_id).values(banned_at=func.now())) 

355 

356 

357# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

358def get_friend_relationship(user1: User, user2: User) -> FriendRelationship | None: 

359 with session_scope() as session: 

360 friend_relationship = session.execute( 

361 select(FriendRelationship).where( 

362 or_( 

363 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

364 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

365 ) 

366 ) 

367 ).scalar_one_or_none() 

368 

369 session.expunge(friend_relationship) 

370 return friend_relationship 

371 

372 

373def add_users_to_new_moderation_list(users: list[User]) -> int: 

374 """Group users as duplicated accounts""" 

375 with session_scope() as session: 

376 moderation_user_list = ModerationUserList() 

377 session.add(moderation_user_list) 

378 session.flush() 

379 for user in users: 

380 refreshed_user = session.get_one(User, user.id) 

381 moderation_user_list.users.append(refreshed_user) 

382 return moderation_user_list.id 

383 

384 

385def pg_dump_is_available() -> bool: 

386 result = subprocess.run(["which", "pg_dump"], stdout=subprocess.PIPE, encoding="ascii") 

387 return result.returncode == 0 

388 

389 

390def make_volunteer(started_volunteering: date, show_on_team_page: bool = True, **kwargs: Any) -> Volunteer: 

391 vol = Volunteer(show_on_team_page=show_on_team_page, **kwargs) 

392 vol.started_volunteering = started_volunteering 

393 

394 return vol