Coverage for src/couchers/servicers/public.py: 49%

37 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-02 01:08 +0000

1import logging 

2 

3import grpc 

4from sqlalchemy.sql import func, union_all 

5 

6from couchers import errors 

7from couchers.models import Cluster, Node, ProfilePublicVisibility, Reference, User 

8from couchers.servicers.api import fluency2api, hostingstatus2api, meetupstatus2api, user_model_to_pb 

9from couchers.servicers.gis import _statement_to_geojson_response 

10from couchers.sql import couchers_select as select 

11from couchers.utils import Timestamp_from_datetime, make_logged_out_context 

12from proto import api_pb2, public_pb2, public_pb2_grpc 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class Public(public_pb2_grpc.PublicServicer): 

18 """ 

19 Public (logged out) APIs for getting public info 

20 """ 

21 

22 def GetPublicUsers(self, request, context, session): 

23 with_geom = ( 

24 select(User.username, User.geom) 

25 .where(User.is_visible) 

26 .where(User.public_visibility != ProfilePublicVisibility.nothing) 

27 .where(User.public_visibility != ProfilePublicVisibility.map_only) 

28 ) 

29 

30 without_geom = ( 

31 select(None, User.randomized_geom) 

32 .where(User.is_visible) 

33 .where(User.randomized_geom != None) 

34 .where(User.public_visibility == ProfilePublicVisibility.map_only) 

35 ) 

36 return _statement_to_geojson_response(session, union_all(with_geom, without_geom)) 

37 

38 def GetPublicUser(self, request, context, session): 

39 user = session.execute( 

40 select(User) 

41 .where(User.is_visible) 

42 .where(User.username == request.user) 

43 .where( 

44 User.public_visibility.in_( 

45 [ProfilePublicVisibility.limited, ProfilePublicVisibility.most, ProfilePublicVisibility.full] 

46 ) 

47 ) 

48 ).scalar_one_or_none() 

49 

50 if not user: 

51 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

52 

53 if user.public_visibility == ProfilePublicVisibility.full: 

54 return public_pb2.GetPublicUserRes(full_user=user_model_to_pb(user, session, make_logged_out_context())) 

55 

56 num_references = session.execute( 

57 select(func.count()) 

58 .select_from(Reference) 

59 .join(User, User.id == Reference.from_user_id) 

60 .where(User.is_visible) 

61 .where(Reference.to_user_id == user.id) 

62 ).scalar_one() 

63 

64 if user.public_visibility == ProfilePublicVisibility.limited: 

65 return public_pb2.GetPublicUserRes( 

66 limited_user=public_pb2.LimitedUser( 

67 username=user.username, 

68 name=user.name, 

69 city=user.city, 

70 hometown=user.hometown, 

71 num_references=num_references, 

72 joined=Timestamp_from_datetime(user.display_joined), 

73 hosting_status=hostingstatus2api[user.hosting_status], 

74 meetup_status=meetupstatus2api[user.meetup_status], 

75 badges=[badge.badge_id for badge in user.badges], 

76 ) 

77 ) 

78 

79 if user.public_visibility == ProfilePublicVisibility.most: 

80 return public_pb2.GetPublicUserRes( 

81 most_user=public_pb2.MostUser( 

82 username=user.username, 

83 name=user.name, 

84 city=user.city, 

85 hometown=user.hometown, 

86 timezone=user.timezone, 

87 num_references=num_references, 

88 gender=user.gender, 

89 pronouns=user.pronouns, 

90 age=user.age, 

91 joined=Timestamp_from_datetime(user.display_joined), 

92 last_active=Timestamp_from_datetime(user.display_last_active), 

93 hosting_status=hostingstatus2api[user.hosting_status], 

94 meetup_status=meetupstatus2api[user.meetup_status], 

95 occupation=user.occupation, 

96 education=user.education, 

97 about_me=user.about_me, 

98 things_i_like=user.things_i_like, 

99 language_abilities=[ 

100 api_pb2.LanguageAbility(code=ability.language_code, fluency=fluency2api[ability.fluency]) 

101 for ability in user.language_abilities 

102 ], 

103 regions_visited=[region.code for region in user.regions_visited], 

104 regions_lived=[region.code for region in user.regions_lived], 

105 avatar_url=user.avatar.full_url if user.avatar else None, 

106 avatar_thumbnail_url=user.avatar.thumbnail_url if user.avatar else None, 

107 badges=[badge.badge_id for badge in user.badges], 

108 ) 

109 ) 

110 

111 def GetSignupPageInfo(self, request, context, session): 

112 # last user who signed up 

113 last_signup, geom = session.execute( 

114 select(User.joined, User.geom).where(User.is_visible).order_by(User.id.desc()).limit(1) 

115 ).one_or_none() 

116 

117 communities = ( 

118 session.execute( 

119 select(Cluster.name) 

120 .join(Node, Node.id == Cluster.parent_node_id) 

121 .where(Cluster.is_official_cluster) 

122 .where(func.ST_Contains(Node.geom, geom)) 

123 .order_by(Cluster.id.asc()) 

124 ) 

125 .scalars() 

126 .all() 

127 ) 

128 

129 if len(communities) <= 1: 

130 # either no community or just global community 

131 last_location = "The World" 

132 elif len(communities) == 3: 

133 # probably global, continent, region, so let's just return the region 

134 last_location = communities[-1] 

135 else: 

136 # probably global, continent, region, city 

137 last_location = f"{communities[-1]}, {communities[-2]}" 

138 

139 user_count = session.execute(select(func.count()).select_from(User).where(User.is_visible)).scalar_one() 

140 

141 return public_pb2.GetSignupPageInfoRes( 

142 last_signup=Timestamp_from_datetime(last_signup.replace(second=0, microsecond=0)), 

143 last_location=last_location, 

144 user_count=user_count, 

145 )