Coverage for src/couchers/servicers/account.py: 99%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from datetime import timedelta
3import grpc
4from google.protobuf import empty_pb2
5from sqlalchemy.sql import update
7from couchers import errors, urls
8from couchers.constants import PHONE_REVERIFICATION_INTERVAL, SMS_CODE_ATTEMPTS, SMS_CODE_LIFETIME
9from couchers.crypto import hash_password, urlsafe_secure_token, verify_password, verify_token
10from couchers.db import session_scope
11from couchers.models import AccountDeletionReason, ContributeOption, ContributorForm, User
12from couchers.notifications.notify import notify
13from couchers.phone import sms
14from couchers.phone.check import is_e164_format, is_known_operator
15from couchers.sql import couchers_select as select
16from couchers.tasks import (
17 maybe_send_contributor_form_email,
18 send_account_deletion_confirmation_email,
19 send_account_deletion_report_email,
20 send_email_changed_confirmation_to_new_email,
21 send_email_changed_confirmation_to_old_email,
22 send_email_changed_notification_email,
23 send_password_changed_email,
24)
25from couchers.utils import is_valid_email, now
26from proto import account_pb2, account_pb2_grpc, auth_pb2
28contributeoption2sql = {
29 auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED: None,
30 auth_pb2.CONTRIBUTE_OPTION_YES: ContributeOption.yes,
31 auth_pb2.CONTRIBUTE_OPTION_MAYBE: ContributeOption.maybe,
32 auth_pb2.CONTRIBUTE_OPTION_NO: ContributeOption.no,
33}
35contributeoption2api = {
36 None: auth_pb2.CONTRIBUTE_OPTION_UNSPECIFIED,
37 ContributeOption.yes: auth_pb2.CONTRIBUTE_OPTION_YES,
38 ContributeOption.maybe: auth_pb2.CONTRIBUTE_OPTION_MAYBE,
39 ContributeOption.no: auth_pb2.CONTRIBUTE_OPTION_NO,
40}
43def _check_password(user, field_name, request, context):
44 """
45 Internal utility function: given a request with a StringValue `field_name` field, checks the password is correct or that the user does not have a password
46 """
47 if user.has_password:
48 # the user has a password
49 if not request.HasField(field_name):
50 # no password supplied
51 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PASSWORD)
53 if not verify_password(user.hashed_password, getattr(request, field_name).value):
54 # wrong password
55 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_USERNAME_OR_PASSWORD)
57 elif request.HasField(field_name):
58 # the user doesn't have a password, but one was supplied
59 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_PASSWORD)
62def abort_on_invalid_password(password, context):
63 """
64 Internal utility function: given a password, aborts if password is unforgivably insecure
65 """
66 if len(password) < 8:
67 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_SHORT)
69 if len(password) > 256:
70 # Hey, what are you trying to do? Give us a DDOS attack?
71 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PASSWORD_TOO_LONG)
73 # check for most common weak passwords (not meant to be an exhaustive check!)
74 if password.lower() in ("password", "12345678", "couchers", "couchers1"):
75 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INSECURE_PASSWORD)
78class Account(account_pb2_grpc.AccountServicer):
79 def GetAccountInfo(self, request, context):
80 with session_scope() as session:
81 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
83 if not user.has_password:
84 auth_info = dict(
85 login_method=account_pb2.GetAccountInfoRes.LoginMethod.MAGIC_LINK,
86 has_password=False,
87 )
88 else:
89 auth_info = dict(
90 login_method=account_pb2.GetAccountInfoRes.LoginMethod.PASSWORD,
91 has_password=True,
92 )
93 return account_pb2.GetAccountInfoRes(
94 username=user.username,
95 email=user.email,
96 phone=user.phone if (user.phone_is_verified or not user.phone_code_expired) else None,
97 phone_verified=user.phone_is_verified,
98 profile_complete=user.has_completed_profile,
99 timezone=user.timezone,
100 **auth_info,
101 )
103 def ChangePassword(self, request, context):
104 """
105 Changes the user's password. They have to confirm their old password just in case.
107 If they didn't have an old password previously, then we don't check that.
108 """
109 with session_scope() as session:
110 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
112 if not request.HasField("old_password") and not request.HasField("new_password"):
113 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_BOTH_PASSWORDS)
115 _check_password(user, "old_password", request, context)
117 # password correct or no password
119 if not request.HasField("new_password"):
120 # the user wants to unset their password
121 user.hashed_password = None
122 else:
123 abort_on_invalid_password(request.new_password.value, context)
124 user.hashed_password = hash_password(request.new_password.value)
126 session.commit()
128 send_password_changed_email(user)
130 notify(
131 user_id=user.id,
132 topic="password",
133 key="",
134 action="change",
135 icon="wrench",
136 title=f"Your password was changed",
137 link=urls.account_settings_link(),
138 )
140 return empty_pb2.Empty()
142 def ChangeEmail(self, request, context):
143 """
144 Change the user's email address.
146 If the user has a password, a notification is sent to the old email, and a confirmation is sent to the new one.
148 Otherwise they need to confirm twice, via an email sent to each of their old and new emails.
150 In all confirmation emails, the user must click on the confirmation link.
151 """
152 with session_scope() as session:
153 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
155 # check password first
156 _check_password(user, "password", request, context)
158 # not a valid email
159 if not is_valid_email(request.new_email):
160 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL)
162 # email already in use (possibly by this user)
163 if session.execute(select(User).where(User.email == request.new_email)).scalar_one_or_none():
164 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_EMAIL)
166 user.new_email = request.new_email
167 user.new_email_token = urlsafe_secure_token()
168 user.new_email_token_created = now()
169 user.new_email_token_expiry = now() + timedelta(hours=2)
170 user.need_to_confirm_via_new_email = True
172 if user.has_password:
173 user.old_email_token = None
174 user.old_email_token_created = None
175 user.old_email_token_expiry = None
176 user.need_to_confirm_via_old_email = False
177 send_email_changed_notification_email(user)
178 send_email_changed_confirmation_to_new_email(user)
180 notify(
181 user_id=user.id,
182 topic="email_address",
183 key="",
184 action="change",
185 icon="wrench",
186 title=f"Your email was changed",
187 link=urls.account_settings_link(),
188 )
189 else:
190 user.old_email_token = urlsafe_secure_token()
191 user.old_email_token_created = now()
192 user.old_email_token_expiry = now() + timedelta(hours=2)
193 user.need_to_confirm_via_old_email = True
194 send_email_changed_confirmation_to_old_email(user)
195 send_email_changed_confirmation_to_new_email(user)
197 # session autocommit
198 return empty_pb2.Empty()
200 def FillContributorForm(self, request, context):
201 with session_scope() as session:
202 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
204 form = request.contributor_form
206 form = ContributorForm(
207 user=user,
208 ideas=form.ideas or None,
209 features=form.features or None,
210 experience=form.experience or None,
211 contribute=contributeoption2sql[form.contribute],
212 contribute_ways=form.contribute_ways,
213 expertise=form.expertise or None,
214 )
216 session.add(form)
217 session.flush()
218 maybe_send_contributor_form_email(form)
220 user.filled_contributor_form = True
222 return empty_pb2.Empty()
224 def GetContributorFormInfo(self, request, context):
225 with session_scope() as session:
226 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
228 return account_pb2.GetContributorFormInfoRes(
229 filled_contributor_form=user.filled_contributor_form,
230 )
232 def ChangePhone(self, request, context):
233 phone = request.phone
234 # early quick validation
235 if phone and not is_e164_format(phone):
236 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_PHONE)
238 with session_scope() as session:
239 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
240 if not phone:
241 user.phone = None
242 user.phone_verification_verified = None
243 user.phone_verification_token = None
244 user.phone_verification_attempts = 0
245 return empty_pb2.Empty()
247 if not is_known_operator(phone):
248 context.abort(grpc.StatusCode.UNIMPLEMENTED, errors.UNRECOGNIZED_PHONE_NUMBER)
250 if now() - user.phone_verification_sent < PHONE_REVERIFICATION_INTERVAL:
251 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.REVERIFICATION_TOO_EARLY)
253 token = sms.generate_random_code()
254 result = sms.send_sms(phone, sms.format_message(token))
256 if result == "success":
257 user.phone = phone
258 user.phone_verification_verified = None
259 user.phone_verification_token = token
260 user.phone_verification_sent = now()
261 user.phone_verification_attempts = 0
263 notify(
264 user_id=user.id,
265 topic="phone_number",
266 key="",
267 action="change",
268 icon="wrench",
269 title=f"Your phone number was changed",
270 link=urls.account_settings_link(),
271 )
273 return empty_pb2.Empty()
275 context.abort(grpc.StatusCode.UNIMPLEMENTED, result)
277 def VerifyPhone(self, request, context):
278 if not sms.looks_like_a_code(request.token):
279 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.WRONG_SMS_CODE)
281 with session_scope() as session:
282 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
283 if user.phone_verification_token is None:
284 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION)
286 if now() - user.phone_verification_sent > SMS_CODE_LIFETIME:
287 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NO_PENDING_VERIFICATION)
289 if user.phone_verification_attempts > SMS_CODE_ATTEMPTS:
290 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.TOO_MANY_SMS_CODE_ATTEMPTS)
292 if not verify_token(request.token, user.phone_verification_token):
293 user.phone_verification_attempts += 1
294 session.commit()
295 context.abort(grpc.StatusCode.NOT_FOUND, errors.WRONG_SMS_CODE)
297 # Delete verifications from everyone else that has this number
298 session.execute(
299 update(User)
300 .where(User.phone == user.phone)
301 .where(User.id != context.user_id)
302 .values(
303 {
304 "phone_verification_verified": None,
305 "phone_verification_attempts": 0,
306 "phone_verification_token": None,
307 "phone": None,
308 }
309 )
310 .execution_options(synchronize_session=False)
311 )
313 user.phone_verification_token = None
314 user.phone_verification_verified = now()
315 user.phone_verification_attempts = 0
317 notify(
318 user_id=user.id,
319 topic="phone_number",
320 key="",
321 action="verify",
322 icon="wrench",
323 title=f"Your phone number was verified",
324 link=urls.account_settings_link(),
325 )
327 return empty_pb2.Empty()
329 def DeleteAccount(self, request, context):
330 """
331 Triggers email with token to confirm deletion
333 Frontend should confirm via unique string (i.e. username) before this is called
334 """
335 if not request.confirm:
336 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_CONFIRM_ACCOUNT_DELETE)
338 with session_scope() as session:
339 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
341 reason = request.reason.strip()
342 if reason:
343 reason = AccountDeletionReason(user_id=user.id, reason=reason)
344 session.add(reason)
345 session.commit()
346 send_account_deletion_report_email(reason)
348 token = send_account_deletion_confirmation_email(user)
349 session.add(token)
351 return empty_pb2.Empty()