Coverage for src/couchers/servicers/account.py: 95%
245 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-29 01:26 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-29 01:26 +0000
1import json
2import logging
3from datetime import timedelta
4from urllib.parse import urlencode
6import grpc
7import requests
8from google.protobuf import empty_pb2
9from sqlalchemy.sql import func, update
10from user_agents import parse as user_agents_parse
12from couchers import errors, urls
13from couchers.config import config
14from couchers.constants import PHONE_REVERIFICATION_INTERVAL, SMS_CODE_ATTEMPTS, SMS_CODE_LIFETIME
15from couchers.crypto import (
16 b64decode,
17 b64encode,
18 hash_password,
19 simple_decrypt,
20 simple_encrypt,
21 urlsafe_secure_token,
22 verify_password,
23 verify_token,
24)
25from couchers.helpers.geoip import geoip_approximate_location
26from couchers.jobs.enqueue import queue_job
27from couchers.metrics import (
28 account_deletion_initiations_counter,
29 strong_verification_data_deletions_counter,
30 strong_verification_initiations_counter,
31)
32from couchers.models import (
33 AccountDeletionReason,
34 AccountDeletionToken,
35 ContributeOption,
36 ContributorForm,
37 ModNote,
38 StrongVerificationAttempt,
39 StrongVerificationAttemptStatus,
40 StrongVerificationCallbackEvent,
41 User,
42 UserSession,
43)
44from couchers.notifications.notify import notify
45from couchers.phone import sms
46from couchers.phone.check import is_e164_format, is_known_operator
47from couchers.sql import couchers_select as select
48from couchers.tasks import (
49 maybe_send_contributor_form_email,
50 send_account_deletion_report_email,
51 send_email_changed_confirmation_to_new_email,
52)
53from couchers.utils import (
54 Timestamp_from_datetime,
55 dt_from_page_token,
56 dt_to_page_token,
57 is_valid_email,
58 now,
59 to_aware_datetime,
60)
61from proto import account_pb2, account_pb2_grpc, api_pb2, auth_pb2, iris_pb2_grpc, notification_data_pb2
62from proto.google.api import httpbody_pb2
63from proto.internal import jobs_pb2, verification_pb2
65logger = logging.getLogger(__name__)
66logger.setLevel(logging.DEBUG)
68contributeoption2sql = {
69 auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED: None,
70 auth_pb2.CONTRIBUTE_OPTION_YES: ContributeOption.yes,
71 auth_pb2.CONTRIBUTE_OPTION_MAYBE: ContributeOption.maybe,
72 auth_pb2.CONTRIBUTE_OPTION_NO: ContributeOption.no,
73}
75contributeoption2api = {
76 None: auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED,
77 ContributeOption.yes: auth_pb2.CONTRIBUTE_OPTION_YES,
78 ContributeOption.maybe: auth_pb2.CONTRIBUTE_OPTION_MAYBE,
79 ContributeOption.no: auth_pb2.CONTRIBUTE_OPTION_NO,
80}
82MAX_PAGINATION_LENGTH = 50
85def has_strong_verification(session, user):
86 attempt = session.execute(
87 select(StrongVerificationAttempt)
88 .where(StrongVerificationAttempt.user_id == user.id)
89 .where(StrongVerificationAttempt.is_valid)
90 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc())
91 .limit(1)
92 ).scalar_one_or_none()
93 if attempt:
94 assert attempt.is_valid
95 return attempt.has_strong_verification(user)
96 return False
99def mod_note_to_pb(note: ModNote):
100 return account_pb2.ModNote(
101 note_id=note.id,
102 note_content=note.note_content,
103 created=Timestamp_from_datetime(note.created),
104 acknowledged=Timestamp_from_datetime(note.acknowledged) if note.acknowledged else None,
105 )
108def get_strong_verification_fields(session, db_user):
109 out = dict(
110 birthdate_verification_status=api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED,
111 gender_verification_status=api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED,
112 has_strong_verification=False,
113 )
114 attempt = session.execute(
115 select(StrongVerificationAttempt)
116 .where(StrongVerificationAttempt.user_id == db_user.id)
117 .where(StrongVerificationAttempt.is_valid)
118 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc())
119 .limit(1)
120 ).scalar_one_or_none()
121 if attempt:
122 assert attempt.is_valid
123 if attempt.matches_birthdate(db_user):
124 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
125 else:
126 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH
128 if attempt.matches_gender(db_user):
129 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
130 else:
131 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH
133 out["has_strong_verification"] = attempt.has_strong_verification(db_user)
135 assert out["has_strong_verification"] == (
136 out["birthdate_verification_status"] == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
137 and out["gender_verification_status"] == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
138 )
139 return out
142def abort_on_invalid_password(password, context):
143 """
144 Internal utility function: given a password, aborts if password is unforgivably insecure
145 """
146 if len(password) < 8:
147 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_SHORT)
149 if len(password) > 256:
150 # Hey, what are you trying to do? Give us a DDOS attack?
151 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_LONG)
153 # check for most common weak passwords (not meant to be an exhaustive check!)
154 if password.lower() in ("password", "12345678", "couchers", "couchers1"):
155 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INSECURE_PASSWORD)
158class Account(account_pb2_grpc.AccountServicer):
159 def GetAccountInfo(self, request, context, session):
160 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
162 return account_pb2.GetAccountInfoRes(
163 username=user.username,
164 email=user.email,
165 phone=user.phone if (user.phone_is_verified or not user.phone_code_expired) else None,
166 has_donated=user.has_donated,
167 phone_verified=user.phone_is_verified,
168 profile_complete=user.has_completed_profile,
169 timezone=user.timezone,
170 is_superuser=user.is_superuser,
171 **get_strong_verification_fields(session, user),
172 )
174 def ChangePasswordV2(self, request, context, session):
175 """
176 Changes the user's password. They have to confirm their old password just in case.
178 If they didn't have an old password previously, then we don't check that.
179 """
180 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
182 if not verify_password(user.hashed_password, request.old_password):
183 # wrong password
184 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD)
186 abort_on_invalid_password(request.new_password, context)
187 user.hashed_password = hash_password(request.new_password)
189 session.commit()
191 notify(
192 session,
193 user_id=user.id,
194 topic_action="password:change",
195 )
197 return empty_pb2.Empty()
199 def ChangeEmailV2(self, request, context, session):
200 """
201 Change the user's email address.
203 If the user has a password, a notification is sent to the old email, and a confirmation is sent to the new one.
205 Otherwise they need to confirm twice, via an email sent to each of their old and new emails.
207 In all confirmation emails, the user must click on the confirmation link.
208 """
209 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
211 # check password first
212 if not verify_password(user.hashed_password, request.password):
213 # wrong password
214 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD)
216 # not a valid email
217 if not is_valid_email(request.new_email):
218 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL)
220 # email already in use (possibly by this user)
221 if session.execute(select(User).where(User.email == request.new_email)).scalar_one_or_none():
222 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL)
224 user.new_email = request.new_email
225 user.new_email_token = urlsafe_secure_token()
226 user.new_email_token_created = now()
227 user.new_email_token_expiry = now() + timedelta(hours=2)
229 send_email_changed_confirmation_to_new_email(session, user)
231 # will still go into old email
232 notify(
233 session,
234 user_id=user.id,
235 topic_action="email_address:change",
236 data=notification_data_pb2.EmailAddressChange(
237 new_email=request.new_email,
238 ),
239 )
241 # session autocommit
242 return empty_pb2.Empty()
244 def FillContributorForm(self, request, context, session):
245 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
247 form = request.contributor_form
249 form = ContributorForm(
250 user=user,
251 ideas=form.ideas or None,
252 features=form.features or None,
253 experience=form.experience or None,
254 contribute=contributeoption2sql[form.contribute],
255 contribute_ways=form.contribute_ways,
256 expertise=form.expertise or None,
257 )
259 session.add(form)
260 session.flush()
261 maybe_send_contributor_form_email(session, form)
263 user.filled_contributor_form = True
265 return empty_pb2.Empty()
267 def GetContributorFormInfo(self, request, context, session):
268 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
270 return account_pb2.GetContributorFormInfoRes(
271 filled_contributor_form=user.filled_contributor_form,
272 )
274 def ChangePhone(self, request, context, session):
275 phone = request.phone
276 # early quick validation
277 if phone and not is_e164_format(phone):
278 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PHONE)
280 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
281 if not user.has_donated:
282 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_DONATED)
284 if not phone:
285 user.phone = None
286 user.phone_verification_verified = None
287 user.phone_verification_token = None
288 user.phone_verification_attempts = 0
289 return empty_pb2.Empty()
291 if not is_known_operator(phone):
292 context.abort(grpc.StatusCode.UNIMPLEMENTED, errors.UNRECOGNIZED_PHONE_NUMBER)
294 if now() - user.phone_verification_sent < PHONE_REVERIFICATION_INTERVAL:
295 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.REVERIFICATION_TOO_EARLY)
297 token = sms.generate_random_code()
298 result = sms.send_sms(phone, sms.format_message(token))
300 if result == "success":
301 user.phone = phone
302 user.phone_verification_verified = None
303 user.phone_verification_token = token
304 user.phone_verification_sent = now()
305 user.phone_verification_attempts = 0
307 notify(
308 session,
309 user_id=user.id,
310 topic_action="phone_number:change",
311 data=notification_data_pb2.PhoneNumberChange(
312 phone=phone,
313 ),
314 )
316 return empty_pb2.Empty()
318 context.abort(grpc.StatusCode.UNIMPLEMENTED, result)
320 def VerifyPhone(self, request, context, session):
321 if not sms.looks_like_a_code(request.token):
322 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.WRONG_SMS_CODE)
324 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
325 if user.phone_verification_token is None:
326 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION)
328 if now() - user.phone_verification_sent > SMS_CODE_LIFETIME:
329 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION)
331 if user.phone_verification_attempts > SMS_CODE_ATTEMPTS:
332 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.TOO_MANY_SMS_CODE_ATTEMPTS)
334 if not verify_token(request.token, user.phone_verification_token):
335 user.phone_verification_attempts += 1
336 session.commit()
337 context.abort(grpc.StatusCode.NOT_FOUND, errors.WRONG_SMS_CODE)
339 # Delete verifications from everyone else that has this number
340 session.execute(
341 update(User)
342 .where(User.phone == user.phone)
343 .where(User.id != context.user_id)
344 .values(
345 {
346 "phone_verification_verified": None,
347 "phone_verification_attempts": 0,
348 "phone_verification_token": None,
349 "phone": None,
350 }
351 )
352 .execution_options(synchronize_session=False)
353 )
355 user.phone_verification_token = None
356 user.phone_verification_verified = now()
357 user.phone_verification_attempts = 0
359 notify(
360 session,
361 user_id=user.id,
362 topic_action="phone_number:verify",
363 data=notification_data_pb2.PhoneNumberVerify(
364 phone=user.phone,
365 ),
366 )
368 return empty_pb2.Empty()
370 def InitiateStrongVerification(self, request, context, session):
371 if not config["ENABLE_STRONG_VERIFICATION"]:
372 context.abort(grpc.StatusCode.UNAVAILABLE, errors.STRONG_VERIFICATION_DISABLED)
374 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
375 existing_verification = session.execute(
376 select(StrongVerificationAttempt)
377 .where(StrongVerificationAttempt.user_id == user.id)
378 .where(StrongVerificationAttempt.is_valid)
379 ).scalar_one_or_none()
380 if existing_verification:
381 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.STRONG_VERIFICATION_ALREADY_VERIFIED)
383 strong_verification_initiations_counter.labels(user.gender).inc()
385 verification_attempt_token = urlsafe_secure_token()
386 # this is the iris reference data, they will return this on every callback, it also doubles as webhook auth given lack of it otherwise
387 reference = b64encode(
388 simple_encrypt(
389 "iris_callback",
390 verification_pb2.VerificationReferencePayload(
391 verification_attempt_token=verification_attempt_token,
392 user_id=user.id,
393 ).SerializeToString(),
394 )
395 )
396 response = requests.post(
397 "https://passportreader.app/api/v1/session.create",
398 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
399 json={
400 "callback_url": f"{config['BACKEND_BASE_URL']}/iris/webhook",
401 "face_verification": False,
402 "reference": reference,
403 },
404 timeout=10,
405 )
407 if response.status_code != 200:
408 raise Exception(f"Iris didn't return 200: {response.text}")
410 iris_session_id = response.json()["id"]
411 token = response.json()["token"]
412 session.add(
413 StrongVerificationAttempt(
414 user_id=user.id,
415 verification_attempt_token=verification_attempt_token,
416 iris_session_id=iris_session_id,
417 iris_token=token,
418 )
419 )
421 redirect_params = {
422 "token": token,
423 "redirect_url": urls.complete_strong_verification_url(
424 verification_attempt_token=verification_attempt_token
425 ),
426 }
427 redirect_url = "https://passportreader.app/open?" + urlencode(redirect_params)
429 return account_pb2.InitiateStrongVerificationRes(
430 verification_attempt_token=verification_attempt_token,
431 redirect_url=redirect_url,
432 )
434 def GetStrongVerificationAttemptStatus(self, request, context, session):
435 verification_attempt = session.execute(
436 select(StrongVerificationAttempt)
437 .where(StrongVerificationAttempt.user_id == context.user_id)
438 .where(StrongVerificationAttempt.is_visible)
439 .where(StrongVerificationAttempt.verification_attempt_token == request.verification_attempt_token)
440 ).scalar_one_or_none()
441 if not verification_attempt:
442 context.abort(grpc.StatusCode.NOT_FOUND, errors.STRONG_VERIFICATION_ATTEMPT_NOT_FOUND)
443 status_to_pb = {
444 StrongVerificationAttemptStatus.succeeded: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED,
445 StrongVerificationAttemptStatus.in_progress_waiting_on_user_to_open_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP,
446 StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP,
447 StrongVerificationAttemptStatus.in_progress_waiting_on_backend: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND,
448 StrongVerificationAttemptStatus.failed: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_FAILED,
449 }
450 return account_pb2.GetStrongVerificationAttemptStatusRes(
451 status=status_to_pb.get(
452 verification_attempt.status, account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_UNKNOWN
453 ),
454 )
456 def DeleteStrongVerificationData(self, request, context, session):
457 verification_attempts = (
458 session.execute(
459 select(StrongVerificationAttempt)
460 .where(StrongVerificationAttempt.user_id == context.user_id)
461 .where(StrongVerificationAttempt.has_full_data)
462 )
463 .scalars()
464 .all()
465 )
466 for verification_attempt in verification_attempts:
467 verification_attempt.status = StrongVerificationAttemptStatus.deleted
468 verification_attempt.has_full_data = False
469 verification_attempt.passport_encrypted_data = None
470 verification_attempt.passport_date_of_birth = None
471 verification_attempt.passport_sex = None
472 session.flush()
473 # double check:
474 verification_attempts = (
475 session.execute(
476 select(StrongVerificationAttempt)
477 .where(StrongVerificationAttempt.user_id == context.user_id)
478 .where(StrongVerificationAttempt.has_full_data)
479 )
480 .scalars()
481 .all()
482 )
483 assert len(verification_attempts) == 0
485 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
486 strong_verification_data_deletions_counter.labels(user.gender).inc()
488 return empty_pb2.Empty()
490 def DeleteAccount(self, request, context, session):
491 """
492 Triggers email with token to confirm deletion
494 Frontend should confirm via unique string (i.e. username) before this is called
495 """
496 if not request.confirm:
497 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_ACCOUNT_DELETE)
499 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
501 reason = request.reason.strip()
502 if reason:
503 reason = AccountDeletionReason(user_id=user.id, reason=reason)
504 session.add(reason)
505 session.flush()
506 send_account_deletion_report_email(session, reason)
508 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2))
510 notify(
511 session,
512 user_id=user.id,
513 topic_action="account_deletion:start",
514 data=notification_data_pb2.AccountDeletionStart(
515 deletion_token=token.token,
516 ),
517 )
518 session.add(token)
520 account_deletion_initiations_counter.labels(user.gender).inc()
522 return empty_pb2.Empty()
524 def ListModNotes(self, request, context, session):
525 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
527 notes = (
528 session.execute(select(ModNote).where(ModNote.user_id == user.id).order_by(ModNote.created.asc()))
529 .scalars()
530 .all()
531 )
533 return account_pb2.ListModNotesRes(mod_notes=[mod_note_to_pb(note) for note in notes])
535 def ListActiveSessions(self, request, context, session):
536 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
537 page_token = dt_from_page_token(request.page_token) if request.page_token else now()
539 user_sessions = (
540 session.execute(
541 select(UserSession)
542 .where(UserSession.user_id == context.user_id)
543 .where(UserSession.is_valid)
544 .where(UserSession.is_api_key == False)
545 .where(UserSession.last_seen <= page_token)
546 .order_by(UserSession.last_seen.desc())
547 .limit(page_size + 1)
548 )
549 .scalars()
550 .all()
551 )
553 (token, token_expiry) = context.token
555 def _active_session_to_pb(user_session):
556 user_agent = user_agents_parse(user_session.user_agent or "")
557 return account_pb2.ActiveSession(
558 created=Timestamp_from_datetime(user_session.created),
559 expiry=Timestamp_from_datetime(user_session.expiry),
560 last_seen=Timestamp_from_datetime(user_session.last_seen),
561 operating_system=user_agent.os.family,
562 browser=user_agent.browser.family,
563 device=user_agent.device.family,
564 approximate_location=geoip_approximate_location(user_session.ip_address) or "Unknown",
565 is_current_session=user_session.token == token,
566 )
568 return account_pb2.ListActiveSessionsRes(
569 active_sessions=list(map(_active_session_to_pb, user_sessions[:page_size])),
570 next_page_token=dt_to_page_token(user_sessions[-1].last_seen) if len(user_sessions) > page_size else None,
571 )
573 def LogOutSession(self, request, context, session):
574 (token, token_expiry) = context.token
576 session.execute(
577 update(UserSession)
578 .where(UserSession.token != token)
579 .where(UserSession.user_id == context.user_id)
580 .where(UserSession.is_valid)
581 .where(UserSession.is_api_key == False)
582 .where(UserSession.created == to_aware_datetime(request.created))
583 .values(expiry=func.now())
584 .execution_options(synchronize_session=False)
585 )
586 return empty_pb2.Empty()
588 def LogOutOtherSessions(self, request, context, session):
589 if not request.confirm:
590 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS)
592 (token, token_expiry) = context.token
594 session.execute(
595 update(UserSession)
596 .where(UserSession.token != token)
597 .where(UserSession.user_id == context.user_id)
598 .where(UserSession.is_valid)
599 .where(UserSession.is_api_key == False)
600 .values(expiry=func.now())
601 .execution_options(synchronize_session=False)
602 )
603 return empty_pb2.Empty()
606class Iris(iris_pb2_grpc.IrisServicer):
607 def Webhook(self, request, context, session):
608 json_data = json.loads(request.data)
609 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
610 simple_decrypt("iris_callback", b64decode(json_data["session_reference"]))
611 )
612 # if we make it past the decrypt, we consider this webhook authenticated
613 verification_attempt_token = reference_payload.verification_attempt_token
614 user_id = reference_payload.user_id
616 verification_attempt = session.execute(
617 select(StrongVerificationAttempt)
618 .where(StrongVerificationAttempt.user_id == reference_payload.user_id)
619 .where(StrongVerificationAttempt.verification_attempt_token == reference_payload.verification_attempt_token)
620 .where(StrongVerificationAttempt.iris_session_id == json_data["session_id"])
621 ).scalar_one()
622 iris_status = json_data["session_state"]
623 session.add(
624 StrongVerificationCallbackEvent(
625 verification_attempt_id=verification_attempt.id,
626 iris_status=iris_status,
627 )
628 )
629 if iris_status == "INITIATED":
630 # the user opened the session in the app
631 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app
632 elif iris_status == "COMPLETED":
633 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend
634 elif iris_status == "APPROVED":
635 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend
636 session.commit()
637 # background worker will go and sort this one out
638 queue_job(
639 session,
640 job_type="finalize_strong_verification",
641 payload=jobs_pb2.FinalizeStrongVerificationPayload(verification_attempt_id=verification_attempt.id),
642 )
643 elif iris_status in ["FAILED", "ABORTED", "REJECTED"]:
644 verification_attempt.status = StrongVerificationAttemptStatus.failed
646 return httpbody_pb2.HttpBody(
647 content_type="application/json",
648 # json.dumps escapes non-ascii characters
649 data=json.dumps({"success": True}).encode("ascii"),
650 )