Coverage for src/tests/test_strong_verification.py: 99%

254 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-11-21 04:21 +0000

1import json 

2from datetime import date, timedelta 

3from unittest.mock import ANY, patch 

4 

5import grpc 

6import pytest 

7from google.protobuf import empty_pb2 

8from sqlalchemy.sql import or_ 

9 

10import couchers.jobs.handlers 

11import couchers.servicers.account 

12from couchers import errors 

13from couchers.config import config 

14from couchers.crypto import asym_decrypt, b64encode_unpadded 

15from couchers.db import session_scope 

16from couchers.jobs.handlers import update_badges 

17from couchers.jobs.worker import process_job 

18from couchers.models import ( 

19 PassportSex, 

20 StrongVerificationAttempt, 

21 StrongVerificationAttemptStatus, 

22 StrongVerificationCallbackEvent, 

23 User, 

24) 

25from couchers.sql import couchers_select as select 

26from couchers.utils import now 

27from proto import account_pb2, admin_pb2, api_pb2 

28from proto.google.api import httpbody_pb2 

29from tests.test_fixtures import ( # noqa 

30 account_session, 

31 api_session, 

32 db, 

33 generate_user, 

34 real_admin_session, 

35 real_iris_session, 

36 testconfig, 

37) 

38 

39 

40@pytest.fixture(autouse=True) 

41def _(testconfig): 

42 pass 

43 

44 

45def _emulate_iris_callback(session_id, session_state, reference): 

46 assert session_state in ["CREATED", "INITIATED", "FAILED", "ABORTED", "COMPLETED", "REJECTED", "APPROVED"] 

47 with real_iris_session() as iris: 

48 data = json.dumps( 

49 {"session_id": session_id, "session_state": session_state, "session_referenace": reference} 

50 ).encode("ascii") 

51 iris.Webhook(httpbody_pb2.HttpBody(content_type="application/json", data=data)) 

52 

53 

54default_expiry = date.today() + timedelta(days=5 * 365) 

55 

56 

57def do_and_check_sv( 

58 user, 

59 token, 

60 verification_id, 

61 sex, 

62 dob, 

63 document_type, 

64 document_number, 

65 document_expiry, 

66 nationality, 

67 return_after=None, 

68): 

69 iris_token_data = { 

70 "merchant_id": 5731012934821982, 

71 "session_id": verification_id, 

72 "seed": 1674246339, 

73 "face_verification": False, 

74 "host": "https://passportreader.app", 

75 } 

76 iris_token = b64encode_unpadded(json.dumps(iris_token_data).encode("utf8")) 

77 

78 with account_session(token) as account: 

79 # start by initiation 

80 with patch("couchers.servicers.account.requests.post") as mock: 

81 json_resp1 = { 

82 "id": verification_id, 

83 "token": iris_token, 

84 } 

85 mock.return_value = type( 

86 "__MockResponse", 

87 (), 

88 { 

89 "status_code": 200, 

90 "text": json.dumps(json_resp1), 

91 "json": lambda: json_resp1, 

92 }, 

93 ) 

94 res = account.InitiateStrongVerification(empty_pb2.Empty()) 

95 mock.assert_called_once_with( 

96 "https://passportreader.app/api/v1/session.create", 

97 auth=("dummy_pubkey", "dummy_secret"), 

98 json={ 

99 "callback_url": "http://localhost:8888/iris/webhook", 

100 "face_verification": False, 

101 "reference": ANY, 

102 }, 

103 timeout=10, 

104 ) 

105 reference_data = mock.call_args.kwargs["json"]["reference"] 

106 verification_attempt_token = res.verification_attempt_token 

107 assert res.iris_url == f"iris:///?token={iris_token}" 

108 

109 assert ( 

110 account.GetStrongVerificationAttemptStatus( 

111 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

112 ).status 

113 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP 

114 ) 

115 

116 # ok, now the user downloads the app, scans their id, and Iris ID sends callbacks to the server 

117 _emulate_iris_callback(verification_id, "INITIATED", reference_data) 

118 

119 with account_session(token) as account: 

120 assert ( 

121 account.GetStrongVerificationAttemptStatus( 

122 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

123 ).status 

124 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP 

125 ) 

126 

127 if return_after == "INITIATED": 

128 return 

129 

130 _emulate_iris_callback(verification_id, "COMPLETED", reference_data) 

131 

132 with account_session(token) as account: 

133 assert ( 

134 account.GetStrongVerificationAttemptStatus( 

135 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

136 ).status 

137 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

138 ) 

