Coverage for src/tests/test_strong_verification.py: 99%
254 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
1import json
2from datetime import date, timedelta
3from unittest.mock import ANY, patch
5import grpc
6import pytest
7from google.protobuf import empty_pb2
8from sqlalchemy.sql import or_
10import couchers.jobs.handlers
11import couchers.servicers.account
12from couchers import errors
13from couchers.config import config
14from couchers.crypto import asym_decrypt, b64encode_unpadded
15from couchers.db import session_scope
16from couchers.jobs.handlers import update_badges
17from couchers.jobs.worker import process_job
18from couchers.models import (
19 PassportSex,
20 StrongVerificationAttempt,
21 StrongVerificationAttemptStatus,
22 StrongVerificationCallbackEvent,
23 User,
24)
25from couchers.sql import couchers_select as select
26from couchers.utils import now
27from proto import account_pb2, admin_pb2, api_pb2
28from proto.google.api import httpbody_pb2
29from tests.test_fixtures import ( # noqa
30 account_session,
31 api_session,
32 db,
33 generate_user,
34 real_admin_session,
35 real_iris_session,
36 testconfig,
37)
40@pytest.fixture(autouse=True)
41def _(testconfig):
42 pass
45def _emulate_iris_callback(session_id, session_state, reference):
46 assert session_state in ["CREATED", "INITIATED", "FAILED", "ABORTED", "COMPLETED", "REJECTED", "APPROVED"]
47 with real_iris_session() as iris:
48 data = json.dumps(
49 {"session_id": session_id, "session_state": session_state, "session_referenace": reference}
50 ).encode("ascii")
51 iris.Webhook(httpbody_pb2.HttpBody(content_type="application/json", data=data))
54default_expiry = date.today() + timedelta(days=5 * 365)
57def do_and_check_sv(
58 user,
59 token,
60 verification_id,
61 sex,
62 dob,
63 document_type,
64 document_number,
65 document_expiry,
66 nationality,
67 return_after=None,
68):
69 iris_token_data = {
70 "merchant_id": 5731012934821982,
71 "session_id": verification_id,
72 "seed": 1674246339,
73 "face_verification": False,
74 "host": "https://passportreader.app",
75 }
76 iris_token = b64encode_unpadded(json.dumps(iris_token_data).encode("utf8"))
78 with account_session(token) as account:
79 # start by initiation
80 with patch("couchers.servicers.account.requests.post") as mock:
81 json_resp1 = {
82 "id": verification_id,
83 "token": iris_token,
84 }
85 mock.return_value = type(
86 "__MockResponse",
87 (),
88 {
89 "status_code": 200,
90 "text": json.dumps(json_resp1),
91 "json": lambda: json_resp1,
92 },
93 )
94 res = account.InitiateStrongVerification(empty_pb2.Empty())
95 mock.assert_called_once_with(
96 "https://passportreader.app/api/v1/session.create",
97 auth=("dummy_pubkey", "dummy_secret"),
98 json={
99 "callback_url": "http://localhost:8888/iris/webhook",
100 "face_verification": False,
101 "reference": ANY,
102 },
103 timeout=10,
104 )
105 reference_data = mock.call_args.kwargs["json"]["reference"]
106 verification_attempt_token = res.verification_attempt_token
107 assert res.iris_url == f"iris:///?token={iris_token}"
109 assert (
110 account.GetStrongVerificationAttemptStatus(
111 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token)
112 ).status
113 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP
114 )
116 # ok, now the user downloads the app, scans their id, and Iris ID sends callbacks to the server
117 _emulate_iris_callback(verification_id, "INITIATED", reference_data)
119 with account_session(token) as account:
120 assert (
121 account.GetStrongVerificationAttemptStatus(
122 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token)
123 ).status
124 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP
125 )
127 if return_after == "INITIATED":
128 return
130 _emulate_iris_callback(verification_id, "COMPLETED", reference_data)
132 with account_session(token) as account:
133 assert (
134 account.GetStrongVerificationAttemptStatus(
135 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token)
136 ).status
137 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND
138 )
140 if return_after == "COMPLETED":
141 return
143 _emulate_iris_callback(verification_id, "APPROVED", reference_data)
145 with account_session(token) as account:
146 assert (
147 account.GetStrongVerificationAttemptStatus(
148 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token)
149 ).status
150 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND
151 )
153 if return_after == "APPROVED":
154 return
156 with patch("couchers.jobs.handlers.requests.post") as mock:
157 json_resp2 = {
158 "id": verification_id,
159 "created": "2024-05-11T15:46:46Z",
160 "expires": "2024-05-11T16:17:26Z",
161 "state": "APPROVED",
162 "reference": reference_data,
163 "user_ip": "10.123.123.123",
164 "user_agent": "Iris%20ID/168357896 CFNetwork/1494.0.7 Darwin/23.4.0",
165 "given_names": "John Wayne",
166 "surname": "Doe",
167 "nationality": nationality,
168 "sex": sex,
169 "date_of_birth": dob,
170 "document_type": document_type,
171 "document_number": document_number,
172 "expiry_date": document_expiry.isoformat(),
173 "issuing_country": nationality,
174 "issuer": "Department of State, U.S. Government",
175 "portrait": "dGVzdHRlc3R0ZXN0...",
176 }
177 mock.return_value = type(
178 "__MockResponse",
179 (),
180 {
181 "status_code": 200,
182 "text": json.dumps(json_resp2),
183 "json": lambda: json_resp2,
184 },
185 )
186 while process_job():
187 pass
189 mock.assert_called_once_with(
190 "https://passportreader.app/api/v1/session.get",
191 auth=("dummy_pubkey", "dummy_secret"),
192 json={"id": verification_id},
193 timeout=10,
194 )
196 with account_session(token) as account:
197 assert (
198 account.GetStrongVerificationAttemptStatus(
199 account_pb2.GetStrongVerificationAttemptStatusReq(verification_attempt_token=verification_attempt_token)
200 ).status
201 == account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED
202 )
204 with session_scope() as session:
205 verification_attempt = session.execute(
206 select(StrongVerificationAttempt).where(
207 StrongVerificationAttempt.verification_attempt_token == verification_attempt_token
208 )
209 ).scalar_one()
210 assert verification_attempt.user_id == user.id
211 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded
212 assert verification_attempt.has_full_data
213 assert verification_attempt.passport_encrypted_data
214 # assert verification_attempt.passport_date_of_birth == date(1988, 1, 1)
215 # assert verification_attempt.passport_sex == PassportSex.male
216 assert verification_attempt.has_minimal_data
217 assert verification_attempt.passport_expiry_date == document_expiry
218 assert verification_attempt.passport_nationality == nationality
219 assert verification_attempt.passport_last_three_document_chars == document_number[-3:]
220 assert verification_attempt.iris_token == iris_token
221 assert verification_attempt.iris_session_id == verification_id
223 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272")
224 decrypted_data = json.loads(asym_decrypt(private_key, verification_attempt.passport_encrypted_data))
225 assert decrypted_data == json_resp2
227 callbacks = (
228 session.execute(
229 select(StrongVerificationCallbackEvent.iris_status)
230 .where(StrongVerificationCallbackEvent.verification_attempt_id == verification_attempt.id)
231 .order_by(StrongVerificationCallbackEvent.created.asc())
232 )
233 .scalars()
234 .all()
235 )
236 assert callbacks == ["INITIATED", "COMPLETED", "APPROVED"]
239def monkeypatch_sv_config(monkeypatch):
240 new_config = config.copy()
241 new_config["ENABLE_STRONG_VERIFICATION"] = True
242 new_config["IRIS_ID_PUBKEY"] = "dummy_pubkey"
243 new_config["IRIS_ID_SECRET"] = "dummy_secret"
244 new_config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
245 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
246 )
248 private_key = bytes.fromhex("e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272")
250 monkeypatch.setattr(couchers.servicers.account, "config", new_config)
251 monkeypatch.setattr(couchers.jobs.handlers, "config", new_config)
254def test_strong_verification_happy_path(db, monkeypatch):
255 monkeypatch_sv_config(monkeypatch)
257 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man")
258 _, superuser_token = generate_user(is_superuser=True)
260 update_badges(empty_pb2.Empty())
262 with api_session(token) as api:
263 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
264 assert "strong_verification" not in res.badges
265 assert not res.has_strong_verification
266 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
267 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
269 do_and_check_sv(
270 user,
271 token,
272 verification_id=5731012934821983,
273 sex="MALE",
274 dob="1988-01-01",
275 document_type="PASSPORT",
276 document_number="31195855",
277 document_expiry=default_expiry,
278 nationality="US",
279 )
281 with session_scope() as session:
282 verification_attempt = session.execute(
283 select(StrongVerificationAttempt).where(StrongVerificationAttempt.user_id == user.id)
284 ).scalar_one()
285 assert verification_attempt.status == StrongVerificationAttemptStatus.succeeded
286 assert verification_attempt.passport_date_of_birth == date(1988, 1, 1)
287 assert verification_attempt.passport_sex == PassportSex.male
288 assert verification_attempt.passport_expiry_date == default_expiry
289 assert verification_attempt.passport_nationality == "US"
290 assert verification_attempt.passport_last_three_document_chars == "855"
292 update_badges(empty_pb2.Empty())
294 # the user should now have strong verification
295 with api_session(token) as api:
296 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
297 assert "strong_verification" in res.badges
298 assert res.has_strong_verification
299 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
300 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
302 # wrong dob = no badge
303 with session_scope() as session:
304 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one()
305 user_.birthdate = date(1988, 1, 2)
307 update_badges(empty_pb2.Empty())
309 with api_session(token) as api:
310 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
311 assert "strong_verification" not in res.badges
312 assert not res.has_strong_verification
313 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH
314 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
316 # bad gender-sex correspondence = no badge
317 with session_scope() as session:
318 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one()
319 user_.birthdate = date(1988, 1, 1)
320 user_.gender = "Woman"
322 update_badges(empty_pb2.Empty())
324 with api_session(token) as api:
325 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
326 assert "strong_verification" not in res.badges
327 assert not res.has_strong_verification
328 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
329 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH
331 with account_session(token) as account:
332 res = account.GetAccountInfo(empty_pb2.Empty())
333 assert not res.has_strong_verification
334 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
335 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH
337 # back to should have a badge
338 with session_scope() as session:
339 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one()
340 user_.gender = "Man"
342 update_badges(empty_pb2.Empty())
344 with api_session(token) as api:
345 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
346 assert "strong_verification" in res.badges
347 assert res.has_strong_verification
348 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
349 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
351 # check has_passport_sex_gender_exception
352 with real_admin_session(superuser_token) as admin:
353 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username))
354 assert "strong_verification" in res.badges
355 assert res.has_strong_verification
356 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
357 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
359 admin.SetPassportSexGenderException(
360 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=True)
361 )
362 admin.ChangeUserGender(admin_pb2.ChangeUserGenderReq(user=user.username, gender="Woman"))
364 update_badges(empty_pb2.Empty())
366 with api_session(token) as api:
367 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
368 assert "strong_verification" in res.badges
369 assert res.has_strong_verification
370 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
371 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
373 with real_admin_session(superuser_token) as admin:
374 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username))
375 assert "strong_verification" in res.badges
376 assert res.has_strong_verification
377 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
378 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
380 # now turn exception off
381 admin.SetPassportSexGenderException(
382 admin_pb2.SetPassportSexGenderExceptionReq(user=user.username, passport_sex_gender_exception=False)
383 )
385 update_badges(empty_pb2.Empty())
387 with api_session(token) as api:
388 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
389 assert "strong_verification" not in res.badges
390 assert not res.has_strong_verification
391 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
392 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH
394 with real_admin_session(superuser_token) as admin:
395 res = admin.GetUserDetails(admin_pb2.GetUserDetailsReq(user=user.username))
396 assert "strong_verification" not in res.badges
397 assert not res.has_strong_verification
398 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
399 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH
402def test_strong_verification_delete_data(db, monkeypatch):
403 monkeypatch_sv_config(monkeypatch)
405 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man")
406 _, superuser_token = generate_user(is_superuser=True)
408 with api_session(token) as api:
409 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification
411 # can remove SV data even if there is none, should do nothing
412 with account_session(token) as account:
413 account.DeleteStrongVerificationData(empty_pb2.Empty())
415 do_and_check_sv(
416 user,
417 token,
418 verification_id=5731012934821983,
419 sex="MALE",
420 dob="1988-01-01",
421 document_type="PASSPORT",
422 document_number="31195855",
423 document_expiry=default_expiry,
424 nationality="US",
425 )
427 # the user should now have strong verification
428 with api_session(token) as api:
429 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification
431 # check removing SV data
432 with account_session(token) as account:
433 account.DeleteStrongVerificationData(empty_pb2.Empty())
435 with api_session(token) as api:
436 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification
438 with session_scope() as session:
439 assert (
440 len(
441 session.execute(
442 select(StrongVerificationAttempt).where(
443 or_(
444 StrongVerificationAttempt.passport_encrypted_data != None,
445 StrongVerificationAttempt.passport_date_of_birth != None,
446 StrongVerificationAttempt.passport_sex != None,
447 )
448 )
449 )
450 .scalars()
451 .all()
452 )
453 == 0
454 )
457def test_strong_verification_expiry(db, monkeypatch):
458 monkeypatch_sv_config(monkeypatch)
460 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man")
461 _, superuser_token = generate_user(is_superuser=True)
463 with api_session(token) as api:
464 assert not api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification
466 expiry = date.today() + timedelta(days=10)
468 do_and_check_sv(
469 user,
470 token,
471 verification_id=5731012934821983,
472 sex="MALE",
473 dob="1988-01-01",
474 document_type="PASSPORT",
475 document_number="31195855",
476 document_expiry=expiry,
477 nationality="US",
478 )
480 # the user should now have strong verification
481 with api_session(token) as api:
482 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
483 assert res.has_strong_verification
484 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
485 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
487 def after_expiry():
488 return now() + timedelta(days=15)
490 with patch("couchers.models.now", after_expiry):
491 with api_session(token) as api:
492 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
493 assert not res.has_strong_verification
494 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
495 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
497 res = api.GetUser(api_pb2.GetUserReq(user=user.username))
498 assert not res.has_strong_verification
499 assert not res.has_strong_verification
501 do_and_check_sv(
502 user,
503 token,
504 verification_id=5731012934821985,
505 sex="MALE",
506 dob="1988-01-01",
507 document_type="PASSPORT",
508 document_number="PA41323412",
509 document_expiry=date.today() + timedelta(days=365),
510 nationality="AU",
511 )
513 with api_session(token) as api:
514 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification
517def test_strong_verification_regression(db, monkeypatch):
518 monkeypatch_sv_config(monkeypatch)
520 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man")
522 do_and_check_sv(
523 user,
524 token,
525 verification_id=5731012934821983,
526 sex="MALE",
527 dob="1988-01-01",
528 document_type="PASSPORT",
529 document_number="31195855",
530 document_expiry=default_expiry,
531 nationality="US",
532 return_after="INITIATED",
533 )
535 with api_session(token) as api:
536 api.Ping(api_pb2.PingReq())
539def test_strong_verification_regression2(db, monkeypatch):
540 monkeypatch_sv_config(monkeypatch)
542 user, token = generate_user(birthdate=date(1988, 1, 1), gender="Man")
544 do_and_check_sv(
545 user,
546 token,
547 verification_id=5731012934821983,
548 sex="MALE",
549 dob="1988-01-01",
550 document_type="PASSPORT",
551 document_number="31195855",
552 document_expiry=default_expiry,
553 nationality="US",
554 return_after="INITIATED",
555 )
557 do_and_check_sv(
558 user,
559 token,
560 verification_id=5731012934821985,
561 sex="MALE",
562 dob="1988-01-01",
563 document_type="PASSPORT",
564 document_number="PA41323412",
565 document_expiry=default_expiry,
566 nationality="AU",
567 )
569 with api_session(token) as api:
570 assert api.GetUser(api_pb2.GetUserReq(user=user.username)).has_strong_verification
573def test_strong_verification_disabled(db):
574 user, token = generate_user()
576 with account_session(token) as account:
577 with pytest.raises(grpc.RpcError) as e:
578 account.InitiateStrongVerification(empty_pb2.Empty())
579 assert e.value.code() == grpc.StatusCode.UNAVAILABLE
580 assert e.value.details() == errors.STRONG_VERIFICATION_DISABLED