Coverage for src/couchers/servicers/admin.py: 71%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import json
2import logging
3from datetime import timedelta
5import grpc
6from shapely.geometry import shape
7from sqlalchemy.sql import or_, select
9from couchers import errors, urls
10from couchers.db import session_scope
11from couchers.helpers.clusters import create_cluster, create_node
12from couchers.models import GroupChat, GroupChatSubscription, HostRequest, Message, User
13from couchers.notifications.notify import notify
14from couchers.servicers.auth import create_session
15from couchers.servicers.communities import community_to_pb
16from couchers.sql import couchers_select as select
17from couchers.tasks import send_api_key_email
18from couchers.utils import date_to_api, parse_date
19from proto import admin_pb2, admin_pb2_grpc
21logger = logging.getLogger(__name__)
24def _user_to_details(user):
25 return admin_pb2.UserDetails(
26 user_id=user.id,
27 username=user.username,
28 email=user.email,
29 gender=user.gender,
30 birthdate=date_to_api(user.birthdate),
31 banned=user.is_banned,
32 deleted=user.is_deleted,
33 )
36class Admin(admin_pb2_grpc.AdminServicer):
37 def GetUserDetails(self, request, context):
38 with session_scope() as session:
39 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
40 if not user:
41 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
42 return _user_to_details(user)
44 def ChangeUserGender(self, request, context):
45 with session_scope() as session:
46 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
47 if not user:
48 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
49 user.gender = request.gender
51 notify(
52 user_id=user.id,
53 topic="gender",
54 key="",
55 action="change",
56 icon="wrench",
57 title=f"An admin changed your gender",
58 link=urls.account_settings_link(),
59 )
61 return _user_to_details(user)
63 def ChangeUserBirthdate(self, request, context):
64 with session_scope() as session:
65 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
66 if not user:
67 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
68 user.birthdate = parse_date(request.birthdate)
70 notify(
71 user_id=user.id,
72 topic="birthdate",
73 key="",
74 action="change",
75 icon="wrench",
76 title=f"An admin changed your birth date",
77 link=urls.account_settings_link(),
78 )
80 return _user_to_details(user)
82 def BanUser(self, request, context):
83 with session_scope() as session:
84 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
85 if not user:
86 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
87 user.is_banned = True
88 return _user_to_details(user)
90 def DeleteUser(self, request, context):
91 with session_scope() as session:
92 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
93 if not user:
94 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
95 user.is_deleted = True
96 return _user_to_details(user)
98 def CreateApiKey(self, request, context):
99 with session_scope() as session:
100 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
101 if not user:
102 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
103 token, expiry = create_session(
104 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365)
105 )
106 send_api_key_email(session, user, token, expiry)
108 notify(
109 user_id=user.id,
110 topic="api_key",
111 key="",
112 action="create",
113 icon="wrench",
114 title=f"An admin created an API key for you, please check your email",
115 link=urls.account_settings_link(),
116 )
118 return _user_to_details(user)
120 def CreateCommunity(self, request, context):
121 with session_scope() as session:
122 geom = shape(json.loads(request.geojson))
124 if geom.type != "MultiPolygon":
125 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON)
127 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
128 node = create_node(session, geom, parent_node_id)
129 create_cluster(
130 session, node.id, request.name, request.description, context.user_id, request.admin_ids, True
131 )
133 return community_to_pb(node, context)
135 def GetChats(self, request, context):
136 with session_scope() as session:
138 def format_user(user):
139 return f"{user.name} ({user.username}, {user.id})"
141 def format_conversation(conversation_id):
142 out = ""
143 with session_scope() as session:
144 messages = (
145 session.execute(
146 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
147 )
148 .scalars()
149 .all()
150 )
151 for message in messages:
152 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n"
153 out += str(message.text)
154 out += "\n\n-----\n"
155 out += "\n\n\n\n"
156 return out
158 def format_host_request(host_request_id):
159 out = ""
160 with session_scope() as session:
161 host_request = session.execute(
162 select(HostRequest).where(HostRequest.conversation_id == host_request_id)
163 ).scalar_one()
164 out += "==============================\n"
165 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n"
166 out += format_conversation(host_request.conversation_id)
167 out += "\n\n\n\n"
168 return out
170 def format_group_chat(group_chat_id):
171 out = ""
172 with session_scope() as session:
173 group_chat = session.execute(
174 select(GroupChat).where(GroupChat.conversation_id == group_chat_id)
175 ).scalar_one()
176 out += "==============================\n"
177 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n"
178 subs = (
179 session.execute(
180 select(GroupChatSubscription)
181 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
182 .order_by(GroupChatSubscription.joined.asc())
183 )
184 .scalars()
185 .all()
186 )
187 for sub in subs:
188 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n"
189 out += "\n\nMessages:\n"
190 out += format_conversation(group_chat.conversation_id)
191 out += "\n\n\n\n"
192 return out
194 def format_all_chats_for_user(user_id):
195 out = ""
196 with session_scope() as session:
197 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
198 out += f"Chats for user {format_user(user)}\n"
199 host_request_ids = (
200 session.execute(
201 select(HostRequest.conversation_id).where(
202 or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id)
203 )
204 )
205 .scalars()
206 .all()
207 )
208 out += f"************************************* Requests ({len(host_request_ids)})\n"
209 for host_request in host_request_ids:
210 out += format_host_request(host_request)
211 group_chat_ids = (
212 session.execute(
213 select(GroupChatSubscription.group_chat_id)
214 .where(GroupChatSubscription.user_id == user_id)
215 .order_by(GroupChatSubscription.joined.asc())
216 )
217 .scalars()
218 .all()
219 )
220 out += f"************************************* Group chats ({len(group_chat_ids)})\n"
221 for group_chat_id in group_chat_ids:
222 out += format_group_chat(group_chat_id)
223 return out
225 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
226 if not user:
227 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
229 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id))