139 

140 if return_after == "COMPLETED": 

141 return 

142 

143 _emulate_iris_callback(verification_id, "APPROVED", reference_data) 

144 

145 with account_session(token) as account: 

146 assert ( 

147 account.GetStrongVerificationAttemptStatus( 

148 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

149 ).status 

150 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND 

151 ) 

152 

153 if return_after == "APPROVED": 

154 return 

155 

156 with patch("couchers.jobs.handlers.requests.post") as mock: 

157 json_resp2 = { 

158 "id": verification_id, 

159 "created": "2024-05-11T15:46:46Z", 

160 "expires": "2024-05-11T16:17:26Z", 

161 "state": "APPROVED", 

162 "reference": reference_data, 

163 "user_ip": "10.123.123.123", 

164 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0", 

165 "given_names": "John Wayne", 

166 "surname": "Doe", 

167 "nationality": nationality, 

168 "sex": sex, 

169 "date_of_birth": dob, 

170 "document_type": document_type, 

171 "document_number": document_number, 

172 "expiry_date": document_expiry.isoformat(), 

173 "issuing_country": nationality, 

174 "issuer": "Department of State, U.S. Government", 

175 "portrait": "dGVzdHRlc3R0ZXN0...", 

176 } 

177 mock.return_value = type( 

178 "__MockResponse", 

179 (), 

180 { 

181 "status_code": 200, 

182 "text": json.dumps(json_resp2), 

183 "json": lambda: json_resp2, 

184 }, 

185 ) 

186 while process_job(): 

187 pass 

188 

189 mock.assert_called_once_with( 

190 "https://passportreader.app/api/v1/session.get", 

191 auth=("dummy_pubkey", "dummy_secret"), 

192 json={"id": verification_id}, 

193 timeout=10, 

194 ) 

195 

196 with account_session(token) as account: 

197 assert ( 

198 account.GetStrongVerificationAttemptStatus( 

199 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token) 

200 ).status 

201 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED 

202 ) 

203 

204 with session_scope() as session: 

205 verification_attempt = session.execute( 

206 select(StrongVerificationAttempt).where( 

207 StrongVerificationAttempt.verification_attempt_token == verification_attempt_token 

208 ) 

209 ).scalar_one() 

210 assert verification_attempt.user_id == user.id 

211 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

212 assert verification_attempt.has_full_data 

213 assert verification_attempt.passport_encrypted_data 

214 # assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

215 # assert verification_attempt.passport_sex == PassportSex.male 

216 assert verification_attempt.has_minimal_data 

217 assert verification_attempt.passport_expiry_date == document_expiry 

218 assert verification_attempt.passport_nationality == nationality 

219 assert verification_attempt.passport_last_three_document_chars == document_number[-3:] 

220 assert verification_attempt.iris_token == iris_token 

221 assert verification_attempt.iris_session_id == verification_id 

222 

223 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

224 decrypted_data = json.loads(asym_decrypt(private_key, verification_attempt.passport_encrypted_data)) 

225 assert decrypted_data == json_resp2 

226 

227 callbacks = ( 

228 session.execute( 

229 select(StrongVerificationCallbackEvent.iris_status) 

230 .where(StrongVerificationCallbackEvent.verification_attempt_id == verification_attempt.id) 

231 .order_by(StrongVerificationCallbackEvent.created.asc()) 

232 ) 

233 .scalars() 

234 .all() 

235 ) 

236 assert callbacks == ["INITIATED", "COMPLETED", "APPROVED"] 

237 

238 

239def monkeypatch_sv_config(monkeypatch): 

240 new_config = config.copy() 

241 new_config["ENABLE_STRONG_VERIFICATION"] = True 

242 new_config["IRIS_ID_PUBKEY"] = "dummy_pubkey" 

243 new_config["IRIS_ID_SECRET"] = "dummy_secret" 

244 new_config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

245 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

246 ) 

247 

248 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272") 

249 

250 monkeypatch.setattr(couchers.servicers.account, "config", new_config) 

251 monkeypatch.setattr(couchers.jobs.handlers, "config", new_config) 

252 

253 

254def test_strong_verification_happy_path(db, monkeypatch): 

255 monkeypatch_sv_config(monkeypatch) 

256 

257 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

258 _, superuser_token = generate_user(is_superuser=True) 

259 

260 update_badges(empty_pb2.Empty()) 

261 

262 with api_session(token) as api: 

