Coverage for src/tests/test_strong_verification.py: 99%

372 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-31 14:14 +0000

1import json 

2from datetime import date, timedelta 

3from unittest.mock import ANY, patch 

4from urllib.parse import urlencode 

5 

6import grpc 

7import pytest 

8from google.protobuf import empty_pb2 

9from sqlalchemy import select, update 

10from sqlalchemy.sql import or_ 

11 

12import couchers.jobs.handlers 

13import couchers.servicers.account 

14from couchers.config import config 

15from couchers.crypto import asym_decrypt, b64encode_unpadded 

16from couchers.db import session_scope 

17from couchers.jobs.handlers import update_badges 

18from couchers.jobs.worker import process_job 

19from couchers.materialized_views import refresh_materialized_views_rapid 

20from couchers.models import ( 

21 PassportSex, 

22 StrongVerificationAttempt, 

23 StrongVerificationAttemptStatus, 

24 StrongVerificationCallbackEvent, 

25 User, 

26) 

27from couchers.proto import account_pb2, admin_pb2, api_pb2 

28from couchers.proto.google.api import httpbody_pb2 

29from tests.test_fixtures import ( # noqa 

30 account_session, 

31 api_session, 

32 db, 

33 generate_user, 

34 push_collector, 

35 real_admin_session, 

36 real_iris_session, 

37 testconfig, 

38) 

39 

40 

41@pytest.fixture(autouse=True) 

42def _(testconfig): 

43 pass 

44 

45 

46def _emulate_iris_callback(session_id, session_state, reference): 

47 assert session_state in ["CREATED", "INITIATED", "FAILED", "ABORTED", "COMPLETED", "REJECTED", "APPROVED"] 

48 with real_iris_session() as iris: 

49 data = json.dumps( 

50 {"session_id": session_id, "session_state": session_state, "session_reference": reference} 

51 ).encode("ascii") 

52 iris.Webhook(httpbody_pb2.HttpBody(content_type="application/json", data=data)) 

53 

54 

55default_expiry = date.today() + timedelta(days=5 * 365) 

56 

57 

58def do_and_check_sv( 

59 user, 

60 token, 

61 verification_id, 

62 sex, 

63 dob, 

64 document_type, 

65 document_number, 

66 document_expiry, 

67 nationality, 

68 return_after=None, 

69): 

70 iris_token_data = { 

71 "merchant_id": 5731012934821982, 

72 "session_id": verification_id, 

73 "seed": 1674246339, 

74 "face_verification": False, 

75 "host": "https://passportreader.app", 

76 } 

77 iris_token = b64encode_unpadded(json.dumps(iris_token_data).encode("utf8")) 

78 

79 with account_session(token) as account: 

80 # start by initiation 

81 with patch("couchers.servicers.account.requests.post") as mock: 

82 json_resp1 = { 

83 "id": verification_id, 

84 "token": iris_token, 

85 } 

86 mock.return_value = type( 

87 "__MockResponse", 

88 (), 

89 { 

90 "status_code": 200, 

91 "text": json.dumps(json_resp1), 

92 "json": lambda: json_resp1, 

93 }, 

94 ) 

95 res = account.InitiateStrongVerification(empty_pb2.Empty()) 

96 mock.assert_called_once_with( 

97 "https://passportreader.app/api/v1/session.create", 

98 auth=("dummy_pubkey", "dummy_secret"), 

99 json={ 

100 "callback_url": "http://localhost:8888/iris/webhook", 

101 "face_verification": False, 

102 "passport_only": True, 

103 "reference": ANY, 

104 }, 

105 timeout=10, 

106 verify="/etc/ssl/certs/ca-certificates.crt", 

107 ) 

108 reference_data = mock.call_args.kwargs["json"]["reference"] 

109 verification_attempt_token = res.verification_attempt_token 

110 return_url = f"http://localhost:3000/complete-strong-verification?verification_attempt_token={verification_attempt_token}" 

