Coverage for src/tests/test_strong_verification.py: 99%

371 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-12-20 18:03 +0000

1import json 

2from datetime import date, timedelta 

3from unittest.mock import ANY, patch 

4from urllib.parse import urlencode 

5 

6import grpc 

7import pytest 

8from google.protobuf import empty_pb2 

9from sqlalchemy.sql import or_ 

10 

11import couchers.jobs.handlers 

12import couchers.servicers.account 

13from couchers import errors 

14from couchers.config import config 

15from couchers.crypto import asym_decrypt, b64encode_unpadded 

16from couchers.db import session_scope 

17from couchers.jobs.handlers import update_badges 

18from couchers.jobs.worker import process_job 

19from couchers.materialized_views import refresh_materialized_views_rapid 

20from couchers.models import ( 

21 PassportSex, 

22 StrongVerificationAttempt, 

23 StrongVerificationAttemptStatus, 

24 StrongVerificationCallbackEvent, 

25 User, 

26) 

27from couchers.sql import couchers_select as select 

28from proto import account_pb2, admin_pb2, api_pb2 

29from proto.google.api import httpbody_pb2 

30from tests.test_fixtures import ( # noqa 

31 account_session, 

32 api_session, 

33 db, 

34 generate_user, 

35 push_collector, 

36 real_admin_session, 

37 real_iris_session, 

38 testconfig, 

39) 

40 

41 

42@pytest.fixture(autouse=True) 

43def _(testconfig): 

44 pass 

45 

46 

47def _emulate_iris_callback(session_id, session_state, reference): 

48 assert session_state in ["CREATED", "INITIATED", "FAILED", "ABORTED", "COMPLETED", "REJECTED", "APPROVED"] 

49 with real_iris_session() as iris: 

50 data = json.dumps( 

51 {"session_id": session_id, "session_state": session_state, "session_reference": reference} 

52 ).encode("ascii") 

53 iris.Webhook(httpbody_pb2.HttpBody(content_type="application/json", data=data)) 

54 

55 

56default_expiry = date.today() + timedelta(days=5 * 365) 

57 

58 

59def do_and_check_sv( 

60 user, 

61 token, 

62 verification_id, 

63 sex, 

64 dob, 

65 document_type, 

66 document_number, 

67 document_expiry, 

68 nationality, 

69 return_after=None, 

70): 

71 iris_token_data = { 

72 "merchant_id": 5731012934821982, 

73 "session_id": verification_id, 

74 "seed": 1674246339, 

75 "face_verification": False, 

76 "host": "https://passportreader.app", 

77 } 

78 iris_token = b64encode_unpadded(json.dumps(iris_token_data).encode("utf8")) 

79 

80 with account_session(token) as account: 

81 # start by initiation 

82 with patch("couchers.servicers.account.requests.post") as mock: 

83 json_resp1 = { 

84 "id": verification_id, 

85 "token": iris_token, 

86 } 

87 mock.return_value = type( 

88 "__MockResponse", 

89 (), 

90 { 

91 "status_code": 200, 

92 "text": json.dumps(json_resp1), 

93 "json": lambda: json_resp1, 

94 }, 

95 ) 

96 res = account.InitiateStrongVerification(empty_pb2.Empty()) 

97 mock.assert_called_once_with( 

98 "https://passportreader.app/api/v1/session.create", 

99 auth=("dummy_pubkey", "dummy_secret"), 

100 json={ 

101 "callback_url": "http://localhost:8888/iris/webhook", 

102 "face_verification": False, 

103 "reference": ANY, 

104 }, 

105 timeout=10, 

106 ) 

107 reference_data = mock.call_args.kwargs["json"]["reference"] 

108 verification_attempt_token = res.verification_attempt_token 

109 return_url = f"http://localhost:3000/complete-strong-verification?verification_attempt_token={verification_attempt_token}" 

110 assert res.redirect_url == "https://passportreader.app/open?" + urlencode( 

111 {"token": iris_token, "redirect_url": return_url} 

112 ) 

113 

114 assert ( 

115 account.GetStrongVerificationAttemptStatus( 

116 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

117 ).status 

118 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP 

119 ) 

120 

121 # ok, now the user downloads the app, scans their id, and Iris ID sends callbacks to the server 

122 _emulate_iris_callback(verification_id, "INITIATED", reference_data) 

123 

