Coverage for app/backend/src/tests/test_strong_verification.py: 99%

375 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import json 

2from datetime import date, timedelta 

3from unittest.mock import ANY, patch 

4from urllib.parse import urlencode 

5 

6import grpc 

7import pytest 

8from google.protobuf import empty_pb2 

9from sqlalchemy import select, update 

10from sqlalchemy.sql import or_ 

11 

12import couchers.jobs.handlers 

13import couchers.servicers.account 

14from couchers.config import config 

15from couchers.crypto import asym_decrypt, b64encode_unpadded 

16from couchers.db import session_scope 

17from couchers.jobs.handlers import update_badges 

18from couchers.jobs.worker import process_job 

19from couchers.materialized_views import refresh_materialized_views_rapid 

20from couchers.models import ( 

21 PassportSex, 

22 StrongVerificationAttempt, 

23 StrongVerificationAttemptStatus, 

24 StrongVerificationCallbackEvent, 

25 User, 

26) 

27from couchers.proto import account_pb2, admin_pb2, api_pb2 

28from couchers.proto.google.api import httpbody_pb2 

29from tests.fixtures.db import generate_user 

30from tests.fixtures.misc import PushCollector 

31from tests.fixtures.sessions import account_session, api_session, real_admin_session, real_iris_session 

32 

33 

34@pytest.fixture(autouse=True) 

35def _(testconfig): 

36 pass 

37 

38 

39def _emulate_iris_callback(session_id, session_state, reference): 

40 assert session_state in ["CREATED", "INITIATED", "FAILED", "ABORTED", "COMPLETED", "REJECTED", "APPROVED"] 

41 with real_iris_session() as iris: 

42 data = json.dumps( 

43 {"session_id": session_id, "session_state": session_state, "session_reference": reference} 

44 ).encode("ascii") 

45 iris.Webhook(httpbody_pb2.HttpBody(content_type="application/json", data=data)) 

46 

47 

48default_expiry = date.today() + timedelta(days=5 * 365) 

49 

50 

51def do_and_check_sv( 

52 user, 

53 token, 

54 verification_id, 

55 sex, 

56 dob, 

57 document_type, 

58 document_number, 

59 document_expiry, 

60 nationality, 

61 return_after=None, 

62): 

63 iris_token_data = { 

64 "merchant_id": 5731012934821982, 

65 "session_id": verification_id, 

66 "seed": 1674246339, 

67 "face_verification": False, 

68 "host": "https://passportreader.app", 

69 } 

70 iris_token = b64encode_unpadded(json.dumps(iris_token_data).encode("utf8")) 

71 

72 with account_session(token) as account: 

73 # start by initiation 

74 with patch("couchers.servicers.account.requests.post") as mock: 

75 json_resp1 = { 

76 "id": verification_id, 

77 "token": iris_token, 

78 } 

79 mock.return_value = type( 

80 "__MockResponse", 

81 (), 

82 { 

83 "status_code": 200, 

84 "text": json.dumps(json_resp1), 

85 "json": lambda: json_resp1, 

86 }, 

87 ) 

88 res = account.InitiateStrongVerification(empty_pb2.Empty()) 

89 mock.assert_called_once_with( 

90 "https://passportreader.app/api/v1/session.create", 

91 auth=("dummy_pubkey", "dummy_secret"), 

92 json={ 

93 "callback_url": "http://localhost:8888/iris/webhook", 

94 "face_verification": False, 

95 "passport_only": True, 

96 "reference": ANY, 

97 }, 

98 timeout=10, 

99 verify="/etc/ssl/certs/ca-certificates.crt", 

100 ) 

101 reference_data = mock.call_args.kwargs["json"]["reference"] 

102 verification_attempt_token = res.verification_attempt_token 

103 return_url = f"http://localhost:3000/complete-strong-verification?verification_attempt_token={verification_attempt_token}" 

104 assert res.redirect_url == "https://passportreader.app/open?" + urlencode( 

105 {"token": iris_token, "redirect_url": return_url} 

106 ) 