111 assert res.redirect_url == "https://passportreader.app/open?" + urlencode( 

112 {"token": iris_token, "redirect_url": return_url} 

113 ) 

114 

115 assert ( 

116 account.GetStrongVerificationAttemptStatus( 

117 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

118 ).status 

119 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP 

120 ) 

121 

122 # ok, now the user downloads the app, scans their id, and Iris ID sends callbacks to the server 

123 _emulate_iris_callback(verification_id, "INITIATED", reference_data) 

124 

125 with account_session(token) as account: 

126 assert ( 

127 account.GetStrongVerificationAttemptStatus( 

128 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

129 ).status 

130 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP 

131 ) 

132 

133 if return_after == "INITIATED": 

134 return reference_data 

135 

136 _emulate_iris_callback(verification_id, "COMPLETED", reference_data) 

137 

138 with account_session(token) as account: 

139 assert ( 

140 account.GetStrongVerificationAttemptStatus( 

141 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

142 ).status 

143 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

144 ) 

145 

146 if return_after == "COMPLETED": 

147 return reference_data 

148 

149 _emulate_iris_callback(verification_id, "APPROVED", reference_data) 

150 

151 with account_session(token) as account: 

152 assert ( 

153 account.GetStrongVerificationAttemptStatus( 

154 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

155 ).status 

156 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

157 ) 

158 

159 if return_after == "APPROVED": 

160 return reference_data 

161 

162 with patch("couchers.jobs.handlers.requests.post") as mock: 

163 json_resp2 = { 

164 "id": verification_id, 

165 "created": "2024-05-11T15:46:46Z", 

166 "expires": "2024-05-11T16:17:26Z", 

167 "state": "APPROVED", 

168 "reference": reference_data, 

169 "user_ip": "10.123.123.123", 

170 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

171 "given_names": "John Wayne", 

172 "surname": "Doe", 

173 "nationality": nationality, 

174 "sex": sex, 

175 "date_of_birth": dob, 

176 "document_type": document_type, 

177 "document_number": document_number, 

178 "expiry_date": document_expiry.isoformat(), 

179 "issuing_country": nationality, 

180 "issuer": "Department of State, U.S. Government", 

181 "portrait": "dGVzdHRlc3R0ZXN0...", 

182 } 

183 mock.return_value = type( 

184 "__MockResponse", 

185 (), 

186 { 

187 "status_code": 200, 

188 "text": json.dumps(json_resp2), 

189 "json": lambda: json_resp2, 

190 }, 

191 ) 

192 while process_job(): 

193 pass 

194 

195 mock.assert_called_once_with( 

196 "https://passportreader.app/api/v1/session.get", 

197 auth=("dummy_pubkey", "dummy_secret"), 

198 json={"id": verification_id}, 

199 timeout=10, 

200 verify="/etc/ssl/certs/ca-certificates.crt", 

201 ) 

202 

203 with account_session(token) as account: 

204 assert ( 

205 account.GetStrongVerificationAttemptStatus( 

206 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

207 ).status 

208 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED 

209 ) 

210 

211 with session_scope() as session: 

212 verification_attempt = session.execute( 

213 select(StrongVerificationAttempt).where( 

214 StrongVerificationAttempt.verification_attempt_token == verification_attempt_token 

215 ) 

216 ).scalar_one() 

217 assert verification_attempt.user_id == user.id 

218 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

219 assert verification_attempt.has_full_data 

220 assert verification_attempt.passport_encrypted_data 

221 # assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

222 # assert verification_attempt.passport_sex == PassportSex.male 

223 assert verification_attempt.has_minimal_data 

224 assert verification_attempt.passport_expiry_date == document_expiry 

225 assert verification_attempt.passport_nationality == nationality 

226 assert verification_attempt.passport_last_three_document_chars == document_number[-3:] 

227 assert verification_attempt.iris_token == iris_token 

228 assert verification_attempt.iris_session_id == verification_id 

229 

230 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

