Coverage for src/couchers/servicers/account.py: 94%

257 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1import json 

2import logging 

3from datetime import timedelta 

4from urllib.parse import urlencode 

5 

6import grpc 

7import requests 

8from google.protobuf import empty_pb2 

9from sqlalchemy.sql import func, update 

10from user_agents import parse as user_agents_parse 

11 

12from couchers import errors, urls 

13from couchers.config import config 

14from couchers.constants import PHONE_REVERIFICATION_INTERVAL, SMS_CODE_ATTEMPTS, SMS_CODE_LIFETIME 

15from couchers.crypto import ( 

16 b64decode, 

17 b64encode, 

18 hash_password, 

19 simple_decrypt, 

20 simple_encrypt, 

21 urlsafe_secure_token, 

22 verify_password, 

23 verify_token, 

24) 

25from couchers.helpers.geoip import geoip_approximate_location 

26from couchers.jobs.enqueue import queue_job 

27from couchers.metrics import ( 

28 account_deletion_initiations_counter, 

29 strong_verification_data_deletions_counter, 

30 strong_verification_initiations_counter, 

31) 

32from couchers.models import ( 

33 AccountDeletionReason, 

34 AccountDeletionToken, 

35 ContributeOption, 

36 ContributorForm, 

37 ModNote, 

38 ProfilePublicVisibility, 

39 StrongVerificationAttempt, 

40 StrongVerificationAttemptStatus, 

41 StrongVerificationCallbackEvent, 

42 User, 

43 UserSession, 

44) 

45from couchers.notifications.notify import notify 

46from couchers.phone import sms 

47from couchers.phone.check import is_e164_format, is_known_operator 

48from couchers.sql import couchers_select as select 

49from couchers.tasks import ( 

50 maybe_send_contributor_form_email, 

51 send_account_deletion_report_email, 

52 send_email_changed_confirmation_to_new_email, 

53) 

54from couchers.utils import ( 

55 Timestamp_from_datetime, 

56 dt_from_page_token, 

57 dt_to_page_token, 

58 is_valid_email, 

59 now, 

60 to_aware_datetime, 

61) 

62from proto import account_pb2, account_pb2_grpc, api_pb2, auth_pb2, iris_pb2_grpc, notification_data_pb2 

63from proto.google.api import httpbody_pb2 

64from proto.internal import jobs_pb2, verification_pb2 

65 

66logger = logging.getLogger(__name__) 

67logger.setLevel(logging.DEBUG) 

68 

69contributeoption2sql = { 

70 auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED: None, 

71 auth_pb2.CONTRIBUTE_OPTION_YES: ContributeOption.yes, 

72 auth_pb2.CONTRIBUTE_OPTION_MAYBE: ContributeOption.maybe, 

73 auth_pb2.CONTRIBUTE_OPTION_NO: ContributeOption.no, 

74} 

75 

76contributeoption2api = { 

77 None: auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED, 

78 ContributeOption.yes: auth_pb2.CONTRIBUTE_OPTION_YES, 

79 ContributeOption.maybe: auth_pb2.CONTRIBUTE_OPTION_MAYBE, 

80 ContributeOption.no: auth_pb2.CONTRIBUTE_OPTION_NO, 

81} 

82 

83profilepublicitysetting2sql = { 

84 account_pb2.PROFILE_PUBLIC_VISIBILITY_UNKNOWN: None, 

85 account_pb2.PROFILE_PUBLIC_VISIBILITY_NOTHING: ProfilePublicVisibility.nothing, 

86 account_pb2.PROFILE_PUBLIC_VISIBILITY_MAP_ONLY: ProfilePublicVisibility.map_only, 

87 account_pb2.PROFILE_PUBLIC_VISIBILITY_LIMITED: ProfilePublicVisibility.limited, 

88 account_pb2.PROFILE_PUBLIC_VISIBILITY_MOST: ProfilePublicVisibility.most, 

89 account_pb2.PROFILE_PUBLIC_VISIBILITY_FULL: ProfilePublicVisibility.full, 

90} 

91 