107 

108 assert ( 

109 account.GetStrongVerificationAttemptStatus( 

110 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

111 ).status 

112 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP 

113 ) 

114 

115 # ok, now the user downloads the app, scans their id, and Iris ID sends callbacks to the server 

116 _emulate_iris_callback(verification_id, "INITIATED", reference_data) 

117 

118 with account_session(token) as account: 

119 assert ( 

120 account.GetStrongVerificationAttemptStatus( 

121 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

122 ).status 

123 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP 

124 ) 

125 

126 if return_after == "INITIATED": 

127 return reference_data 

128 

129 _emulate_iris_callback(verification_id, "COMPLETED", reference_data) 

130 

131 with account_session(token) as account: 

132 assert ( 

133 account.GetStrongVerificationAttemptStatus( 

134 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

135 ).status 

136 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

137 ) 

138 

139 if return_after == "COMPLETED": 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 return reference_data 

141 

142 _emulate_iris_callback(verification_id, "APPROVED", reference_data) 

143 

144 with account_session(token) as account: 

145 assert ( 

146 account.GetStrongVerificationAttemptStatus( 

147 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

148 ).status 

149 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

150 ) 

151 

152 if return_after == "APPROVED": 

153 return reference_data 

154 

155 with patch("couchers.jobs.handlers.requests.post") as mock: 

156 json_resp2 = { 

157 "id": verification_id, 

158 "created": "2024-05-11T15:46:46Z", 

159 "expires": "2024-05-11T16:17:26Z", 

160 "state": "APPROVED", 

161 "reference": reference_data, 

162 "user_ip": "10.123.123.123", 

163 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

164 "given_names": "John Wayne", 

165 "surname": "Doe", 

166 "nationality": nationality, 

167 "sex": sex, 

168 "date_of_birth": dob, 

169 "document_type": document_type, 

170 "document_number": document_number, 

171 "expiry_date": document_expiry.isoformat(), 

172 "issuing_country": nationality, 

173 "issuer": "Department of State, U.S. Government", 

174 "portrait": "dGVzdHRlc3R0ZXN0...", 

175 } 

176 mock.return_value = type( 

177 "__MockResponse", 

178 (), 

179 { 

180 "status_code": 200, 

181 "text": json.dumps(json_resp2), 

182 "json": lambda: json_resp2, 

183 }, 

184 ) 

185 while process_job(): 

186 pass 

187 

188 mock.assert_called_once_with( 

189 "https://passportreader.app/api/v1/session.get", 

190 auth=("dummy_pubkey", "dummy_secret"), 

191 json={"id": verification_id}, 

192 timeout=10, 

193 verify="/etc/ssl/certs/ca-certificates.crt", 

194 ) 

195 

196 with account_session(token) as account: 

197 assert ( 

198 account.GetStrongVerificationAttemptStatus( 

199 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

200 ).status 

201 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED 

202 ) 

203 

204 with session_scope() as session: 

205 verification_attempt = session.execute( 

206 select(StrongVerificationAttempt).where( 

207 StrongVerificationAttempt.verification_attempt_token == verification_attempt_token 

208 ) 

209 ).scalar_one() 

210 assert verification_attempt.user_id == user.id 

211 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

212 assert verification_attempt.has_full_data 

213 assert verification_attempt.passport_encrypted_data 

214 # assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

215 # assert verification_attempt.passport_sex == PassportSex.male 

216 assert verification_attempt.has_minimal_data 

217 assert verification_attempt.passport_expiry_date == document_expiry 

218 assert verification_attempt.passport_nationality == nationality 

219 assert verification_attempt.passport_last_three_document_chars == document_number[-3:] 

220 assert verification_attempt.iris_token == iris_token 

221 assert verification_attempt.iris_session_id == verification_id 

222 

223 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

224 decrypted_data = json.loads(asym_decrypt(private_key, verification_attempt.passport_encrypted_data)) 

225 assert decrypted_data == json_resp2 