231 decrypted_data = json.loads(asym_decrypt(private_key, verification_attempt.passport_encrypted_data)) 

232 assert decrypted_data == json_resp2 

233 

234 callbacks = ( 

235 session.execute( 

236 select(StrongVerificationCallbackEvent.iris_status) 

237 .where(StrongVerificationCallbackEvent.verification_attempt_id == verification_attempt.id) 

238 .order_by(StrongVerificationCallbackEvent.created.asc()) 

239 ) 

240 .scalars() 

241 .all() 

242 ) 

243 assert callbacks == ["INITIATED", "COMPLETED", "APPROVED"] 

244 

245 

246def monkeypatch_sv_config(monkeypatch): 

247 new_config = config.copy() 

248 new_config["ENABLE_STRONG_VERIFICATION"] = True 

249 new_config["IRIS_ID_PUBKEY"] = "dummy_pubkey" 

250 new_config["IRIS_ID_SECRET"] = "dummy_secret" 

251 new_config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

252 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

253 ) 

254 

255 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

256 

257 monkeypatch.setattr(couchers.servicers.account, "config", new_config) 

258 monkeypatch.setattr(couchers.jobs.handlers, "config", new_config) 

259 

260 

261def test_strong_verification_happy_path(db, monkeypatch): 

262 monkeypatch_sv_config(monkeypatch) 

263 

264 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

265 _, superuser_token = generate_user(is_superuser=True) 

266 

267 update_badges(empty_pb2.Empty()) 

268 refresh_materialized_views_rapid(None) 

269 

270 with api_session(token) as api: 

271 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

272 assert "strong_verification" not in res.badges 

273 assert not res.has_strong_verification 

274 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

275 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

276 assert ( 

277 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

278 == res.has_strong_verification 

279 ) 

280 

281 do_and_check_sv( 

282 user, 

283 token, 

284 verification_id=5731012934821983, 

285 sex="MALE", 

286 dob="1988-01-01", 

287 document_type="PASSPORT", 

288 document_number="31195855", 

289 document_expiry=default_expiry, 

290 nationality="US", 

291 ) 

292 

293 with session_scope() as session: 

294 verification_attempt = session.execute( 

295 select(StrongVerificationAttempt).where(StrongVerificationAttempt.user_id == user.id) 

296 ).scalar_one() 

297 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

298 assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

299 assert verification_attempt.passport_sex == PassportSex.male 

300 assert verification_attempt.passport_expiry_date == default_expiry 

301 assert verification_attempt.passport_nationality == "US" 

302 assert verification_attempt.passport_last_three_document_chars == "855" 

303 

304 update_badges(empty_pb2.Empty()) 

305 refresh_materialized_views_rapid(None) 

306 

307 # the user should now have strong verification 

308 with api_session(token) as api: 

309 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

310 assert "strong_verification" in res.badges 

311 assert res.has_strong_verification 

312 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

313 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

314 assert ( 

315 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

316 == res.has_strong_verification 

317 ) 

318 

319 # wrong dob = no badge 

320 with session_scope() as session: 

321 session.execute(update(User).where(User.id == user.id).values(birthdate=date(1988, 1, 2))) 

322 

323 update_badges(empty_pb2.Empty()) 

324 refresh_materialized_views_rapid(None) 

325 

326 with api_session(token) as api: 

327 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

328 assert "strong_verification" not in res.badges 

329 assert not res.has_strong_verification 

330 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH 

331 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

332 assert ( 

333 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

334 == res.has_strong_verification 

335 ) 

336 

337 # bad gender-sex correspondence = no badge 

338 with session_scope() as session: 

339 session.execute(update(User).where(User.id == user.id).values(birthdate=date(1988, 1, 1), gender="Woman")) 

340 

341 update_badges(empty_pb2.Empty()) 

342 refresh_materialized_views_rapid(None) 

343 

344 with api_session(token) as api: 

345 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

346 assert "strong_verification" not in res.badges 