92profilepublicitysetting2api = { 

93 None: account_pb2.PROFILE_PUBLIC_VISIBILITY_UNKNOWN, 

94 ProfilePublicVisibility.nothing: account_pb2.PROFILE_PUBLIC_VISIBILITY_NOTHING, 

95 ProfilePublicVisibility.map_only: account_pb2.PROFILE_PUBLIC_VISIBILITY_MAP_ONLY, 

96 ProfilePublicVisibility.limited: account_pb2.PROFILE_PUBLIC_VISIBILITY_LIMITED, 

97 ProfilePublicVisibility.most: account_pb2.PROFILE_PUBLIC_VISIBILITY_MOST, 

98 ProfilePublicVisibility.full: account_pb2.PROFILE_PUBLIC_VISIBILITY_FULL, 

99} 

100 

101MAX_PAGINATION_LENGTH = 50 

102 

103 

104def has_strong_verification(session, user): 

105 attempt = session.execute( 

106 select(StrongVerificationAttempt) 

107 .where(StrongVerificationAttempt.user_id == user.id) 

108 .where(StrongVerificationAttempt.is_valid) 

109 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc()) 

110 .limit(1) 

111 ).scalar_one_or_none() 

112 if attempt: 

113 assert attempt.is_valid 

114 return attempt.has_strong_verification(user) 

115 return False 

116 

117 

118def mod_note_to_pb(note: ModNote): 

119 return account_pb2.ModNote( 

120 note_id=note.id, 

121 note_content=note.note_content, 

122 created=Timestamp_from_datetime(note.created), 

123 acknowledged=Timestamp_from_datetime(note.acknowledged) if note.acknowledged else None, 

124 ) 

125 

126 

127def get_strong_verification_fields(session, db_user): 

128 out = dict( 

129 birthdate_verification_status=api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED, 

130 gender_verification_status=api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED, 

131 has_strong_verification=False, 

132 ) 

133 attempt = session.execute( 

134 select(StrongVerificationAttempt) 

135 .where(StrongVerificationAttempt.user_id == db_user.id) 

136 .where(StrongVerificationAttempt.is_valid) 

137 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc()) 

138 .limit(1) 

139 ).scalar_one_or_none() 

140 if attempt: 

141 assert attempt.is_valid 

142 if attempt.matches_birthdate(db_user): 

143 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

144 else: 

145 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH 

146 

147 if attempt.matches_gender(db_user): 

148 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

149 else: 

150 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

151 

152 out["has_strong_verification"] = attempt.has_strong_verification(db_user) 

153 

154 assert out["has_strong_verification"] == ( 

155 out["birthdate_verification_status"] == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

156 and out["gender_verification_status"] == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

157 ) 

158 return out 

159 

160 

161def abort_on_invalid_password(password, context): 

162 """ 

163 Internal utility function: given a password, aborts if password is unforgivably insecure 

164 """ 

165 if len(password) < 8: 

166 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_SHORT) 

167 

168 if len(password) > 256: 

169 # Hey, what are you trying to do? Give us a DDOS attack? 

170 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_LONG) 

171 

172 # check for most common weak passwords (not meant to be an exhaustive check!) 

173 if password.lower() in ("password", "12345678", "couchers", "couchers1"): 

174 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INSECURE_PASSWORD) 

175 

176 

177class Account(account_pb2_grpc.AccountServicer): 

178 def GetAccountInfo(self, request, context, session): 

179 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

180 

181 return account_pb2.GetAccountInfoRes( 

182 username=user.username, 

183 email=user.email, 

184 phone=user.phone if (user.phone_is_verified or not user.phone_code_expired) else None, 

185 has_donated=user.has_donated, 

186 phone_verified=user.phone_is_verified, 

187 profile_complete=user.has_completed_profile, 

188 timezone=user.timezone, 

189 is_superuser=user.is_superuser, 

190 ui_language_preference=user.ui_language_preference, 

191 profile_public_visibility=profilepublicitysetting2api[user.public_visibility], 

192 **get_strong_verification_fields(session, user), 

193 ) 

194 

195 def ChangePasswordV2(self, request, context, session): 

196 """ 

197 Changes the user's password. They have to confirm their old password just in case. 

198 

199 If they didn't have an old password previously, then we don't check that. 

200 """ 

201 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

202 

203 if not verify_password(user.hashed_password, request.old_password): 

204 # wrong password 

205 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD) 

206 

207 abort_on_invalid_password(request.new_password, context) 

208 user.hashed_password = hash_password(request.new_password) 

209 

210 session.commit() 

211 