226 

227 callbacks = ( 

228 session.execute( 

229 select(StrongVerificationCallbackEvent.iris_status) 

230 .where(StrongVerificationCallbackEvent.verification_attempt_id == verification_attempt.id) 

231 .order_by(StrongVerificationCallbackEvent.created.asc()) 

232 ) 

233 .scalars() 

234 .all() 

235 ) 

236 assert callbacks == ["INITIATED", "COMPLETED", "APPROVED"] 

237 

238 

239def monkeypatch_sv_config(monkeypatch): 

240 new_config = config.copy() 

241 new_config.IRIS_ID_PUBKEY = "dummy_pubkey" 

242 new_config.IRIS_ID_SECRET = "dummy_secret" 

243 new_config.VERIFICATION_DATA_PUBLIC_KEY = bytes.fromhex( 

244 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

245 ) 

246 

247 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

248 

249 monkeypatch.setattr(couchers.servicers.account, "config", new_config) 

250 monkeypatch.setattr(couchers.jobs.handlers, "config", new_config) 

251 

252 

253def test_strong_verification_happy_path(db, monkeypatch): 

254 monkeypatch_sv_config(monkeypatch) 

255 

256 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

257 _, superuser_token = generate_user(is_superuser=True) 

258 

259 update_badges(empty_pb2.Empty()) 

260 refresh_materialized_views_rapid(empty_pb2.Empty()) 

261 

262 with api_session(token) as api: 

263 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

264 assert "strong_verification" not in res.badges 

265 assert not res.has_strong_verification 

266 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

267 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

268 assert ( 

269 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

270 == res.has_strong_verification 

271 ) 

272 

273 do_and_check_sv( 

274 user, 

275 token, 

276 verification_id=5731012934821983, 

277 sex="MALE", 

278 dob="1988-01-01", 

279 document_type="PASSPORT", 

280 document_number="31195855", 

281 document_expiry=default_expiry, 

282 nationality="US", 

283 ) 

284 

285 with session_scope() as session: 

286 verification_attempt = session.execute( 

287 select(StrongVerificationAttempt).where(StrongVerificationAttempt.user_id == user.id) 

288 ).scalar_one() 

289 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

290 assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

291 assert verification_attempt.passport_sex == PassportSex.male 

292 assert verification_attempt.passport_expiry_date == default_expiry 

293 assert verification_attempt.passport_nationality == "US" 

294 assert verification_attempt.passport_last_three_document_chars == "855" 

295 

296 update_badges(empty_pb2.Empty()) 

297 refresh_materialized_views_rapid(empty_pb2.Empty()) 

298 

299 # the user should now have strong verification 

300 with api_session(token) as api: 

301 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

302 assert "strong_verification" in res.badges 

303 assert res.has_strong_verification 

304 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

305 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

306 assert ( 

307 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

308 == res.has_strong_verification 

309 ) 

310 

311 # wrong dob = no badge 

312 with session_scope() as session: 

313 session.execute(update(User).where(User.id == user.id).values(birthdate=date(1988, 1, 2))) 

314 

315 update_badges(empty_pb2.Empty()) 

316 refresh_materialized_views_rapid(empty_pb2.Empty()) 

317 

318 with api_session(token) as api: 

319 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

320 assert "strong_verification" not in res.badges 

321 assert not res.has_strong_verification 

322 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH 

323 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

324 assert ( 

325 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

326 == res.has_strong_verification 

327 ) 

328 

329 # bad gender-sex correspondence = no badge 

330 with session_scope() as session: 

331 session.execute(update(User).where(User.id == user.id).values(birthdate=date(1988, 1, 1), gender="Woman")) 

332 

333 update_badges(empty_pb2.Empty()) 

334 refresh_materialized_views_rapid(empty_pb2.Empty()) 

335 

336 with api_session(token) as api: 

337 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

338 assert "strong_verification" not in res.badges 

339 assert not res.has_strong_verification 

340 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

341 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

