Coverage for src/couchers/servicers/account.py: 97%

213 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 16:44 +0000

1import json 

2from datetime import timedelta 

3 

4import grpc 

5import requests 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import update 

8 

9from couchers import errors 

10from couchers.config import config 

11from couchers.constants import PHONE_REVERIFICATION_INTERVAL, SMS_CODE_ATTEMPTS, SMS_CODE_LIFETIME 

12from couchers.crypto import ( 

13 b64decode, 

14 b64encode, 

15 hash_password, 

16 simple_decrypt, 

17 simple_encrypt, 

18 urlsafe_secure_token, 

19 verify_password, 

20 verify_token, 

21) 

22from couchers.db import session_scope 

23from couchers.jobs.enqueue import queue_job 

24from couchers.models import ( 

25 AccountDeletionReason, 

26 AccountDeletionToken, 

27 ContributeOption, 

28 ContributorForm, 

29 StrongVerificationAttempt, 

30 StrongVerificationAttemptStatus, 

31 StrongVerificationCallbackEvent, 

32 User, 

33) 

34from couchers.notifications.notify import notify 

35from couchers.phone import sms 

36from couchers.phone.check import is_e164_format, is_known_operator 

37from couchers.sql import couchers_select as select 

38from couchers.tasks import ( 

39 maybe_send_contributor_form_email, 

40 send_account_deletion_report_email, 

41 send_email_changed_confirmation_to_new_email, 

42) 

43from couchers.utils import is_valid_email, now 

44from proto import account_pb2, account_pb2_grpc, api_pb2, auth_pb2, iris_pb2_grpc, notification_data_pb2 

45from proto.google.api import httpbody_pb2 

46from proto.internal import jobs_pb2, verification_pb2 

47 

48contributeoption2sql = { 

49 auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED: None, 

50 auth_pb2.CONTRIBUTE_OPTION_YES: ContributeOption.yes, 

51 auth_pb2.CONTRIBUTE_OPTION_MAYBE: ContributeOption.maybe, 

52 auth_pb2.CONTRIBUTE_OPTION_NO: ContributeOption.no, 

53} 

54 

55contributeoption2api = { 

56 None: auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED, 

57 ContributeOption.yes: auth_pb2.CONTRIBUTE_OPTION_YES, 

58 ContributeOption.maybe: auth_pb2.CONTRIBUTE_OPTION_MAYBE, 

59 ContributeOption.no: auth_pb2.CONTRIBUTE_OPTION_NO, 

60} 

61 

62 

63def get_strong_verification_fields(session, db_user): 

64 out = dict( 

65 birthdate_verification_status=api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED, 

66 gender_verification_status=api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED, 

67 has_strong_verification=False, 

68 ) 

69 attempt = session.execute( 

70 select(StrongVerificationAttempt) 

71 .where(StrongVerificationAttempt.user_id == db_user.id) 

72 .where(StrongVerificationAttempt.is_valid) 

73 .order_by(StrongVerificationAttempt.passport_expiry_datetime.desc()) 

74 .limit(1) 

75 ).scalar_one_or_none() 

76 if attempt: 

77 assert attempt.is_valid 

78 if attempt.matches_birthdate(db_user): 

79 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

80 else: 

81 out["birthdate_verification_status"] = api_pb2.BIRTHDATE_VERIFICATION_STATUS_MISMATCH 

82 

83 if attempt.matches_gender(db_user): 

84 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

85 else: 

86 out["gender_verification_status"] = api_pb2.GENDER_VERIFICATION_STATUS_MISMATCH 

87 

88 out["has_strong_verification"] = attempt.has_strong_verification(db_user) 

89 

90 assert out["has_strong_verification"] == ( 

91 out["birthdate_verification_status"] == api_pb2.BIRTHDATE_VERIFICATION_STATUS_VERIFIED 

92 and out["gender_verification_status"] == api_pb2.GENDER_VERIFICATION_STATUS_VERIFIED 

93 ) 

94 return out 

95 

96 

97def abort_on_invalid_password(password, context): 

98 """ 

99 Internal utility function: given a password, aborts if password is unforgivably insecure 

100 """ 

101 if len(password) < 8: 

102 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_SHORT) 

103 

104 if len(password) > 256: 

105 # Hey, what are you trying to do? Give us a DDOS attack? 

106 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_LONG) 

107 

108 # check for most common weak passwords (not meant to be an exhaustive check!) 

109 if password.lower() in ("password", "12345678", "couchers", "couchers1"): 

110 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INSECURE_PASSWORD) 

111 

112 

113class Account(account_pb2_grpc.AccountServicer): 

114 def GetAccountInfo(self, request, context): 

115 with session_scope() as session: 

116 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

117 