347 assert not res.has_strong_verification 

348 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

349 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

350 assert ( 

351 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

352 == res.has_strong_verification 

353 ) 

354 

355 with account_session(token) as account: 

356 res = account.GetAccountInfo(empty_pb2.Empty()) 

357 assert not res.has_strong_verification 

358 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

359 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

360 

361 # back to should have a badge 

362 with session_scope() as session: 

363 session.execute(update(User).where(User.id == user.id).values(gender="Man")) 

364 

365 update_badges(empty_pb2.Empty()) 

366 refresh_materialized_views_rapid(None) 

367 

368 with api_session(token) as api: 

369 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

370 assert "strong_verification" in res.badges 

371 assert res.has_strong_verification 

372 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

373 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

374 assert ( 

375 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

376 == res.has_strong_verification 

377 ) 

378 with account_session(token) as account: 

379 assert not any( 

380 reminder.HasField("complete_verification_reminder") 

381 for reminder in account.GetReminders(empty_pb2.Empty()).reminders 

382 ) 

383 

384 # check has_passport_sex_gender_exception 

385 with real_admin_session(superuser_token) as admin: 

386 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

387 assert "strong_verification" in res.badges 

388 assert res.has_strong_verification 

389 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

390 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

391 

392 admin.SetPassportSexGenderException( 

393 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=True) 

394 ) 

395 admin.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=user.username, gender="Woman")) 

396 

397 update_badges(empty_pb2.Empty()) 

398 refresh_materialized_views_rapid(None) 

399 

400 with api_session(token) as api: 

401 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

402 assert "strong_verification" in res.badges 

403 assert res.has_strong_verification 

404 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

405 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

406 assert ( 

407 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

408 == res.has_strong_verification 

409 ) 

410 with account_session(token) as account: 

411 assert not any( 

412 reminder.HasField("complete_verification_reminder") 

413 for reminder in account.GetReminders(empty_pb2.Empty()).reminders 

414 ) 

415 

416 with real_admin_session(superuser_token) as admin: 

417 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

418 assert "strong_verification" in res.badges 

419 assert res.has_strong_verification 

420 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

421 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

422 

423 # now turn exception off 

424 admin.SetPassportSexGenderException( 

425 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=False) 

426 ) 

427 

428 update_badges(empty_pb2.Empty()) 

429 refresh_materialized_views_rapid(None) 

430 

431 with api_session(token) as api: 

432 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

433 assert "strong_verification" not in res.badges 

434 assert not res.has_strong_verification 

435 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

436 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

437 assert ( 

438 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

439 == res.has_strong_verification 

440 ) 

441 with account_session(token) as account: 

442 assert any( 

443 reminder.HasField("complete_verification_reminder") 

444 for reminder in account.GetReminders(empty_pb2.Empty()).reminders 

445 ) 

446 

447 with real_admin_session(superuser_token) as admin: 

448 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

449 assert "strong_verification" not in res.badges 

450 assert not res.has_strong_verification 

451 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

452 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

453 

454 

455def test_strong_verification_delete_data(db, monkeypatch): 

456 monkeypatch_sv_config(monkeypatch) 

457 

458 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

459 _, superuser_token = generate_user(is_superuser=True) 

460 

461 refresh_materialized_views_rapid(None) 

462 

463 with api_session(token) as api: 

464 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

465 assert ( 

466 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

467 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

468 ) 

469 

470 # can remove SV data even if there is none, should do nothing 

471 with account_session(token) as account: 

472 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

473 

474 do_and_check_sv( 

475 user, 

476 token, 

477 verification_id=5731012934821983, 

478 sex="MALE", 

479 dob="1988-01-01", 

480 document_type="PASSPORT", 

481 document_number="31195855", 

482 document_expiry=default_expiry, 

483 nationality="US", 

484 ) 

485 

486 refresh_materialized_views_rapid(None) 

487 

488 # the user should now have strong verification 

489 with api_session(token) as api: 

