Coverage for app / backend / src / couchers / servicers / jail.py: 96%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import logging 

2 

3import grpc 

4from google.protobuf import empty_pb2 

5from sqlalchemy import select 

6from sqlalchemy.orm import Session 

7 

8from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

9from couchers.context import CouchersContext 

10from couchers.models import ActivenessProbe, ActivenessProbeStatus, HostingStatus, ModNote, User 

11from couchers.proto import jail_pb2, jail_pb2_grpc 

12from couchers.servicers.account import mod_note_to_pb 

13from couchers.utils import create_coordinate, now 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18def _get_jail_info(user: User) -> jail_pb2.JailInfoRes: 

19 res = jail_pb2.JailInfoRes( 

20 has_not_accepted_tos=user.jailed_missing_tos, 

21 needs_to_update_location=user.is_missing_location, 

22 has_not_accepted_community_guidelines=user.jailed_missing_community_guidelines, 

23 has_pending_mod_notes=user.jailed_pending_mod_notes, 

24 pending_mod_notes=[mod_note_to_pb(note) for note in user.mod_notes.where(ModNote.is_pending)], 

25 has_pending_activeness_probe=user.jailed_pending_activeness_probe, 

26 ) 

27 

28 # if any of the bools in res are true, we're jailed 

29 jailed = False 

30 for field in res.DESCRIPTOR.fields: 

31 if getattr(res, field.name): 

32 jailed = True 

33 res.jailed = jailed 

34 

35 # double check 

36 assert user.is_jailed == jailed 

37 

38 return res 

39 

40 

41class Jail(jail_pb2_grpc.JailServicer): 

42 """ 

43 The Jail servicer. 

44 

45 API calls allowed for users who need to complete some tasks before being 

46 fully active 

47 """ 

48 

49 def JailInfo(self, request: empty_pb2.Empty, context: CouchersContext, session: Session) -> jail_pb2.JailInfoRes: 

50 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

51 return _get_jail_info(user) 

52 

53 def AcceptTOS( 

54 self, request: jail_pb2.AcceptTOSReq, context: CouchersContext, session: Session 

55 ) -> jail_pb2.JailInfoRes: 

56 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

57 

58 if not request.accept: 

59 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unaccept_tos") 

60 

61 user.accepted_tos = TOS_VERSION 

62 

63 return _get_jail_info(user) 

64 

65 def SetLocation( 

66 self, request: jail_pb2.SetLocationReq, context: CouchersContext, session: Session 

67 ) -> jail_pb2.JailInfoRes: 

68 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

69 

70 if request.lat == 0 and request.lng == 0: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

72 

73 user.city = request.city 

74 user.geom = create_coordinate(request.lat, request.lng) 

75 user.randomized_geom = None 

76 user.geom_radius = request.radius 

77 user.needs_to_update_location = False 

78 

79 return _get_jail_info(user) 

80 

81 def AcceptCommunityGuidelines( 

82 self, request: jail_pb2.AcceptCommunityGuidelinesReq, context: CouchersContext, session: Session 

83 ) -> jail_pb2.JailInfoRes: 

84 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

85 

86 if not request.accept: 

87 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unaccept_community_guidelines") 

88 

89 user.accepted_community_guidelines = GUIDELINES_VERSION 

90 

91 return _get_jail_info(user) 

92 

93 def AcknowledgePendingModNote( 

94 self, request: jail_pb2.AcknowledgePendingModNoteReq, context: CouchersContext, session: Session 

95 ) -> jail_pb2.JailInfoRes: 

96 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

97 

98 note = session.execute( 

99 select(ModNote) 

100 .where(ModNote.user_id == user.id) 

101 .where(ModNote.is_pending) 

102 .where(ModNote.id == request.note_id) 

103 ).scalar_one_or_none() 

104 

105 if not note: 

106 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderator_note_not_found") 

107 

108 if not request.acknowledge: 

109 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "moderator_note_need_to_acknowledge") 

110 

111 note.acknowledged = now() 

112 

113 return _get_jail_info(user) 

114 

115 def RespondToActivenessProbe( 

116 self, request: jail_pb2.RespondToActivenessProbeReq, context: CouchersContext, session: Session 

117 ) -> jail_pb2.JailInfoRes: 

118 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

119 

120 probe = session.execute( 

121 select(ActivenessProbe).where(ActivenessProbe.user_id == user.id).where(ActivenessProbe.is_pending) 

122 ).scalar_one_or_none() 

123 

124 if not probe: 

125 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "probe_not_found") 

126 

127 if request.response == jail_pb2.ACTIVENESS_PROBE_RESPONSE_STILL_ACTIVE: 

128 probe.response = ActivenessProbeStatus.still_active 

129 elif request.response == jail_pb2.ACTIVENESS_PROBE_RESPONSE_NO_LONGER_ACTIVE: 129 ↛ 134line 129 didn't jump to line 134 because the condition on line 129 was always true

130 probe.response = ActivenessProbeStatus.no_longer_active 

131 # disable hosting 

132 user.hosting_status = HostingStatus.cant_host 

133 else: 

134 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "probe_response_invalid") 

135 

136 probe.responded = now() 

137 

138 return _get_jail_info(user)