Coverage for src/couchers/servicers/account.py: 95%

249 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-11 15:27 +0000

1import json 

2import logging 

3from datetime import timedelta 

4from urllib.parse import urlencode 

5 

6import grpc 

7import requests 

8from google.protobuf import empty_pb2 

9from sqlalchemy.sql import func, update 

10from user_agents import parse as user_agents_parse 

11 

12from couchers import errors, urls 

13from couchers.config import config 

14from couchers.constants import PHONE_REVERIFICATION_INTERVAL, SMS_CODE_ATTEMPTS, SMS_CODE_LIFETIME 

15from couchers.crypto import ( 

16 b64decode, 

17 b64encode, 

18 hash_password, 

19 simple_decrypt, 

20 simple_encrypt, 

21 urlsafe_secure_token, 

22 verify_password, 

23 verify_token, 

24) 

25from couchers.helpers.geoip import geoip_approximate_location 

26from couchers.jobs.enqueue import queue_job 

27from couchers.metrics import ( 

28 account_deletion_initiations_counter, 

29 strong_verification_data_deletions_counter, 

30 strong_verification_initiations_counter, 

31) 

32from couchers.models import ( 

33 AccountDeletionReason, 

34 AccountDeletionToken, 

35 ContributeOption, 

36 ContributorForm, 

37 ModNote, 

38 StrongVerificationAttempt, 

39 StrongVerificationAttemptStatus, 

40 StrongVerificationCallbackEvent, 

41 User, 

42 UserSession, 

43) 

44from couchers.notifications.notify import notify 

45from couchers.phone import sms 

46from couchers.phone.check import is_e164_format, is_known_operator 

47from couchers.sql import couchers_select as select 

48from couchers.tasks import ( 

49 maybe_send_contributor_form_email, 

50 send_account_deletion_report_email, 

51 send_email_changed_confirmation_to_new_email, 

52) 

53from couchers.utils import ( 

54 Timestamp_from_datetime, 

55 dt_from_page_token, 

56 dt_to_page_token, 

57 is_valid_email, 

58 now, 

59 to_aware_datetime, 

60) 

61from proto import account_pb2, account_pb2_grpc, api_pb2, auth_pb2, iris_pb2_grpc, notification_data_pb2 

62from proto.google.api import httpbody_pb2 

63from proto.internal import jobs_pb2, verification_pb2 

64 

65logger = logging.getLogger(__name__) 

66logger.setLevel(logging.DEBUG) 

67 

68contributeoption2sql = { 

69 auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED: None, 

70 auth_pb2.CONTRIBUTE_OPTION_YES: ContributeOption.yes, 

71 auth_pb2.CONTRIBUTE_OPTION_MAYBE: ContributeOption.maybe, 

72 auth_pb2.CONTRIBUTE_OPTION_NO: ContributeOption.no, 

73} 

74 

75contributeoption2api = { 

76 None: auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED, 

77 ContributeOption.yes: auth_pb2.CONTRIBUTE_OPTION_YES, 

78 ContributeOption.maybe: auth_pb2.CONTRIBUTE_OPTION_MAYBE, 

79 ContributeOption.no: auth_pb2.CONTRIBUTE_OPTION_NO, 

80} 

81 

82MAX_PAGINATION_LENGTH = 50 

83 

84 

85def has_strong_verification(session, user): 

86 attempt = session.execute( 

87 select(StrongVerificationAttempt) 

88 .where(StrongVerificationAttempt.user_id == user.id) 

89 .where(StrongVerificationAttempt.is_valid) 

90 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc()) 

91 .limit(1) 

92 ).scalar_one_or_none() 

93 if attempt: 

94 assert attempt.is_valid 

95 return attempt.has_strong_verification(user) 

96 return False 

97 

98 

99def mod_note_to_pb(note: ModNote): 

100 return account_pb2.ModNote( 

101 note_id=note.id, 

102 note_content=note.note_content, 

103 created=Timestamp_from_datetime(note.created), 

104 acknowledged=Timestamp_from_datetime(note.acknowledged) if note.acknowledged else None, 

105 ) 