124 with account_session(token) as account: 

125 assert ( 

126 account.GetStrongVerificationAttemptStatus( 

127 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

128 ).status 

129 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP 

130 ) 

131 

132 if return_after == "INITIATED": 

133 return reference_data 

134 

135 _emulate_iris_callback(verification_id, "COMPLETED", reference_data) 

136 

137 with account_session(token) as account: 

138 assert ( 

139 account.GetStrongVerificationAttemptStatus( 

140 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

141 ).status 

142 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

143 ) 

144 

145 if return_after == "COMPLETED": 

146 return reference_data 

147 

148 _emulate_iris_callback(verification_id, "APPROVED", reference_data) 

149 

150 with account_session(token) as account: 

151 assert ( 

152 account.GetStrongVerificationAttemptStatus( 

153 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

154 ).status 

155 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

156 ) 

157 

158 if return_after == "APPROVED": 

159 return reference_data 

160 

161 with patch("couchers.jobs.handlers.requests.post") as mock: 

162 json_resp2 = { 

163 "id": verification_id, 

164 "created": "2024-05-11T15:46:46Z", 

165 "expires": "2024-05-11T16:17:26Z", 

166 "state": "APPROVED", 

167 "reference": reference_data, 

168 "user_ip": "10.123.123.123", 

169 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

170 "given_names": "John Wayne", 

171 "surname": "Doe", 

172 "nationality": nationality, 

173 "sex": sex, 

174 "date_of_birth": dob, 

175 "document_type": document_type, 

176 "document_number": document_number, 

177 "expiry_date": document_expiry.isoformat(), 

178 "issuing_country": nationality, 

179 "issuer": "Department of State, U.S. Government", 

180 "portrait": "dGVzdHRlc3R0ZXN0...", 

181 } 

182 mock.return_value = type( 

183 "__MockResponse", 

184 (), 

185 { 

186 "status_code": 200, 

187 "text": json.dumps(json_resp2), 

188 "json": lambda: json_resp2, 

189 }, 

190 ) 

191 while process_job(): 

192 pass 

193 

194 mock.assert_called_once_with( 

195 "https://passportreader.app/api/v1/session.get", 

196 auth=("dummy_pubkey", "dummy_secret"), 

197 json={"id": verification_id}, 

198 timeout=10, 

199 ) 

200 

201 with account_session(token) as account: 

202 assert ( 

203 account.GetStrongVerificationAttemptStatus( 

204 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

205 ).status 

206 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED 

207 ) 

208 

209 with session_scope() as session: 

210 verification_attempt = session.execute( 

211 select(StrongVerificationAttempt).where( 

212 StrongVerificationAttempt.verification_attempt_token == verification_attempt_token 

213 ) 

214 ).scalar_one() 

215 assert verification_attempt.user_id == user.id 

216 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

217 assert verification_attempt.has_full_data 

218 assert verification_attempt.passport_encrypted_data 

219 # assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

220 # assert verification_attempt.passport_sex == PassportSex.male 

221 assert verification_attempt.has_minimal_data 

222 assert verification_attempt.passport_expiry_date == document_expiry 

223 assert verification_attempt.passport_nationality == nationality 

224 assert verification_attempt.passport_last_three_document_chars == document_number[-3:] 

225 assert verification_attempt.iris_token == iris_token 

226 assert verification_attempt.iris_session_id == verification_id 

227 

228 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

229 decrypted_data = json.loads(asym_decrypt(private_key, verification_attempt.passport_encrypted_data)) 

230 assert decrypted_data == json_resp2 

231 

232 callbacks = ( 

233 session.execute( 

234 select(StrongVerificationCallbackEvent.iris_status) 

235 .where(StrongVerificationCallbackEvent.verification_attempt_id == verification_attempt.id) 

236 .order_by(StrongVerificationCallbackEvent.created.asc()) 

237 ) 

238 .scalars() 

239 .all() 

240 ) 

241 assert callbacks == ["INITIATED", "COMPLETED", "APPROVED"] 

242 

243 

244def monkeypatch_sv_config(monkeypatch): 

245 new_config = config.copy() 

246 new_config["ENABLE_STRONG_VERIFICATION"] = True 

247 new_config["IRIS_ID_PUBKEY"] = "dummy_pubkey" 

248 new_config["IRIS_ID_SECRET"] = "dummy_secret" 

249 new_config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