263 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

264 assert "strong_verification" not in res.badges 

265 assert not res.has_strong_verification 

266 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

267 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

268 

269 do_and_check_sv( 

270 user, 

271 token, 

272 verification_id=5731012934821983, 

273 sex="MALE", 

274 dob="1988-01-01", 

275 document_type="PASSPORT", 

276 document_number="31195855", 

277 document_expiry=default_expiry, 

278 nationality="US", 

279 ) 

280 

281 with session_scope() as session: 

282 verification_attempt = session.execute( 

283 select(StrongVerificationAttempt).where(StrongVerificationAttempt.user_id == user.id) 

284 ).scalar_one() 

285 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded 

286 assert verification_attempt.passport_date_of_birth == date(1988, 1, 1) 

287 assert verification_attempt.passport_sex == PassportSex.male 

288 assert verification_attempt.passport_expiry_date == default_expiry 

289 assert verification_attempt.passport_nationality == "US" 

290 assert verification_attempt.passport_last_three_document_chars == "855" 

291 

292 update_badges(empty_pb2.Empty()) 

293 

294 # the user should now have strong verification 

295 with api_session(token) as api: 

296 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

297 assert "strong_verification" in res.badges 

298 assert res.has_strong_verification 

299 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

300 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

301 

302 # wrong dob = no badge 

303 with session_scope() as session: 

304 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one() 

305 user_.birthdate = date(1988, 1, 2) 

306 

307 update_badges(empty_pb2.Empty()) 

308 

309 with api_session(token) as api: 

310 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

311 assert "strong_verification" not in res.badges 

312 assert not res.has_strong_verification 

313 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH 

314 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

315 

316 # bad gender-sex correspondence = no badge 

317 with session_scope() as session: 

318 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one() 

319 user_.birthdate = date(1988, 1, 1) 

320 user_.gender = "Woman" 

321 

322 update_badges(empty_pb2.Empty()) 

323 

324 with api_session(token) as api: 

325 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

326 assert "strong_verification" not in res.badges 

327 assert not res.has_strong_verification 

328 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

329 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

330 

331 with account_session(token) as account: 

332 res = account.GetAccountInfo(empty_pb2.Empty()) 

333 assert not res.has_strong_verification 

334 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

335 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

336 

337 # back to should have a badge 

338 with session_scope() as session: 

339 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one() 

340 user_.gender = "Man" 

341 

342 update_badges(empty_pb2.Empty()) 

343 

344 with api_session(token) as api: 

345 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

346 assert "strong_verification" in res.badges 

347 assert res.has_strong_verification 

348 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

349 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

350 

351 # check has_passport_sex_gender_exception 

352 with real_admin_session(superuser_token) as admin: 

353 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

354 assert "strong_verification" in res.badges 

355 assert res.has_strong_verification 

356 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

357 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

358 

359 admin.SetPassportSexGenderException( 

360 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=True) 

361 ) 

362 admin.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=user.username, gender="Woman")) 

363 

364 update_badges(empty_pb2.Empty()) 

365 

366 with api_session(token) as api: 

367 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

368 assert "strong_verification" in res.badges 

369 assert res.has_strong_verification 

370 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

371 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

372 

373 with real_admin_session(superuser_token) as admin: 

374 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

375 assert "strong_verification" in res.badges 

376 assert res.has_strong_verification 

377 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

378 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

379 

380 # now turn exception off 

381 admin.SetPassportSexGenderException( 

382 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=False) 

383 ) 

384 

385 update_badges(empty_pb2.Empty()) 

386 

387 with api_session(token) as api: 

388 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

389 assert "strong_verification" not in res.badges 

390 assert not res.has_strong_verification 

391 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

392 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

393 

394 with real_admin_session(superuser_token) as admin: 

395 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username)) 

396 assert "strong_verification" not in res.badges 

397 assert not res.has_strong_verification 

398 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

399 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

400 

401 

402def test_strong_verification_delete_data(db, monkeypatch): 

403 monkeypatch_sv_config(monkeypatch) 

404 

405 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

406 _, superuser_token = generate_user(is_superuser=True) 

407 

408 with api_session(token) as api: 

409 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

410 

411 # can remove SV data even if there is none, should do nothing 

412 with account_session(token) as account: 

413 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

414 

415 do_and_check_sv( 

416 user, 

417 token, 

418 verification_id=5731012934821983, 

419 sex="MALE", 

420 dob="1988-01-01", 

421 document_type="PASSPORT", 

422 document_number="31195855", 

423 document_expiry=default_expiry, 

424 nationality="US", 

425 ) 