490 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

491 assert ( 

492 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

493 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

494 ) 

495 

496 # check removing SV data 

497 with account_session(token) as account: 

498 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

499 

500 refresh_materialized_views_rapid(None) 

501 

502 with api_session(token) as api: 

503 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

504 assert ( 

505 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

506 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

507 ) 

508 

509 with session_scope() as session: 

510 assert ( 

511 len( 

512 session.execute( 

513 select(StrongVerificationAttempt).where( 

514 or_( 

515 StrongVerificationAttempt.passport_encrypted_data != None, 

516 StrongVerificationAttempt.passport_date_of_birth != None, 

517 StrongVerificationAttempt.passport_sex != None, 

518 ) 

519 ) 

520 ) 

521 .scalars() 

522 .all() 

523 ) 

524 == 0 

525 ) 

526 

527 

528def test_strong_verification_expiry(db, monkeypatch): 

529 monkeypatch_sv_config(monkeypatch) 

530 

531 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

532 _, superuser_token = generate_user(is_superuser=True) 

533 

534 refresh_materialized_views_rapid(None) 

535 

536 with api_session(token) as api: 

537 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

538 assert ( 

539 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

540 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

541 ) 

542 

543 expiry = date.today() + timedelta(days=10) 

544 

545 do_and_check_sv( 

546 user, 

547 token, 

548 verification_id=5731012934821983, 

549 sex="MALE", 

550 dob="1988-01-01", 

551 document_type="PASSPORT", 

552 document_number="31195855", 

553 document_expiry=expiry, 

554 nationality="US", 

555 ) 

556 

557 # the user should now have strong verification 

558 with api_session(token) as api: 

559 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

560 assert res.has_strong_verification 

561 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

562 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

563 

564 with session_scope() as session: 

565 attempt = session.execute(select(StrongVerificationAttempt)).scalars().one() 

566 attempt.passport_expiry_date = date.today() - timedelta(days=2) 

567 

568 with api_session(token) as api: 

569 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

570 assert not res.has_strong_verification 

571 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

572 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

573 

574 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

575 assert not res.has_strong_verification 

576 assert not res.has_strong_verification 

577 

578 do_and_check_sv( 

579 user, 

580 token, 

581 verification_id=5731012934821985, 

582 sex="MALE", 

583 dob="1988-01-01", 

584 document_type="PASSPORT", 

585 document_number="PA41323412", 

586 document_expiry=date.today() + timedelta(days=365), 

587 nationality="AU", 

588 ) 

589 

590 refresh_materialized_views_rapid(None) 

591 

592 with api_session(token) as api: 

593 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

594 assert ( 

595 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

596 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

597 ) 

598 

599 

600def test_strong_verification_regression(db, monkeypatch): 

601 monkeypatch_sv_config(monkeypatch) 

602 

603 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

604 

605 do_and_check_sv( 

606 user, 

607 token, 

608 verification_id=5731012934821983, 

609 sex="MALE", 

610 dob="1988-01-01", 

611 document_type="PASSPORT", 

612 document_number="31195855", 

613 document_expiry=default_expiry, 

614 nationality="US", 

615 return_after="INITIATED", 

616 ) 

617 

618 with api_session(token) as api: 

619 api.Ping(api_pb2.PingReq()) 

620 

621 

622def test_strong_verification_regression2(db, monkeypatch): 

623 monkeypatch_sv_config(monkeypatch) 

624 

625 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

626 

627 do_and_check_sv( 

628 user, 

629 token, 

630 verification_id=5731012934821983, 

631 sex="MALE", 

632 dob="1988-01-01", 

633 document_type="PASSPORT", 

634 document_number="31195855", 

635 document_expiry=default_expiry, 

636 nationality="US", 

637 return_after="INITIATED", 

638 ) 

639 

640 do_and_check_sv( 

641 user, 

642 token, 

643 verification_id=5731012934821985, 

644 sex="MALE", 

645 dob="1988-01-01", 

646 document_type="PASSPORT", 

647 document_number="PA41323412", 

648 document_expiry=default_expiry, 

649 nationality="AU", 

650 ) 

