Coverage for src/couchers/servicers/jail.py: 98%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

63 statements  

1import logging 

2 

3import grpc 

4 

5from couchers import errors 

6from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

7from couchers.crypto import hash_password 

8from couchers.db import session_scope 

9from couchers.models import User 

10from couchers.servicers.auth import abort_on_invalid_password 

11from couchers.sql import couchers_select as select 

12from couchers.tasks import send_password_changed_email 

13from couchers.utils import create_coordinate 

14from proto import jail_pb2, jail_pb2_grpc 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class Jail(jail_pb2_grpc.JailServicer): 

20 """ 

21 The Jail servicer. 

22 

23 API calls allowed for users who need to complete some tasks before being 

24 fully active 

25 """ 

26 

27 def _get_jail_info(self, user): 

28 res = jail_pb2.JailInfoRes( 

29 has_not_accepted_tos=user.accepted_tos < TOS_VERSION, 

30 has_not_added_location=user.is_missing_location, 

31 has_not_accepted_community_guidelines=user.accepted_community_guidelines < GUIDELINES_VERSION, 

32 has_not_set_password=not user.has_password, 

33 ) 

34 

35 # if any of the bools in res are true, we're jailed 

36 jailed = False 

37 for field in res.DESCRIPTOR.fields: 

38 if getattr(res, field.name): 

39 jailed = True 

40 res.jailed = jailed 

41 

42 # double check 

43 assert user.is_jailed == jailed 

44 

45 return res 

46 

47 def JailInfo(self, request, context): 

48 with session_scope() as session: 

49 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

50 return self._get_jail_info(user) 

51 

52 def AcceptTOS(self, request, context): 

53 with session_scope() as session: 

54 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

55 

56 if not request.accept: 

57 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNACCEPT_TOS) 

58 

59 user.accepted_tos = TOS_VERSION 

60 session.commit() 

61 

62 return self._get_jail_info(user) 

63 

64 def SetLocation(self, request, context): 

65 with session_scope() as session: 

66 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

67 

68 if request.lat == 0 and request.lng == 0: 

69 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

70 

71 user.city = request.city 

72 user.geom = create_coordinate(request.lat, request.lng) 

73 user.geom_radius = request.radius 

74 

75 session.commit() 

76 

77 return self._get_jail_info(user) 

78 

79 def AcceptCommunityGuidelines(self, request, context): 

80 with session_scope() as session: 

81 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

82 

83 if not request.accept: 

84 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNACCEPT_COMMUNITY_GUIDELINES) 

85 

86 user.accepted_community_guidelines = GUIDELINES_VERSION 

87 session.commit() 

88 

89 return self._get_jail_info(user) 

90 

91 def SetPassword(self, request, context): 

92 with session_scope() as session: 

93 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

94 

95 # this is important so anybody can't just set your password through the jail API 

96 if user.has_password: 

97 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_HAS_PASSWORD) 

98 

99 abort_on_invalid_password(request.new_password, context) 

100 

101 user.hashed_password = hash_password(request.new_password) 

102 session.commit() 

103 

104 send_password_changed_email(user) 

105 

106 return self._get_jail_info(user)