Coverage for src/tests/test_strong_verification.py: 99%

373 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-09 11:32 +0000

1import json 

2from datetime import date, timedelta 

3from unittest.mock import ANY, patch 

4from urllib.parse import urlencode 

5 

6import grpc 

7import pytest 

8from google.protobuf import empty_pb2 

9from sqlalchemy import update 

10from sqlalchemy.sql import or_ 

11 

12import couchers.jobs.handlers 

13import couchers.servicers.account 

14from couchers.config import config 

15from couchers.crypto import asym_decrypt, b64encode_unpadded 

16from couchers.db import session_scope 

17from couchers.jobs.handlers import update_badges 

18from couchers.jobs.worker import process_job 

19from couchers.materialized_views import refresh_materialized_views_rapid 

20from couchers.models import ( 

21 PassportSex, 

22 StrongVerificationAttempt, 

23 StrongVerificationAttemptStatus, 

24 StrongVerificationCallbackEvent, 

25 User, 

26) 

27from couchers.proto import account_pb2, admin_pb2, api_pb2 

28from couchers.proto.google.api import httpbody_pb2 

29from couchers.sql import couchers_select as select 

30from tests.test_fixtures import ( # noqa 

31 account_session, 

32 api_session, 

33 db, 

34 generate_user, 

35 push_collector, 

36 real_admin_session, 

37 real_iris_session, 

38 testconfig, 

39) 

40 

41 

42@pytest.fixture(autouse=True) 

43def _(testconfig): 

44 pass 

45 

46 

47def _emulate_iris_callback(session_id, session_state, reference): 

48 assert session_state in ["CREATED", "INITIATED", "FAILED", "ABORTED", "COMPLETED", "REJECTED", "APPROVED"] 

49 with real_iris_session() as iris: 

50 data = json.dumps( 

51 {"session_id": session_id, "session_state": session_state, "session_reference": reference} 

52 ).encode("ascii") 

53 iris.Webhook(httpbody_pb2.HttpBody(content_type="application/json", data=data)) 

54 

55 

56default_expiry = date.today() + timedelta(days=5 * 365) 

57 

58 

59def do_and_check_sv( 

60 user, 

61 token, 

62 verification_id, 

63 sex, 

64 dob, 

65 document_type, 

66 document_number, 

67 document_expiry, 

68 nationality, 

69 return_after=None, 

70): 

71 iris_token_data = { 

72 "merchant_id": 5731012934821982, 

73 "session_id": verification_id, 

74 "seed": 1674246339, 

75 "face_verification": False, 

76 "host": "https://passportreader.app", 

77 } 

78 iris_token = b64encode_unpadded(json.dumps(iris_token_data).encode("utf8")) 

79 

80 with account_session(token) as account: 

81 # start by initiation 

82 with patch("couchers.servicers.account.requests.post") as mock: 

83 json_resp1 = { 

84 "id": verification_id, 

85 "token": iris_token, 

86 } 

87 mock.return_value = type( 

88 "__MockResponse", 

89 (), 

90 { 

91 "status_code": 200, 

92 "text": json.dumps(json_resp1), 

93 "json": lambda: json_resp1, 

94 }, 

95 ) 

96 res = account.InitiateStrongVerification(empty_pb2.Empty()) 

97 mock.assert_called_once_with( 

98 "https://passportreader.app/api/v1/session.create", 

99 auth=("dummy_pubkey", "dummy_secret"), 

100 json={ 

101 "callback_url": "http://localhost:8888/iris/webhook", 

102 "face_verification": False, 

103 "passport_only": True, 

104 "reference": ANY, 

105 }, 

106 timeout=10, 

107 verify="/etc/ssl/certs/ca-certificates.crt", 

108 ) 

109 reference_data = mock.call_args.kwargs["json"]["reference"] 

110 verification_attempt_token = res.verification_attempt_token 

111 return_url = f"http://localhost:3000/complete-strong-verification?verification_attempt_token={verification_attempt_token}" 