106 

107 

108def get_strong_verification_fields(session, db_user): 

109 out = dict( 

110 birthdate_verification_status=api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED, 

111 gender_verification_status=api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED, 

112 has_strong_verification=False, 

113 ) 

114 attempt = session.execute( 

115 select(StrongVerificationAttempt) 

116 .where(StrongVerificationAttempt.user_id == db_user.id) 

117 .where(StrongVerificationAttempt.is_valid) 

118 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc()) 

119 .limit(1) 

120 ).scalar_one_or_none() 

121 if attempt: 

122 assert attempt.is_valid 

123 if attempt.matches_birthdate(db_user): 

124 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

125 else: 

126 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH 

127 

128 if attempt.matches_gender(db_user): 

129 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

130 else: 

131 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

132 

133 out["has_strong_verification"] = attempt.has_strong_verification(db_user) 

134 

135 assert out["has_strong_verification"] == ( 

136 out["birthdate_verification_status"] == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

137 and out["gender_verification_status"] == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

138 ) 

139 return out 

140 

141 

142def abort_on_invalid_password(password, context): 

143 """ 

144 Internal utility function: given a password, aborts if password is unforgivably insecure 

145 """ 

146 if len(password) < 8: 

147 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_SHORT) 

148 

149 if len(password) > 256: 

150 # Hey, what are you trying to do? Give us a DDOS attack? 

151 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_LONG) 

152 

153 # check for most common weak passwords (not meant to be an exhaustive check!) 

154 if password.lower() in ("password", "12345678", "couchers", "couchers1"): 

155 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INSECURE_PASSWORD) 

156 

157 

158class Account(account_pb2_grpc.AccountServicer): 

159 def GetAccountInfo(self, request, context, session): 

160 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

161 

162 return account_pb2.GetAccountInfoRes( 

163 username=user.username, 

164 email=user.email, 

165 phone=user.phone if (user.phone_is_verified or not user.phone_code_expired) else None, 

166 has_donated=user.has_donated, 

167 phone_verified=user.phone_is_verified, 

168 profile_complete=user.has_completed_profile, 

169 timezone=user.timezone, 

170 is_superuser=user.is_superuser, 

171 ui_language_preference=user.ui_language_preference, 

172 **get_strong_verification_fields(session, user), 

173 ) 

174 

175 def ChangePasswordV2(self, request, context, session): 

176 """ 

177 Changes the user's password. They have to confirm their old password just in case. 

178 

179 If they didn't have an old password previously, then we don't check that. 

180 """ 

181 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

182 

183 if not verify_password(user.hashed_password, request.old_password): 

184 # wrong password 

185 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD) 

186 

187 abort_on_invalid_password(request.new_password, context) 

188 user.hashed_password = hash_password(request.new_password) 

189 

190 session.commit() 

191 

192 notify( 

193 session, 

194 user_id=user.id, 

195 topic_action="password:change", 

196 ) 

197 

198 return empty_pb2.Empty() 

199 

200 def ChangeEmailV2(self, request, context, session): 

201 """ 

202 Change the user's email address. 

203 

204 If the user has a password, a notification is sent to the old email, and a confirmation is sent to the new one. 

205 

206 Otherwise they need to confirm twice, via an email sent to each of their old and new emails. 

207 

208 In all confirmation emails, the user must click on the confirmation link. 

209 """ 

210 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

211 

212 # check password first 

213 if not verify_password(user.hashed_password, request.password): 

214 # wrong password 

215 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD) 

216 

217 # not a valid email 

218 if not is_valid_email(request.new_email): 

219 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL) 

220 

221 # email already in use (possibly by this user) 

222 if session.execute(select(User).where(User.email == request.new_email)).scalar_one_or_none(): 

223 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL) 

224 

225 user.new_email = request.new_email 

226 user.new_email_token = urlsafe_secure_token() 