250 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

251 ) 

252 

253 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

254 

255 monkeypatch.setattr(couchers.servicers.account, "config", new_config) 

256 monkeypatch.setattr(couchers.jobs.handlers, "config", new_config) 

257 

258 

259def test_strong_verification_happy_path(db, monkeypatch): 

260 monkeypatch_sv_config(monkeypatch) 

261 

262 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

263 _, superuser_token = generate_user(is_superuser=True) 

264 

265 update_badges(empty_pb2.Empty()) 

266 refresh_materialized_views_rapid(None) 

267 

268 with api_session(token) as api: 

269 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

270 assert "strong_verification" not in res.badges 

271 assert not res.has_strong_verification 

272 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

273 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

274 assert ( 

275 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

276 == res.has_strong_verification 

277 ) 

278 

279 do_and_check_sv( 

280 user, 

281 token, 

282 verification_id=5731012934821983, 

283 sex="MALE", 

284 dob="1988-01-01", 

285 document_type="PASSPORT", 

286 document_number="31195855", 

287 document_expiry=default_expiry, 

288 nationality="US", 

289 ) 

290 

291 with session_scope() as session: 

292 verification_attempt = session.execute( 

293 select(StrongVerificationAttempt).where(StrongVerificationAttempt.user_id == user.id) 

294 ).scalar_one() 

295 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

296 assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

297 assert verification_attempt.passport_sex == PassportSex.male 

298 assert verification_attempt.passport_expiry_date == default_expiry 

299 assert verification_attempt.passport_nationality == "US" 

300 assert verification_attempt.passport_last_three_document_chars == "855" 

301 

302 update_badges(empty_pb2.Empty()) 

303 refresh_materialized_views_rapid(None) 

304 

305 # the user should now have strong verification 

306 with api_session(token) as api: 

307 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

308 assert "strong_verification" in res.badges 

309 assert res.has_strong_verification 

310 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

311 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

312 assert ( 

313 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

314 == res.has_strong_verification 

315 ) 

316 

317 # wrong dob = no badge 

318 with session_scope() as session: 

319 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one() 

320 user_.birthdate = date(1988, 1, 2) 

321 

322 update_badges(empty_pb2.Empty()) 

323 refresh_materialized_views_rapid(None) 

324 

325 with api_session(token) as api: 

326 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

327 assert "strong_verification" not in res.badges 

328 assert not res.has_strong_verification 

329 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH 

330 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

331 assert ( 

332 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

333 == res.has_strong_verification 

334 ) 

335 

336 # bad gender-sex correspondence = no badge 

337 with session_scope() as session: 

338 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one() 

339 user_.birthdate = date(1988, 1, 1) 

340 user_.gender = "Woman" 

341 

342 update_badges(empty_pb2.Empty()) 

343 refresh_materialized_views_rapid(None) 

344 

345 with api_session(token) as api: 

346 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

347 assert "strong_verification" not in res.badges 

348 assert not res.has_strong_verification 

349 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

350 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

351 assert ( 

352 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

353 == res.has_strong_verification 

354 ) 

355 

356 with account_session(token) as account: 

357 res = account.GetAccountInfo(empty_pb2.Empty()) 

358 assert not res.has_strong_verification 

359 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

360 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

361 

362 # back to should have a badge 

363 with session_scope() as session: 

364 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one() 

365 user_.gender = "Man" 

366 

367 update_badges(empty_pb2.Empty()) 

368 refresh_materialized_views_rapid(None) 

369 

370 with api_session(token) as api: 

371 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

372 assert "strong_verification" in res.badges 

373 assert res.has_strong_verification 

374 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

375 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

376 assert ( 

377 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

378 == res.has_strong_verification 

379 ) 

380 

381 # check has_passport_sex_gender_exception 

382 with real_admin_session(superuser_token) as admin: 

383 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

384 assert "strong_verification" in res.badges 

385 assert res.has_strong_verification 

386 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

387 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

388 

389 admin.SetPassportSexGenderException( 

390 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=True) 

391 ) 

392 admin.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=user.username, gender="Woman")) 

393 

394 update_badges(empty_pb2.Empty()) 

395 refresh_materialized_views_rapid(None) 

396 

397 with api_session(token) as api: 

398 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

399 assert "strong_verification" in res.badges 

400 assert res.has_strong_verification 

401 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

402 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