212 notify( 

213 session, 

214 user_id=user.id, 

215 topic_action="password:change", 

216 ) 

217 

218 return empty_pb2.Empty() 

219 

220 def ChangeEmailV2(self, request, context, session): 

221 """ 

222 Change the user's email address. 

223 

224 If the user has a password, a notification is sent to the old email, and a confirmation is sent to the new one. 

225 

226 Otherwise they need to confirm twice, via an email sent to each of their old and new emails. 

227 

228 In all confirmation emails, the user must click on the confirmation link. 

229 """ 

230 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

231 

232 # check password first 

233 if not verify_password(user.hashed_password, request.password): 

234 # wrong password 

235 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD) 

236 

237 # not a valid email 

238 if not is_valid_email(request.new_email): 

239 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL) 

240 

241 # email already in use (possibly by this user) 

242 if session.execute(select(User).where(User.email == request.new_email)).scalar_one_or_none(): 

243 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL) 

244 

245 user.new_email = request.new_email 

246 user.new_email_token = urlsafe_secure_token() 

247 user.new_email_token_created = now() 

248 user.new_email_token_expiry = now() + timedelta(hours=2) 

249 

250 send_email_changed_confirmation_to_new_email(session, user) 

251 

252 # will still go into old email 

253 notify( 

254 session, 

255 user_id=user.id, 

256 topic_action="email_address:change", 

257 data=notification_data_pb2.EmailAddressChange( 

258 new_email=request.new_email, 

259 ), 

260 ) 

261 

262 # session autocommit 

263 return empty_pb2.Empty() 

264 

265 def ChangeLanguagePreference(self, request, context, session): 

266 # select the user from the db 

267 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

268 

269 # update the user's preference 

270 user.ui_language_preference = request.ui_language_preference 

271 # setting this on context will update the cookie (via interceptors)? 

272 context.ui_language_preference = request.ui_language_preference 

273 

274 return empty_pb2.Empty() 

275 

276 def FillContributorForm(self, request, context, session): 

277 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

278 

279 form = request.contributor_form 

280 

281 form = ContributorForm( 

282 user=user, 

283 ideas=form.ideas or None, 

284 features=form.features or None, 

285 experience=form.experience or None, 

286 contribute=contributeoption2sql[form.contribute], 

287 contribute_ways=form.contribute_ways, 

288 expertise=form.expertise or None, 

289 ) 

290 

291 session.add(form) 

292 session.flush() 

293 maybe_send_contributor_form_email(session, form) 

294 

295 user.filled_contributor_form = True 

296 

297 return empty_pb2.Empty() 

298 

299 def GetContributorFormInfo(self, request, context, session): 

300 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

301 

302 return account_pb2.GetContributorFormInfoRes( 

303 filled_contributor_form=user.filled_contributor_form, 

304 ) 

305 

306 def ChangePhone(self, request, context, session): 

307 phone = request.phone 

308 # early quick validation 

309 if phone and not is_e164_format(phone): 

310 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PHONE) 

311 

312 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

313 if not user.has_donated: 

314 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_DONATED) 

315 

316 if not phone: 

317 user.phone = None 

318 user.phone_verification_verified = None 

319 user.phone_verification_token = None 

320 user.phone_verification_attempts = 0 

321 return empty_pb2.Empty() 

322 

323 if not is_known_operator(phone): 

324 context.abort(grpc.StatusCode.UNIMPLEMENTED, errors.UNRECOGNIZED_PHONE_NUMBER) 

325 

326 if now() - user.phone_verification_sent < PHONE_REVERIFICATION_INTERVAL: 

327 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.REVERIFICATION_TOO_EARLY) 

328 

329 token = sms.generate_random_code() 

330 result = sms.send_sms(phone, sms.format_message(token)) 

331 

332 if result == "success": 

333 user.phone = phone 

334 user.phone_verification_verified = None 

335 user.phone_verification_token = token 

336 user.phone_verification_sent = now() 

337 user.phone_verification_attempts = 0 

338 

339 notify( 

340 session, 

341 user_id=user.id, 

342 topic_action="phone_number:change", 

343 data=notification_data_pb2.PhoneNumberChange( 

344 phone=phone, 

345 ), 

346 ) 

347 

348 return empty_pb2.Empty() 

349 

350 context.abort(grpc.StatusCode.UNIMPLEMENTED, result) 