112 assert res.redirect_url == "https://passportreader.app/open?" + urlencode( 

113 {"token": iris_token, "redirect_url": return_url} 

114 ) 

115 

116 assert ( 

117 account.GetStrongVerificationAttemptStatus( 

118 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

119 ).status 

120 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP 

121 ) 

122 

123 # ok, now the user downloads the app, scans their id, and Iris ID sends callbacks to the server 

124 _emulate_iris_callback(verification_id, "INITIATED", reference_data) 

125 

126 with account_session(token) as account: 

127 assert ( 

128 account.GetStrongVerificationAttemptStatus( 

129 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

130 ).status 

131 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP 

132 ) 

133 

134 if return_after == "INITIATED": 

135 return reference_data 

136 

137 _emulate_iris_callback(verification_id, "COMPLETED", reference_data) 

138 

139 with account_session(token) as account: 

140 assert ( 

141 account.GetStrongVerificationAttemptStatus( 

142 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

143 ).status 

144 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

145 ) 

146 

147 if return_after == "COMPLETED": 

148 return reference_data 

149 

150 _emulate_iris_callback(verification_id, "APPROVED", reference_data) 

151 

152 with account_session(token) as account: 

153 assert ( 

154 account.GetStrongVerificationAttemptStatus( 

155 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

156 ).status 

157 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

158 ) 

159 

160 if return_after == "APPROVED": 

161 return reference_data 

162 

163 with patch("couchers.jobs.handlers.requests.post") as mock: 

164 json_resp2 = { 

165 "id": verification_id, 

166 "created": "2024-05-11T15:46:46Z", 

167 "expires": "2024-05-11T16:17:26Z", 

168 "state": "APPROVED", 

169 "reference": reference_data, 

170 "user_ip": "10.123.123.123", 

171 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

172 "given_names": "John Wayne", 

173 "surname": "Doe", 

174 "nationality": nationality, 

175 "sex": sex, 

176 "date_of_birth": dob, 

177 "document_type": document_type, 

178 "document_number": document_number, 

179 "expiry_date": document_expiry.isoformat(), 

180 "issuing_country": nationality, 

181 "issuer": "Department of State, U.S. Government", 

182 "portrait": "dGVzdHRlc3R0ZXN0...", 

183 } 

184 mock.return_value = type( 

185 "__MockResponse", 

186 (), 

187 { 

188 "status_code": 200, 

189 "text": json.dumps(json_resp2), 

190 "json": lambda: json_resp2, 

191 }, 

192 ) 

193 while process_job(): 

194 pass 

195 

196 mock.assert_called_once_with( 

197 "https://passportreader.app/api/v1/session.get", 

198 auth=("dummy_pubkey", "dummy_secret"), 

199 json={"id": verification_id}, 

200 timeout=10, 

201 verify="/etc/ssl/certs/ca-certificates.crt", 

202 ) 

203 

204 with account_session(token) as account: 

205 assert ( 

206 account.GetStrongVerificationAttemptStatus( 

207 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

208 ).status 

209 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED 

210 ) 

211 

212 with session_scope() as session: 

213 verification_attempt = session.execute( 

214 select(StrongVerificationAttempt).where( 

215 StrongVerificationAttempt.verification_attempt_token == verification_attempt_token 

216 ) 

217 ).scalar_one() 

218 assert verification_attempt.user_id == user.id 

219 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

220 assert verification_attempt.has_full_data 

221 assert verification_attempt.passport_encrypted_data 

222 # assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

223 # assert verification_attempt.passport_sex == PassportSex.male 

224 assert verification_attempt.has_minimal_data 

225 assert verification_attempt.passport_expiry_date == document_expiry 

226 assert verification_attempt.passport_nationality == nationality 

227 assert verification_attempt.passport_last_three_document_chars == document_number[-3:] 

228 assert verification_attempt.iris_token == iris_token 

229 assert verification_attempt.iris_session_id == verification_id 