342 assert ( 

343 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

344 == res.has_strong_verification 

345 ) 

346 

347 with account_session(token) as account: 

348 res = account.GetAccountInfo(empty_pb2.Empty()) 

349 assert not res.has_strong_verification 

350 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

351 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

352 

353 # back to should have a badge 

354 with session_scope() as session: 

355 session.execute(update(User).where(User.id == user.id).values(gender="Man")) 

356 

357 update_badges(empty_pb2.Empty()) 

358 refresh_materialized_views_rapid(empty_pb2.Empty()) 

359 

360 with api_session(token) as api: 

361 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

362 assert "strong_verification" in res.badges 

363 assert res.has_strong_verification 

364 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

365 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

366 assert ( 

367 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

368 == res.has_strong_verification 

369 ) 

370 

371 # check has_passport_sex_gender_exception 

372 with real_admin_session(superuser_token) as admin: 

373 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

374 assert "strong_verification" in res.badges 

375 assert res.has_strong_verification 

376 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

377 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

378 

379 admin.SetPassportSexGenderException( 

380 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=True) 

381 ) 

382 admin.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=user.username, gender="Woman")) 

383 

384 update_badges(empty_pb2.Empty()) 

385 refresh_materialized_views_rapid(empty_pb2.Empty()) 

386 

387 with api_session(token) as api: 

388 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

389 assert "strong_verification" in res.badges 

390 assert res.has_strong_verification 

391 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

392 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

393 assert ( 

394 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

395 == res.has_strong_verification 

396 ) 

397 

398 with real_admin_session(superuser_token) as admin: 

399 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

400 assert "strong_verification" in res.badges 

401 assert res.has_strong_verification 

402 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

403 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

404 

405 # now turn exception off 

406 admin.SetPassportSexGenderException( 

407 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=False) 

408 ) 

409 

410 update_badges(empty_pb2.Empty()) 

411 refresh_materialized_views_rapid(empty_pb2.Empty()) 

412 

413 with api_session(token) as api: 

414 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

415 assert "strong_verification" not in res.badges 

416 assert not res.has_strong_verification 

417 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

418 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

419 assert ( 

420 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

421 == res.has_strong_verification 

422 ) 

423 

424 with real_admin_session(superuser_token) as admin: 

425 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

426 assert "strong_verification" not in res.badges 

427 assert not res.has_strong_verification 

428 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

429 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

430 

431 

432def test_strong_verification_delete_data(db, monkeypatch): 

433 monkeypatch_sv_config(monkeypatch) 

434 

435 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

436 _, superuser_token = generate_user(is_superuser=True) 

437 

438 refresh_materialized_views_rapid(empty_pb2.Empty()) 

439 

440 with api_session(token) as api: 

441 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

442 assert ( 

443 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

444 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

445 ) 

446 

447 # can remove SV data even if there is none, should do nothing 

448 with account_session(token) as account: 

449 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

450 

451 do_and_check_sv( 

452 user, 

453 token, 

454 verification_id=5731012934821983, 

455 sex="MALE", 

456 dob="1988-01-01", 

457 document_type="PASSPORT", 

458 document_number="31195855", 

459 document_expiry=default_expiry, 

460 nationality="US", 

461 ) 

462 

463 refresh_materialized_views_rapid(empty_pb2.Empty()) 

464 

465 # the user should now have strong verification 

466 with api_session(token) as api: 

467 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

468 assert ( 

469 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

470 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

471 ) 

472 

473 # check removing SV data 

474 with account_session(token) as account: 

475 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

476 

477 refresh_materialized_views_rapid(empty_pb2.Empty()) 

478 

479 with api_session(token) as api: 

480 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

481 assert ( 

482 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

483 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

484 ) 

485 

486 with session_scope() as session: 

487 assert ( 

488 len( 

489 session.execute( 

490 select(StrongVerificationAttempt).where( 

491 or_( 

492 StrongVerificationAttempt.passport_encrypted_data != None, 

493 StrongVerificationAttempt.passport_date_of_birth != None, 

494 StrongVerificationAttempt.passport_sex != None, 

495 ) 

496 ) 

497 ) 

498 .scalars() 

499 .all() 

500 ) 

501 == 0 

502 ) 

