Coverage for app / backend / src / tests / fixtures / db.py: 100%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import subprocess 

2from collections.abc import Sequence 

3from contextlib import contextmanager 

4from datetime import date, timedelta 

5from pathlib import Path 

6from typing import Any, cast 

7 

8from sqlalchemy import Connection, Engine, create_engine, func, or_, select, text, update 

9from sqlalchemy.orm import Session 

10 

11from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

12from couchers.context import CouchersContext 

13from couchers.crypto import random_hex 

14from couchers.db import _get_base_engine, session_scope 

15from couchers.helpers.completed_profile import has_completed_profile 

16from couchers.models import ( 

17 Base, 

18 FriendRelationship, 

19 FriendStatus, 

20 HostingStatus, 

21 LanguageAbility, 

22 LanguageFluency, 

23 ModerationObjectType, 

24 ModerationState, 

25 ModerationUserList, 

26 ModerationVisibility, 

27 PassportSex, 

28 PhotoGallery, 

29 PhotoGalleryItem, 

30 RegionLived, 

31 RegionVisited, 

32 StrongVerificationAttempt, 

33 StrongVerificationAttemptStatus, 

34 Upload, 

35 User, 

36 UserBlock, 

37 UserSession, 

38 Volunteer, 

39) 

40from couchers.servicers.auth import create_session 

41from couchers.utils import create_coordinate, now 

42from tests.fixtures.sessions import _MockCouchersContext 

43 

44 

45def create_schema_from_models(engine: Engine | None = None) -> None: 

46 """ 

47 Create everything from the current models, not incrementally 

48 through migrations. 

49 """ 

50 if engine is None: 

51 engine = _get_base_engine() 

52 

53 # create sql functions (these are created in migrations otherwise) 

54 functions = Path(__file__).parent / "sql_functions.sql" 

55 with open(functions) as f, engine.connect() as conn: 

56 conn.execute(text(f.read())) 

57 conn.commit() 

58 

59 Base.metadata.create_all(engine) 

60 

61 

62def populate_testing_resources(conn: Connection) -> None: 

63 """ 

64 Testing version of couchers.resources.copy_resources_to_database 

65 """ 

66 conn.execute( 

67 text(""" 

68 INSERT INTO regions (code, name) VALUES 

69 ('AUS', 'Australia'), 

70 ('CAN', 'Canada'), 

71 ('CHE', 'Switzerland'), 

72 ('CUB', 'Cuba'), 

73 ('CXR', 'Christmas Island'), 

74 ('CZE', 'Czechia'), 

75 ('DEU', 'Germany'), 

76 ('EGY', 'Egypt'), 

77 ('ESP', 'Spain'), 

78 ('EST', 'Estonia'), 

79 ('FIN', 'Finland'), 

80 ('FRA', 'France'), 

81 ('GBR', 'United Kingdom'), 

82 ('GEO', 'Georgia'), 

83 ('GHA', 'Ghana'), 

84 ('GRC', 'Greece'), 

85 ('HKG', 'Hong Kong'), 

86 ('IRL', 'Ireland'), 

87 ('ISR', 'Israel'), 

88 ('ITA', 'Italy'), 

89 ('JPN', 'Japan'), 

90 ('LAO', 'Laos'), 

91 ('MEX', 'Mexico'), 

92 ('MMR', 'Myanmar'), 

93 ('NAM', 'Namibia'), 

94 ('NLD', 'Netherlands'), 

95 ('NZL', 'New Zealand'), 

96 ('POL', 'Poland'), 

97 ('PRK', 'North Korea'), 

98 ('REU', 'Réunion'), 

99 ('SGP', 'Singapore'), 

100 ('SWE', 'Sweden'), 

101 ('THA', 'Thailand'), 

102 ('TUR', 'Turkey'), 

103 ('TWN', 'Taiwan'), 

104 ('USA', 'United States'), 

105 ('VNM', 'Vietnam'); 

106 """) 

107 ) 

108 

109 # Insert languages as textual SQL 

110 conn.execute( 

111 text(""" 

112 INSERT INTO languages (code, name) VALUES 

113 ('arb', 'Arabic (Standard)'), 

114 ('deu', 'German'), 

115 ('eng', 'English'), 

116 ('fin', 'Finnish'), 

117 ('fra', 'French'), 

118 ('heb', 'Hebrew'), 

119 ('hun', 'Hungarian'), 

120 ('jpn', 'Japanese'), 

121 ('pol', 'Polish'), 

122 ('swe', 'Swedish'), 

123 ('cmn', 'Chinese (Mandarin)') 

124 """) 

125 ) 

126 

