Coverage for src/couchers/servicers/jail.py: 97%

66 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1import logging 

2 

3import grpc 

4 

5from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

6from couchers.models import ActivenessProbe, ActivenessProbeStatus, HostingStatus, ModNote, User 

7from couchers.proto import jail_pb2, jail_pb2_grpc 

8from couchers.servicers.account import mod_note_to_pb 

9from couchers.sql import couchers_select as select 

10from couchers.utils import create_coordinate, now 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15def _get_jail_info(session, user): 

16 res = jail_pb2.JailInfoRes( 

17 has_not_accepted_tos=user.jailed_missing_tos, 

18 needs_to_update_location=user.is_missing_location, 

19 has_not_accepted_community_guidelines=user.jailed_missing_community_guidelines, 

20 has_pending_mod_notes=user.jailed_pending_mod_notes, 

21 pending_mod_notes=[mod_note_to_pb(note) for note in user.mod_notes.where(ModNote.is_pending)], 

22 has_pending_activeness_probe=user.jailed_pending_activeness_probe, 

23 ) 

24 

25 # if any of the bools in res are true, we're jailed 

26 jailed = False 

27 for field in res.DESCRIPTOR.fields: 

28 if getattr(res, field.name): 

29 jailed = True 

30 res.jailed = jailed 

31 

32 # double check 

33 assert user.is_jailed == jailed 

34 

35 return res 

36 

37 

38class Jail(jail_pb2_grpc.JailServicer): 

39 """ 

40 The Jail servicer. 

41 

42 API calls allowed for users who need to complete some tasks before being 

43 fully active 

44 """ 

45 

46 def JailInfo(self, request, context, session): 

47 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

48 return _get_jail_info(session, user) 

49 

50 def AcceptTOS(self, request, context, session): 

51 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

52 

53 if not request.accept: 

54 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unaccept_tos") 

55 

56 user.accepted_tos = TOS_VERSION 

57 

58 return _get_jail_info(session, user) 

59 

60 def SetLocation(self, request, context, session): 

61 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

62 

63 if request.lat == 0 and request.lng == 0: 

64 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

65 

66 user.city = request.city 

67 user.geom = create_coordinate(request.lat, request.lng) 

68 user.randomized_geom = None 

69 user.geom_radius = request.radius 

70 user.needs_to_update_location = False 

71 

72 return _get_jail_info(session, user) 

73 

74 def AcceptCommunityGuidelines(self, request, context, session): 

75 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

76 

77 if not request.accept: 

78 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unaccept_community_guidelines") 

79 

80 user.accepted_community_guidelines = GUIDELINES_VERSION 

81 

82 return _get_jail_info(session, user) 

83 

84 def AcknowledgePendingModNote(self, request, context, session): 

85 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

86 

87 note = session.execute( 

88 select(ModNote) 

89 .where(ModNote.user_id == user.id) 

90 .where(ModNote.is_pending) 

91 .where(ModNote.id == request.note_id) 

92 ).scalar_one_or_none() 

93 

94 if not note: 

95 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "mod_note_not_found") 

96 

97 if not request.acknowledge: 

98 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "mod_note_need_to_acknoweldge") 

99 

100 note.acknowledged = now() 

101 

102 return _get_jail_info(session, user) 

103 

104 def RespondToActivenessProbe(self, request, context, session): 

105 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

106 

107 probe = session.execute( 

108 select(ActivenessProbe).where(ActivenessProbe.user_id == user.id).where(ActivenessProbe.is_pending) 

109 ).scalar_one_or_none() 

110 

111 if not probe: 

112 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "probe_not_found") 

113 

114 if request.response == jail_pb2.ACTIVENESS_PROBE_RESPONSE_STILL_ACTIVE: 

115 probe.response = ActivenessProbeStatus.still_active 

116 elif request.response == jail_pb2.ACTIVENESS_PROBE_RESPONSE_NO_LONGER_ACTIVE: 

117 probe.response = ActivenessProbeStatus.no_longer_active 

118 # disable hosting 

119 user.hosting_status = HostingStatus.cant_host 

120 else: 

121 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "probe_response_invalid") 

122 

123 probe.responded = now() 

124 

125 return _get_jail_info(session, user)