503 

504 

505def test_strong_verification_expiry(db, monkeypatch): 

506 monkeypatch_sv_config(monkeypatch) 

507 

508 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

509 _, superuser_token = generate_user(is_superuser=True) 

510 

511 refresh_materialized_views_rapid(empty_pb2.Empty()) 

512 

513 with api_session(token) as api: 

514 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

515 assert ( 

516 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

517 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

518 ) 

519 

520 expiry = date.today() + timedelta(days=10) 

521 

522 do_and_check_sv( 

523 user, 

524 token, 

525 verification_id=5731012934821983, 

526 sex="MALE", 

527 dob="1988-01-01", 

528 document_type="PASSPORT", 

529 document_number="31195855", 

530 document_expiry=expiry, 

531 nationality="US", 

532 ) 

533 

534 # the user should now have strong verification 

535 with api_session(token) as api: 

536 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

537 assert res.has_strong_verification 

538 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

539 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

540 

541 with session_scope() as session: 

542 attempt = session.execute(select(StrongVerificationAttempt)).scalars().one() 

543 attempt.passport_expiry_date = date.today() - timedelta(days=2) 

544 

545 with api_session(token) as api: 

546 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

547 assert not res.has_strong_verification 

548 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

549 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

550 

551 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

552 assert not res.has_strong_verification 

553 assert not res.has_strong_verification 

554 

555 do_and_check_sv( 

556 user, 

557 token, 

558 verification_id=5731012934821985, 

559 sex="MALE", 

560 dob="1988-01-01", 

561 document_type="PASSPORT", 

562 document_number="PA41323412", 

563 document_expiry=date.today() + timedelta(days=365), 

564 nationality="AU", 

565 ) 

566 

567 refresh_materialized_views_rapid(empty_pb2.Empty()) 

568 

569 with api_session(token) as api: 

570 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

571 assert ( 

572 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

573 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

574 ) 

575 

576 

577def test_strong_verification_regression(db, monkeypatch): 

578 monkeypatch_sv_config(monkeypatch) 

579 

580 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

581 

582 do_and_check_sv( 

583 user, 

584 token, 

585 verification_id=5731012934821983, 

586 sex="MALE", 

587 dob="1988-01-01", 

588 document_type="PASSPORT", 

589 document_number="31195855", 

590 document_expiry=default_expiry, 

591 nationality="US", 

592 return_after="INITIATED", 

593 ) 

594 

595 with api_session(token) as api: 

596 api.Ping(api_pb2.PingReq()) 

597 

598 

599def test_strong_verification_regression2(db, monkeypatch): 

600 monkeypatch_sv_config(monkeypatch) 

601 

602 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

603 

604 do_and_check_sv( 

605 user, 

606 token, 

607 verification_id=5731012934821983, 

608 sex="MALE", 

609 dob="1988-01-01", 

610 document_type="PASSPORT", 

611 document_number="31195855", 

612 document_expiry=default_expiry, 

613 nationality="US", 

614 return_after="INITIATED", 

615 ) 

616 

617 do_and_check_sv( 

618 user, 

619 token, 

620 verification_id=5731012934821985, 

621 sex="MALE", 

622 dob="1988-01-01", 

623 document_type="PASSPORT", 

624 document_number="PA41323412", 

625 document_expiry=default_expiry, 

626 nationality="AU", 

627 ) 

628 

629 refresh_materialized_views_rapid(empty_pb2.Empty()) 

630 

631 with api_session(token) as api: 

632 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

633 assert ( 

634 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

635 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

636 ) 

637 

638 

639def test_strong_verification_disabled(db, feature_flags): 

640 feature_flags.set("strong_verification_enabled", False) 

641 user, token = generate_user() 

642 

643 with account_session(token) as account: 