127 with open(Path(__file__).parent.parent.parent.parent / "resources" / "timezone_areas.sql-fake", "r") as f: 

128 tz_sql = f.read() 

129 

130 conn.execute(text(tz_sql)) 

131 

132 

133def drop_database() -> None: 

134 with session_scope() as session: 

135 # postgis is required for all the Geographic Information System (GIS) stuff 

136 # pg_trgm is required for trigram-based search 

137 # btree_gist is required for gist-based exclusion constraints 

138 session.execute( 

139 text( 

140 "DROP SCHEMA IF EXISTS public CASCADE;" 

141 "DROP SCHEMA IF EXISTS logging CASCADE;" 

142 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

143 "CREATE SCHEMA IF NOT EXISTS public;" 

144 "CREATE SCHEMA IF NOT EXISTS logging;" 

145 "CREATE EXTENSION postgis;" 

146 "CREATE EXTENSION pg_trgm;" 

147 "CREATE EXTENSION btree_gist;" 

148 ) 

149 ) 

150 

151 

152@contextmanager 

153def autocommit_engine(url: str): 

154 """ 

155 An engine that executes every statement in a transaction. Mainly needed 

156 because CREATE/DROP DATABASE cannot be executed any other way. 

157 """ 

158 engine = create_engine( 

159 url, 

160 isolation_level="AUTOCOMMIT", 

161 ) 

162 yield engine 

163 engine.dispose() 

164 

165 

166def make_user(**kwargs: Any) -> User: 

167 username = "test_user_" + random_hex(16) 

168 

169 user = User( 

170 username=username, 

171 email=f"{username}@dev.couchers.org", 

172 hashed_password=b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

173 name=username.capitalize(), 

174 hosting_status=HostingStatus.cant_host, 

175 city="Testing city", 

176 hometown="Test hometown", 

177 community_standing=0.5, 

178 birthdate=date(year=2000, month=1, day=1), 

179 gender="Woman", 

180 pronouns="", 

181 occupation="Tester", 

182 education="UST(esting)", 

183 about_me="I test things", 

184 things_i_like="Code", 

185 about_place="My place has a lot of testing paraphenelia", 

186 additional_information="I can be a bit testy", 

187 accepted_tos=TOS_VERSION, 

188 geom=create_coordinate(40.7108, -73.9740), 

189 geom_radius=100, 

190 last_onboarding_email_sent=now(), 

191 last_donated=now(), 

192 ) 

193 user.accepted_community_guidelines = GUIDELINES_VERSION 

194 user.onboarding_emails_sent = 1 

195 

196 # Ensure superusers are also editors (DB constraint) 

197 if kwargs.get("is_superuser") and "is_editor" not in kwargs: 

198 kwargs["is_editor"] = True 

199 

200 for key, value in kwargs.items(): 

201 setattr(user, key, value) 

202 

203 return user 

204 

205 

206def generate_user( 

207 *, 

208 delete_user=False, 

209 complete_profile=True, 

210 strong_verification=False, 

211 regions_visited: Sequence[str] = (), 

212 regions_lived: Sequence[str] = (), 

213 language_abilities: Sequence[tuple[str, LanguageFluency]] = (), 

214 **kwargs: Any, 

215) -> tuple[User, str]: 

216 """ 

217 Create a new user, return session token 

218 

219 The user is detached from any session, and you can access its static attributes, but you can't modify it 

220 

221 Use this most of the time 

222 """ 

223 with session_scope() as session: 

224 user = make_user(**kwargs) 

225 

226 session.add(user) 

227 session.flush() 

228 

229 # Create a profile gallery for the user and link it 

230 profile_gallery = PhotoGallery(owner_user_id=user.id) 

231 session.add(profile_gallery) 

232 session.flush() 

233 user.profile_gallery_id = profile_gallery.id 

234 

235 for region in regions_visited: 

236 session.add(RegionVisited(user_id=user.id, region_code=region)) 

237 

238 for region in regions_lived: 

239 session.add(RegionLived(user_id=user.id, region_code=region)) 

240 

241 for lang, fluency in language_abilities: 

242 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency)) 

243 

244 # this expires the user, so now it's "dirty" 

245 context = cast(CouchersContext, _MockCouchersContext()) 

246 token, _ = create_session(context, session, user, False, set_cookie=False) 

247 

248 # deleted user aborts session creation, hence this follows and necessitates a second commit 

249 if delete_user: 

250 user.deleted_at = now() 

251 

252 user.recommendation_score = 1e10 - user.id 

253 

254 if complete_profile: 

255 key = random_hex(32) 