651 

652 refresh_materialized_views_rapid(None) 

653 

654 with api_session(token) as api: 

655 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

656 assert ( 

657 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

658 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

659 ) 

660 

661 

662def test_strong_verification_disabled(db): 

663 user, token = generate_user() 

664 

665 with account_session(token) as account: 

666 with pytest.raises(grpc.RpcError) as e: 

667 account.InitiateStrongVerification(empty_pb2.Empty()) 

668 assert e.value.code() == grpc.StatusCode.UNAVAILABLE 

669 assert e.value.details() == "Strong verification is currently disabled." 

670 

671 

672def test_strong_verification_delete_data_cant_reverify(db, monkeypatch, push_collector): 

673 monkeypatch_sv_config(monkeypatch) 

674 

675 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

676 _, superuser_token = generate_user(is_superuser=True) 

677 

678 refresh_materialized_views_rapid(None) 

679 

680 with api_session(token) as api: 

681 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

682 assert ( 

683 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

684 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

685 ) 

686 

687 do_and_check_sv( 

688 user, 

689 token, 

690 verification_id=5731012934821983, 

691 sex="MALE", 

692 dob="1988-01-01", 

693 document_type="PASSPORT", 

694 document_number="31195855", 

695 document_expiry=default_expiry, 

696 nationality="US", 

697 ) 

698 

699 refresh_materialized_views_rapid(None) 

700 

701 # the user should now have strong verification 

702 with api_session(token) as api: 

703 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

704 assert ( 

705 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

706 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

707 ) 

708 

709 # check removing SV data 

710 with account_session(token) as account: 

711 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

712 

713 refresh_materialized_views_rapid(None) 

714 

715 with api_session(token) as api: 

716 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

717 assert ( 

718 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

719 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

720 ) 

721 

722 with session_scope() as session: 

723 assert ( 

724 len( 

725 session.execute( 

726 select(StrongVerificationAttempt).where( 

727 or_( 

728 StrongVerificationAttempt.passport_encrypted_data != None, 

729 StrongVerificationAttempt.passport_date_of_birth != None, 

730 StrongVerificationAttempt.passport_sex != None, 

731 ) 

732 ) 

733 ) 

734 .scalars() 

735 .all() 

736 ) 

737 == 0 

738 ) 

739 

740 reference_data = do_and_check_sv( 

741 user, 

742 token, 

743 verification_id=5731012934821984, 

744 sex="MALE", 

745 dob="1988-01-01", 

746 document_type="PASSPORT", 

747 document_number="31195855", 

748 document_expiry=default_expiry, 

749 nationality="US", 

750 return_after="APPROVED", 

751 ) 

752 

753 with patch("couchers.jobs.handlers.requests.post") as mock: 

754 json_resp2 = { 

755 "id": 5731012934821984, 

756 "created": "2024-05-11T15:46:46Z", 

757 "expires": "2024-05-11T16:17:26Z", 

758 "state": "APPROVED", 

759 "reference": reference_data, 

760 "user_ip": "10.123.123.123", 

761 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

762 "given_names": "John Wayne", 

763 "surname": "Doe", 

764 "nationality": "US", 

765 "sex": "MALE", 

766 "date_of_birth": "1988-01-01", 

767 "document_type": "PASSPORT", 

768 "document_number": "31195855", 

769 "expiry_date": default_expiry.isoformat(), 

770 "issuing_country": "US", 

771 "issuer": "Department of State, U.S. Government", 

772 "portrait": "dGVzdHRlc3R0ZXN0...", 

773 } 

774 mock.return_value = type( 

775 "__MockResponse", 

776 (), 

777 { 

778 "status_code": 200, 

779 "text": json.dumps(json_resp2), 

780 "json": lambda: json_resp2, 

781 }, 

782 ) 

