Coverage for src/couchers/servicers/account.py: 95%
245 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-02 20:27 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-02 20:27 +0000
1import json
2import logging
3from datetime import timedelta
5import grpc
6import requests
7from google.protobuf import empty_pb2
8from sqlalchemy.sql import func, update
9from user_agents import parse as user_agents_parse
11from couchers import errors
12from couchers.config import config
13from couchers.constants import PHONE_REVERIFICATION_INTERVAL, SMS_CODE_ATTEMPTS, SMS_CODE_LIFETIME
14from couchers.crypto import (
15 b64decode,
16 b64encode,
17 hash_password,
18 simple_decrypt,
19 simple_encrypt,
20 urlsafe_secure_token,
21 verify_password,
22 verify_token,
23)
24from couchers.helpers.geoip import geoip_approximate_location
25from couchers.jobs.enqueue import queue_job
26from couchers.metrics import (
27 account_deletion_initiations_counter,
28 strong_verification_completions_counter,
29 strong_verification_data_deletions_counter,
30 strong_verification_initiations_counter,
31)
32from couchers.models import (
33 AccountDeletionReason,
34 AccountDeletionToken,
35 ContributeOption,
36 ContributorForm,
37 ModNote,
38 StrongVerificationAttempt,
39 StrongVerificationAttemptStatus,
40 StrongVerificationCallbackEvent,
41 User,
42 UserSession,
43)
44from couchers.notifications.notify import notify
45from couchers.phone import sms
46from couchers.phone.check import is_e164_format, is_known_operator
47from couchers.sql import couchers_select as select
48from couchers.tasks import (
49 maybe_send_contributor_form_email,
50 send_account_deletion_report_email,
51 send_email_changed_confirmation_to_new_email,
52)
53from couchers.utils import (
54 Timestamp_from_datetime,
55 dt_from_page_token,
56 dt_to_page_token,
57 is_valid_email,
58 now,
59 to_aware_datetime,
60)
61from proto import account_pb2, account_pb2_grpc, api_pb2, auth_pb2, iris_pb2_grpc, notification_data_pb2
62from proto.google.api import httpbody_pb2
63from proto.internal import jobs_pb2, verification_pb2
65logger = logging.getLogger(__name__)
66logger.setLevel(logging.DEBUG)
68contributeoption2sql = {
69 auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED: None,
70 auth_pb2.CONTRIBUTE_OPTION_YES: ContributeOption.yes,
71 auth_pb2.CONTRIBUTE_OPTION_MAYBE: ContributeOption.maybe,
72 auth_pb2.CONTRIBUTE_OPTION_NO: ContributeOption.no,
73}
75contributeoption2api = {
76 None: auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED,
77 ContributeOption.yes: auth_pb2.CONTRIBUTE_OPTION_YES,
78 ContributeOption.maybe: auth_pb2.CONTRIBUTE_OPTION_MAYBE,
79 ContributeOption.no: auth_pb2.CONTRIBUTE_OPTION_NO,
80}
82MAX_PAGINATION_LENGTH = 50
85def has_strong_verification(session, user):
86 attempt = session.execute(
87 select(StrongVerificationAttempt)
88 .where(StrongVerificationAttempt.user_id == user.id)
89 .where(StrongVerificationAttempt.is_valid)
90 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc())
91 .limit(1)
92 ).scalar_one_or_none()
93 if attempt:
94 assert attempt.is_valid
95 return attempt.has_strong_verification(user)
96 return False
99def mod_note_to_pb(note: ModNote):
100 return account_pb2.ModNote(
101 note_id=note.id,
102 note_content=note.note_content,
103 created=Timestamp_from_datetime(note.created),
104 acknowledged=Timestamp_from_datetime(note.acknowledged) if note.acknowledged else None,
105 )
108def get_strong_verification_fields(session, db_user):
109 out = dict(
110 birthdate_verification_status=api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED,
111 gender_verification_status=api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED,
112 has_strong_verification=False,
113 )
114 attempt = session.execute(
115 select(StrongVerificationAttempt)
116 .where(StrongVerificationAttempt.user_id == db_user.id)
117 .where(StrongVerificationAttempt.is_valid)
118 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc())
119 .limit(1)
120 ).scalar_one_or_none()
121 if attempt:
122 assert attempt.is_valid
123 if attempt.matches_birthdate(db_user):
124 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
125 else:
126 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH
128 if attempt.matches_gender(db_user):
129 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
130 else:
131 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH
133 out["has_strong_verification"] = attempt.has_strong_verification(db_user)
135 assert out["has_strong_verification"] == (
136 out["birthdate_verification_status"] == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED
137 and out["gender_verification_status"] == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED
138 )
139 return out
142def abort_on_invalid_password(password, context):
143 """
144 Internal utility function: given a password, aborts if password is unforgivably insecure
145 """
146 if len(password) < 8:
147 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_SHORT)
149 if len(password) > 256:
150 # Hey, what are you trying to do? Give us a DDOS attack?
151 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_LONG)
153 # check for most common weak passwords (not meant to be an exhaustive check!)
154 if password.lower() in ("password", "12345678", "couchers", "couchers1"):
155 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INSECURE_PASSWORD)
158class Account(account_pb2_grpc.AccountServicer):
159 def GetAccountInfo(self, request, context, session):
160 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
162 return account_pb2.GetAccountInfoRes(
163 username=user.username,
164 email=user.email,
165 phone=user.phone if (user.phone_is_verified or not user.phone_code_expired) else None,
166 has_donated=user.has_donated,
167 phone_verified=user.phone_is_verified,
168 profile_complete=user.has_completed_profile,
169 timezone=user.timezone,
170 is_superuser=user.is_superuser,
171 **get_strong_verification_fields(session, user),
172 )
174 def ChangePasswordV2(self, request, context, session):
175 """
176 Changes the user's password. They have to confirm their old password just in case.
178 If they didn't have an old password previously, then we don't check that.
179 """
180 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
182 if not verify_password(user.hashed_password, request.old_password):
183 # wrong password
184 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD)
186 abort_on_invalid_password(request.new_password, context)
187 user.hashed_password = hash_password(request.new_password)
189 session.commit()
191 notify(
192 session,
193 user_id=user.id,
194 topic_action="password:change",
195 )
197 return empty_pb2.Empty()
199 def ChangeEmailV2(self, request, context, session):
200 """
201 Change the user's email address.
203 If the user has a password, a notification is sent to the old email, and a confirmation is sent to the new one.
205 Otherwise they need to confirm twice, via an email sent to each of their old and new emails.
207 In all confirmation emails, the user must click on the confirmation link.
208 """
209 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
211 # check password first
212 if not verify_password(user.hashed_password, request.password):
213 # wrong password
214 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD)
216 # not a valid email
217 if not is_valid_email(request.new_email):
218 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL)
220 # email already in use (possibly by this user)
221 if session.execute(select(User).where(User.email == request.new_email)).scalar_one_or_none():
222 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL)
224 user.new_email = request.new_email
225 user.new_email_token = urlsafe_secure_token()
226 user.new_email_token_created = now()
227 user.new_email_token_expiry = now() + timedelta(hours=2)
229 send_email_changed_confirmation_to_new_email(session, user)
231 # will still go into old email
232 notify(
233 session,
234 user_id=user.id,
235 topic_action="email_address:change",
236 data=notification_data_pb2.EmailAddressChange(
237 new_email=request.new_email,
238 ),
239 )
241 # session autocommit
242 return empty_pb2.Empty()
244 def FillContributorForm(self, request, context, session):
245 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
247 form = request.contributor_form
249 form = ContributorForm(
250 user=user,
251 ideas=form.ideas or None,
252 features=form.features or None,
253 experience=form.experience or None,
254 contribute=contributeoption2sql[form.contribute],
255 contribute_ways=form.contribute_ways,
256 expertise=form.expertise or None,
257 )
259 session.add(form)
260 session.flush()
261 maybe_send_contributor_form_email(session, form)
263 user.filled_contributor_form = True
265 return empty_pb2.Empty()
267 def GetContributorFormInfo(self, request, context, session):
268 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
270 return account_pb2.GetContributorFormInfoRes(
271 filled_contributor_form=user.filled_contributor_form,
272 )
274 def ChangePhone(self, request, context, session):
275 phone = request.phone
276 # early quick validation
277 if phone and not is_e164_format(phone):
278 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PHONE)
280 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
281 if not user.has_donated:
282 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_DONATED)
284 if not phone:
285 user.phone = None
286 user.phone_verification_verified = None
287 user.phone_verification_token = None
288 user.phone_verification_attempts = 0
289 return empty_pb2.Empty()
291 if not is_known_operator(phone):
292 context.abort(grpc.StatusCode.UNIMPLEMENTED, errors.UNRECOGNIZED_PHONE_NUMBER)
294 if now() - user.phone_verification_sent < PHONE_REVERIFICATION_INTERVAL:
295 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.REVERIFICATION_TOO_EARLY)
297 token = sms.generate_random_code()
298 result = sms.send_sms(phone, sms.format_message(token))
300 if result == "success":
301 user.phone = phone
302 user.phone_verification_verified = None
303 user.phone_verification_token = token
304 user.phone_verification_sent = now()
305 user.phone_verification_attempts = 0
307 notify(
308 session,
309 user_id=user.id,
310 topic_action="phone_number:change",
311 data=notification_data_pb2.PhoneNumberChange(
312 phone=phone,
313 ),
314 )
316 return empty_pb2.Empty()
318 context.abort(grpc.StatusCode.UNIMPLEMENTED, result)
320 def VerifyPhone(self, request, context, session):
321 if not sms.looks_like_a_code(request.token):
322 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.WRONG_SMS_CODE)
324 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
325 if user.phone_verification_token is None:
326 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION)
328 if now() - user.phone_verification_sent > SMS_CODE_LIFETIME:
329 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION)
331 if user.phone_verification_attempts > SMS_CODE_ATTEMPTS:
332 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.TOO_MANY_SMS_CODE_ATTEMPTS)
334 if not verify_token(request.token, user.phone_verification_token):
335 user.phone_verification_attempts += 1
336 session.commit()
337 context.abort(grpc.StatusCode.NOT_FOUND, errors.WRONG_SMS_CODE)
339 # Delete verifications from everyone else that has this number
340 session.execute(
341 update(User)
342 .where(User.phone == user.phone)
343 .where(User.id != context.user_id)
344 .values(
345 {
346 "phone_verification_verified": None,
347 "phone_verification_attempts": 0,
348 "phone_verification_token": None,
349 "phone": None,
350 }
351 )
352 .execution_options(synchronize_session=False)
353 )
355 user.phone_verification_token = None
356 user.phone_verification_verified = now()
357 user.phone_verification_attempts = 0
359 notify(
360 session,
361 user_id=user.id,
362 topic_action="phone_number:verify",
363 data=notification_data_pb2.PhoneNumberVerify(
364 phone=user.phone,
365 ),
366 )
368 return empty_pb2.Empty()
370 def InitiateStrongVerification(self, request, context, session):
371 if not config["ENABLE_STRONG_VERIFICATION"]:
372 context.abort(grpc.StatusCode.UNAVAILABLE, errors.STRONG_VERIFICATION_DISABLED)
374 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
375 existing_verification = session.execute(
376 select(StrongVerificationAttempt)
377 .where(StrongVerificationAttempt.user_id == user.id)
378 .where(StrongVerificationAttempt.is_valid)
379 ).scalar_one_or_none()
380 if existing_verification:
381 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.STRONG_VERIFICATION_ALREADY_VERIFIED)
383 strong_verification_initiations_counter.labels(user.gender).inc()
385 verification_attempt_token = urlsafe_secure_token()
386 # this is the iris reference data, they will return this on every callback, it also doubles as webhook auth given lack of it otherwise
387 reference = b64encode(
388 simple_encrypt(
389 "iris_callback",
390 verification_pb2.VerificationReferencePayload(
391 verification_attempt_token=verification_attempt_token,
392 user_id=user.id,
393 ).SerializeToString(),
394 )
395 )
396 response = requests.post(
397 "https://passportreader.app/api/v1/session.create",
398 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]),
399 json={
400 "callback_url": f"{config['BACKEND_BASE_URL']}/iris/webhook",
401 "face_verification": False,
402 "reference": reference,
403 },
404 timeout=10,
405 )
406 if response.status_code != 200:
407 raise Exception(f"Iris didn't return 200: {response.text}")
408 iris_session_id = response.json()["id"]
409 token = response.json()["token"]
410 url = f"iris:///?token={token}"
411 verification_attempt = StrongVerificationAttempt(
412 user_id=user.id,
413 verification_attempt_token=verification_attempt_token,
414 iris_session_id=iris_session_id,
415 iris_token=token,
416 )
417 session.add(verification_attempt)
419 return account_pb2.InitiateStrongVerificationRes(
420 verification_attempt_token=verification_attempt_token,
421 iris_url=url,
422 )
424 def GetStrongVerificationAttemptStatus(self, request, context, session):
425 verification_attempt = session.execute(
426 select(StrongVerificationAttempt)
427 .where(StrongVerificationAttempt.user_id == context.user_id)
428 .where(StrongVerificationAttempt.is_visible)
429 .where(StrongVerificationAttempt.verification_attempt_token == request.verification_attempt_token)
430 ).scalar_one_or_none()
431 if not verification_attempt:
432 context.abort(grpc.StatusCode.NOT_FOUND, errors.STRONG_VERIFICATION_ATTEMPT_NOT_FOUND)
433 status_to_pb = {
434 StrongVerificationAttemptStatus.succeeded: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED,
435 StrongVerificationAttemptStatus.in_progress_waiting_on_user_to_open_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP,
436 StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP,
437 StrongVerificationAttemptStatus.in_progress_waiting_on_backend: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND,
438 StrongVerificationAttemptStatus.failed: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_FAILED,
439 }
440 return account_pb2.GetStrongVerificationAttemptStatusRes(
441 status=status_to_pb.get(
442 verification_attempt.status, account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_UNKNOWN
443 ),
444 )
446 def DeleteStrongVerificationData(self, request, context, session):
447 verification_attempts = (
448 session.execute(
449 select(StrongVerificationAttempt)
450 .where(StrongVerificationAttempt.user_id == context.user_id)
451 .where(StrongVerificationAttempt.has_full_data)
452 )
453 .scalars()
454 .all()
455 )
456 for verification_attempt in verification_attempts:
457 verification_attempt.status = StrongVerificationAttemptStatus.deleted
458 verification_attempt.has_full_data = False
459 verification_attempt.passport_encrypted_data = None
460 verification_attempt.passport_date_of_birth = None
461 verification_attempt.passport_sex = None
462 session.flush()
463 # double check:
464 verification_attempts = (
465 session.execute(
466 select(StrongVerificationAttempt)
467 .where(StrongVerificationAttempt.user_id == context.user_id)
468 .where(StrongVerificationAttempt.has_full_data)
469 )
470 .scalars()
471 .all()
472 )
473 assert len(verification_attempts) == 0
475 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
476 strong_verification_data_deletions_counter.labels(user.gender).inc()
478 return empty_pb2.Empty()
480 def DeleteAccount(self, request, context, session):
481 """
482 Triggers email with token to confirm deletion
484 Frontend should confirm via unique string (i.e. username) before this is called
485 """
486 if not request.confirm:
487 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_ACCOUNT_DELETE)
489 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
491 reason = request.reason.strip()
492 if reason:
493 reason = AccountDeletionReason(user_id=user.id, reason=reason)
494 session.add(reason)
495 session.flush()
496 send_account_deletion_report_email(session, reason)
498 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2))
500 notify(
501 session,
502 user_id=user.id,
503 topic_action="account_deletion:start",
504 data=notification_data_pb2.AccountDeletionStart(
505 deletion_token=token.token,
506 ),
507 )
508 session.add(token)
510 account_deletion_initiations_counter.labels(user.gender).inc()
512 return empty_pb2.Empty()
514 def ListModNotes(self, request, context, session):
515 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
517 notes = (
518 session.execute(select(ModNote).where(ModNote.user_id == user.id).order_by(ModNote.created.asc()))
519 .scalars()
520 .all()
521 )
523 return account_pb2.ListModNotesRes(mod_notes=[mod_note_to_pb(note) for note in notes])
525 def ListActiveSessions(self, request, context, session):
526 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
527 page_token = dt_from_page_token(request.page_token) if request.page_token else now()
529 user_sessions = (
530 session.execute(
531 select(UserSession)
532 .where(UserSession.user_id == context.user_id)
533 .where(UserSession.is_valid)
534 .where(UserSession.is_api_key == False)
535 .where(UserSession.last_seen <= page_token)
536 .order_by(UserSession.last_seen.desc())
537 .limit(page_size + 1)
538 )
539 .scalars()
540 .all()
541 )
543 (token, token_expiry) = context.token
545 def _active_session_to_pb(user_session):
546 user_agent = user_agents_parse(user_session.user_agent or "")
547 return account_pb2.ActiveSession(
548 created=Timestamp_from_datetime(user_session.created),
549 expiry=Timestamp_from_datetime(user_session.expiry),
550 last_seen=Timestamp_from_datetime(user_session.last_seen),
551 operating_system=user_agent.os.family,
552 browser=user_agent.browser.family,
553 device=user_agent.device.family,
554 approximate_location=geoip_approximate_location(user_session.ip_address) or "Unknown",
555 is_current_session=user_session.token == token,
556 )
558 return account_pb2.ListActiveSessionsRes(
559 active_sessions=list(map(_active_session_to_pb, user_sessions[:page_size])),
560 next_page_token=dt_to_page_token(user_sessions[-1].last_seen) if len(user_sessions) > page_size else None,
561 )
563 def LogOutSession(self, request, context, session):
564 (token, token_expiry) = context.token
566 session.execute(
567 update(UserSession)
568 .where(UserSession.token != token)
569 .where(UserSession.user_id == context.user_id)
570 .where(UserSession.is_valid)
571 .where(UserSession.is_api_key == False)
572 .where(UserSession.created == to_aware_datetime(request.created))
573 .values(expiry=func.now())
574 .execution_options(synchronize_session=False)
575 )
576 return empty_pb2.Empty()
578 def LogOutOtherSessions(self, request, context, session):
579 if not request.confirm:
580 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS)
582 (token, token_expiry) = context.token
584 session.execute(
585 update(UserSession)
586 .where(UserSession.token != token)
587 .where(UserSession.user_id == context.user_id)
588 .where(UserSession.is_valid)
589 .where(UserSession.is_api_key == False)
590 .values(expiry=func.now())
591 .execution_options(synchronize_session=False)
592 )
593 return empty_pb2.Empty()
596class Iris(iris_pb2_grpc.IrisServicer):
597 def Webhook(self, request, context, session):
598 json_data = json.loads(request.data)
599 reference_payload = verification_pb2.VerificationReferencePayload.FromString(
600 simple_decrypt("iris_callback", b64decode(json_data["session_referenace"]))
601 )
602 # if we make it past the decrypt, we consider this webhook authenticated
603 verification_attempt_token = reference_payload.verification_attempt_token
604 user_id = reference_payload.user_id
606 verification_attempt = session.execute(
607 select(StrongVerificationAttempt)
608 .where(StrongVerificationAttempt.user_id == reference_payload.user_id)
609 .where(StrongVerificationAttempt.verification_attempt_token == reference_payload.verification_attempt_token)
610 .where(StrongVerificationAttempt.iris_session_id == json_data["session_id"])
611 ).scalar_one()
612 iris_status = json_data["session_state"]
613 session.add(
614 StrongVerificationCallbackEvent(
615 verification_attempt_id=verification_attempt.id,
616 iris_status=iris_status,
617 )
618 )
619 if iris_status == "INITIATED":
620 # the user opened the session in the app
621 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app
622 elif iris_status == "COMPLETED":
623 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend
624 elif iris_status == "APPROVED":
625 strong_verification_completions_counter.inc()
626 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend
627 session.commit()
628 # background worker will go and sort this one out
629 queue_job(
630 session,
631 job_type="finalize_strong_verification",
632 payload=jobs_pb2.FinalizeStrongVerificationPayload(verification_attempt_id=verification_attempt.id),
633 )
634 elif iris_status in ["FAILED", "ABORTED", "REJECTED"]:
635 verification_attempt.status = StrongVerificationAttemptStatus.failed
637 return httpbody_pb2.HttpBody(
638 content_type="application/json",
639 # json.dumps escapes non-ascii characters
640 data=json.dumps({"success": True}).encode("ascii"),
641 )