256 session.add( 

257 Upload( 

258 key=key, 

259 filename=random_hex(32) + ".jpg", 

260 creator_user_id=user.id, 

261 ) 

262 ) 

263 session.add( 

264 PhotoGalleryItem( 

265 gallery_id=profile_gallery.id, 

266 upload_key=key, 

267 position=0, 

268 ) 

269 ) 

270 session.flush() 

271 

272 user.about_me = "I have a complete profile!\n" * 20 

273 

274 if strong_verification: 

275 attempt = StrongVerificationAttempt( 

276 verification_attempt_token=f"verification_attempt_token_{user.id}", 

277 user_id=user.id, 

278 status=StrongVerificationAttemptStatus.succeeded, 

279 has_full_data=True, 

280 passport_encrypted_data=b"not real", 

281 passport_date_of_birth=user.birthdate, 

282 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

283 user.gender, PassportSex.unspecified 

284 ), 

285 has_minimal_data=True, 

286 passport_expiry_date=date.today() + timedelta(days=10), 

287 passport_nationality="UTO", 

288 passport_last_three_document_chars=f"{user.id:03}", 

289 iris_token=f"iris_token_{user.id}", 

290 iris_session_id=user.id, 

291 ) 

292 session.add(attempt) 

293 session.flush() 

294 assert attempt.has_strong_verification(user) 

295 

296 session.commit() 

297 

298 assert has_completed_profile(session, user) == complete_profile 

299 

300 # refresh it, undoes the expiry 

301 session.refresh(user) 

302 

303 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

304 user.timezone # noqa: B018 

305 

306 # allows detaches the user from the session, allowing its use outside this session 

307 session.expunge(user) 

308 

309 return user, token 

310 

311 

312def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]: 

313 user_id = session.execute(select(User.id).where(User.username == username)).scalar_one() 

314 token = session.execute(select(UserSession.token).where(UserSession.user_id == user_id)).scalar_one() 

315 return user_id, token 

316 

317 

318def make_friends(user1: User, user2: User) -> None: 

319 with session_scope() as session: 

320 # Create moderation state with VISIBLE status (approved friendship for tests) 

321 moderation_state = ModerationState( 

322 object_type=ModerationObjectType.friend_request, 

323 object_id=0, # Placeholder, will be updated 

324 visibility=ModerationVisibility.visible, 

325 ) 

326 session.add(moderation_state) 

327 session.flush() 

328 

329 friend_relationship = FriendRelationship( 

330 from_user_id=user1.id, 

331 to_user_id=user2.id, 

332 status=FriendStatus.accepted, 

333 moderation_state_id=moderation_state.id, 

334 ) 

335 session.add(friend_relationship) 

336 session.flush() 

337 

338 # Update the moderation state with the actual object id 

339 moderation_state.object_id = friend_relationship.id 

340 

341 

342def make_user_block(user1: User, user2: User) -> None: 

343 with session_scope() as session: 

344 user_block = UserBlock( 

345 blocking_user_id=user1.id, 

346 blocked_user_id=user2.id, 

347 ) 

348 session.add(user_block) 

349 

350 

351def make_user_invisible(user_id: int) -> None: 

352 with session_scope() as session: 

353 session.execute(update(User).where(User.id == user_id).values(banned_at=func.now())) 

354 

355 

356# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

357def get_friend_relationship(user1: User, user2: User) -> FriendRelationship | None: 

358 with session_scope() as session: 

359 friend_relationship = session.execute( 

360 select(FriendRelationship).where( 

361 or_( 

362 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

363 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

364 ) 

365 ) 

366 ).scalar_one_or_none() 

367 

368 session.expunge(friend_relationship) 

369 return friend_relationship 

370 

371 

372def add_users_to_new_moderation_list(users: list[User]) -> int: 

373 """Group users as duplicated accounts""" 

374 with session_scope() as session: 

375 moderation_user_list = ModerationUserList() 

376 session.add(moderation_user_list) 

377 session.flush() 

378 for user in users: 

379 refreshed_user = session.get_one(User, user.id) 

380 moderation_user_list.users.append(refreshed_user) 

381 return moderation_user_list.id 

382 

383 

384def pg_dump_is_available() -> bool: 

385 result = subprocess.run(["which", "pg_dump"], stdout=subprocess.PIPE, encoding="ascii") 

386 return result.returncode == 0 

387 

388 

389def make_volunteer(started_volunteering: date, show_on_team_page: bool = True, **kwargs: Any) -> Volunteer: 

390 vol = Volunteer(show_on_team_page=show_on_team_page, **kwargs) 

391 vol.started_volunteering = started_volunteering 

392 

393 return vol