118 return account_pb2.GetAccountInfoRes( 

119 username=user.username, 

120 email=user.email, 

121 phone=user.phone if (user.phone_is_verified or not user.phone_code_expired) else None, 

122 phone_verified=user.phone_is_verified, 

123 profile_complete=user.has_completed_profile, 

124 timezone=user.timezone, 

125 **get_strong_verification_fields(session, user), 

126 ) 

127 

128 def ChangePasswordV2(self, request, context): 

129 """ 

130 Changes the user's password. They have to confirm their old password just in case. 

131 

132 If they didn't have an old password previously, then we don't check that. 

133 """ 

134 with session_scope() as session: 

135 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

136 

137 if not verify_password(user.hashed_password, request.old_password): 

138 # wrong password 

139 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD) 

140 

141 abort_on_invalid_password(request.new_password, context) 

142 user.hashed_password = hash_password(request.new_password) 

143 

144 session.commit() 

145 

146 notify( 

147 user_id=user.id, 

148 topic_action="password:change", 

149 ) 

150 

151 return empty_pb2.Empty() 

152 

153 def ChangeEmailV2(self, request, context): 

154 """ 

155 Change the user's email address. 

156 

157 If the user has a password, a notification is sent to the old email, and a confirmation is sent to the new one. 

158 

159 Otherwise they need to confirm twice, via an email sent to each of their old and new emails. 

160 

161 In all confirmation emails, the user must click on the confirmation link. 

162 """ 

163 with session_scope() as session: 

164 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

165 

166 # check password first 

167 if not verify_password(user.hashed_password, request.password): 

168 # wrong password 

169 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PASSWORD) 

170 

171 # not a valid email 

172 if not is_valid_email(request.new_email): 

173 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL) 

174 

175 # email already in use (possibly by this user) 

176 if session.execute(select(User).where(User.email == request.new_email)).scalar_one_or_none(): 

177 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL) 

178 

179 user.new_email = request.new_email 

180 user.new_email_token = urlsafe_secure_token() 

181 user.new_email_token_created = now() 

182 user.new_email_token_expiry = now() + timedelta(hours=2) 

183 

184 send_email_changed_confirmation_to_new_email(user) 

185 

186 # will still go into old email 

187 notify( 

188 user_id=user.id, 

189 topic_action="email_address:change", 

190 data=notification_data_pb2.EmailAddressChange( 

191 new_email=request.new_email, 

192 ), 

193 ) 

194 

195 # session autocommit 

196 return empty_pb2.Empty() 

197 

198 def FillContributorForm(self, request, context): 

199 with session_scope() as session: 

200 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

201 

202 form = request.contributor_form 

203 

204 form = ContributorForm( 

205 user=user, 

206 ideas=form.ideas or None, 

207 features=form.features or None, 

208 experience=form.experience or None, 

209 contribute=contributeoption2sql[form.contribute], 

210 contribute_ways=form.contribute_ways, 

211 expertise=form.expertise or None, 

212 ) 

213 

214 session.add(form) 

215 session.flush() 

216 maybe_send_contributor_form_email(form) 

217 

218 user.filled_contributor_form = True 

219 

220 return empty_pb2.Empty() 

221 

222 def GetContributorFormInfo(self, request, context): 

223 with session_scope() as session: 

224 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

225 

226 return account_pb2.GetContributorFormInfoRes( 

227 filled_contributor_form=user.filled_contributor_form, 

228 ) 

229 

230 def ChangePhone(self, request, context): 

231 phone = request.phone 

232 # early quick validation 

233 if phone and not is_e164_format(phone): 

234 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PHONE) 

235 

236 with session_scope() as session: 

237 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

238 if not phone: 

239 user.phone = None 

240 user.phone_verification_verified = None 

241 user.phone_verification_token = None 

242 user.phone_verification_attempts = 0 

243 return empty_pb2.Empty() 

244 

245 if not is_known_operator(phone): 

246 context.abort(grpc.StatusCode.UNIMPLEMENTED, errors.UNRECOGNIZED_PHONE_NUMBER) 

247 

248 if now() - user.phone_verification_sent < PHONE_REVERIFICATION_INTERVAL: 

249 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.REVERIFICATION_TOO_EARLY) 

250 

251 token = sms.generate_random_code() 

252 result = sms.send_sms(phone, sms.format_message(token)) 

253 

254 if result == "success": 

255 user.phone = phone 

256 user.phone_verification_verified = None 

257 user.phone_verification_token = token 

258 user.phone_verification_sent = now() 

259 user.phone_verification_attempts = 0 

260 

261 notify( 

262 user_id=user.id, 

263 topic_action="phone_number:change", 

264 data=notification_data_pb2.PhoneNumberChange( 

265 phone=phone, 

266 ), 

267 ) 

268 

269 return empty_pb2.Empty() 

270 

271 context.abort(grpc.StatusCode.UNIMPLEMENTED, result) 