783 while process_job(): 

784 pass 

785 

786 mock.assert_called_once_with( 

787 "https://passportreader.app/api/v1/session.get", 

788 auth=("dummy_pubkey", "dummy_secret"), 

789 json={"id": 5731012934821984}, 

790 timeout=10, 

791 verify="/etc/ssl/certs/ca-certificates.crt", 

792 ) 

793 

794 with session_scope() as session: 

795 verification_attempt = session.execute( 

796 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

797 ).scalar_one() 

798 assert verification_attempt.user_id == user.id 

799 assert verification_attempt.status == StrongVerificationAttemptStatus.duplicate 

800 

801 push_collector.assert_user_push_matches_fields( 

802 user.id, 

803 ix=1, 

804 title="Strong Verification failed", 

805 body="You tried to verify with a passport that has already been used for verification. Please use another passport.", 

806 ) 

807 

808 refresh_materialized_views_rapid(None) 

809 

810 with api_session(token) as api: 

811 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

812 assert ( 

813 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

814 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

815 ) 

816 

817 

818def test_strong_verification_duplicate_other_user(db, monkeypatch, push_collector): 

819 monkeypatch_sv_config(monkeypatch) 

820 

821 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

822 user2, token2 = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

823 _, superuser_token = generate_user(is_superuser=True) 

824 

825 refresh_materialized_views_rapid(None) 

826 

827 with api_session(token) as api: 

828 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

829 assert ( 

830 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

831 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

832 ) 

833 

834 # can remove SV data even if there is none, should do nothing 

835 with account_session(token) as account: 

836 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

837 

838 do_and_check_sv( 

839 user, 

840 token, 

841 verification_id=5731012934821983, 

842 sex="MALE", 

843 dob="1988-01-01", 

844 document_type="PASSPORT", 

845 document_number="31195855", 

846 document_expiry=default_expiry, 

847 nationality="US", 

848 ) 

849 

850 refresh_materialized_views_rapid(None) 

851 

852 # the user should now have strong verification 

853 with api_session(token) as api: 

854 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

855 assert ( 

856 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

857 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

858 ) 

859 

860 # check removing SV data 

861 with account_session(token) as account: 

862 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

863 

864 refresh_materialized_views_rapid(None) 

865 

866 with api_session(token) as api: 

867 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

868 assert ( 

869 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

870 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

871 ) 

872 

873 with session_scope() as session: 

874 assert ( 

875 len( 

876 session.execute( 

877 select(StrongVerificationAttempt).where( 

878 or_( 

879 StrongVerificationAttempt.passport_encrypted_data != None, 

880 StrongVerificationAttempt.passport_date_of_birth != None, 

881 StrongVerificationAttempt.passport_sex != None, 

882 ) 

883 ) 

884 ) 

885 .scalars() 

886 .all() 

887 ) 

888 == 0 

889 ) 

890 

891 reference_data = do_and_check_sv( 

892 user2, 

893 token2, 

894 verification_id=5731012934821984, 

895 sex="MALE", 

896 dob="1988-01-01", 

897 document_type="PASSPORT", 

898 document_number="31195855", 

899 document_expiry=default_expiry, 

900 nationality="US", 

901 return_after="APPROVED", 

902 ) 

903 

904 with patch("couchers.jobs.handlers.requests.post") as mock: 

905 json_resp2 = { 

906 "id": 5731012934821984, 

907 "created": "2024-05-11T15:46:46Z", 

908 "expires": "2024-05-11T16:17:26Z", 

909 "state": "APPROVED", 

910 "reference": reference_data, 

911 "user_ip": "10.123.123.123", 

912 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

913 "given_names": "John Wayne", 

914 "surname": "Doe", 

915 "nationality": "US", 

916 "sex": "MALE", 

917 "date_of_birth": "1988-01-01", 

918 "document_type": "PASSPORT", 

919 "document_number": "31195855", 

920 "expiry_date": default_expiry.isoformat(), 

921 "issuing_country": "US", 

922 "issuer": "Department of State, U.S. Government", 

923 "portrait": "dGVzdHRlc3R0ZXN0...", 

924 } 

