Coverage for app / backend / src / tests / fixtures / db.py: 97%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import subprocess 

2from collections.abc import Sequence 

3from contextlib import contextmanager 

4from datetime import date, timedelta 

5from pathlib import Path 

6from typing import Any, cast 

7 

8from sqlalchemy import Connection, Engine, create_engine, or_, select, text, update 

9from sqlalchemy.orm import Session 

10 

11from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

12from couchers.context import CouchersContext 

13from couchers.crypto import random_hex 

14from couchers.db import _get_base_engine, session_scope 

15from couchers.helpers.completed_profile import has_completed_profile 

16from couchers.models import ( 

17 Base, 

18 FriendRelationship, 

19 FriendStatus, 

20 HostingStatus, 

21 LanguageAbility, 

22 LanguageFluency, 

23 ModerationUserList, 

24 PassportSex, 

25 PhotoGallery, 

26 PhotoGalleryItem, 

27 RegionLived, 

28 RegionVisited, 

29 StrongVerificationAttempt, 

30 StrongVerificationAttemptStatus, 

31 Upload, 

32 User, 

33 UserBlock, 

34 UserSession, 

35 Volunteer, 

36) 

37from couchers.servicers.auth import create_session 

38from couchers.utils import create_coordinate, now 

39from tests.fixtures.sessions import _MockCouchersContext 

40 

41 

42def create_schema_from_models(engine: Engine | None = None) -> None: 

43 """ 

44 Create everything from the current models, not incrementally 

45 through migrations. 

46 """ 

47 if engine is None: 

48 engine = _get_base_engine() 

49 

50 # create sql functions (these are created in migrations otherwise) 

51 functions = Path(__file__).parent / "sql_functions.sql" 

52 with open(functions) as f, engine.connect() as conn: 

53 conn.execute(text(f.read())) 

54 conn.commit() 

55 

56 Base.metadata.create_all(engine) 

57 

58 

59def populate_testing_resources(conn: Connection) -> None: 

60 """ 

61 Testing version of couchers.resources.copy_resources_to_database 

62 """ 

63 conn.execute( 

64 text(""" 

65 INSERT INTO regions (code, name) VALUES 

66 ('AUS', 'Australia'), 

67 ('CAN', 'Canada'), 

68 ('CHE', 'Switzerland'), 

69 ('CUB', 'Cuba'), 

70 ('CXR', 'Christmas Island'), 

71 ('CZE', 'Czechia'), 

72 ('DEU', 'Germany'), 

73 ('EGY', 'Egypt'), 

74 ('ESP', 'Spain'), 

75 ('EST', 'Estonia'), 

76 ('FIN', 'Finland'), 

77 ('FRA', 'France'), 

78 ('GBR', 'United Kingdom'), 

79 ('GEO', 'Georgia'), 

80 ('GHA', 'Ghana'), 

81 ('GRC', 'Greece'), 

82 ('HKG', 'Hong Kong'), 

83 ('IRL', 'Ireland'), 

84 ('ISR', 'Israel'), 

85 ('ITA', 'Italy'), 

86 ('JPN', 'Japan'), 

87 ('LAO', 'Laos'), 

88 ('MEX', 'Mexico'), 

89 ('MMR', 'Myanmar'), 

90 ('NAM', 'Namibia'), 

91 ('NLD', 'Netherlands'), 

92 ('NZL', 'New Zealand'), 

93 ('POL', 'Poland'), 

94 ('PRK', 'North Korea'), 

95 ('REU', 'Réunion'), 

96 ('SGP', 'Singapore'), 

97 ('SWE', 'Sweden'), 

98 ('THA', 'Thailand'), 

99 ('TUR', 'Turkey'), 

100 ('TWN', 'Taiwan'), 

101 ('USA', 'United States'), 

102 ('VNM', 'Vietnam'); 

103 """) 

104 ) 

105 

106 # Insert languages as textual SQL 

107 conn.execute( 

108 text(""" 

109 INSERT INTO languages (code, name) VALUES 

110 ('arb', 'Arabic (Standard)'), 

111 ('deu', 'German'), 

112 ('eng', 'English'), 

113 ('fin', 'Finnish'), 

114 ('fra', 'French'), 

115 ('heb', 'Hebrew'), 

116 ('hun', 'Hungarian'), 

117 ('jpn', 'Japanese'), 

118 ('pol', 'Polish'), 

119 ('swe', 'Swedish'), 

120 ('cmn', 'Chinese (Mandarin)') 

121 """) 

122 ) 