644 with pytest.raises(grpc.RpcError) as e: 

645 account.InitiateStrongVerification(empty_pb2.Empty()) 

646 assert e.value.code() == grpc.StatusCode.UNAVAILABLE 

647 assert e.value.details() == "Strong verification is currently disabled." 

648 

649 

650def test_strong_verification_delete_data_cant_reverify(db, monkeypatch, push_collector: PushCollector): 

651 monkeypatch_sv_config(monkeypatch) 

652 

653 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

654 _, superuser_token = generate_user(is_superuser=True) 

655 

656 refresh_materialized_views_rapid(empty_pb2.Empty()) 

657 

658 with api_session(token) as api: 

659 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

660 assert ( 

661 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

662 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

663 ) 

664 

665 do_and_check_sv( 

666 user, 

667 token, 

668 verification_id=5731012934821983, 

669 sex="MALE", 

670 dob="1988-01-01", 

671 document_type="PASSPORT", 

672 document_number="31195855", 

673 document_expiry=default_expiry, 

674 nationality="US", 

675 ) 

676 

677 refresh_materialized_views_rapid(empty_pb2.Empty()) 

678 

679 # the user should now have strong verification 

680 with api_session(token) as api: 

681 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

682 assert ( 

683 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

684 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

685 ) 

686 

687 # There should be a notification confirming it 

688 push_collector.pop_for_user(user.id, last=True) 

689 

690 # check removing SV data 

691 with account_session(token) as account: 

692 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

693 

694 refresh_materialized_views_rapid(empty_pb2.Empty()) 

695 

696 with api_session(token) as api: 

697 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

698 assert ( 

699 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

700 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

701 ) 

702 

703 with session_scope() as session: 

704 assert ( 

705 len( 

706 session.execute( 

707 select(StrongVerificationAttempt).where( 

708 or_( 

709 StrongVerificationAttempt.passport_encrypted_data != None, 

710 StrongVerificationAttempt.passport_date_of_birth != None, 

711 StrongVerificationAttempt.passport_sex != None, 

712 ) 

713 ) 

714 ) 

715 .scalars() 

716 .all() 

717 ) 

718 == 0 

719 ) 

720 

721 reference_data = do_and_check_sv( 

722 user, 

723 token, 

724 verification_id=5731012934821984, 

725 sex="MALE", 

726 dob="1988-01-01", 

727 document_type="PASSPORT", 

728 document_number="31195855", 

729 document_expiry=default_expiry, 

730 nationality="US", 

731 return_after="APPROVED", 

732 ) 

733 

734 with patch("couchers.jobs.handlers.requests.post") as mock: 

735 json_resp2 = { 

736 "id": 5731012934821984, 

737 "created": "2024-05-11T15:46:46Z", 

738 "expires": "2024-05-11T16:17:26Z", 

739 "state": "APPROVED", 

740 "reference": reference_data, 

741 "user_ip": "10.123.123.123", 

742 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

743 "given_names": "John Wayne", 

744 "surname": "Doe", 

745 "nationality": "US", 

746 "sex": "MALE", 

747 "date_of_birth": "1988-01-01", 

748 "document_type": "PASSPORT", 

749 "document_number": "31195855", 

750 "expiry_date": default_expiry.isoformat(), 

751 "issuing_country": "US", 

752 "issuer": "Department of State, U.S. Government", 

753 "portrait": "dGVzdHRlc3R0ZXN0...", 

754 } 

755 mock.return_value = type( 

756 "__MockResponse", 

757 (), 

758 { 

759 "status_code": 200, 

760 "text": json.dumps(json_resp2), 

761 "json": lambda: json_resp2, 

762 }, 

763 ) 

764 while process_job(): 

765 pass 

766 

767 mock.assert_called_once_with( 

768 "https://passportreader.app/api/v1/session.get", 

769 auth=("dummy_pubkey", "dummy_secret"), 

770 json={"id": 5731012934821984}, 

771 timeout=10, 

772 verify="/etc/ssl/certs/ca-certificates.crt", 

773 ) 