227 user.new_email_token_created = now() 

228 user.new_email_token_expiry = now() + timedelta(hours=2) 

229 

230 send_email_changed_confirmation_to_new_email(session, user) 

231 

232 # will still go into old email 

233 notify( 

234 session, 

235 user_id=user.id, 

236 topic_action="email_address:change", 

237 data=notification_data_pb2.EmailAddressChange( 

238 new_email=request.new_email, 

239 ), 

240 ) 

241 

242 # session autocommit 

243 return empty_pb2.Empty() 

244 

245 def ChangeLanguagePreference(self, request, context, session): 

246 # select the user from the db 

247 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

248 

249 # update the user's preference 

250 user.ui_language_preference = request.ui_language_preference 

251 

252 return empty_pb2.Empty() 

253 

254 def FillContributorForm(self, request, context, session): 

255 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

256 

257 form = request.contributor_form 

258 

259 form = ContributorForm( 

260 user=user, 

261 ideas=form.ideas or None, 

262 features=form.features or None, 

263 experience=form.experience or None, 

264 contribute=contributeoption2sql[form.contribute], 

265 contribute_ways=form.contribute_ways, 

266 expertise=form.expertise or None, 

267 ) 

268 

269 session.add(form) 

270 session.flush() 

271 maybe_send_contributor_form_email(session, form) 

272 

273 user.filled_contributor_form = True 

274 

275 return empty_pb2.Empty() 

276 

277 def GetContributorFormInfo(self, request, context, session): 

278 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

279 

280 return account_pb2.GetContributorFormInfoRes( 

281 filled_contributor_form=user.filled_contributor_form, 

282 ) 

283 

284 def ChangePhone(self, request, context, session): 

285 phone = request.phone 

286 # early quick validation 

287 if phone and not is_e164_format(phone): 

288 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PHONE) 

289 

290 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

291 if not user.has_donated: 

292 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_DONATED) 

293 

294 if not phone: 

295 user.phone = None 

296 user.phone_verification_verified = None 

297 user.phone_verification_token = None 

298 user.phone_verification_attempts = 0 

299 return empty_pb2.Empty() 

300 

301 if not is_known_operator(phone): 

302 context.abort(grpc.StatusCode.UNIMPLEMENTED, errors.UNRECOGNIZED_PHONE_NUMBER) 

303 

304 if now() - user.phone_verification_sent < PHONE_REVERIFICATION_INTERVAL: 

305 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.REVERIFICATION_TOO_EARLY) 

306 

307 token = sms.generate_random_code() 

308 result = sms.send_sms(phone, sms.format_message(token)) 

309 

310 if result == "success": 

311 user.phone = phone 

312 user.phone_verification_verified = None 

313 user.phone_verification_token = token 

314 user.phone_verification_sent = now() 

315 user.phone_verification_attempts = 0 

316 

317 notify( 

318 session, 

319 user_id=user.id, 

320 topic_action="phone_number:change", 

321 data=notification_data_pb2.PhoneNumberChange( 

322 phone=phone, 

323 ), 

324 ) 

325 

326 return empty_pb2.Empty() 

327 

328 context.abort(grpc.StatusCode.UNIMPLEMENTED, result) 

329 

330 def VerifyPhone(self, request, context, session): 

331 if not sms.looks_like_a_code(request.token): 

332 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.WRONG_SMS_CODE) 

333 

334 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

335 if user.phone_verification_token is None: 

336 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION) 

337 

338 if now() - user.phone_verification_sent > SMS_CODE_LIFETIME: 

339 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION) 

340 

341 if user.phone_verification_attempts > SMS_CODE_ATTEMPTS: 

342 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.TOO_MANY_SMS_CODE_ATTEMPTS) 

343 

344 if not verify_token(request.token, user.phone_verification_token): 

345 user.phone_verification_attempts += 1 

346 session.commit() 

347 context.abort(grpc.StatusCode.NOT_FOUND, errors.WRONG_SMS_CODE) 