272 

273 def VerifyPhone(self, request, context): 

274 if not sms.looks_like_a_code(request.token): 

275 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.WRONG_SMS_CODE) 

276 

277 with session_scope() as session: 

278 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

279 if user.phone_verification_token is None: 

280 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION) 

281 

282 if now() - user.phone_verification_sent > SMS_CODE_LIFETIME: 

283 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION) 

284 

285 if user.phone_verification_attempts > SMS_CODE_ATTEMPTS: 

286 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.TOO_MANY_SMS_CODE_ATTEMPTS) 

287 

288 if not verify_token(request.token, user.phone_verification_token): 

289 user.phone_verification_attempts += 1 

290 session.commit() 

291 context.abort(grpc.StatusCode.NOT_FOUND, errors.WRONG_SMS_CODE) 

292 

293 # Delete verifications from everyone else that has this number 

294 session.execute( 

295 update(User) 

296 .where(User.phone == user.phone) 

297 .where(User.id != context.user_id) 

298 .values( 

299 { 

300 "phone_verification_verified": None, 

301 "phone_verification_attempts": 0, 

302 "phone_verification_token": None, 

303 "phone": None, 

304 } 

305 ) 

306 .execution_options(synchronize_session=False) 

307 ) 

308 

309 user.phone_verification_token = None 

310 user.phone_verification_verified = now() 

311 user.phone_verification_attempts = 0 

312 

313 notify( 

314 user_id=user.id, 

315 topic_action="phone_number:verify", 

316 data=notification_data_pb2.PhoneNumberVerify( 

317 phone=user.phone, 

318 ), 

319 ) 

320 

321 return empty_pb2.Empty() 

322 

323 def InitiateStrongVerification(self, request, context): 

324 if not config["ENABLE_STRONG_VERIFICATION"]: 

325 context.abort(grpc.StatusCode.UNAVAILABLE, errors.STRONG_VERIFICATION_DISABLED) 

326 with session_scope() as session: 

327 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

328 existing_verification = session.execute( 

329 select(StrongVerificationAttempt) 

330 .where(StrongVerificationAttempt.user_id == user.id) 

331 .where(StrongVerificationAttempt.is_valid) 

332 ).scalar_one_or_none() 

333 if existing_verification: 

334 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.STRONG_VERIFICATION_ALREADY_VERIFIED) 

335 verification_attempt_token = urlsafe_secure_token() 

336 # this is the iris reference data, they will return this on every callback, it also doubles as webhook auth given lack of it otherwise 

337 reference = b64encode( 

338 simple_encrypt( 

339 "iris_callback", 

340 verification_pb2.VerificationReferencePayload( 

341 verification_attempt_token=verification_attempt_token, 

342 user_id=user.id, 

343 ).SerializeToString(), 

344 ) 

345 ) 

346 response = requests.post( 

347 "https://passportreader.app/api/v1/session.create", 

348 auth=(config["IRIS_ID_PUBKEY"], config["IRIS_ID_SECRET"]), 

349 json={ 

350 "callback_url": f"{config['BACKEND_BASE_URL']}/iris/webhook", 

351 "face_verification": False, 

352 "reference": reference, 

353 }, 

354 timeout=10, 

355 ) 

356 if response.status_code != 200: 

357 raise Exception(f"Iris didn't return 200: {response.text}") 

358 iris_session_id = response.json()["id"] 

359 token = response.json()["token"] 

360 url = f"iris:///?token={token}" 

361 verification_attempt = StrongVerificationAttempt( 

362 user_id=user.id, 

363 verification_attempt_token=verification_attempt_token, 

364 iris_session_id=iris_session_id, 

365 iris_token=token, 

366 ) 

367 session.add(verification_attempt) 

368 return account_pb2.InitiateStrongVerificationRes( 

369 verification_attempt_token=verification_attempt_token, 

370 iris_url=url, 

371 ) 

372 

373 def GetStrongVerificationAttemptStatus(self, request, context): 

374 with session_scope() as session: 

375 verification_attempt = session.execute( 

376 select(StrongVerificationAttempt) 

377 .where(StrongVerificationAttempt.user_id == context.user_id) 

378 .where(StrongVerificationAttempt.is_visible) 

379 .where(StrongVerificationAttempt.verification_attempt_token == request.verification_attempt_token) 

380 ).scalar_one_or_none() 

381 if not verification_attempt: 

382 context.abort(grpc.StatusCode.NOT_FOUND, errors.STRONG_VERIFICATION_ATTEMPT_NOT_FOUND) 