230 

231 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

232 decrypted_data = json.loads(asym_decrypt(private_key, verification_attempt.passport_encrypted_data)) 

233 assert decrypted_data == json_resp2 

234 

235 callbacks = ( 

236 session.execute( 

237 select(StrongVerificationCallbackEvent.iris_status) 

238 .where(StrongVerificationCallbackEvent.verification_attempt_id == verification_attempt.id) 

239 .order_by(StrongVerificationCallbackEvent.created.asc()) 

240 ) 

241 .scalars() 

242 .all() 

243 ) 

244 assert callbacks == ["INITIATED", "COMPLETED", "APPROVED"] 

245 

246 

247def monkeypatch_sv_config(monkeypatch): 

248 new_config = config.copy() 

249 new_config["ENABLE_STRONG_VERIFICATION"] = True 

250 new_config["IRIS_ID_PUBKEY"] = "dummy_pubkey" 

251 new_config["IRIS_ID_SECRET"] = "dummy_secret" 

252 new_config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

253 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

254 ) 

255 

256 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

257 

258 monkeypatch.setattr(couchers.servicers.account, "config", new_config) 

259 monkeypatch.setattr(couchers.jobs.handlers, "config", new_config) 

260 

261 

262def test_strong_verification_happy_path(db, monkeypatch): 

263 monkeypatch_sv_config(monkeypatch) 

264 

265 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

266 _, superuser_token = generate_user(is_superuser=True) 

267 

268 update_badges(empty_pb2.Empty()) 

269 refresh_materialized_views_rapid(None) 

270 

271 with api_session(token) as api: 

272 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

273 assert "strong_verification" not in res.badges 

274 assert not res.has_strong_verification 

275 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

276 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

277 assert ( 

278 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

279 == res.has_strong_verification 

280 ) 

281 

282 do_and_check_sv( 

283 user, 

284 token, 

285 verification_id=5731012934821983, 

286 sex="MALE", 

287 dob="1988-01-01", 

288 document_type="PASSPORT", 

289 document_number="31195855", 

290 document_expiry=default_expiry, 

291 nationality="US", 

292 ) 

293 

294 with session_scope() as session: 

295 verification_attempt = session.execute( 

296 select(StrongVerificationAttempt).where(StrongVerificationAttempt.user_id == user.id) 

297 ).scalar_one() 

298 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

299 assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

300 assert verification_attempt.passport_sex == PassportSex.male 

301 assert verification_attempt.passport_expiry_date == default_expiry 

302 assert verification_attempt.passport_nationality == "US" 

303 assert verification_attempt.passport_last_three_document_chars == "855" 

304 

305 update_badges(empty_pb2.Empty()) 

306 refresh_materialized_views_rapid(None) 

307 

308 # the user should now have strong verification 

309 with api_session(token) as api: 

310 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

311 assert "strong_verification" in res.badges 

312 assert res.has_strong_verification 

313 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

314 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

315 assert ( 

316 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

317 == res.has_strong_verification 

318 ) 

319 

320 # wrong dob = no badge 

321 with session_scope() as session: 

322 session.execute(update(User).where(User.id == user.id).values(birthdate=date(1988, 1, 2))) 

323 

324 update_badges(empty_pb2.Empty()) 

325 refresh_materialized_views_rapid(None) 

326 

327 with api_session(token) as api: 

328 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

329 assert "strong_verification" not in res.badges 

330 assert not res.has_strong_verification 

331 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH 

332 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

333 assert ( 

334 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

335 == res.has_strong_verification 

336 ) 

337 

338 # bad gender-sex correspondence = no badge 

339 with session_scope() as session: 

340 session.execute(update(User).where(User.id == user.id).values(birthdate=date(1988, 1, 1), gender="Woman")) 

341 

342 update_badges(empty_pb2.Empty()) 

343 refresh_materialized_views_rapid(None) 

344 

345 with api_session(token) as api: 

346 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

347 assert "strong_verification" not in res.badges 