426 

427 # the user should now have strong verification 

428 with api_session(token) as api: 

429 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

430 

431 # check removing SV data 

432 with account_session(token) as account: 

433 account.DeleteStrongVerificationData(empty_pb2.Empty()) 

434 

435 with api_session(token) as api: 

436 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

437 

438 with session_scope() as session: 

439 assert ( 

440 len( 

441 session.execute( 

442 select(StrongVerificationAttempt).where( 

443 or_( 

444 StrongVerificationAttempt.passport_encrypted_data != None, 

445 StrongVerificationAttempt.passport_date_of_birth != None, 

446 StrongVerificationAttempt.passport_sex != None, 

447 ) 

448 ) 

449 ) 

450 .scalars() 

451 .all() 

452 ) 

453 == 0 

454 ) 

455 

456 

457def test_strong_verification_expiry(db, monkeypatch): 

458 monkeypatch_sv_config(monkeypatch) 

459 

460 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

461 _, superuser_token = generate_user(is_superuser=True) 

462 

463 with api_session(token) as api: 

464 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

465 

466 expiry = date.today() + timedelta(days=10) 

467 

468 do_and_check_sv( 

469 user, 

470 token, 

471 verification_id=5731012934821983, 

472 sex="MALE", 

473 dob="1988-01-01", 

474 document_type="PASSPORT", 

475 document_number="31195855", 

476 document_expiry=expiry, 

477 nationality="US", 

478 ) 

479 

480 # the user should now have strong verification 

481 with api_session(token) as api: 

482 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

483 assert res.has_strong_verification 

484 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

485 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

486 

487 def after_expiry(): 

488 return now() + timedelta(days=15) 

489 

490 with patch("couchers.models.now", after_expiry): 

491 with api_session(token) as api: 

492 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

493 assert not res.has_strong_verification 

494 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

495 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

496 

497 res = api.GetUser(api_pb2.GetUserReq(user=user.username)) 

498 assert not res.has_strong_verification 

499 assert not res.has_strong_verification 

500 

501 do_and_check_sv( 

502 user, 

503 token, 

504 verification_id=5731012934821985, 

505 sex="MALE", 

506 dob="1988-01-01", 

507 document_type="PASSPORT", 

508 document_number="PA41323412", 

509 document_expiry=date.today() + timedelta(days=365), 

510 nationality="AU", 

511 ) 

512 

513 with api_session(token) as api: 

514 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

515 

516 

517def test_strong_verification_regression(db, monkeypatch): 

518 monkeypatch_sv_config(monkeypatch) 

519 

520 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

521 

522 do_and_check_sv( 

523 user, 

524 token, 

525 verification_id=5731012934821983, 

526 sex="MALE", 

527 dob="1988-01-01", 

528 document_type="PASSPORT", 

529 document_number="31195855", 

530 document_expiry=default_expiry, 

531 nationality="US", 

532 return_after="INITIATED", 

533 ) 

534 

535 with api_session(token) as api: 

536 api.Ping(api_pb2.PingReq()) 

537 

538 

539def test_strong_verification_regression2(db, monkeypatch): 

540 monkeypatch_sv_config(monkeypatch) 

541 

542 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man") 

543 

544 do_and_check_sv( 

545 user, 

546 token, 

547 verification_id=5731012934821983, 

548 sex="MALE", 

549 dob="1988-01-01", 

550 document_type="PASSPORT", 

551 document_number="31195855", 

552 document_expiry=default_expiry, 

553 nationality="US", 

554 return_after="INITIATED", 

555 ) 

556 

557 do_and_check_sv( 

558 user, 

559 token, 

560 verification_id=5731012934821985, 

561 sex="MALE", 

562 dob="1988-01-01", 

563 document_type="PASSPORT", 

564 document_number="PA41323412", 

565 document_expiry=default_expiry, 

566 nationality="AU", 

567 ) 

568 

569 with api_session(token) as api: 

570 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification 

571 

572 

573def test_strong_verification_disabled(db): 

574 user, token = generate_user() 

575 

576 with account_session(token) as account: 

577 with pytest.raises(grpc.RpcError) as e: 

578 account.InitiateStrongVerification(empty_pb2.Empty()) 

579 assert e.value.code() == grpc.StatusCode.UNAVAILABLE 

580 assert e.value.details() == errors.STRONG_VERIFICATION_DISABLED