Coverage for app / backend / src / couchers / servicers / jail.py: 96%
69 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import logging
3import grpc
4from google.protobuf import empty_pb2
5from sqlalchemy import select
6from sqlalchemy.orm import Session
8from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
9from couchers.context import CouchersContext
10from couchers.models import ActivenessProbe, ActivenessProbeStatus, HostingStatus, ModNote, User
11from couchers.proto import jail_pb2, jail_pb2_grpc
12from couchers.servicers.account import mod_note_to_pb
13from couchers.utils import create_coordinate, now
15logger = logging.getLogger(__name__)
18def _get_jail_info(user: User) -> jail_pb2.JailInfoRes:
19 res = jail_pb2.JailInfoRes(
20 has_not_accepted_tos=user.jailed_missing_tos,
21 needs_to_update_location=user.is_missing_location,
22 has_not_accepted_community_guidelines=user.jailed_missing_community_guidelines,
23 has_pending_mod_notes=user.jailed_pending_mod_notes,
24 pending_mod_notes=[mod_note_to_pb(note) for note in user.mod_notes.where(ModNote.is_pending)],
25 has_pending_activeness_probe=user.jailed_pending_activeness_probe,
26 )
28 # if any of the bools in res are true, we're jailed
29 jailed = False
30 for field in res.DESCRIPTOR.fields:
31 if getattr(res, field.name):
32 jailed = True
33 res.jailed = jailed
35 # double check
36 assert user.is_jailed == jailed
38 return res
41class Jail(jail_pb2_grpc.JailServicer):
42 """
43 The Jail servicer.
45 API calls allowed for users who need to complete some tasks before being
46 fully active
47 """
49 def JailInfo(self, request: empty_pb2.Empty, context: CouchersContext, session: Session) -> jail_pb2.JailInfoRes:
50 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
51 return _get_jail_info(user)
53 def AcceptTOS(
54 self, request: jail_pb2.AcceptTOSReq, context: CouchersContext, session: Session
55 ) -> jail_pb2.JailInfoRes:
56 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
58 if not request.accept:
59 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unaccept_tos")
61 user.accepted_tos = TOS_VERSION
63 return _get_jail_info(user)
65 def SetLocation(
66 self, request: jail_pb2.SetLocationReq, context: CouchersContext, session: Session
67 ) -> jail_pb2.JailInfoRes:
68 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
70 if request.lat == 0 and request.lng == 0: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate")
73 user.city = request.city
74 user.geom = create_coordinate(request.lat, request.lng)
75 user.randomized_geom = None
76 user.geom_radius = request.radius
77 user.needs_to_update_location = False
79 return _get_jail_info(user)
81 def AcceptCommunityGuidelines(
82 self, request: jail_pb2.AcceptCommunityGuidelinesReq, context: CouchersContext, session: Session
83 ) -> jail_pb2.JailInfoRes:
84 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
86 if not request.accept:
87 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unaccept_community_guidelines")
89 user.accepted_community_guidelines = GUIDELINES_VERSION
91 return _get_jail_info(user)
93 def AcknowledgePendingModNote(
94 self, request: jail_pb2.AcknowledgePendingModNoteReq, context: CouchersContext, session: Session
95 ) -> jail_pb2.JailInfoRes:
96 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
98 note = session.execute(
99 select(ModNote)
100 .where(ModNote.user_id == user.id)
101 .where(ModNote.is_pending)
102 .where(ModNote.id == request.note_id)
103 ).scalar_one_or_none()
105 if not note:
106 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderator_note_not_found")
108 if not request.acknowledge:
109 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "moderator_note_need_to_acknowledge")
111 note.acknowledged = now()
113 return _get_jail_info(user)
115 def RespondToActivenessProbe(
116 self, request: jail_pb2.RespondToActivenessProbeReq, context: CouchersContext, session: Session
117 ) -> jail_pb2.JailInfoRes:
118 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
120 probe = session.execute(
121 select(ActivenessProbe).where(ActivenessProbe.user_id == user.id).where(ActivenessProbe.is_pending)
122 ).scalar_one_or_none()
124 if not probe:
125 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "probe_not_found")
127 if request.response == jail_pb2.ACTIVENESS_PROBE_RESPONSE_STILL_ACTIVE:
128 probe.response = ActivenessProbeStatus.still_active
129 elif request.response == jail_pb2.ACTIVENESS_PROBE_RESPONSE_NO_LONGER_ACTIVE: 129 ↛ 134line 129 didn't jump to line 134 because the condition on line 129 was always true
130 probe.response = ActivenessProbeStatus.no_longer_active
131 # disable hosting
132 user.hosting_status = HostingStatus.cant_host
133 else:
134 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "probe_response_invalid")
136 probe.responded = now()
138 return _get_jail_info(user)