348 assert not res.has_strong_verification 

349 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

350 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

351 assert ( 

352 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

353 == res.has_strong_verification 

354 ) 

355 

356 with account_session(token) as account: 

357 res = account.GetAccountInfo(empty_pb2.Empty()) 

358 assert not res.has_strong_verification 

359 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

360 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

361 

362 # back to should have a badge 

363 with session_scope() as session: 

364 session.execute(update(User).where(User.id == user.id).values(gender="Man")) 

365 

366 update_badges(empty_pb2.Empty()) 

367 refresh_materialized_views_rapid(None) 

368 

369 with api_session(token) as api: 

370 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

371 assert "strong_verification" in res.badges 

372 assert res.has_strong_verification 

373 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

374 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

375 assert ( 

376 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

377 == res.has_strong_verification 

378 ) 

379 with account_session(token) as account: 

380 assert not any( 

381 reminder.HasField("complete_verification_reminder") 

382 for reminder in account.GetReminders(empty_pb2.Empty()).reminders 

383 ) 

384 

385 # check has_passport_sex_gender_exception 

386 with real_admin_session(superuser_token) as admin: 

387 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

388 assert "strong_verification" in res.badges 

389 assert res.has_strong_verification 

390 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

391 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

392 

393 admin.SetPassportSexGenderException( 

394 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=True) 

395 ) 

396 admin.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=user.username, gender="Woman")) 

397 

398 update_badges(empty_pb2.Empty()) 

399 refresh_materialized_views_rapid(None) 

400 

401 with api_session(token) as api: 

402 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

403 assert "strong_verification" in res.badges 

404 assert res.has_strong_verification 

405 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

406 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

407 assert ( 

408 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

409 == res.has_strong_verification 

410 ) 

411 with account_session(token) as account: 

412 assert not any( 

413 reminder.HasField("complete_verification_reminder") 

414 for reminder in account.GetReminders(empty_pb2.Empty()).reminders 

415 ) 

416 

417 with real_admin_session(superuser_token) as admin: 

418 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

419 assert "strong_verification" in res.badges 

420 assert res.has_strong_verification 

421 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

422 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

423 

424 # now turn exception off 

425 admin.SetPassportSexGenderException( 

426 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=False) 

427 ) 

428 

429 update_badges(empty_pb2.Empty()) 

430 refresh_materialized_views_rapid(None) 

431 

432 with api_session(token) as api: 

433 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

434 assert "strong_verification" not in res.badges 

435 assert not res.has_strong_verification 

436 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

437 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

438 assert ( 

439 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

440 == res.has_strong_verification 

441 ) 

442 with account_session(token) as account: 

443 assert any( 

444 reminder.HasField("complete_verification_reminder") 

445 for reminder in account.GetReminders(empty_pb2.Empty()).reminders 

446 ) 

447 

448 with real_admin_session(superuser_token) as admin: 

449 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

450 assert "strong_verification" not in res.badges 

451 assert not res.has_strong_verification 

452 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

453 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

454 

455 

456def test_strong_verification_delete_data(db, monkeypatch): 

457 monkeypatch_sv_config(monkeypatch) 

458 

459 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

460 _, superuser_token = generate_user(is_superuser=True) 

461 

462 refresh_materialized_views_rapid(None) 

463 

464 with api_session(token) as api: 

465 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

466 assert ( 

467 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

468 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

469 ) 

470 

471 # can remove SV data even if there is none, should do nothing 

472 with account_session(token) as account: 

473 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

474 

475 do_and_check_sv( 

476 user, 

477 token, 

478 verification_id=5731012934821983, 

479 sex="MALE", 

480 dob="1988-01-01", 

481 document_type="PASSPORT", 

482 document_number="31195855", 

483 document_expiry=default_expiry, 

484 nationality="US", 

485 ) 

486 

487 refresh_materialized_views_rapid(None) 

488 

489 # the user should now have strong verification 

490 with api_session(token) as api: 