403 assert ( 

404 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

405 == res.has_strong_verification 

406 ) 

407 

408 with real_admin_session(superuser_token) as admin: 

409 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

410 assert "strong_verification" in res.badges 

411 assert res.has_strong_verification 

412 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

413 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

414 

415 # now turn exception off 

416 admin.SetPassportSexGenderException( 

417 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=False) 

418 ) 

419 

420 update_badges(empty_pb2.Empty()) 

421 refresh_materialized_views_rapid(None) 

422 

423 with api_session(token) as api: 

424 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

425 assert "strong_verification" not in res.badges 

426 assert not res.has_strong_verification 

427 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

428 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

429 assert ( 

430 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

431 == res.has_strong_verification 

432 ) 

433 

434 with real_admin_session(superuser_token) as admin: 

435 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

436 assert "strong_verification" not in res.badges 

437 assert not res.has_strong_verification 

438 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

439 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

440 

441 

442def test_strong_verification_delete_data(db, monkeypatch): 

443 monkeypatch_sv_config(monkeypatch) 

444 

445 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

446 _, superuser_token = generate_user(is_superuser=True) 

447 

448 refresh_materialized_views_rapid(None) 

449 

450 with api_session(token) as api: 

451 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

452 assert ( 

453 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

454 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

455 ) 

456 

457 # can remove SV data even if there is none, should do nothing 

458 with account_session(token) as account: 

459 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

460 

461 do_and_check_sv( 

462 user, 

463 token, 

464 verification_id=5731012934821983, 

465 sex="MALE", 

466 dob="1988-01-01", 

467 document_type="PASSPORT", 

468 document_number="31195855", 

469 document_expiry=default_expiry, 

470 nationality="US", 

471 ) 

472 

473 refresh_materialized_views_rapid(None) 

474 

475 # the user should now have strong verification 

476 with api_session(token) as api: 

477 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

478 assert ( 

479 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

480 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

481 ) 

482 

483 # check removing SV data 

484 with account_session(token) as account: 

485 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

486 

487 refresh_materialized_views_rapid(None) 

488 

489 with api_session(token) as api: 

490 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

491 assert ( 

492 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

493 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

494 ) 

495 

496 with session_scope() as session: 

497 assert ( 

498 len( 

499 session.execute( 

500 select(StrongVerificationAttempt).where( 

501 or_( 

502 StrongVerificationAttempt.passport_encrypted_data != None, 

503 StrongVerificationAttempt.passport_date_of_birth != None, 

504 StrongVerificationAttempt.passport_sex != None, 

505 ) 

506 ) 

507 ) 

508 .scalars() 

509 .all() 

510 ) 

511 == 0 

512 ) 

513 

514 

515def test_strong_verification_expiry(db, monkeypatch): 

516 monkeypatch_sv_config(monkeypatch) 

517 

518 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

519 _, superuser_token = generate_user(is_superuser=True) 

520 

521 refresh_materialized_views_rapid(None) 

522 

523 with api_session(token) as api: 

524 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

525 assert ( 

526 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

527 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

528 ) 

529 

530 expiry = date.today() + timedelta(days=10) 

531 

532 do_and_check_sv( 

533 user, 

534 token, 

535 verification_id=5731012934821983, 

536 sex="MALE", 

537 dob="1988-01-01", 

538 document_type="PASSPORT", 

539 document_number="31195855", 

540 document_expiry=expiry, 

541 nationality="US", 

542 ) 

543 

544 # the user should now have strong verification 

545 with api_session(token) as api: 

546 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

547 assert res.has_strong_verification 

548 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

549 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

550 

551 with session_scope() as session: 

552 attempt = session.execute(select(StrongVerificationAttempt)).scalars().one() 

553 attempt.passport_expiry_date = date.today() - timedelta(days=2) 

554 

555 with api_session(token) as api: 

556 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

557 assert not res.has_strong_verification 

558 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

559 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

560 

561 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

562 assert not res.has_strong_verification 

563 assert not res.has_strong_verification 

564 

565 do_and_check_sv( 

566 user, 

567 token, 

568 verification_id=5731012934821985, 

569 sex="MALE", 

570 dob="1988-01-01", 

571 document_type="PASSPORT", 

572 document_number="PA41323412", 

573 document_expiry=date.today() + timedelta(days=365), 

574 nationality="AU", 

575 ) 