348 

349 # Delete verifications from everyone else that has this number 

350 session.execute( 

351 update(User) 

352 .where(User.phone == user.phone) 

353 .where(User.id != context.user_id) 

354 .values( 

355 { 

356 "phone_verification_verified": None, 

357 "phone_verification_attempts": 0, 

358 "phone_verification_token": None, 

359 "phone": None, 

360 } 

361 ) 

362 .execution_options(synchronize_session=False) 

363 ) 

364 

365 user.phone_verification_token = None 

366 user.phone_verification_verified = now() 

367 user.phone_verification_attempts = 0 

368 

369 notify( 

370 session, 

371 user_id=user.id, 

372 topic_action="phone_number:verify", 

373 data=notification_data_pb2.PhoneNumberVerify( 

374 phone=user.phone, 

375 ), 

376 ) 

377 

378 return empty_pb2.Empty() 

379 

380 def InitiateStrongVerification(self, request, context, session): 

381 if not config["ENABLE_STRONG_VERIFICATION"]: 

382 context.abort(grpc.StatusCode.UNAVAILABLE, errors.STRONG_VERIFICATION_DISABLED) 

383 

384 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

385 existing_verification = session.execute( 

386 select(StrongVerificationAttempt) 

387 .where(StrongVerificationAttempt.user_id == user.id) 

388 .where(StrongVerificationAttempt.is_valid) 

389 ).scalar_one_or_none() 

390 if existing_verification: 

391 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.STRONG_VERIFICATION_ALREADY_VERIFIED) 

392 

393 strong_verification_initiations_counter.labels(user.gender).inc() 

394 

395 verification_attempt_token = urlsafe_secure_token() 

396 # this is the iris reference data, they will return this on every callback, it also doubles as webhook auth given lack of it otherwise 

397 reference = b64encode( 

398 simple_encrypt( 

399 "iris_callback", 

400 verification_pb2.VerificationReferencePayload( 

401 verification_attempt_token=verification_attempt_token, 

402 user_id=user.id, 

403 ).SerializeToString(), 

404 ) 

405 ) 

406 response = requests.post( 

407 "https://passportreader.app/api/v1/session.create", 

408 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

409 json={ 

410 "callback_url": f"{config['BACKEND_BASE_URL']}/iris/webhook", 

411 "face_verification": False, 

412 "reference": reference, 

413 }, 

414 timeout=10, 

415 ) 

416 

417 if response.status_code != 200: 

418 raise Exception(f"Iris didn't return 200: {response.text}") 

419 

420 iris_session_id = response.json()["id"] 

421 token = response.json()["token"] 

422 session.add( 

423 StrongVerificationAttempt( 

424 user_id=user.id, 

425 verification_attempt_token=verification_attempt_token, 

426 iris_session_id=iris_session_id, 

427 iris_token=token, 

428 ) 

429 ) 

430 

431 redirect_params = { 

432 "token": token, 

433 "redirect_url": urls.complete_strong_verification_url( 

434 verification_attempt_token=verification_attempt_token 

435 ), 

436 } 

437 redirect_url = "https://passportreader.app/open?" + urlencode(redirect_params) 

438 

439 return account_pb2.InitiateStrongVerificationRes( 

440 verification_attempt_token=verification_attempt_token, 

441 redirect_url=redirect_url, 

442 ) 

443 

444 def GetStrongVerificationAttemptStatus(self, request, context, session): 

445 verification_attempt = session.execute( 

446 select(StrongVerificationAttempt) 

447 .where(StrongVerificationAttempt.user_id == context.user_id) 

448 .where(StrongVerificationAttempt.is_visible) 

449 .where(StrongVerificationAttempt.verification_attempt_token == request.verification_attempt_token) 

450 ).scalar_one_or_none() 

451 if not verification_attempt: 

452 context.abort(grpc.StatusCode.NOT_FOUND, errors.STRONG_VERIFICATION_ATTEMPT_NOT_FOUND) 