491 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

492 assert ( 

493 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

494 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

495 ) 

496 

497 # check removing SV data 

498 with account_session(token) as account: 

499 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

500 

501 refresh_materialized_views_rapid(None) 

502 

503 with api_session(token) as api: 

504 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

505 assert ( 

506 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

507 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

508 ) 

509 

510 with session_scope() as session: 

511 assert ( 

512 len( 

513 session.execute( 

514 select(StrongVerificationAttempt).where( 

515 or_( 

516 StrongVerificationAttempt.passport_encrypted_data != None, 

517 StrongVerificationAttempt.passport_date_of_birth != None, 

518 StrongVerificationAttempt.passport_sex != None, 

519 ) 

520 ) 

521 ) 

522 .scalars() 

523 .all() 

524 ) 

525 == 0 

526 ) 

527 

528 

529def test_strong_verification_expiry(db, monkeypatch): 

530 monkeypatch_sv_config(monkeypatch) 

531 

532 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

533 _, superuser_token = generate_user(is_superuser=True) 

534 

535 refresh_materialized_views_rapid(None) 

536 

537 with api_session(token) as api: 

538 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

539 assert ( 

540 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

541 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

542 ) 

543 

544 expiry = date.today() + timedelta(days=10) 

545 

546 do_and_check_sv( 

547 user, 

548 token, 

549 verification_id=5731012934821983, 

550 sex="MALE", 

551 dob="1988-01-01", 

552 document_type="PASSPORT", 

553 document_number="31195855", 

554 document_expiry=expiry, 

555 nationality="US", 

556 ) 

557 

558 # the user should now have strong verification 

559 with api_session(token) as api: 

560 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

561 assert res.has_strong_verification 

562 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

563 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

564 

565 with session_scope() as session: 

566 attempt = session.execute(select(StrongVerificationAttempt)).scalars().one() 

567 attempt.passport_expiry_date = date.today() - timedelta(days=2) 

568 

569 with api_session(token) as api: 

570 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

571 assert not res.has_strong_verification 

572 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

573 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

574 

575 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

576 assert not res.has_strong_verification 

577 assert not res.has_strong_verification 

578 

579 do_and_check_sv( 

580 user, 

581 token, 

582 verification_id=5731012934821985, 

583 sex="MALE", 

584 dob="1988-01-01", 

585 document_type="PASSPORT", 

586 document_number="PA41323412", 

587 document_expiry=date.today() + timedelta(days=365), 

588 nationality="AU", 

589 ) 

590 

591 refresh_materialized_views_rapid(None) 

592 

593 with api_session(token) as api: 

594 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

595 assert ( 

596 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

597 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

598 ) 

599 

600 

601def test_strong_verification_regression(db, monkeypatch): 

602 monkeypatch_sv_config(monkeypatch) 

603 

604 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

605 

606 do_and_check_sv( 

607 user, 

608 token, 

609 verification_id=5731012934821983, 

610 sex="MALE", 

611 dob="1988-01-01", 

612 document_type="PASSPORT", 

613 document_number="31195855", 

614 document_expiry=default_expiry, 

615 nationality="US", 

616 return_after="INITIATED", 

617 ) 

618 

619 with api_session(token) as api: 

620 api.Ping(api_pb2.PingReq()) 

621 

622 

623def test_strong_verification_regression2(db, monkeypatch): 

624 monkeypatch_sv_config(monkeypatch) 

625 

626 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

627 

628 do_and_check_sv( 

629 user, 

630 token, 

631 verification_id=5731012934821983, 

632 sex="MALE", 

633 dob="1988-01-01", 

634 document_type="PASSPORT", 

635 document_number="31195855", 

636 document_expiry=default_expiry, 

637 nationality="US", 

638 return_after="INITIATED", 

639 ) 

640 