576 

577 refresh_materialized_views_rapid(None) 

578 

579 with api_session(token) as api: 

580 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

581 assert ( 

582 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

583 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

584 ) 

585 

586 

587def test_strong_verification_regression(db, monkeypatch): 

588 monkeypatch_sv_config(monkeypatch) 

589 

590 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

591 

592 do_and_check_sv( 

593 user, 

594 token, 

595 verification_id=5731012934821983, 

596 sex="MALE", 

597 dob="1988-01-01", 

598 document_type="PASSPORT", 

599 document_number="31195855", 

600 document_expiry=default_expiry, 

601 nationality="US", 

602 return_after="INITIATED", 

603 ) 

604 

605 with api_session(token) as api: 

606 api.Ping(api_pb2.PingReq()) 

607 

608 

609def test_strong_verification_regression2(db, monkeypatch): 

610 monkeypatch_sv_config(monkeypatch) 

611 

612 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

613 

614 do_and_check_sv( 

615 user, 

616 token, 

617 verification_id=5731012934821983, 

618 sex="MALE", 

619 dob="1988-01-01", 

620 document_type="PASSPORT", 

621 document_number="31195855", 

622 document_expiry=default_expiry, 

623 nationality="US", 

624 return_after="INITIATED", 

625 ) 

626 

627 do_and_check_sv( 

628 user, 

629 token, 

630 verification_id=5731012934821985, 

631 sex="MALE", 

632 dob="1988-01-01", 

633 document_type="PASSPORT", 

634 document_number="PA41323412", 

635 document_expiry=default_expiry, 

636 nationality="AU", 

637 ) 

638 

639 refresh_materialized_views_rapid(None) 

640 

641 with api_session(token) as api: 

642 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

643 assert ( 

644 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

645 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

646 ) 

647 

648 

649def test_strong_verification_disabled(db): 

650 user, token = generate_user() 

651 

652 with account_session(token) as account: 

653 with pytest.raises(grpc.RpcError) as e: 

654 account.InitiateStrongVerification(empty_pb2.Empty()) 

655 assert e.value.code() == grpc.StatusCode.UNAVAILABLE 

656 assert e.value.details() == errors.STRONG_VERIFICATION_DISABLED 

657 

658 

659def test_strong_verification_delete_data_cant_reverify(db, monkeypatch, push_collector): 

660 monkeypatch_sv_config(monkeypatch) 

661 

662 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

663 _, superuser_token = generate_user(is_superuser=True) 

664 

665 refresh_materialized_views_rapid(None) 

666 

667 with api_session(token) as api: 

668 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

669 assert ( 

670 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

671 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

672 ) 

673 

674 do_and_check_sv( 

675 user, 

676 token, 

677 verification_id=5731012934821983, 

678 sex="MALE", 

679 dob="1988-01-01", 

680 document_type="PASSPORT", 

681 document_number="31195855", 

682 document_expiry=default_expiry, 

683 nationality="US", 

684 ) 

685 

686 refresh_materialized_views_rapid(None) 

687 

688 # the user should now have strong verification 

689 with api_session(token) as api: 

690 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

691 assert ( 

692 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

693 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

694 ) 

695 

696 # check removing SV data 

697 with account_session(token) as account: 

698 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

699 

700 refresh_materialized_views_rapid(None) 

701 

702 with api_session(token) as api: 

703 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

704 assert ( 

705 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

706 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

707 ) 

708 

709 with session_scope() as session: 

710 assert ( 

711 len( 

712 session.execute( 

713 select(StrongVerificationAttempt).where( 

714 or_( 

715 StrongVerificationAttempt.passport_encrypted_data != None, 

716 StrongVerificationAttempt.passport_date_of_birth != None, 

717 StrongVerificationAttempt.passport_sex != None, 

718 ) 

719 ) 

720 ) 

721 .scalars() 

722 .all() 

723 ) 

724 == 0 

725 ) 

726 

727 reference_data = do_and_check_sv( 

728 user, 

729 token, 

730 verification_id=5731012934821984, 

731 sex="MALE", 

732 dob="1988-01-01", 

733 document_type="PASSPORT", 

734 document_number="31195855", 

735 document_expiry=default_expiry, 

736 nationality="US", 

737 return_after="APPROVED", 

738 ) 

739 

740 with patch("couchers.jobs.handlers.requests.post") as mock: 

