Coverage for src/couchers/servicers/admin.py: 81%
262 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1import json
2import logging
3from datetime import timedelta
5import grpc
6from geoalchemy2.shape import from_shape
7from google.protobuf import empty_pb2
8from shapely.geometry import shape
9from sqlalchemy.sql import or_, select, update
11from couchers import errors, urls
12from couchers.helpers.badges import user_add_badge, user_remove_badge
13from couchers.helpers.clusters import create_cluster, create_node
14from couchers.jobs.enqueue import queue_job
15from couchers.models import (
16 ContentReport,
17 Event,
18 EventCommunityInviteRequest,
19 EventOccurrence,
20 GroupChat,
21 GroupChatSubscription,
22 HostRequest,
23 Message,
24 ModNote,
25 Node,
26 User,
27 UserBadge,
28)
29from couchers.notifications.notify import notify
30from couchers.resources import get_badge_dict
31from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb
32from couchers.servicers.auth import create_session
33from couchers.servicers.communities import community_to_pb
34from couchers.servicers.events import get_users_to_notify_for_new_event
35from couchers.sql import couchers_select as select
36from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date
37from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2
38from proto.internal import jobs_pb2
40logger = logging.getLogger(__name__)
42MAX_PAGINATION_LENGTH = 250
45def _user_to_details(session, user):
46 return admin_pb2.UserDetails(
47 user_id=user.id,
48 username=user.username,
49 name=user.name,
50 email=user.email,
51 gender=user.gender,
52 birthdate=date_to_api(user.birthdate),
53 banned=user.is_banned,
54 deleted=user.is_deleted,
55 do_not_email=user.do_not_email,
56 badges=[badge.badge_id for badge in user.badges],
57 **get_strong_verification_fields(session, user),
58 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
59 admin_note=user.admin_note,
60 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
61 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
62 )
65def _content_report_to_pb(content_report: ContentReport):
66 return admin_pb2.ContentReport(
67 content_report_id=content_report.id,
68 time=Timestamp_from_datetime(content_report.time),
69 reporting_user_id=content_report.reporting_user_id,
70 author_user_id=content_report.author_user_id,
71 reason=content_report.reason,
72 description=content_report.description,
73 content_ref=content_report.content_ref,
74 user_agent=content_report.user_agent,
75 page=content_report.page,
76 )
79def append_admin_note(session, context, user, note):
80 if not note.strip():
81 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY)
82 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
83 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n"
86def load_community_geom(geojson, context):
87 geom = shape(json.loads(geojson))
89 if geom.geom_type != "MultiPolygon":
90 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON)
92 return geom
95class Admin(admin_pb2_grpc.AdminServicer):
96 def GetUserDetails(self, request, context, session):
97 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
98 if not user:
99 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
100 return _user_to_details(session, user)
102 def GetUser(self, request, context, session):
103 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
104 if not user:
105 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
106 return user_model_to_pb(user, session, context)
108 def ChangeUserGender(self, request, context, session):
109 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
110 if not user:
111 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
112 user.gender = request.gender
113 session.commit()
115 notify(
116 session,
117 user_id=user.id,
118 topic_action="gender:change",
119 data=notification_data_pb2.GenderChange(
120 gender=request.gender,
121 ),
122 )
124 return _user_to_details(session, user)
126 def ChangeUserBirthdate(self, request, context, session):
127 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
128 if not user:
129 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
130 user.birthdate = parse_date(request.birthdate)
131 session.commit()
133 notify(
134 session,
135 user_id=user.id,
136 topic_action="birthdate:change",
137 data=notification_data_pb2.BirthdateChange(
138 birthdate=request.birthdate,
139 ),
140 )
142 return _user_to_details(session, user)
144 def AddBadge(self, request, context, session):
145 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
146 if not user:
147 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
149 badge = get_badge_dict().get(request.badge_id)
150 if not badge:
151 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
153 if not badge["admin_editable"]:
154 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
156 if badge["id"] in [b.badge_id for b in user.badges]:
157 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE)
159 user_add_badge(session, user.id, request.badge_id)
161 return _user_to_details(session, user)
163 def RemoveBadge(self, request, context, session):
164 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
165 if not user:
166 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
168 badge = get_badge_dict().get(request.badge_id)
169 if not badge:
170 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
172 if not badge["admin_editable"]:
173 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
175 user_badge = session.execute(
176 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"])
177 ).scalar_one_or_none()
178 if not user_badge:
179 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE)
181 user_remove_badge(session, user.id, request.badge_id)
183 return _user_to_details(session, user)
185 def SetPassportSexGenderException(self, request, context, session):
186 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
187 if not user:
188 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
189 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
190 return _user_to_details(session, user)
192 def BanUser(self, request, context, session):
193 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
194 if not user:
195 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
196 append_admin_note(session, context, user, request.admin_note)
197 user.is_banned = True
198 return _user_to_details(session, user)
200 def UnbanUser(self, request, context, session):
201 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
202 if not user:
203 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
204 append_admin_note(session, context, user, request.admin_note)
205 user.is_banned = False
206 return _user_to_details(session, user)
208 def AddAdminNote(self, request, context, session):
209 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
210 if not user:
211 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
212 append_admin_note(session, context, user, request.admin_note)
213 return _user_to_details(session, user)
215 def GetContentReport(self, request, context, session):
216 content_report = session.execute(
217 select(ContentReport).where(ContentReport.id == request.content_report_id)
218 ).scalar_one_or_none()
219 if not content_report:
220 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND)
221 return admin_pb2.GetContentReportRes(
222 content_report=_content_report_to_pb(content_report),
223 )
225 def GetContentReportsForAuthor(self, request, context, session):
226 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
227 if not user:
228 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
229 content_reports = (
230 session.execute(
231 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
232 )
233 .scalars()
234 .all()
235 )
236 return admin_pb2.GetContentReportsForAuthorRes(
237 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
238 )
240 def SendModNote(self, request, context, session):
241 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
242 if not user:
243 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
244 session.add(
245 ModNote(
246 user_id=user.id,
247 internal_id=request.internal_id,
248 creator_user_id=context.user_id,
249 note_content=request.content,
250 )
251 )
252 session.flush()
254 if not request.do_not_notify:
255 notify(
256 session,
257 user_id=user.id,
258 topic_action="modnote:create",
259 )
261 return _user_to_details(session, user)
263 def DeleteUser(self, request, context, session):
264 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
265 if not user:
266 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
267 user.is_deleted = True
268 return _user_to_details(session, user)
270 def CreateApiKey(self, request, context, session):
271 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
272 if not user:
273 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
274 token, expiry = create_session(
275 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
276 )
278 notify(
279 session,
280 user_id=user.id,
281 topic_action="api_key:create",
282 data=notification_data_pb2.ApiKeyCreate(
283 api_key=token,
284 expiry=Timestamp_from_datetime(expiry),
285 ),
286 )
288 return _user_to_details(session, user)
290 def CreateCommunity(self, request, context, session):
291 geom = load_community_geom(request.geojson, context)
293 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
294 node = create_node(session, geom, parent_node_id)
295 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
297 return community_to_pb(session, node, context)
299 def UpdateCommunity(self, request, context, session):
300 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
301 if not node:
302 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
303 cluster = node.official_cluster
305 if request.name:
306 cluster.name = request.name
308 if request.description:
309 cluster.description = request.description
311 if request.geojson:
312 geom = load_community_geom(request.geojson, context)
314 node.geom = from_shape(geom)
316 if request.parent_node_id != 0:
317 node.parent_node_id = request.parent_node_id
319 session.flush()
321 return community_to_pb(session, cluster.parent_node, context)
323 def GetChats(self, request, context, session):
324 def format_user(user):
325 return f"{user.name} ({user.username}, {user.id})"
327 def format_conversation(conversation_id):
328 out = ""
329 messages = (
330 session.execute(
331 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
332 )
333 .scalars()
334 .all()
335 )
336 for message in messages:
337 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n"
338 out += str(message.text)
339 out += "\n\n-----\n"
340 out += "\n\n\n\n"
341 return out
343 def format_host_request(host_request_id):
344 out = ""
345 host_request = session.execute(
346 select(HostRequest).where(HostRequest.conversation_id == host_request_id)
347 ).scalar_one()
348 out += "==============================\n"
349 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n"
350 out += format_conversation(host_request.conversation_id)
351 out += "\n\n\n\n"
352 return out
354 def format_group_chat(group_chat_id):
355 out = ""
356 group_chat = session.execute(
357 select(GroupChat).where(GroupChat.conversation_id == group_chat_id)
358 ).scalar_one()
359 out += "==============================\n"
360 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n"
361 subs = (
362 session.execute(
363 select(GroupChatSubscription)
364 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
365 .order_by(GroupChatSubscription.joined.asc())
366 )
367 .scalars()
368 .all()
369 )
370 for sub in subs:
371 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n"
372 out += "\n\nMessages:\n"
373 out += format_conversation(group_chat.conversation_id)
374 out += "\n\n\n\n"
375 return out
377 def format_all_chats_for_user(user_id):
378 out = ""
379 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
380 out += f"Chats for user {format_user(user)}\n"
381 host_request_ids = (
382 session.execute(
383 select(HostRequest.conversation_id)
384 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id))
385 .order_by(HostRequest.conversation_id.desc())
386 )
387 .scalars()
388 .all()
389 )
390 out += f"************************************* Requests ({len(host_request_ids)})\n"
391 for host_request in host_request_ids:
392 out += format_host_request(host_request)
393 group_chat_ids = (
394 session.execute(
395 select(GroupChatSubscription.group_chat_id)
396 .where(GroupChatSubscription.user_id == user_id)
397 .order_by(GroupChatSubscription.joined.desc())
398 )
399 .scalars()
400 .all()
401 )
402 out += f"************************************* Group chats ({len(group_chat_ids)})\n"
403 for group_chat_id in group_chat_ids:
404 out += format_group_chat(group_chat_id)
405 return out
407 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
408 if not user:
409 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
411 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id))
413 def ListEventCommunityInviteRequests(self, request, context, session):
414 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
415 next_request_id = int(request.page_token) if request.page_token else 0
416 requests = (
417 session.execute(
418 select(EventCommunityInviteRequest)
419 .where(EventCommunityInviteRequest.approved.is_(None))
420 .where(EventCommunityInviteRequest.id >= next_request_id)
421 .order_by(EventCommunityInviteRequest.id)
422 .limit(page_size + 1)
423 )
424 .scalars()
425 .all()
426 )
428 def _request_to_pb(request):
429 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
430 return admin_pb2.EventCommunityInviteRequest(
431 event_community_invite_request_id=request.id,
432 user_id=request.user_id,
433 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
434 approx_users_to_notify=len(users_to_notify),
435 community_id=node_id,
436 )
438 return admin_pb2.ListEventCommunityInviteRequestsRes(
439 requests=[_request_to_pb(request) for request in requests[:page_size]],
440 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
441 )
443 def DecideEventCommunityInviteRequest(self, request, context, session):
444 req = session.execute(
445 select(EventCommunityInviteRequest).where(
446 EventCommunityInviteRequest.id == request.event_community_invite_request_id
447 )
448 ).scalar_one_or_none()
450 if not req:
451 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND)
453 if req.decided:
454 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED)
456 decided = now()
457 req.decided = decided
458 req.decided_by_user_id = context.user_id
459 req.approved = request.approve
461 # deny other reqs for the same event
462 if request.approve:
463 session.execute(
464 update(EventCommunityInviteRequest)
465 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
466 .where(EventCommunityInviteRequest.decided.is_(None))
467 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
468 )
470 session.flush()
472 if request.approve:
473 queue_job(
474 session,
475 "generate_event_create_notifications",
476 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
477 inviting_user_id=req.user_id,
478 occurrence_id=req.occurrence_id,
479 approved=True,
480 ),
481 )
483 return admin_pb2.DecideEventCommunityInviteRequestRes()
485 def DeleteEvent(self, request, context, session):
486 res = session.execute(
487 select(Event, EventOccurrence)
488 .where(EventOccurrence.id == request.event_id)
489 .where(EventOccurrence.event_id == Event.id)
490 .where(~EventOccurrence.is_deleted)
491 ).one_or_none()
493 if not res:
494 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
496 event, occurrence = res
498 occurrence.is_deleted = True
500 queue_job(
501 session,
502 "generate_event_delete_notifications",
503 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
504 occurrence_id=occurrence.id,
505 ),
506 )
508 return empty_pb2.Empty()
510 def ListUserIds(self, request, context, session):
511 start_date = request.start_time.ToDatetime()
512 end_date = request.end_time.ToDatetime()
514 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
515 next_user_id = int(request.page_token) if request.page_token else 0
517 user_ids = (
518 session.execute(
519 select(User.id)
520 .where(User.id >= next_user_id)
521 .where(User.joined >= start_date)
522 .where(User.joined <= end_date)
523 .order_by(User.joined.desc())
524 .limit(page_size + 1)
525 )
526 .scalars()
527 .all()
528 )
530 return admin_pb2.ListUserIdsRes(
531 user_ids=user_ids[:page_size],
532 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
533 )