641 do_and_check_sv( 

642 user, 

643 token, 

644 verification_id=5731012934821985, 

645 sex="MALE", 

646 dob="1988-01-01", 

647 document_type="PASSPORT", 

648 document_number="PA41323412", 

649 document_expiry=default_expiry, 

650 nationality="AU", 

651 ) 

652 

653 refresh_materialized_views_rapid(None) 

654 

655 with api_session(token) as api: 

656 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

657 assert ( 

658 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

659 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

660 ) 

661 

662 

663def test_strong_verification_disabled(db): 

664 user, token = generate_user() 

665 

666 with account_session(token) as account: 

667 with pytest.raises(grpc.RpcError) as e: 

668 account.InitiateStrongVerification(empty_pb2.Empty()) 

669 assert e.value.code() == grpc.StatusCode.UNAVAILABLE 

670 assert e.value.details() == "Strong verification is currently disabled." 

671 

672 

673def test_strong_verification_delete_data_cant_reverify(db, monkeypatch, push_collector): 

674 monkeypatch_sv_config(monkeypatch) 

675 

676 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

677 _, superuser_token = generate_user(is_superuser=True) 

678 

679 refresh_materialized_views_rapid(None) 

680 

681 with api_session(token) as api: 

682 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

683 assert ( 

684 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

685 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

686 ) 

687 

688 do_and_check_sv( 

689 user, 

690 token, 

691 verification_id=5731012934821983, 

692 sex="MALE", 

693 dob="1988-01-01", 

694 document_type="PASSPORT", 

695 document_number="31195855", 

696 document_expiry=default_expiry, 

697 nationality="US", 

698 ) 

699 

700 refresh_materialized_views_rapid(None) 

701 

702 # the user should now have strong verification 

703 with api_session(token) as api: 

704 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

705 assert ( 

706 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

707 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

708 ) 

709 

710 # check removing SV data 

711 with account_session(token) as account: 

712 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

713 

714 refresh_materialized_views_rapid(None) 

715 

716 with api_session(token) as api: 

717 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

718 assert ( 

719 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

720 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

721 ) 

722 

723 with session_scope() as session: 

724 assert ( 

725 len( 

726 session.execute( 

727 select(StrongVerificationAttempt).where( 

728 or_( 

729 StrongVerificationAttempt.passport_encrypted_data != None, 

730 StrongVerificationAttempt.passport_date_of_birth != None, 

731 StrongVerificationAttempt.passport_sex != None, 

732 ) 

733 ) 

734 ) 

735 .scalars() 

736 .all() 

737 ) 

738 == 0 

739 ) 

740 

741 reference_data = do_and_check_sv( 

742 user, 

743 token, 

744 verification_id=5731012934821984, 

745 sex="MALE", 

746 dob="1988-01-01", 

747 document_type="PASSPORT", 

748 document_number="31195855", 

749 document_expiry=default_expiry, 

750 nationality="US", 

751 return_after="APPROVED", 

752 ) 

753 

754 with patch("couchers.jobs.handlers.requests.post") as mock: 

755 json_resp2 = { 

756 "id": 5731012934821984, 

757 "created": "2024-05-11T15:46:46Z", 

758 "expires": "2024-05-11T16:17:26Z", 

759 "state": "APPROVED", 

760 "reference": reference_data, 

761 "user_ip": "10.123.123.123", 

762 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

763 "given_names": "John Wayne", 

764 "surname": "Doe", 

765 "nationality": "US", 

766 "sex": "MALE", 

767 "date_of_birth": "1988-01-01", 

768 "document_type": "PASSPORT", 

769 "document_number": "31195855", 

770 "expiry_date": default_expiry.isoformat(), 

771 "issuing_country": "US", 

772 "issuer": "Department of State, U.S. Government", 

773 "portrait": "dGVzdHRlc3R0ZXN0...", 

774 } 

775 mock.return_value = type( 

776 "__MockResponse", 

777 (), 

778 { 

779 "status_code": 200, 

780 "text": json.dumps(json_resp2), 

781 "json": lambda: json_resp2, 

782 }, 

783 ) 