741 json_resp2 = { 

742 "id": 5731012934821984, 

743 "created": "2024-05-11T15:46:46Z", 

744 "expires": "2024-05-11T16:17:26Z", 

745 "state": "APPROVED", 

746 "reference": reference_data, 

747 "user_ip": "10.123.123.123", 

748 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

749 "given_names": "John Wayne", 

750 "surname": "Doe", 

751 "nationality": "US", 

752 "sex": "MALE", 

753 "date_of_birth": "1988-01-01", 

754 "document_type": "PASSPORT", 

755 "document_number": "31195855", 

756 "expiry_date": default_expiry.isoformat(), 

757 "issuing_country": "US", 

758 "issuer": "Department of State, U.S. Government", 

759 "portrait": "dGVzdHRlc3R0ZXN0...", 

760 } 

761 mock.return_value = type( 

762 "__MockResponse", 

763 (), 

764 { 

765 "status_code": 200, 

766 "text": json.dumps(json_resp2), 

767 "json": lambda: json_resp2, 

768 }, 

769 ) 

770 while process_job(): 

771 pass 

772 

773 mock.assert_called_once_with( 

774 "https://passportreader.app/api/v1/session.get", 

775 auth=("dummy_pubkey", "dummy_secret"), 

776 json={"id": 5731012934821984}, 

777 timeout=10, 

778 ) 

779 

780 with session_scope() as session: 

781 verification_attempt = session.execute( 

782 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

783 ).scalar_one() 

784 assert verification_attempt.user_id == user.id 

785 assert verification_attempt.status == StrongVerificationAttemptStatus.duplicate 

786 

787 push_collector.assert_user_push_matches_fields( 

788 user.id, 

789 ix=1, 

790 title="Strong Verification failed", 

791 body="You tried to verify with a passport that has already been used for verification. Please use another passport.", 

792 ) 

793 

794 refresh_materialized_views_rapid(None) 

795 

796 with api_session(token) as api: 

797 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

798 assert ( 

799 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

800 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

801 ) 

802 

803 

804def test_strong_verification_duplicate_other_user(db, monkeypatch, push_collector): 

805 monkeypatch_sv_config(monkeypatch) 

806 

807 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

808 user2, token2 = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

809 _, superuser_token = generate_user(is_superuser=True) 

810 

811 refresh_materialized_views_rapid(None) 

812 

813 with api_session(token) as api: 

814 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

815 assert ( 

816 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

817 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

818 ) 

819 

820 # can remove SV data even if there is none, should do nothing 

821 with account_session(token) as account: 

822 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

823 

824 do_and_check_sv( 

825 user, 

826 token, 

827 verification_id=5731012934821983, 

828 sex="MALE", 

829 dob="1988-01-01", 

830 document_type="PASSPORT", 

831 document_number="31195855", 

832 document_expiry=default_expiry, 

833 nationality="US", 

834 ) 

835 

836 refresh_materialized_views_rapid(None) 

837 

838 # the user should now have strong verification 

839 with api_session(token) as api: 

840 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

841 assert ( 

842 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

843 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

844 ) 

845 

846 # check removing SV data 

847 with account_session(token) as account: 

848 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

849 

850 refresh_materialized_views_rapid(None) 

851 

852 with api_session(token) as api: 

853 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

854 assert ( 

855 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

856 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

857 ) 

858 

859 with session_scope() as session: 

860 assert ( 

861 len( 

862 session.execute( 

863 select(StrongVerificationAttempt).where( 

864 or_( 

865 StrongVerificationAttempt.passport_encrypted_data != None, 

866 StrongVerificationAttempt.passport_date_of_birth != None, 

867 StrongVerificationAttempt.passport_sex != None, 

868 ) 

869 ) 

870 ) 

871 .scalars() 

872 .all() 

873 ) 

874 == 0 

875 ) 

876 

877 reference_data = do_and_check_sv( 

878 user2, 

879 token2, 

880 verification_id=5731012934821984, 

881 sex="MALE", 

882 dob="1988-01-01", 

883 document_type="PASSPORT", 

884 document_number="31195855", 

885 document_expiry=default_expiry, 

886 nationality="US", 

887 return_after="APPROVED", 

888 ) 

889 

890 with patch("couchers.jobs.handlers.requests.post") as mock: 