123 

124 with open(Path(__file__).parent.parent.parent.parent / "resources" / "timezone_areas.sql-fake", "r") as f: 

125 tz_sql = f.read() 

126 

127 conn.execute(text(tz_sql)) 

128 

129 

130def drop_database() -> None: 

131 with session_scope() as session: 

132 # postgis is required for all the Geographic Information System (GIS) stuff 

133 # pg_trgm is required for trigram-based search 

134 # btree_gist is required for gist-based exclusion constraints 

135 session.execute( 

136 text( 

137 "DROP SCHEMA IF EXISTS public CASCADE;" 

138 "DROP SCHEMA IF EXISTS logging CASCADE;" 

139 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

140 "CREATE SCHEMA IF NOT EXISTS public;" 

141 "CREATE SCHEMA IF NOT EXISTS logging;" 

142 "CREATE EXTENSION postgis;" 

143 "CREATE EXTENSION pg_trgm;" 

144 "CREATE EXTENSION btree_gist;" 

145 ) 

146 ) 

147 

148 

149@contextmanager 

150def autocommit_engine(url: str): 

151 """ 

152 An engine that executes every statement in a transaction. Mainly needed 

153 because CREATE/DROP DATABASE cannot be executed any other way. 

154 """ 

155 engine = create_engine( 

156 url, 

157 isolation_level="AUTOCOMMIT", 

158 ) 

159 yield engine 

160 engine.dispose() 

161 

162 

163def make_user(**kwargs: Any) -> User: 

164 username = "test_user_" + random_hex(16) 

165 

166 user = User( 

167 username=username, 

168 email=f"{username}@dev.couchers.org", 

169 hashed_password=b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

170 name=username.capitalize(), 

171 hosting_status=HostingStatus.cant_host, 

172 city="Testing city", 

173 hometown="Test hometown", 

174 community_standing=0.5, 

175 birthdate=date(year=2000, month=1, day=1), 

176 gender="Woman", 

177 pronouns="", 

178 occupation="Tester", 

179 education="UST(esting)", 

180 about_me="I test things", 

181 things_i_like="Code", 

182 about_place="My place has a lot of testing paraphenelia", 

183 additional_information="I can be a bit testy", 

184 accepted_tos=TOS_VERSION, 

185 geom=create_coordinate(40.7108, -73.9740), 

186 geom_radius=100, 

187 last_onboarding_email_sent=now(), 

188 last_donated=now(), 

189 ) 

190 user.accepted_community_guidelines = GUIDELINES_VERSION 

191 user.onboarding_emails_sent = 1 

192 

193 # Ensure superusers are also editors (DB constraint) 

194 if kwargs.get("is_superuser") and "is_editor" not in kwargs: 

195 kwargs["is_editor"] = True 

196 

197 for key, value in kwargs.items(): 

198 setattr(user, key, value) 

199 

200 return user 

201 

202 

203def generate_user( 

204 *, 

205 delete_user=False, 

206 complete_profile=True, 

207 strong_verification=False, 

208 regions_visited: Sequence[str] = (), 

209 regions_lived: Sequence[str] = (), 

210 language_abilities: Sequence[tuple[str, LanguageFluency]] = (), 

211 **kwargs: Any, 

212) -> tuple[User, str]: 

213 """ 

214 Create a new user, return session token 

215 

216 The user is detached from any session, and you can access its static attributes, but you can't modify it 

217 

218 Use this most of the time 

219 """ 

220 with session_scope() as session: 

221 user = make_user(**kwargs) 

222 

223 session.add(user) 

224 session.flush() 

225 

226 # Create a profile gallery for the user and link it 

227 profile_gallery = PhotoGallery(owner_user_id=user.id) 

228 session.add(profile_gallery) 

229 session.flush() 

230 user.profile_gallery_id = profile_gallery.id 

231 

232 for region in regions_visited: 

233 session.add(RegionVisited(user_id=user.id, region_code=region)) 

234 

235 for region in regions_lived: 

236 session.add(RegionLived(user_id=user.id, region_code=region)) 

237 

238 for lang, fluency in language_abilities: 

239 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency)) 

240 

241 # this expires the user, so now it's "dirty" 

242 context = cast(CouchersContext, _MockCouchersContext()) 

243 token, _ = create_session(context, session, user, False, set_cookie=False) 