784 while process_job(): 

785 pass 

786 

787 mock.assert_called_once_with( 

788 "https://passportreader.app/api/v1/session.get", 

789 auth=("dummy_pubkey", "dummy_secret"), 

790 json={"id": 5731012934821984}, 

791 timeout=10, 

792 verify="/etc/ssl/certs/ca-certificates.crt", 

793 ) 

794 

795 with session_scope() as session: 

796 verification_attempt = session.execute( 

797 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

798 ).scalar_one() 

799 assert verification_attempt.user_id == user.id 

800 assert verification_attempt.status == StrongVerificationAttemptStatus.duplicate 

801 

802 push_collector.assert_user_push_matches_fields( 

803 user.id, 

804 ix=1, 

805 title="Strong Verification failed", 

806 body="You tried to verify with a passport that has already been used for verification. Please use another passport.", 

807 ) 

808 

809 refresh_materialized_views_rapid(None) 

810 

811 with api_session(token) as api: 

812 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

813 assert ( 

814 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

815 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

816 ) 

817 

818 

819def test_strong_verification_duplicate_other_user(db, monkeypatch, push_collector): 

820 monkeypatch_sv_config(monkeypatch) 

821 

822 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

823 user2, token2 = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

824 _, superuser_token = generate_user(is_superuser=True) 

825 

826 refresh_materialized_views_rapid(None) 

827 

828 with api_session(token) as api: 

829 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

830 assert ( 

831 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

832 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

833 ) 

834 

835 # can remove SV data even if there is none, should do nothing 

836 with account_session(token) as account: 

837 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

838 

839 do_and_check_sv( 

840 user, 

841 token, 

842 verification_id=5731012934821983, 

843 sex="MALE", 

844 dob="1988-01-01", 

845 document_type="PASSPORT", 

846 document_number="31195855", 

847 document_expiry=default_expiry, 

848 nationality="US", 

849 ) 

850 

851 refresh_materialized_views_rapid(None) 

852 

853 # the user should now have strong verification 

854 with api_session(token) as api: 

855 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

856 assert ( 

857 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

858 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

859 ) 

860 

861 # check removing SV data 

862 with account_session(token) as account: 

863 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

864 

865 refresh_materialized_views_rapid(None) 

866 

867 with api_session(token) as api: 

868 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

869 assert ( 

870 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

871 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

872 ) 

873 

874 with session_scope() as session: 

875 assert ( 

876 len( 

877 session.execute( 

878 select(StrongVerificationAttempt).where( 

879 or_( 

880 StrongVerificationAttempt.passport_encrypted_data != None, 

881 StrongVerificationAttempt.passport_date_of_birth != None, 

882 StrongVerificationAttempt.passport_sex != None, 

883 ) 

884 ) 

885 ) 

886 .scalars() 

887 .all() 

888 ) 

889 == 0 

890 ) 

891 

892 reference_data = do_and_check_sv( 

893 user2, 

894 token2, 

895 verification_id=5731012934821984, 

896 sex="MALE", 

897 dob="1988-01-01", 

898 document_type="PASSPORT", 

899 document_number="31195855", 

900 document_expiry=default_expiry, 

901 nationality="US", 

902 return_after="APPROVED", 

903 ) 

904 

905 with patch("couchers.jobs.handlers.requests.post") as mock: 

906 json_resp2 = { 

907 "id": 5731012934821984, 

908 "created": "2024-05-11T15:46:46Z", 

909 "expires": "2024-05-11T16:17:26Z", 

910 "state": "APPROVED", 

911 "reference": reference_data, 

912 "user_ip": "10.123.123.123", 

913 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

914 "given_names": "John Wayne", 

915 "surname": "Doe", 

916 "nationality": "US", 

917 "sex": "MALE", 

918 "date_of_birth": "1988-01-01", 

919 "document_type": "PASSPORT", 

920 "document_number": "31195855", 

921 "expiry_date": default_expiry.isoformat(), 

922 "issuing_country": "US", 

923 "issuer": "Department of State, U.S. Government", 

924 "portrait": "dGVzdHRlc3R0ZXN0...", 

925 } 

