Coverage for src/couchers/servicers/public.py: 49%
37 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-02 01:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-02 01:08 +0000
1import logging
3import grpc
4from sqlalchemy.sql import func, union_all
6from couchers import errors
7from couchers.models import Cluster, Node, ProfilePublicVisibility, Reference, User
8from couchers.servicers.api import fluency2api, hostingstatus2api, meetupstatus2api, user_model_to_pb
9from couchers.servicers.gis import _statement_to_geojson_response
10from couchers.sql import couchers_select as select
11from couchers.utils import Timestamp_from_datetime, make_logged_out_context
12from proto import api_pb2, public_pb2, public_pb2_grpc
14logger = logging.getLogger(__name__)
17class Public(public_pb2_grpc.PublicServicer):
18 """
19 Public (logged out) APIs for getting public info
20 """
22 def GetPublicUsers(self, request, context, session):
23 with_geom = (
24 select(User.username, User.geom)
25 .where(User.is_visible)
26 .where(User.public_visibility != ProfilePublicVisibility.nothing)
27 .where(User.public_visibility != ProfilePublicVisibility.map_only)
28 )
30 without_geom = (
31 select(None, User.randomized_geom)
32 .where(User.is_visible)
33 .where(User.randomized_geom != None)
34 .where(User.public_visibility == ProfilePublicVisibility.map_only)
35 )
36 return _statement_to_geojson_response(session, union_all(with_geom, without_geom))
38 def GetPublicUser(self, request, context, session):
39 user = session.execute(
40 select(User)
41 .where(User.is_visible)
42 .where(User.username == request.user)
43 .where(
44 User.public_visibility.in_(
45 [ProfilePublicVisibility.limited, ProfilePublicVisibility.most, ProfilePublicVisibility.full]
46 )
47 )
48 ).scalar_one_or_none()
50 if not user:
51 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
53 if user.public_visibility == ProfilePublicVisibility.full:
54 return public_pb2.GetPublicUserRes(full_user=user_model_to_pb(user, session, make_logged_out_context()))
56 num_references = session.execute(
57 select(func.count())
58 .select_from(Reference)
59 .join(User, User.id == Reference.from_user_id)
60 .where(User.is_visible)
61 .where(Reference.to_user_id == user.id)
62 ).scalar_one()
64 if user.public_visibility == ProfilePublicVisibility.limited:
65 return public_pb2.GetPublicUserRes(
66 limited_user=public_pb2.LimitedUser(
67 username=user.username,
68 name=user.name,
69 city=user.city,
70 hometown=user.hometown,
71 num_references=num_references,
72 joined=Timestamp_from_datetime(user.display_joined),
73 hosting_status=hostingstatus2api[user.hosting_status],
74 meetup_status=meetupstatus2api[user.meetup_status],
75 badges=[badge.badge_id for badge in user.badges],
76 )
77 )
79 if user.public_visibility == ProfilePublicVisibility.most:
80 return public_pb2.GetPublicUserRes(
81 most_user=public_pb2.MostUser(
82 username=user.username,
83 name=user.name,
84 city=user.city,
85 hometown=user.hometown,
86 timezone=user.timezone,
87 num_references=num_references,
88 gender=user.gender,
89 pronouns=user.pronouns,
90 age=user.age,
91 joined=Timestamp_from_datetime(user.display_joined),
92 last_active=Timestamp_from_datetime(user.display_last_active),
93 hosting_status=hostingstatus2api[user.hosting_status],
94 meetup_status=meetupstatus2api[user.meetup_status],
95 occupation=user.occupation,
96 education=user.education,
97 about_me=user.about_me,
98 things_i_like=user.things_i_like,
99 language_abilities=[
100 api_pb2.LanguageAbility(code=ability.language_code, fluency=fluency2api[ability.fluency])
101 for ability in user.language_abilities
102 ],
103 regions_visited=[region.code for region in user.regions_visited],
104 regions_lived=[region.code for region in user.regions_lived],
105 avatar_url=user.avatar.full_url if user.avatar else None,
106 avatar_thumbnail_url=user.avatar.thumbnail_url if user.avatar else None,
107 badges=[badge.badge_id for badge in user.badges],
108 )
109 )
111 def GetSignupPageInfo(self, request, context, session):
112 # last user who signed up
113 last_signup, geom = session.execute(
114 select(User.joined, User.geom).where(User.is_visible).order_by(User.id.desc()).limit(1)
115 ).one_or_none()
117 communities = (
118 session.execute(
119 select(Cluster.name)
120 .join(Node, Node.id == Cluster.parent_node_id)
121 .where(Cluster.is_official_cluster)
122 .where(func.ST_Contains(Node.geom, geom))
123 .order_by(Cluster.id.asc())
124 )
125 .scalars()
126 .all()
127 )
129 if len(communities) <= 1:
130 # either no community or just global community
131 last_location = "The World"
132 elif len(communities) == 3:
133 # probably global, continent, region, so let's just return the region
134 last_location = communities[-1]
135 else:
136 # probably global, continent, region, city
137 last_location = f"{communities[-1]}, {communities[-2]}"
139 user_count = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one()
141 return public_pb2.GetSignupPageInfoRes(
142 last_signup=Timestamp_from_datetime(last_signup.replace(second=0, microsecond=0)),
143 last_location=last_location,
144 user_count=user_count,
145 )