774 

775 with session_scope() as session: 

776 verification_attempt = session.execute( 

777 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

778 ).scalar_one() 

779 assert verification_attempt.user_id == user.id 

780 assert verification_attempt.status == StrongVerificationAttemptStatus.duplicate 

781 

782 push = push_collector.pop_for_user(user.id, last=True) 

783 assert push.content.title == "Strong Verification failed" 

784 assert ( 

785 push.content.body 

786 == "You used a passport that has already been used for verification. Please use another passport." 

787 ) 

788 

789 refresh_materialized_views_rapid(empty_pb2.Empty()) 

790 

791 with api_session(token) as api: 

792 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

793 assert ( 

794 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

795 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

796 ) 

797 

798 

799def test_strong_verification_duplicate_other_user(db, monkeypatch, push_collector: PushCollector): 

800 monkeypatch_sv_config(monkeypatch) 

801 

802 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

803 user2, token2 = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

804 _, superuser_token = generate_user(is_superuser=True) 

805 

806 refresh_materialized_views_rapid(empty_pb2.Empty()) 

807 

808 with api_session(token) as api: 

809 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

810 assert ( 

811 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

812 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

813 ) 

814 

815 # can remove SV data even if there is none, should do nothing 

816 with account_session(token) as account: 

817 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

818 

819 do_and_check_sv( 

820 user, 

821 token, 

822 verification_id=5731012934821983, 

823 sex="MALE", 

824 dob="1988-01-01", 

825 document_type="PASSPORT", 

826 document_number="31195855", 

827 document_expiry=default_expiry, 

828 nationality="US", 

829 ) 

830 

831 refresh_materialized_views_rapid(empty_pb2.Empty()) 

832 

833 # the user should now have strong verification 

834 with api_session(token) as api: 

835 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

836 assert ( 

837 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

838 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

839 ) 

840 

841 # check removing SV data 

842 with account_session(token) as account: 

843 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

844 

845 refresh_materialized_views_rapid(empty_pb2.Empty()) 

846 

847 with api_session(token) as api: 

848 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

849 assert ( 

850 api.GetLiteUser(api_pb2.GetLiteUserReq(user=user.username)).has_strong_verification 

851 == api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

852 ) 

853 

854 with session_scope() as session: 

855 assert ( 

856 len( 

857 session.execute( 

858 select(StrongVerificationAttempt).where( 

859 or_( 

860 StrongVerificationAttempt.passport_encrypted_data != None, 

861 StrongVerificationAttempt.passport_date_of_birth != None, 

862 StrongVerificationAttempt.passport_sex != None, 

863 ) 

864 ) 

865 ) 

866 .scalars() 

867 .all() 

868 ) 

869 == 0 

870 ) 

871 

872 reference_data = do_and_check_sv( 

873 user2, 

874 token2, 

875 verification_id=5731012934821984, 

876 sex="MALE", 

877 dob="1988-01-01", 

878 document_type="PASSPORT", 

879 document_number="31195855", 

880 document_expiry=default_expiry, 

881 nationality="US", 

882 return_after="APPROVED", 

883 ) 

884 

885 with patch("couchers.jobs.handlers.requests.post") as mock: 

886 json_resp2 = { 

887 "id": 5731012934821984, 

888 "created": "2024-05-11T15:46:46Z", 

889 "expires": "2024-05-11T16:17:26Z", 

890 "state": "APPROVED", 

891 "reference": reference_data, 

892 "user_ip": "10.123.123.123", 

893 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

894 "given_names": "John Wayne", 

895 "surname": "Doe", 

896 "nationality": "US", 

897 "sex": "MALE", 

898 "date_of_birth": "1988-01-01", 

899 "document_type": "PASSPORT", 

900 "document_number": "31195855", 

901 "expiry_date": default_expiry.isoformat(), 

902 "issuing_country": "US", 

903 "issuer": "Department of State, U.S. Government", 

904 "portrait": "dGVzdHRlc3R0ZXN0...", 

905 } 

