Coverage for src/couchers/servicers/jail.py: 98%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
3import grpc
5from couchers import errors
6from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
7from couchers.crypto import hash_password
8from couchers.db import session_scope
9from couchers.models import User
10from couchers.servicers.auth import abort_on_invalid_password
11from couchers.sql import couchers_select as select
12from couchers.tasks import send_password_changed_email
13from couchers.utils import create_coordinate
14from proto import jail_pb2, jail_pb2_grpc
16logger = logging.getLogger(__name__)
19class Jail(jail_pb2_grpc.JailServicer):
20 """
21 The Jail servicer.
23 API calls allowed for users who need to complete some tasks before being
24 fully active
25 """
27 def _get_jail_info(self, user):
28 res = jail_pb2.JailInfoRes(
29 has_not_accepted_tos=user.accepted_tos < TOS_VERSION,
30 has_not_added_location=user.is_missing_location,
31 has_not_accepted_community_guidelines=user.accepted_community_guidelines < GUIDELINES_VERSION,
32 has_not_set_password=not user.has_password,
33 )
35 # if any of the bools in res are true, we're jailed
36 jailed = False
37 for field in res.DESCRIPTOR.fields:
38 if getattr(res, field.name):
39 jailed = True
40 res.jailed = jailed
42 # double check
43 assert user.is_jailed == jailed
45 return res
47 def JailInfo(self, request, context):
48 with session_scope() as session:
49 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
50 return self._get_jail_info(user)
52 def AcceptTOS(self, request, context):
53 with session_scope() as session:
54 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
56 if not request.accept:
57 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNACCEPT_TOS)
59 user.accepted_tos = TOS_VERSION
60 session.commit()
62 return self._get_jail_info(user)
64 def SetLocation(self, request, context):
65 with session_scope() as session:
66 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
68 if request.lat == 0 and request.lng == 0:
69 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
71 user.city = request.city
72 user.geom = create_coordinate(request.lat, request.lng)
73 user.geom_radius = request.radius
75 session.commit()
77 return self._get_jail_info(user)
79 def AcceptCommunityGuidelines(self, request, context):
80 with session_scope() as session:
81 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
83 if not request.accept:
84 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES)
86 user.accepted_community_guidelines = GUIDELINES_VERSION
87 session.commit()
89 return self._get_jail_info(user)
91 def SetPassword(self, request, context):
92 with session_scope() as session:
93 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
95 # this is important so anybody can't just set your password through the jail API
96 if user.has_password:
97 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAS_PASSWORD)
99 abort_on_invalid_password(request.new_password, context)
101 user.hashed_password = hash_password(request.new_password)
102 session.commit()
104 send_password_changed_email(user)
106 return self._get_jail_info(user)