383 status_to_pb = { 

384 StrongVerificationAttemptStatus.succeeded: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_SUCCEEDED, 

385 StrongVerificationAttemptStatus.in_progress_waiting_on_user_to_open_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_TO_OPEN_APP, 

386 StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_USER_IN_APP, 

387 StrongVerificationAttemptStatus.in_progress_waiting_on_backend: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_IN_PROGRESS_WAITING_ON_BACKEND, 

388 StrongVerificationAttemptStatus.failed: account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_FAILED, 

389 } 

390 return account_pb2.GetStrongVerificationAttemptStatusRes( 

391 status=status_to_pb.get( 

392 verification_attempt.status, account_pb2.STRONG_VERIFICATION_ATTEMPT_STATUS_UNKNOWN 

393 ), 

394 ) 

395 

396 def DeleteStrongVerificationData(self, request, context): 

397 with session_scope() as session: 

398 verification_attempts = ( 

399 session.execute( 

400 select(StrongVerificationAttempt) 

401 .where(StrongVerificationAttempt.user_id == context.user_id) 

402 .where(StrongVerificationAttempt.has_full_data) 

403 ) 

404 .scalars() 

405 .all() 

406 ) 

407 for verification_attempt in verification_attempts: 

408 verification_attempt.status = StrongVerificationAttemptStatus.deleted 

409 verification_attempt.has_full_data = False 

410 verification_attempt.passport_encrypted_data = None 

411 verification_attempt.passport_date_of_birth = None 

412 verification_attempt.passport_sex = None 

413 session.flush() 

414 # double check: 

415 verification_attempts = ( 

416 session.execute( 

417 select(StrongVerificationAttempt) 

418 .where(StrongVerificationAttempt.user_id == context.user_id) 

419 .where(StrongVerificationAttempt.has_full_data) 

420 ) 

421 .scalars() 

422 .all() 

423 ) 

424 assert len(verification_attempts) == 0 

425 

426 return empty_pb2.Empty() 

427 

428 def DeleteAccount(self, request, context): 

429 """ 

430 Triggers email with token to confirm deletion 

431 

432 Frontend should confirm via unique string (i.e. username) before this is called 

433 """ 

434 if not request.confirm: 

435 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_ACCOUNT_DELETE) 

436 

437 with session_scope() as session: 

438 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

439 

440 reason = request.reason.strip() 

441 if reason: 

442 reason = AccountDeletionReason(user_id=user.id, reason=reason) 

443 session.add(reason) 

444 session.flush() 

445 send_account_deletion_report_email(reason) 

446 

447 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2)) 

448 

449 notify( 

450 user_id=user.id, 

451 topic_action="account_deletion:start", 

452 data=notification_data_pb2.AccountDeletionStart( 

453 deletion_token=token.token, 

454 ), 

455 ) 

456 session.add(token) 

457 

458 return empty_pb2.Empty() 

459 

460 

461class Iris(iris_pb2_grpc.IrisServicer): 

462 def Webhook(self, request, context): 

463 json_data = json.loads(request.data) 

464 reference_payload = verification_pb2.VerificationReferencePayload.FromString( 

465 simple_decrypt("iris_callback", b64decode(json_data["session_referenace"])) 

466 ) 

467 # if we make it past the decrypt, we consider this webhook authenticated 

468 verification_attempt_token = reference_payload.verification_attempt_token 

469 user_id = reference_payload.user_id 

470 with session_scope() as session: 

471 verification_attempt = session.execute( 

472 select(StrongVerificationAttempt) 

473 .where(StrongVerificationAttempt.user_id == reference_payload.user_id) 

474 .where( 

475 StrongVerificationAttempt.verification_attempt_token == reference_payload.verification_attempt_token 

476 ) 

477 .where(StrongVerificationAttempt.iris_session_id == json_data["session_id"]) 

478 ).scalar_one() 

479 iris_status = json_data["session_state"] 

480 session.add( 

481 StrongVerificationCallbackEvent( 

482 verification_attempt_id=verification_attempt.id, 

483 iris_status=iris_status, 

484 ) 

485 ) 

486 if iris_status == "INITIATED": 

487 # the user opened the session in the app 

488 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_user_in_app 

489 elif iris_status == "COMPLETED": 

490 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend 

491 elif iris_status == "APPROVED": 

492 verification_attempt.status = StrongVerificationAttemptStatus.in_progress_waiting_on_backend 

493 session.commit() 

494 # background worker will go and sort this one out 

495 queue_job( 

496 job_type="finalize_strong_verification", 

497 payload=jobs_pb2.FinalizeStrongVerificationPayload(verification_attempt_id=verification_attempt.id), 

498 ) 

499 elif iris_status in ["FAILED", "ABORTED", "REJECTED"]: 

500 verification_attempt.status = StrongVerificationAttemptStatus.failed 

501 

502 return httpbody_pb2.HttpBody( 

503 content_type="application/json", 

504 # json.dumps escapes non-ascii characters 

505 data=json.dumps({"success": True}).encode("ascii"), 

506 )