453 status_to_pb = { 

454 StrongVerificationAttemptStatus.succeeded: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED, 

455 StrongVerificationAttemptStatus.in_progress_waiting_on_user_to_open_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP, 

456 StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP, 

457 StrongVerificationAttemptStatus.in_progress_waiting_on_backend: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND, 

458 StrongVerificationAttemptStatus.failed: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_FAILED, 

459 } 

460 return account_pb2.GetStrongVerificationAttemptStatusRes( 

461 status=status_to_pb.get( 

462 verification_attempt.status, account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_UNKNOWN 

463 ), 

464 ) 

465 

466 def DeleteStrongVerificationData(self, request, context, session): 

467 verification_attempts = ( 

468 session.execute( 

469 select(StrongVerificationAttempt) 

470 .where(StrongVerificationAttempt.user_id == context.user_id) 

471 .where(StrongVerificationAttempt.has_full_data) 

472 ) 

473 .scalars() 

474 .all() 

475 ) 

476 for verification_attempt in verification_attempts: 

477 verification_attempt.status = StrongVerificationAttemptStatus.deleted 

478 verification_attempt.has_full_data = False 

479 verification_attempt.passport_encrypted_data = None 

480 verification_attempt.passport_date_of_birth = None 

481 verification_attempt.passport_sex = None 

482 session.flush() 

483 # double check: 

484 verification_attempts = ( 

485 session.execute( 

486 select(StrongVerificationAttempt) 

487 .where(StrongVerificationAttempt.user_id == context.user_id) 

488 .where(StrongVerificationAttempt.has_full_data) 

489 ) 

490 .scalars() 

491 .all() 

492 ) 

493 assert len(verification_attempts) == 0 

494 

495 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

496 strong_verification_data_deletions_counter.labels(user.gender).inc() 

497 

498 return empty_pb2.Empty() 

499 

500 def DeleteAccount(self, request, context, session): 

501 """ 

502 Triggers email with token to confirm deletion 

503 

504 Frontend should confirm via unique string (i.e. username) before this is called 

505 """ 

506 if not request.confirm: 

507 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_ACCOUNT_DELETE) 

508 

509 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

510 

511 reason = request.reason.strip() 

512 if reason: 

513 reason = AccountDeletionReason(user_id=user.id, reason=reason) 

514 session.add(reason) 

515 session.flush() 

516 send_account_deletion_report_email(session, reason) 

517 

518 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2)) 

519 

520 notify( 

521 session, 

522 user_id=user.id, 

523 topic_action="account_deletion:start", 

524 data=notification_data_pb2.AccountDeletionStart( 

525 deletion_token=token.token, 

526 ), 

527 ) 

528 session.add(token) 

529 

530 account_deletion_initiations_counter.labels(user.gender).inc() 

531 

532 return empty_pb2.Empty() 

533 

534 def ListModNotes(self, request, context, session): 

535 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

536 

537 notes = ( 

538 session.execute(select(ModNote).where(ModNote.user_id == user.id).order_by(ModNote.created.asc())) 

539 .scalars() 

540 .all() 

541 ) 

542 

543 return account_pb2.ListModNotesRes(mod_notes=[mod_note_to_pb(note) for note in notes]) 

544 

545 def ListActiveSessions(self, request, context, session): 

546 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

547 page_token = dt_from_page_token(request.page_token) if request.page_token else now() 

548 

549 user_sessions = ( 

550 session.execute( 

551 select(UserSession) 

552 .where(UserSession.user_id == context.user_id) 

553 .where(UserSession.is_valid) 

554 .where(UserSession.is_api_key == False) 

555 .where(UserSession.last_seen <= page_token) 

556 .order_by(UserSession.last_seen.desc()) 

557 .limit(page_size + 1) 

558 ) 

559 .scalars() 

560 .all() 

561 ) 

562 

563 (token, token_expiry) = context.token 

564 

565 def _active_session_to_pb(user_session): 