926 mock.return_value = type( 

927 "__MockResponse", 

928 (), 

929 { 

930 "status_code": 200, 

931 "text": json.dumps(json_resp2), 

932 "json": lambda: json_resp2, 

933 }, 

934 ) 

935 while process_job(): 

936 pass 

937 

938 mock.assert_called_once_with( 

939 "https://passportreader.app/api/v1/session.get", 

940 auth=("dummy_pubkey", "dummy_secret"), 

941 json={"id": 5731012934821984}, 

942 timeout=10, 

943 verify="/etc/ssl/certs/ca-certificates.crt", 

944 ) 

945 

946 with session_scope() as session: 

947 verification_attempt = session.execute( 

948 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

949 ).scalar_one() 

950 assert verification_attempt.user_id == user2.id 

951 assert verification_attempt.status == StrongVerificationAttemptStatus.duplicate 

952 

953 push_collector.assert_user_push_matches_fields( 

954 user2.id, 

955 title="Strong Verification failed", 

956 body="You tried to verify with a passport that has already been used for verification. Please use another passport.", 

957 ) 

958 

959 

960def test_strong_verification_non_passport(db, monkeypatch, push_collector): 

961 monkeypatch_sv_config(monkeypatch) 

962 

963 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

964 _, superuser_token = generate_user(is_superuser=True) 

965 

966 reference_data = do_and_check_sv( 

967 user, 

968 token, 

969 verification_id=5731012934821984, 

970 sex="MALE", 

971 dob="1988-01-01", 

972 document_type="IDENTITY_CARD", 

973 document_number="31195855", 

974 document_expiry=default_expiry, 

975 nationality="US", 

976 return_after="APPROVED", 

977 ) 

978 

979 with patch("couchers.jobs.handlers.requests.post") as mock: 

980 json_resp2 = { 

981 "id": 5731012934821984, 

982 "created": "2024-05-11T15:46:46Z", 

983 "expires": "2024-05-11T16:17:26Z", 

984 "state": "APPROVED", 

985 "reference": reference_data, 

986 "user_ip": "10.123.123.123", 

987 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

988 "given_names": "John Wayne", 

989 "surname": "Doe", 

990 "nationality": "US", 

991 "sex": "MALE", 

992 "date_of_birth": "1988-01-01", 

993 "document_type": "IDENTITY_CARD", 

994 "document_number": "31195855", 

995 "expiry_date": default_expiry.isoformat(), 

996 "issuing_country": "US", 

997 "issuer": "Department of State, U.S. Government", 

998 "portrait": "dGVzdHRlc3R0ZXN0...", 

999 } 

1000 mock.return_value = type( 

1001 "__MockResponse", 

1002 (), 

1003 { 

1004 "status_code": 200, 

1005 "text": json.dumps(json_resp2), 

1006 "json": lambda: json_resp2, 

1007 }, 

1008 ) 

1009 while process_job(): 

1010 pass 

1011 

1012 mock.assert_called_once_with( 

1013 "https://passportreader.app/api/v1/session.get", 

1014 auth=("dummy_pubkey", "dummy_secret"), 

1015 json={"id": 5731012934821984}, 

1016 timeout=10, 

1017 verify="/etc/ssl/certs/ca-certificates.crt", 

1018 ) 

1019 

1020 with session_scope() as session: 

1021 verification_attempt = session.execute( 

1022 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

1023 ).scalar_one() 

1024 assert verification_attempt.user_id == user.id 

1025 assert verification_attempt.status == StrongVerificationAttemptStatus.failed 

1026 

1027 push_collector.assert_user_push_matches_fields( 

1028 user.id, 

1029 title="Strong Verification failed", 

1030 body="You tried to verify with a document that is not a passport. You can only use a passport for Strong Verification.", 

1031 )