925 mock.return_value = type( 

926 "__MockResponse", 

927 (), 

928 { 

929 "status_code": 200, 

930 "text": json.dumps(json_resp2), 

931 "json": lambda: json_resp2, 

932 }, 

933 ) 

934 while process_job(): 

935 pass 

936 

937 mock.assert_called_once_with( 

938 "https://passportreader.app/api/v1/session.get", 

939 auth=("dummy_pubkey", "dummy_secret"), 

940 json={"id": 5731012934821984}, 

941 timeout=10, 

942 verify="/etc/ssl/certs/ca-certificates.crt", 

943 ) 

944 

945 with session_scope() as session: 

946 verification_attempt = session.execute( 

947 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

948 ).scalar_one() 

949 assert verification_attempt.user_id == user2.id 

950 assert verification_attempt.status == StrongVerificationAttemptStatus.duplicate 

951 

952 push_collector.assert_user_push_matches_fields( 

953 user2.id, 

954 title="Strong Verification failed", 

955 body="You tried to verify with a passport that has already been used for verification. Please use another passport.", 

956 ) 

957 

958 

959def test_strong_verification_non_passport(db, monkeypatch, push_collector): 

960 monkeypatch_sv_config(monkeypatch) 

961 

962 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

963 _, superuser_token = generate_user(is_superuser=True) 

964 

965 reference_data = do_and_check_sv( 

966 user, 

967 token, 

968 verification_id=5731012934821984, 

969 sex="MALE", 

970 dob="1988-01-01", 

971 document_type="IDENTITY_CARD", 

972 document_number="31195855", 

973 document_expiry=default_expiry, 

974 nationality="US", 

975 return_after="APPROVED", 

976 ) 

977 

978 with patch("couchers.jobs.handlers.requests.post") as mock: 

979 json_resp2 = { 

980 "id": 5731012934821984, 

981 "created": "2024-05-11T15:46:46Z", 

982 "expires": "2024-05-11T16:17:26Z", 

983 "state": "APPROVED", 

984 "reference": reference_data, 

985 "user_ip": "10.123.123.123", 

986 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

987 "given_names": "John Wayne", 

988 "surname": "Doe", 

989 "nationality": "US", 

990 "sex": "MALE", 

991 "date_of_birth": "1988-01-01", 

992 "document_type": "IDENTITY_CARD", 

993 "document_number": "31195855", 

994 "expiry_date": default_expiry.isoformat(), 

995 "issuing_country": "US", 

996 "issuer": "Department of State, U.S. Government", 

997 "portrait": "dGVzdHRlc3R0ZXN0...", 

998 } 

999 mock.return_value = type( 

1000 "__MockResponse", 

1001 (), 

1002 { 

1003 "status_code": 200, 

1004 "text": json.dumps(json_resp2), 

1005 "json": lambda: json_resp2, 

1006 }, 

1007 ) 

1008 while process_job(): 

1009 pass 

1010 

1011 mock.assert_called_once_with( 

1012 "https://passportreader.app/api/v1/session.get", 

1013 auth=("dummy_pubkey", "dummy_secret"), 

1014 json={"id": 5731012934821984}, 

1015 timeout=10, 

1016 verify="/etc/ssl/certs/ca-certificates.crt", 

1017 ) 

1018 

1019 with session_scope() as session: 

1020 verification_attempt = session.execute( 

1021 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

1022 ).scalar_one() 

1023 assert verification_attempt.user_id == user.id 

1024 assert verification_attempt.status == StrongVerificationAttemptStatus.failed 

1025 

1026 push_collector.assert_user_push_matches_fields( 

1027 user.id, 

1028 title="Strong Verification failed", 

1029 body="You tried to verify with a document that is not a passport. You can only use a passport for Strong Verification.", 

1030 )