244 

245 # deleted user aborts session creation, hence this follows and necessitates a second commit 

246 if delete_user: 

247 user.is_deleted = True 

248 

249 user.recommendation_score = 1e10 - user.id 

250 

251 if complete_profile: 

252 key = random_hex(32) 

253 session.add( 

254 Upload( 

255 key=key, 

256 filename=random_hex(32) + ".jpg", 

257 creator_user_id=user.id, 

258 ) 

259 ) 

260 session.add( 

261 PhotoGalleryItem( 

262 gallery_id=profile_gallery.id, 

263 upload_key=key, 

264 position=0, 

265 ) 

266 ) 

267 session.flush() 

268 

269 user.about_me = "I have a complete profile!\n" * 20 

270 

271 if strong_verification: 

272 attempt = StrongVerificationAttempt( 

273 verification_attempt_token=f"verification_attempt_token_{user.id}", 

274 user_id=user.id, 

275 status=StrongVerificationAttemptStatus.succeeded, 

276 has_full_data=True, 

277 passport_encrypted_data=b"not real", 

278 passport_date_of_birth=user.birthdate, 

279 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

280 user.gender, PassportSex.unspecified 

281 ), 

282 has_minimal_data=True, 

283 passport_expiry_date=date.today() + timedelta(days=10), 

284 passport_nationality="UTO", 

285 passport_last_three_document_chars=f"{user.id:03}", 

286 iris_token=f"iris_token_{user.id}", 

287 iris_session_id=user.id, 

288 ) 

289 session.add(attempt) 

290 session.flush() 

291 assert attempt.has_strong_verification(user) 

292 

293 session.commit() 

294 

295 assert has_completed_profile(session, user) == complete_profile 

296 

297 # refresh it, undoes the expiry 

298 session.refresh(user) 

299 

300 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

301 user.timezone # noqa: B018 

302 

303 # allows detaches the user from the session, allowing its use outside this session 

304 session.expunge(user) 

305 

306 return user, token 

307 

308 

309def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]: 

310 user_id = session.execute(select(User.id).where(User.username == username)).scalar_one() 

311 token = session.execute(select(UserSession.token).where(UserSession.user_id == user_id)).scalar_one() 

312 return user_id, token 

313 

314 

315def make_friends(user1: User, user2: User) -> None: 

316 with session_scope() as session: 

317 friend_relationship = FriendRelationship( 

318 from_user_id=user1.id, 

319 to_user_id=user2.id, 

320 status=FriendStatus.accepted, 

321 ) 

322 session.add(friend_relationship) 

323 

324 

325def make_user_block(user1: User, user2: User) -> None: 

326 with session_scope() as session: 

327 user_block = UserBlock( 

328 blocking_user_id=user1.id, 

329 blocked_user_id=user2.id, 

330 ) 

331 session.add(user_block) 

332 

333 

334def make_user_invisible(user_id: int) -> None: 

335 with session_scope() as session: 

336 session.execute(update(User).where(User.id == user_id).values(is_banned=True)) 

337 

338 

339# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

340def get_friend_relationship(user1: User, user2: User) -> FriendRelationship | None: 

341 with session_scope() as session: 

342 friend_relationship = session.execute( 

343 select(FriendRelationship).where( 

344 or_( 

345 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

346 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

347 ) 

348 ) 

349 ).scalar_one_or_none() 

350 

351 session.expunge(friend_relationship) 

352 return friend_relationship 

353 

354 

355def add_users_to_new_moderation_list(users: list[User]) -> int: 

356 """Group users as duplicated accounts""" 

357 with session_scope() as session: 

358 moderation_user_list = ModerationUserList() 

359 session.add(moderation_user_list) 

360 session.flush() 

361 for user in users: 

362 refreshed_user = session.get_one(User, user.id) 

363 moderation_user_list.users.append(refreshed_user) 

364 return moderation_user_list.id 

365 

366 

367def pg_dump_is_available() -> bool: 

368 result = subprocess.run(["which", "pg_dump"], stdout=subprocess.PIPE, encoding="ascii") 

369 return result.returncode == 0 

370 

371 

372def make_volunteer(started_volunteering: date, show_on_team_page: bool = True, **kwargs: Any) -> Volunteer: 

373 vol = Volunteer(show_on_team_page=show_on_team_page, **kwargs) 

374 vol.started_volunteering = started_volunteering 

375 

376 return vol