351 

352 def VerifyPhone(self, request, context, session): 

353 if not sms.looks_like_a_code(request.token): 

354 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.WRONG_SMS_CODE) 

355 

356 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

357 if user.phone_verification_token is None: 

358 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION) 

359 

360 if now() - user.phone_verification_sent > SMS_CODE_LIFETIME: 

361 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION) 

362 

363 if user.phone_verification_attempts > SMS_CODE_ATTEMPTS: 

364 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.TOO_MANY_SMS_CODE_ATTEMPTS) 

365 

366 if not verify_token(request.token, user.phone_verification_token): 

367 user.phone_verification_attempts += 1 

368 session.commit() 

369 context.abort(grpc.StatusCode.NOT_FOUND, errors.WRONG_SMS_CODE) 

370 

371 # Delete verifications from everyone else that has this number 

372 session.execute( 

373 update(User) 

374 .where(User.phone == user.phone) 

375 .where(User.id != context.user_id) 

376 .values( 

377 { 

378 "phone_verification_verified": None, 

379 "phone_verification_attempts": 0, 

380 "phone_verification_token": None, 

381 "phone": None, 

382 } 

383 ) 

384 .execution_options(synchronize_session=False) 

385 ) 

386 

387 user.phone_verification_token = None 

388 user.phone_verification_verified = now() 

389 user.phone_verification_attempts = 0 

390 

391 notify( 

392 session, 

393 user_id=user.id, 

394 topic_action="phone_number:verify", 

395 data=notification_data_pb2.PhoneNumberVerify( 

396 phone=user.phone, 

397 ), 

398 ) 

399 

400 return empty_pb2.Empty() 

401 

402 def InitiateStrongVerification(self, request, context, session): 

403 if not config["ENABLE_STRONG_VERIFICATION"]: 

404 context.abort(grpc.StatusCode.UNAVAILABLE, errors.STRONG_VERIFICATION_DISABLED) 

405 

406 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

407 existing_verification = session.execute( 

408 select(StrongVerificationAttempt) 

409 .where(StrongVerificationAttempt.user_id == user.id) 

410 .where(StrongVerificationAttempt.is_valid) 

411 ).scalar_one_or_none() 

412 if existing_verification: 

413 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.STRONG_VERIFICATION_ALREADY_VERIFIED) 

414 

415 strong_verification_initiations_counter.labels(user.gender).inc() 

416 

417 verification_attempt_token = urlsafe_secure_token() 

418 # this is the iris reference data, they will return this on every callback, it also doubles as webhook auth given lack of it otherwise 

419 reference = b64encode( 

420 simple_encrypt( 

421 "iris_callback", 

422 verification_pb2.VerificationReferencePayload( 

423 verification_attempt_token=verification_attempt_token, 

424 user_id=user.id, 

425 ).SerializeToString(), 

426 ) 

427 ) 

428 response = requests.post( 

429 "https://passportreader.app/api/v1/session.create", 

430 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

431 json={ 

432 "callback_url": f"{config['BACKEND_BASE_URL']}/iris/webhook", 

433 "face_verification": False, 

434 "reference": reference, 

435 }, 

436 timeout=10, 

437 ) 

438 

439 if response.status_code != 200: 

440 raise Exception(f"Iris didn't return 200: {response.text}") 

441 

442 iris_session_id = response.json()["id"] 

443 token = response.json()["token"] 

444 session.add( 

445 StrongVerificationAttempt( 

446 user_id=user.id, 

447 verification_attempt_token=verification_attempt_token, 

448 iris_session_id=iris_session_id, 

449 iris_token=token, 

450 ) 

451 ) 

452 

453 redirect_params = { 

454 "token": token, 

455 "redirect_url": urls.complete_strong_verification_url( 

456 verification_attempt_token=verification_attempt_token 

457 ), 

458 } 

459 redirect_url = "https://passportreader.app/open?" + urlencode(redirect_params) 

460 

461 return account_pb2.InitiateStrongVerificationRes( 

462 verification_attempt_token=verification_attempt_token, 

463 redirect_url=redirect_url, 

464 ) 

465 

466 def GetStrongVerificationAttemptStatus(self, request, context, session): 

