Coverage for src/couchers/servicers/admin.py: 71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

129 statements  

1import json 

2import logging 

3from datetime import timedelta 

4 

5import grpc 

6from shapely.geometry import shape 

7from sqlalchemy.sql import or_, select 

8 

9from couchers import errors, urls 

10from couchers.db import session_scope 

11from couchers.helpers.clusters import create_cluster, create_node 

12from couchers.models import GroupChat, GroupChatSubscription, HostRequest, Message, User 

13from couchers.notifications.notify import notify 

14from couchers.servicers.auth import create_session 

15from couchers.servicers.communities import community_to_pb 

16from couchers.sql import couchers_select as select 

17from couchers.tasks import send_api_key_email 

18from couchers.utils import date_to_api, parse_date 

19from proto import admin_pb2, admin_pb2_grpc 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24def _user_to_details(user): 

25 return admin_pb2.UserDetails( 

26 user_id=user.id, 

27 username=user.username, 

28 email=user.email, 

29 gender=user.gender, 

30 birthdate=date_to_api(user.birthdate), 

31 banned=user.is_banned, 

32 deleted=user.is_deleted, 

33 ) 

34 

35 

36class Admin(admin_pb2_grpc.AdminServicer): 

37 def GetUserDetails(self, request, context): 

38 with session_scope() as session: 

39 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

40 if not user: 

41 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

42 return _user_to_details(user) 

43 

44 def ChangeUserGender(self, request, context): 

45 with session_scope() as session: 

46 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

47 if not user: 

48 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

49 user.gender = request.gender 

50 

51 notify( 

52 user_id=user.id, 

53 topic="gender", 

54 key="", 

55 action="change", 

56 icon="wrench", 

57 title=f"An admin changed your gender", 

58 link=urls.account_settings_link(), 

59 ) 

60 

61 return _user_to_details(user) 

62 

63 def ChangeUserBirthdate(self, request, context): 

64 with session_scope() as session: 

65 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

66 if not user: 

67 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

68 user.birthdate = parse_date(request.birthdate) 

69 

70 notify( 

71 user_id=user.id, 

72 topic="birthdate", 

73 key="", 

74 action="change", 

75 icon="wrench", 

76 title=f"An admin changed your birth date", 

77 link=urls.account_settings_link(), 

78 ) 

79 

80 return _user_to_details(user) 

81 

82 def BanUser(self, request, context): 

83 with session_scope() as session: 

84 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

85 if not user: 

86 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

87 user.is_banned = True 

88 return _user_to_details(user) 

89 

90 def DeleteUser(self, request, context): 

91 with session_scope() as session: 

92 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

93 if not user: 

94 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

95 user.is_deleted = True 

96 return _user_to_details(user) 

97 

98 def CreateApiKey(self, request, context): 

99 with session_scope() as session: 

100 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

101 if not user: 

102 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

103 token, expiry = create_session( 

104 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365) 

105 ) 

106 send_api_key_email(session, user, token, expiry) 

107 

108 notify( 

109 user_id=user.id, 

110 topic="api_key", 

111 key="", 

112 action="create", 

113 icon="wrench", 

114 title=f"An admin created an API key for you, please check your email", 

115 link=urls.account_settings_link(), 

116 ) 

117 

118 return _user_to_details(user) 

119 

120 def CreateCommunity(self, request, context): 

121 with session_scope() as session: 

122 geom = shape(json.loads(request.geojson)) 

123 

124 if geom.type != "MultiPolygon": 

125 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON) 

126 

127 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

128 node = create_node(session, geom, parent_node_id) 

129 create_cluster( 

130 session, node.id, request.name, request.description, context.user_id, request.admin_ids, True 

131 ) 

132 

133 return community_to_pb(node, context) 

134 

135 def GetChats(self, request, context): 

136 with session_scope() as session: 

137 

138 def format_user(user): 

139 return f"{user.name} ({user.username}, {user.id})" 

140 

141 def format_conversation(conversation_id): 

142 out = "" 

143 with session_scope() as session: 

144 messages = ( 

145 session.execute( 

146 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

147 ) 

148 .scalars() 

149 .all() 

150 ) 

151 for message in messages: 

152 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n" 

153 out += str(message.text) 

154 out += "\n\n-----\n" 

155 out += "\n\n\n\n" 

156 return out 

157 

158 def format_host_request(host_request_id): 

159 out = "" 

160 with session_scope() as session: 

161 host_request = session.execute( 

162 select(HostRequest).where(HostRequest.conversation_id == host_request_id) 

163 ).scalar_one() 

164 out += "==============================\n" 

165 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n" 

166 out += format_conversation(host_request.conversation_id) 

167 out += "\n\n\n\n" 

168 return out 

169 

170 def format_group_chat(group_chat_id): 

171 out = "" 

172 with session_scope() as session: 

173 group_chat = session.execute( 

174 select(GroupChat).where(GroupChat.conversation_id == group_chat_id) 

175 ).scalar_one() 

176 out += "==============================\n" 

177 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n" 

178 subs = ( 

179 session.execute( 

180 select(GroupChatSubscription) 

181 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

182 .order_by(GroupChatSubscription.joined.asc()) 

183 ) 

184 .scalars() 

185 .all() 

186 ) 

187 for sub in subs: 

188 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n" 

189 out += "\n\nMessages:\n" 

190 out += format_conversation(group_chat.conversation_id) 

191 out += "\n\n\n\n" 

192 return out 

193 

194 def format_all_chats_for_user(user_id): 

195 out = "" 

196 with session_scope() as session: 

197 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

198 out += f"Chats for user {format_user(user)}\n" 

199 host_request_ids = ( 

200 session.execute( 

201 select(HostRequest.conversation_id).where( 

202 or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id) 

203 ) 

204 ) 

205 .scalars() 

206 .all() 

207 ) 

208 out += f"************************************* Requests ({len(host_request_ids)})\n" 

209 for host_request in host_request_ids: 

210 out += format_host_request(host_request) 

211 group_chat_ids = ( 

212 session.execute( 

213 select(GroupChatSubscription.group_chat_id) 

214 .where(GroupChatSubscription.user_id == user_id) 

215 .order_by(GroupChatSubscription.joined.asc()) 

216 ) 

217 .scalars() 

218 .all() 

219 ) 

220 out += f"************************************* Group chats ({len(group_chat_ids)})\n" 

221 for group_chat_id in group_chat_ids: 

222 out += format_group_chat(group_chat_id) 

223 return out 

224 

225 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

226 if not user: 

227 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

228 

229 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id))