906 mock.return_value = type( 

907 "__MockResponse", 

908 (), 

909 { 

910 "status_code": 200, 

911 "text": json.dumps(json_resp2), 

912 "json": lambda: json_resp2, 

913 }, 

914 ) 

915 while process_job(): 

916 pass 

917 

918 mock.assert_called_once_with( 

919 "https://passportreader.app/api/v1/session.get", 

920 auth=("dummy_pubkey", "dummy_secret"), 

921 json={"id": 5731012934821984}, 

922 timeout=10, 

923 verify="/etc/ssl/certs/ca-certificates.crt", 

924 ) 

925 

926 with session_scope() as session: 

927 verification_attempt = session.execute( 

928 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

929 ).scalar_one() 

930 assert verification_attempt.user_id == user2.id 

931 assert verification_attempt.status == StrongVerificationAttemptStatus.duplicate 

932 

933 push = push_collector.pop_for_user(user2.id, last=True) 

934 assert push.content.title == "Strong Verification failed" 

935 assert ( 

936 push.content.body 

937 == "You used a passport that has already been used for verification. Please use another passport." 

938 ) 

939 

940 

941def test_strong_verification_non_passport(db, monkeypatch, push_collector: PushCollector): 

942 monkeypatch_sv_config(monkeypatch) 

943 

944 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

945 _, superuser_token = generate_user(is_superuser=True) 

946 

947 reference_data = do_and_check_sv( 

948 user, 

949 token, 

950 verification_id=5731012934821984, 

951 sex="MALE", 

952 dob="1988-01-01", 

953 document_type="IDENTITY_CARD", 

954 document_number="31195855", 

955 document_expiry=default_expiry, 

956 nationality="US", 

957 return_after="APPROVED", 

958 ) 

959 

960 with patch("couchers.jobs.handlers.requests.post") as mock: 

961 json_resp2 = { 

962 "id": 5731012934821984, 

963 "created": "2024-05-11T15:46:46Z", 

964 "expires": "2024-05-11T16:17:26Z", 

965 "state": "APPROVED", 

966 "reference": reference_data, 

967 "user_ip": "10.123.123.123", 

968 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

969 "given_names": "John Wayne", 

970 "surname": "Doe", 

971 "nationality": "US", 

972 "sex": "MALE", 

973 "date_of_birth": "1988-01-01", 

974 "document_type": "IDENTITY_CARD", 

975 "document_number": "31195855", 

976 "expiry_date": default_expiry.isoformat(), 

977 "issuing_country": "US", 

978 "issuer": "Department of State, U.S. Government", 

979 "portrait": "dGVzdHRlc3R0ZXN0...", 

980 } 

981 mock.return_value = type( 

982 "__MockResponse", 

983 (), 

984 { 

985 "status_code": 200, 

986 "text": json.dumps(json_resp2), 

987 "json": lambda: json_resp2, 

988 }, 

989 ) 

990 while process_job(): 

991 pass 

992 

993 mock.assert_called_once_with( 

994 "https://passportreader.app/api/v1/session.get", 

995 auth=("dummy_pubkey", "dummy_secret"), 

996 json={"id": 5731012934821984}, 

997 timeout=10, 

998 verify="/etc/ssl/certs/ca-certificates.crt", 

999 ) 

1000 

1001 with session_scope() as session: 

1002 verification_attempt = session.execute( 

1003 select(StrongVerificationAttempt).where(StrongVerificationAttempt.iris_session_id == 5731012934821984) 

1004 ).scalar_one() 

1005 assert verification_attempt.user_id == user.id 

1006 assert verification_attempt.status == StrongVerificationAttemptStatus.failed 

1007 

1008 push = push_collector.pop_for_user(user.id, last=True) 

1009 assert push.content.title == "Strong Verification failed" 

1010 assert ( 

1011 push.content.body 

1012 == "You used a document other than a passport. You can only use a passport for Strong Verification." 

1013 )