467 verification_attempt = session.execute( 

468 select(StrongVerificationAttempt) 

469 .where(StrongVerificationAttempt.user_id == context.user_id) 

470 .where(StrongVerificationAttempt.is_visible) 

471 .where(StrongVerificationAttempt.verification_attempt_token == request.verification_attempt_token) 

472 ).scalar_one_or_none() 

473 if not verification_attempt: 

474 context.abort(grpc.StatusCode.NOT_FOUND, errors.STRONG_VERIFICATION_ATTEMPT_NOT_FOUND) 

475 status_to_pb = { 

476 StrongVerificationAttemptStatus.succeeded: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED, 

477 StrongVerificationAttemptStatus.in_progress_waiting_on_user_to_open_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP, 

478 StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP, 

479 StrongVerificationAttemptStatus.in_progress_waiting_on_backend: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND, 

480 StrongVerificationAttemptStatus.failed: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_FAILED, 

481 } 

482 return account_pb2.GetStrongVerificationAttemptStatusRes( 

483 status=status_to_pb.get( 

484 verification_attempt.status, account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_UNKNOWN 

485 ), 

486 ) 

487 

488 def DeleteStrongVerificationData(self, request, context, session): 

489 verification_attempts = ( 

490 session.execute( 

491 select(StrongVerificationAttempt) 

492 .where(StrongVerificationAttempt.user_id == context.user_id) 

493 .where(StrongVerificationAttempt.has_full_data) 

494 ) 

495 .scalars() 

496 .all() 

497 ) 

498 for verification_attempt in verification_attempts: 

499 verification_attempt.status = StrongVerificationAttemptStatus.deleted 

500 verification_attempt.has_full_data = False 

501 verification_attempt.passport_encrypted_data = None 

502 verification_attempt.passport_date_of_birth = None 

503 verification_attempt.passport_sex = None 

504 session.flush() 

505 # double check: 

506 verification_attempts = ( 

507 session.execute( 

508 select(StrongVerificationAttempt) 

509 .where(StrongVerificationAttempt.user_id == context.user_id) 

510 .where(StrongVerificationAttempt.has_full_data) 

511 ) 

512 .scalars() 

513 .all() 

514 ) 

515 assert len(verification_attempts) == 0 

516 

517 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

518 strong_verification_data_deletions_counter.labels(user.gender).inc() 

519 

520 return empty_pb2.Empty() 

521 

522 def DeleteAccount(self, request, context, session): 

523 """ 

524 Triggers email with token to confirm deletion 

525 

526 Frontend should confirm via unique string (i.e. username) before this is called 

527 """ 

528 if not request.confirm: 

529 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_ACCOUNT_DELETE) 

530 

531 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

532 

533 reason = request.reason.strip() 

534 if reason: 

535 reason = AccountDeletionReason(user_id=user.id, reason=reason) 

536 session.add(reason) 

537 session.flush() 

538 send_account_deletion_report_email(session, reason) 

539 

540 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2)) 

541 

542 notify( 

543 session, 

544 user_id=user.id, 

545 topic_action="account_deletion:start", 

546 data=notification_data_pb2.AccountDeletionStart( 

547 deletion_token=token.token, 

548 ), 

549 ) 

550 session.add(token) 

551 

552 account_deletion_initiations_counter.labels(user.gender).inc() 

553 

554 return empty_pb2.Empty() 

555 

556 def ListModNotes(self, request, context, session): 

557 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

558 

559 notes = ( 

560 session.execute(select(ModNote).where(ModNote.user_id == user.id).order_by(ModNote.created.asc())) 

561 .scalars() 

562 .all() 

563 ) 

564 

565 return account_pb2.ListModNotesRes(mod_notes=[mod_note_to_pb(note) for note in notes]) 

566 

567 def ListActiveSessions(self, request, context, session): 

568 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

569 page_token = dt_from_page_token(request.page_token) if request.page_token else now() 

570 

571 user_sessions = ( 

572 session.execute( 

573 select(UserSession) 

574 .where(UserSession.user_id == context.user_id) 

575 .where(UserSession.is_valid) 

576 .where(UserSession.is_api_key == False) 

577 .where(UserSession.last_seen <= page_token) 

578 .order_by(UserSession.last_seen.desc()) 

579 .limit(page_size + 1) 

580 ) 

581 .scalars() 

582 .all() 

583 ) 

584 

585 (token, token_expiry) = context.token 

586 