566 user_agent = user_agents_parse(user_session.user_agent or "") 

567 return account_pb2.ActiveSession( 

568 created=Timestamp_from_datetime(user_session.created), 

569 expiry=Timestamp_from_datetime(user_session.expiry), 

570 last_seen=Timestamp_from_datetime(user_session.last_seen), 

571 operating_system=user_agent.os.family, 

572 browser=user_agent.browser.family, 

573 device=user_agent.device.family, 

574 approximate_location=geoip_approximate_location(user_session.ip_address) or "Unknown", 

575 is_current_session=user_session.token == token, 

576 ) 

577 

578 return account_pb2.ListActiveSessionsRes( 

579 active_sessions=list(map(_active_session_to_pb, user_sessions[:page_size])), 

580 next_page_token=dt_to_page_token(user_sessions[-1].last_seen) if len(user_sessions) > page_size else None, 

581 ) 

582 

583 def LogOutSession(self, request, context, session): 

584 (token, token_expiry) = context.token 

585 

586 session.execute( 

587 update(UserSession) 

588 .where(UserSession.token != token) 

589 .where(UserSession.user_id == context.user_id) 

590 .where(UserSession.is_valid) 

591 .where(UserSession.is_api_key == False) 

592 .where(UserSession.created == to_aware_datetime(request.created)) 

593 .values(expiry=func.now()) 

594 .execution_options(synchronize_session=False) 

595 ) 

596 return empty_pb2.Empty() 

597 

598 def LogOutOtherSessions(self, request, context, session): 

599 if not request.confirm: 

600 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS) 

601 

602 (token, token_expiry) = context.token 

603 

604 session.execute( 

605 update(UserSession) 

606 .where(UserSession.token != token) 

607 .where(UserSession.user_id == context.user_id) 

608 .where(UserSession.is_valid) 

609 .where(UserSession.is_api_key == False) 

610 .values(expiry=func.now()) 

611 .execution_options(synchronize_session=False) 

612 ) 

613 return empty_pb2.Empty() 

614 

615 

616class Iris(iris_pb2_grpc.IrisServicer): 

617 def Webhook(self, request, context, session): 

618 json_data = json.loads(request.data) 

619 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

620 simple_decrypt("iris_callback", b64decode(json_data["session_reference"])) 

621 ) 

622 # if we make it past the decrypt, we consider this webhook authenticated 

623 verification_attempt_token = reference_payload.verification_attempt_token 

624 user_id = reference_payload.user_id 

625 

626 verification_attempt = session.execute( 

627 select(StrongVerificationAttempt) 

628 .where(StrongVerificationAttempt.user_id == reference_payload.user_id) 

629 .where(StrongVerificationAttempt.verification_attempt_token == reference_payload.verification_attempt_token) 

630 .where(StrongVerificationAttempt.iris_session_id == json_data["session_id"]) 

631 ).scalar_one() 

632 iris_status = json_data["session_state"] 

633 session.add( 

634 StrongVerificationCallbackEvent( 

635 verification_attempt_id=verification_attempt.id, 

636 iris_status=iris_status, 

637 ) 

638 ) 

639 if iris_status == "INITIATED": 

640 # the user opened the session in the app 

641 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app 

642 elif iris_status == "COMPLETED": 

643 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend 

644 elif iris_status == "APPROVED": 

645 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend 

646 session.commit() 

647 # background worker will go and sort this one out 

648 queue_job( 

649 session, 

650 job_type="finalize_strong_verification", 

651 payload=jobs_pb2.FinalizeStrongVerificationPayload(verification_attempt_id=verification_attempt.id), 

652 ) 

653 elif iris_status in ["FAILED", "ABORTED", "REJECTED"]: 

654 verification_attempt.status = StrongVerificationAttemptStatus.failed 

655 

656 return httpbody_pb2.HttpBody( 

657 content_type="application/json", 

658 # json.dumps escapes non-ascii characters 

659 data=json.dumps({"success": True}).encode("ascii"), 

660 )