Coverage for src/couchers/servicers/admin.py: 79%
242 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1import json
2import logging
3from datetime import timedelta
5import grpc
6from google.protobuf import empty_pb2
7from shapely.geometry import shape
8from sqlalchemy.sql import or_, select, update
10from couchers import errors, urls
11from couchers.helpers.badges import user_add_badge, user_remove_badge
12from couchers.helpers.clusters import create_cluster, create_node
13from couchers.jobs.enqueue import queue_job
14from couchers.models import (
15 ContentReport,
16 Event,
17 EventCommunityInviteRequest,
18 EventOccurrence,
19 GroupChat,
20 GroupChatSubscription,
21 HostRequest,
22 Message,
23 ModNote,
24 User,
25 UserBadge,
26)
27from couchers.notifications.notify import notify
28from couchers.resources import get_badge_dict
29from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb
30from couchers.servicers.auth import create_session
31from couchers.servicers.communities import community_to_pb
32from couchers.servicers.events import get_users_to_notify_for_new_event
33from couchers.sql import couchers_select as select
34from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date
35from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2
36from proto.internal import jobs_pb2
38logger = logging.getLogger(__name__)
40MAX_PAGINATION_LENGTH = 250
43def _user_to_details(session, user):
44 return admin_pb2.UserDetails(
45 user_id=user.id,
46 username=user.username,
47 name=user.name,
48 email=user.email,
49 gender=user.gender,
50 birthdate=date_to_api(user.birthdate),
51 banned=user.is_banned,
52 deleted=user.is_deleted,
53 do_not_email=user.do_not_email,
54 badges=[badge.badge_id for badge in user.badges],
55 **get_strong_verification_fields(session, user),
56 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
57 admin_note=user.admin_note,
58 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
59 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
60 )
63def _content_report_to_pb(content_report: ContentReport):
64 return admin_pb2.ContentReport(
65 content_report_id=content_report.id,
66 time=Timestamp_from_datetime(content_report.time),
67 reporting_user_id=content_report.reporting_user_id,
68 author_user_id=content_report.author_user_id,
69 reason=content_report.reason,
70 description=content_report.description,
71 content_ref=content_report.content_ref,
72 user_agent=content_report.user_agent,
73 page=content_report.page,
74 )
77def append_admin_note(session, context, user, note):
78 if not note.strip():
79 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY)
80 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
81 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n"
84class Admin(admin_pb2_grpc.AdminServicer):
85 def GetUserDetails(self, request, context, session):
86 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
87 if not user:
88 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
89 return _user_to_details(session, user)
91 def GetUser(self, request, context, session):
92 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
93 if not user:
94 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
95 return user_model_to_pb(user, session, context)
97 def ChangeUserGender(self, request, context, session):
98 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
99 if not user:
100 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
101 user.gender = request.gender
102 session.commit()
104 notify(
105 session,
106 user_id=user.id,
107 topic_action="gender:change",
108 data=notification_data_pb2.GenderChange(
109 gender=request.gender,
110 ),
111 )
113 return _user_to_details(session, user)
115 def ChangeUserBirthdate(self, request, context, session):
116 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
117 if not user:
118 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
119 user.birthdate = parse_date(request.birthdate)
120 session.commit()
122 notify(
123 session,
124 user_id=user.id,
125 topic_action="birthdate:change",
126 data=notification_data_pb2.BirthdateChange(
127 birthdate=request.birthdate,
128 ),
129 )
131 return _user_to_details(session, user)
133 def AddBadge(self, request, context, session):
134 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
135 if not user:
136 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
138 badge = get_badge_dict().get(request.badge_id)
139 if not badge:
140 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
142 if not badge["admin_editable"]:
143 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
145 if badge["id"] in [b.badge_id for b in user.badges]:
146 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE)
148 user_add_badge(session, user.id, request.badge_id)
150 return _user_to_details(session, user)
152 def RemoveBadge(self, request, context, session):
153 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
154 if not user:
155 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
157 badge = get_badge_dict().get(request.badge_id)
158 if not badge:
159 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
161 if not badge["admin_editable"]:
162 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
164 user_badge = session.execute(
165 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"])
166 ).scalar_one_or_none()
167 if not user_badge:
168 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE)
170 user_remove_badge(session, user.id, request.badge_id)
172 return _user_to_details(session, user)
174 def SetPassportSexGenderException(self, request, context, session):
175 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
176 if not user:
177 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
178 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
179 return _user_to_details(session, user)
181 def BanUser(self, request, context, session):
182 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
183 if not user:
184 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
185 append_admin_note(session, context, user, request.admin_note)
186 user.is_banned = True
187 return _user_to_details(session, user)
189 def UnbanUser(self, request, context, session):
190 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
191 if not user:
192 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
193 append_admin_note(session, context, user, request.admin_note)
194 user.is_banned = False
195 return _user_to_details(session, user)
197 def AddAdminNote(self, request, context, session):
198 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
199 if not user:
200 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
201 append_admin_note(session, context, user, request.admin_note)
202 return _user_to_details(session, user)
204 def GetContentReport(self, request, context, session):
205 content_report = session.execute(
206 select(ContentReport).where(ContentReport.id == request.content_report_id)
207 ).scalar_one_or_none()
208 if not content_report:
209 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND)
210 return admin_pb2.GetContentReportRes(
211 content_report=_content_report_to_pb(content_report),
212 )
214 def GetContentReportsForAuthor(self, request, context, session):
215 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
216 if not user:
217 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
218 content_reports = (
219 session.execute(
220 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
221 )
222 .scalars()
223 .all()
224 )
225 return admin_pb2.GetContentReportsForAuthorRes(
226 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
227 )
229 def SendModNote(self, request, context, session):
230 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
231 if not user:
232 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
233 session.add(
234 ModNote(
235 user_id=user.id,
236 internal_id=request.internal_id,
237 creator_user_id=context.user_id,
238 note_content=request.content,
239 )
240 )
241 session.flush()
243 if not request.do_not_notify:
244 notify(
245 session,
246 user_id=user.id,
247 topic_action="modnote:create",
248 )
250 return _user_to_details(session, user)
252 def DeleteUser(self, request, context, session):
253 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
254 if not user:
255 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
256 user.is_deleted = True
257 return _user_to_details(session, user)
259 def CreateApiKey(self, request, context, session):
260 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
261 if not user:
262 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
263 token, expiry = create_session(
264 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
265 )
267 notify(
268 session,
269 user_id=user.id,
270 topic_action="api_key:create",
271 data=notification_data_pb2.ApiKeyCreate(
272 api_key=token,
273 expiry=Timestamp_from_datetime(expiry),
274 ),
275 )
277 return _user_to_details(session, user)
279 def CreateCommunity(self, request, context, session):
280 geom = shape(json.loads(request.geojson))
282 if geom.type != "MultiPolygon":
283 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON)
285 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
286 node = create_node(session, geom, parent_node_id)
287 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
289 return community_to_pb(session, node, context)
291 def GetChats(self, request, context, session):
292 def format_user(user):
293 return f"{user.name} ({user.username}, {user.id})"
295 def format_conversation(conversation_id):
296 out = ""
297 messages = (
298 session.execute(
299 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
300 )
301 .scalars()
302 .all()
303 )
304 for message in messages:
305 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n"
306 out += str(message.text)
307 out += "\n\n-----\n"
308 out += "\n\n\n\n"
309 return out
311 def format_host_request(host_request_id):
312 out = ""
313 host_request = session.execute(
314 select(HostRequest).where(HostRequest.conversation_id == host_request_id)
315 ).scalar_one()
316 out += "==============================\n"
317 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n"
318 out += format_conversation(host_request.conversation_id)
319 out += "\n\n\n\n"
320 return out
322 def format_group_chat(group_chat_id):
323 out = ""
324 group_chat = session.execute(
325 select(GroupChat).where(GroupChat.conversation_id == group_chat_id)
326 ).scalar_one()
327 out += "==============================\n"
328 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n"
329 subs = (
330 session.execute(
331 select(GroupChatSubscription)
332 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
333 .order_by(GroupChatSubscription.joined.asc())
334 )
335 .scalars()
336 .all()
337 )
338 for sub in subs:
339 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n"
340 out += "\n\nMessages:\n"
341 out += format_conversation(group_chat.conversation_id)
342 out += "\n\n\n\n"
343 return out
345 def format_all_chats_for_user(user_id):
346 out = ""
347 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
348 out += f"Chats for user {format_user(user)}\n"
349 host_request_ids = (
350 session.execute(
351 select(HostRequest.conversation_id)
352 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id))
353 .order_by(HostRequest.conversation_id.desc())
354 )
355 .scalars()
356 .all()
357 )
358 out += f"************************************* Requests ({len(host_request_ids)})\n"
359 for host_request in host_request_ids:
360 out += format_host_request(host_request)
361 group_chat_ids = (
362 session.execute(
363 select(GroupChatSubscription.group_chat_id)
364 .where(GroupChatSubscription.user_id == user_id)
365 .order_by(GroupChatSubscription.joined.desc())
366 )
367 .scalars()
368 .all()
369 )
370 out += f"************************************* Group chats ({len(group_chat_ids)})\n"
371 for group_chat_id in group_chat_ids:
372 out += format_group_chat(group_chat_id)
373 return out
375 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
376 if not user:
377 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
379 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id))
381 def ListEventCommunityInviteRequests(self, request, context, session):
382 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
383 next_request_id = int(request.page_token) if request.page_token else 0
384 requests = (
385 session.execute(
386 select(EventCommunityInviteRequest)
387 .where(EventCommunityInviteRequest.approved.is_(None))
388 .where(EventCommunityInviteRequest.id >= next_request_id)
389 .order_by(EventCommunityInviteRequest.id)
390 .limit(page_size + 1)
391 )
392 .scalars()
393 .all()
394 )
396 def _request_to_pb(request):
397 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
398 return admin_pb2.EventCommunityInviteRequest(
399 event_community_invite_request_id=request.id,
400 user_id=request.user_id,
401 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
402 approx_users_to_notify=len(users_to_notify),
403 community_id=node_id,
404 )
406 return admin_pb2.ListEventCommunityInviteRequestsRes(
407 requests=[_request_to_pb(request) for request in requests[:page_size]],
408 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
409 )
411 def DecideEventCommunityInviteRequest(self, request, context, session):
412 req = session.execute(
413 select(EventCommunityInviteRequest).where(
414 EventCommunityInviteRequest.id == request.event_community_invite_request_id
415 )
416 ).scalar_one_or_none()
418 if not req:
419 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND)
421 if req.decided:
422 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED)
424 decided = now()
425 req.decided = decided
426 req.decided_by_user_id = context.user_id
427 req.approved = request.approve
429 # deny other reqs for the same event
430 if request.approve:
431 session.execute(
432 update(EventCommunityInviteRequest)
433 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
434 .where(EventCommunityInviteRequest.decided.is_(None))
435 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
436 )
438 session.flush()
440 if request.approve:
441 queue_job(
442 session,
443 "generate_event_create_notifications",
444 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
445 inviting_user_id=req.user_id,
446 occurrence_id=req.occurrence_id,
447 approved=True,
448 ),
449 )
451 return admin_pb2.DecideEventCommunityInviteRequestRes()
453 def DeleteEvent(self, request, context, session):
454 res = session.execute(
455 select(Event, EventOccurrence)
456 .where(EventOccurrence.id == request.event_id)
457 .where(EventOccurrence.event_id == Event.id)
458 .where(~EventOccurrence.is_deleted)
459 ).one_or_none()
461 if not res:
462 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
464 event, occurrence = res
466 occurrence.is_deleted = True
468 queue_job(
469 session,
470 "generate_event_delete_notifications",
471 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
472 occurrence_id=occurrence.id,
473 ),
474 )
476 return empty_pb2.Empty()
478 def ListUserIds(self, request, context, session):
479 start_date = request.start_time.ToDatetime()
480 end_date = request.end_time.ToDatetime()
482 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
483 next_user_id = int(request.page_token) if request.page_token else 0
485 user_ids = (
486 session.execute(
487 select(User.id)
488 .where(User.id >= next_user_id)
489 .where(User.joined >= start_date)
490 .where(User.joined <= end_date)
491 .order_by(User.joined.desc())
492 .limit(page_size + 1)
493 )
494 .scalars()
495 .all()
496 )
498 return admin_pb2.ListUserIdsRes(
499 user_ids=user_ids[:page_size],
500 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
501 )