891 json_resp2 = { 

892 "id": 5731012934821984, 

893 "created": "2024-05-11T15:46:46Z", 

894 "expires": "2024-05-11T16:17:26Z", 

895 "state": "APPROVED", 

896 "reference": reference_data, 

897 "user_ip": "10.123.123.123", 

898 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

899 "given_names": "John Wayne", 

900 "surname": "Doe", 

901 "nationality": "US", 

902 "sex": "MALE", 

903 "date_of_birth": "1988-01-01", 

904 "document_type": "PASSPORT", 

905 "document_number": "31195855", 

906 "expiry_date": default_expiry.isoformat(), 

907 "issuing_country": "US", 

908 "issuer": "Department of State, U.S. Government", 

909 "portrait": "dGVzdHRlc3R0ZXN0...", 

910 } 

911 mock.return_value = type( 

912 "__MockResponse", 

913 (), 

914 { 

915 "status_code": 200, 

916 "text": json.dumps(json_resp2), 

917 "json": lambda: json_resp2, 

918 }, 

919 ) 

920 while process_job(): 

921 pass 

922 

923 mock.assert_called_once_with( 

924 "https://passportreader.app/api/v1/session.get", 

925 auth=("dummy_pubkey", "dummy_secret"), 

926 json={"id": 5731012934821984}, 

927 timeout=10, 

928 ) 

929 

930 with session_scope() as session: 

931 verification_attempt = session.execute( 

932 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

933 ).scalar_one() 

934 assert verification_attempt.user_id == user2.id 

935 assert verification_attempt.status == StrongVerificationAttemptStatus.duplicate 

936 

937 push_collector.assert_user_push_matches_fields( 

938 user2.id, 

939 title="Strong Verification failed", 

940 body="You tried to verify with a passport that has already been used for verification. Please use another passport.", 

941 ) 

942 

943 

944def test_strong_verification_non_passport(db, monkeypatch, push_collector): 

945 monkeypatch_sv_config(monkeypatch) 

946 

947 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

948 _, superuser_token = generate_user(is_superuser=True) 

949 

950 reference_data = do_and_check_sv( 

951 user, 

952 token, 

953 verification_id=5731012934821984, 

954 sex="MALE", 

955 dob="1988-01-01", 

956 document_type="IDENTITY_CARD", 

957 document_number="31195855", 

958 document_expiry=default_expiry, 

959 nationality="US", 

960 return_after="APPROVED", 

961 ) 

962 

963 with patch("couchers.jobs.handlers.requests.post") as mock: 

964 json_resp2 = { 

965 "id": 5731012934821984, 

966 "created": "2024-05-11T15:46:46Z", 

967 "expires": "2024-05-11T16:17:26Z", 

968 "state": "APPROVED", 

969 "reference": reference_data, 

970 "user_ip": "10.123.123.123", 

971 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

972 "given_names": "John Wayne", 

973 "surname": "Doe", 

974 "nationality": "US", 

975 "sex": "MALE", 

976 "date_of_birth": "1988-01-01", 

977 "document_type": "IDENTITY_CARD", 

978 "document_number": "31195855", 

979 "expiry_date": default_expiry.isoformat(), 

980 "issuing_country": "US", 

981 "issuer": "Department of State, U.S. Government", 

982 "portrait": "dGVzdHRlc3R0ZXN0...", 

983 } 

984 mock.return_value = type( 

985 "__MockResponse", 

986 (), 

987 { 

988 "status_code": 200, 

989 "text": json.dumps(json_resp2), 

990 "json": lambda: json_resp2, 

991 }, 

992 ) 

993 while process_job(): 

994 pass 

995 

996 mock.assert_called_once_with( 

997 "https://passportreader.app/api/v1/session.get", 

998 auth=("dummy_pubkey", "dummy_secret"), 

999 json={"id": 5731012934821984}, 

1000 timeout=10, 

1001 ) 

1002 

1003 with session_scope() as session: 

1004 verification_attempt = session.execute( 

1005 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

1006 ).scalar_one() 

1007 assert verification_attempt.user_id == user.id 

1008 assert verification_attempt.status == StrongVerificationAttemptStatus.failed 

1009 

1010 push_collector.assert_user_push_matches_fields( 

1011 user.id, 

1012 title="Strong Verification failed", 

1013 body="You tried to verify with a document that is not a passport. You can only use a passport for Strong Verification.", 

1014 )