587 def _active_session_to_pb(user_session): 

588 user_agent = user_agents_parse(user_session.user_agent or "") 

589 return account_pb2.ActiveSession( 

590 created=Timestamp_from_datetime(user_session.created), 

591 expiry=Timestamp_from_datetime(user_session.expiry), 

592 last_seen=Timestamp_from_datetime(user_session.last_seen), 

593 operating_system=user_agent.os.family, 

594 browser=user_agent.browser.family, 

595 device=user_agent.device.family, 

596 approximate_location=geoip_approximate_location(user_session.ip_address) or "Unknown", 

597 is_current_session=user_session.token == token, 

598 ) 

599 

600 return account_pb2.ListActiveSessionsRes( 

601 active_sessions=list(map(_active_session_to_pb, user_sessions[:page_size])), 

602 next_page_token=dt_to_page_token(user_sessions[-1].last_seen) if len(user_sessions) > page_size else None, 

603 ) 

604 

605 def LogOutSession(self, request, context, session): 

606 (token, token_expiry) = context.token 

607 

608 session.execute( 

609 update(UserSession) 

610 .where(UserSession.token != token) 

611 .where(UserSession.user_id == context.user_id) 

612 .where(UserSession.is_valid) 

613 .where(UserSession.is_api_key == False) 

614 .where(UserSession.created == to_aware_datetime(request.created)) 

615 .values(expiry=func.now()) 

616 .execution_options(synchronize_session=False) 

617 ) 

618 return empty_pb2.Empty() 

619 

620 def LogOutOtherSessions(self, request, context, session): 

621 if not request.confirm: 

622 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS) 

623 

624 (token, token_expiry) = context.token 

625 

626 session.execute( 

627 update(UserSession) 

628 .where(UserSession.token != token) 

629 .where(UserSession.user_id == context.user_id) 

630 .where(UserSession.is_valid) 

631 .where(UserSession.is_api_key == False) 

632 .values(expiry=func.now()) 

633 .execution_options(synchronize_session=False) 

634 ) 

635 return empty_pb2.Empty() 

636 

637 def SetProfilePublicVisibility(self, request, context, session): 

638 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

639 user.public_visibility = profilepublicitysetting2sql[request.profile_public_visibility] 

640 user.has_modified_public_visibility = True 

641 return empty_pb2.Empty() 

642 

643 

644class Iris(iris_pb2_grpc.IrisServicer): 

645 def Webhook(self, request, context, session): 

646 json_data = json.loads(request.data) 

647 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

648 simple_decrypt("iris_callback", b64decode(json_data["session_reference"])) 

649 ) 

650 # if we make it past the decrypt, we consider this webhook authenticated 

651 verification_attempt_token = reference_payload.verification_attempt_token 

652 user_id = reference_payload.user_id 

653 

654 verification_attempt = session.execute( 

655 select(StrongVerificationAttempt) 

656 .where(StrongVerificationAttempt.user_id == reference_payload.user_id) 

657 .where(StrongVerificationAttempt.verification_attempt_token == reference_payload.verification_attempt_token) 

658 .where(StrongVerificationAttempt.iris_session_id == json_data["session_id"]) 

659 ).scalar_one() 

660 iris_status = json_data["session_state"] 

661 session.add( 

662 StrongVerificationCallbackEvent( 

663 verification_attempt_id=verification_attempt.id, 

664 iris_status=iris_status, 

665 ) 

666 ) 

667 if iris_status == "INITIATED": 

668 # the user opened the session in the app 

669 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app 

670 elif iris_status == "COMPLETED": 

671 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend 

672 elif iris_status == "APPROVED": 

673 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend 

674 session.commit() 

675 # background worker will go and sort this one out 

676 queue_job( 

677 session, 

678 job_type="finalize_strong_verification", 

679 payload=jobs_pb2.FinalizeStrongVerificationPayload(verification_attempt_id=verification_attempt.id), 

680 priority=8, 

681 ) 

682 elif iris_status in ["FAILED", "ABORTED", "REJECTED"]: 

683 verification_attempt.status = StrongVerificationAttemptStatus.failed 

684 

685 return httpbody_pb2.HttpBody( 

686 content_type="application/json", 

687 # json.dumps escapes non-ascii characters 

688 data=json.dumps({"success": True}).encode("ascii"), 

689 )