Coverage for src/couchers/servicers/admin.py: 80%
276 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-24 14:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-24 14:08 +0000
1import json
2import logging
3from datetime import timedelta
5import grpc
6from geoalchemy2.shape import from_shape
7from google.protobuf import empty_pb2
8from shapely.geometry import shape
9from sqlalchemy.sql import or_, select, update
11from couchers import errors, urls
12from couchers.helpers.badges import user_add_badge, user_remove_badge
13from couchers.helpers.clusters import create_cluster, create_node
14from couchers.jobs.enqueue import queue_job
15from couchers.models import (
16 ContentReport,
17 Event,
18 EventCommunityInviteRequest,
19 EventOccurrence,
20 GroupChat,
21 GroupChatSubscription,
22 HostRequest,
23 Message,
24 ModNote,
25 Node,
26 Reference,
27 User,
28 UserBadge,
29)
30from couchers.notifications.notify import notify
31from couchers.resources import get_badge_dict
32from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb
33from couchers.servicers.auth import create_session
34from couchers.servicers.communities import community_to_pb
35from couchers.servicers.events import get_users_to_notify_for_new_event
36from couchers.sql import couchers_select as select
37from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date
38from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2
39from proto.internal import jobs_pb2
41logger = logging.getLogger(__name__)
43MAX_PAGINATION_LENGTH = 250
46def _user_to_details(session, user):
47 return admin_pb2.UserDetails(
48 user_id=user.id,
49 username=user.username,
50 name=user.name,
51 email=user.email,
52 gender=user.gender,
53 birthdate=date_to_api(user.birthdate),
54 banned=user.is_banned,
55 deleted=user.is_deleted,
56 do_not_email=user.do_not_email,
57 badges=[badge.badge_id for badge in user.badges],
58 **get_strong_verification_fields(session, user),
59 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
60 admin_note=user.admin_note,
61 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
62 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
63 )
66def _content_report_to_pb(content_report: ContentReport):
67 return admin_pb2.ContentReport(
68 content_report_id=content_report.id,
69 time=Timestamp_from_datetime(content_report.time),
70 reporting_user_id=content_report.reporting_user_id,
71 author_user_id=content_report.author_user_id,
72 reason=content_report.reason,
73 description=content_report.description,
74 content_ref=content_report.content_ref,
75 user_agent=content_report.user_agent,
76 page=content_report.page,
77 )
80def append_admin_note(session, context, user, note):
81 if not note.strip():
82 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY)
83 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
84 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n"
87def load_community_geom(geojson, context):
88 geom = shape(json.loads(geojson))
90 if geom.geom_type != "MultiPolygon":
91 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON)
93 return geom
96class Admin(admin_pb2_grpc.AdminServicer):
97 def GetUserDetails(self, request, context, session):
98 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
99 if not user:
100 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
101 return _user_to_details(session, user)
103 def GetUser(self, request, context, session):
104 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
105 if not user:
106 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
107 return user_model_to_pb(user, session, context)
109 def ChangeUserGender(self, request, context, session):
110 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
111 if not user:
112 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
113 user.gender = request.gender
114 session.commit()
116 notify(
117 session,
118 user_id=user.id,
119 topic_action="gender:change",
120 data=notification_data_pb2.GenderChange(
121 gender=request.gender,
122 ),
123 )
125 return _user_to_details(session, user)
127 def ChangeUserBirthdate(self, request, context, session):
128 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
129 if not user:
130 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
131 user.birthdate = parse_date(request.birthdate)
132 session.commit()
134 notify(
135 session,
136 user_id=user.id,
137 topic_action="birthdate:change",
138 data=notification_data_pb2.BirthdateChange(
139 birthdate=request.birthdate,
140 ),
141 )
143 return _user_to_details(session, user)
145 def AddBadge(self, request, context, session):
146 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
147 if not user:
148 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
150 badge = get_badge_dict().get(request.badge_id)
151 if not badge:
152 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
154 if not badge["admin_editable"]:
155 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
157 if badge["id"] in [b.badge_id for b in user.badges]:
158 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE)
160 user_add_badge(session, user.id, request.badge_id)
162 return _user_to_details(session, user)
164 def RemoveBadge(self, request, context, session):
165 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
166 if not user:
167 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
169 badge = get_badge_dict().get(request.badge_id)
170 if not badge:
171 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
173 if not badge["admin_editable"]:
174 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
176 user_badge = session.execute(
177 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"])
178 ).scalar_one_or_none()
179 if not user_badge:
180 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE)
182 user_remove_badge(session, user.id, request.badge_id)
184 return _user_to_details(session, user)
186 def SetPassportSexGenderException(self, request, context, session):
187 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
188 if not user:
189 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
190 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
191 return _user_to_details(session, user)
193 def BanUser(self, request, context, session):
194 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
195 if not user:
196 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
197 append_admin_note(session, context, user, request.admin_note)
198 user.is_banned = True
199 return _user_to_details(session, user)
201 def UnbanUser(self, request, context, session):
202 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
203 if not user:
204 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
205 append_admin_note(session, context, user, request.admin_note)
206 user.is_banned = False
207 return _user_to_details(session, user)
209 def AddAdminNote(self, request, context, session):
210 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
211 if not user:
212 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
213 append_admin_note(session, context, user, request.admin_note)
214 return _user_to_details(session, user)
216 def GetContentReport(self, request, context, session):
217 content_report = session.execute(
218 select(ContentReport).where(ContentReport.id == request.content_report_id)
219 ).scalar_one_or_none()
220 if not content_report:
221 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND)
222 return admin_pb2.GetContentReportRes(
223 content_report=_content_report_to_pb(content_report),
224 )
226 def GetContentReportsForAuthor(self, request, context, session):
227 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
228 if not user:
229 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
230 content_reports = (
231 session.execute(
232 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
233 )
234 .scalars()
235 .all()
236 )
237 return admin_pb2.GetContentReportsForAuthorRes(
238 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
239 )
241 def SendModNote(self, request, context, session):
242 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
243 if not user:
244 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
245 session.add(
246 ModNote(
247 user_id=user.id,
248 internal_id=request.internal_id,
249 creator_user_id=context.user_id,
250 note_content=request.content,
251 )
252 )
253 session.flush()
255 if not request.do_not_notify:
256 notify(
257 session,
258 user_id=user.id,
259 topic_action="modnote:create",
260 )
262 return _user_to_details(session, user)
264 def DeleteUser(self, request, context, session):
265 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
266 if not user:
267 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
268 user.is_deleted = True
269 return _user_to_details(session, user)
271 def CreateApiKey(self, request, context, session):
272 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
273 if not user:
274 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
275 token, expiry = create_session(
276 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
277 )
279 notify(
280 session,
281 user_id=user.id,
282 topic_action="api_key:create",
283 data=notification_data_pb2.ApiKeyCreate(
284 api_key=token,
285 expiry=Timestamp_from_datetime(expiry),
286 ),
287 )
289 return _user_to_details(session, user)
291 def CreateCommunity(self, request, context, session):
292 geom = load_community_geom(request.geojson, context)
294 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
295 node = create_node(session, geom, parent_node_id)
296 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
298 return community_to_pb(session, node, context)
300 def UpdateCommunity(self, request, context, session):
301 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
302 if not node:
303 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
304 cluster = node.official_cluster
306 if request.name:
307 cluster.name = request.name
309 if request.description:
310 cluster.description = request.description
312 if request.geojson:
313 geom = load_community_geom(request.geojson, context)
315 node.geom = from_shape(geom)
317 if request.parent_node_id != 0:
318 node.parent_node_id = request.parent_node_id
320 session.flush()
322 return community_to_pb(session, cluster.parent_node, context)
324 def GetChats(self, request, context, session):
325 def format_user(user):
326 return f"{user.name} ({user.username}, {user.id})"
328 def format_conversation(conversation_id):
329 out = ""
330 messages = (
331 session.execute(
332 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
333 )
334 .scalars()
335 .all()
336 )
337 for message in messages:
338 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n"
339 out += str(message.text)
340 out += "\n\n-----\n"
341 out += "\n\n\n\n"
342 return out
344 def format_host_request(host_request_id):
345 out = ""
346 host_request = session.execute(
347 select(HostRequest).where(HostRequest.conversation_id == host_request_id)
348 ).scalar_one()
349 out += "==============================\n"
350 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n"
351 out += format_conversation(host_request.conversation_id)
352 out += "\n\n\n\n"
353 return out
355 def format_group_chat(group_chat_id):
356 out = ""
357 group_chat = session.execute(
358 select(GroupChat).where(GroupChat.conversation_id == group_chat_id)
359 ).scalar_one()
360 out += "==============================\n"
361 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n"
362 subs = (
363 session.execute(
364 select(GroupChatSubscription)
365 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
366 .order_by(GroupChatSubscription.joined.asc())
367 )
368 .scalars()
369 .all()
370 )
371 for sub in subs:
372 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n"
373 out += "\n\nMessages:\n"
374 out += format_conversation(group_chat.conversation_id)
375 out += "\n\n\n\n"
376 return out
378 def format_all_chats_for_user(user_id):
379 out = ""
380 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
381 out += f"Chats for user {format_user(user)}\n"
382 host_request_ids = (
383 session.execute(
384 select(HostRequest.conversation_id)
385 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id))
386 .order_by(HostRequest.conversation_id.desc())
387 )
388 .scalars()
389 .all()
390 )
391 out += f"************************************* Requests ({len(host_request_ids)})\n"
392 for host_request in host_request_ids:
393 out += format_host_request(host_request)
394 group_chat_ids = (
395 session.execute(
396 select(GroupChatSubscription.group_chat_id)
397 .where(GroupChatSubscription.user_id == user_id)
398 .order_by(GroupChatSubscription.joined.desc())
399 )
400 .scalars()
401 .all()
402 )
403 out += f"************************************* Group chats ({len(group_chat_ids)})\n"
404 for group_chat_id in group_chat_ids:
405 out += format_group_chat(group_chat_id)
406 return out
408 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
409 if not user:
410 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
412 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id))
414 def ListEventCommunityInviteRequests(self, request, context, session):
415 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
416 next_request_id = int(request.page_token) if request.page_token else 0
417 requests = (
418 session.execute(
419 select(EventCommunityInviteRequest)
420 .where(EventCommunityInviteRequest.approved.is_(None))
421 .where(EventCommunityInviteRequest.id >= next_request_id)
422 .order_by(EventCommunityInviteRequest.id)
423 .limit(page_size + 1)
424 )
425 .scalars()
426 .all()
427 )
429 def _request_to_pb(request):
430 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
431 return admin_pb2.EventCommunityInviteRequest(
432 event_community_invite_request_id=request.id,
433 user_id=request.user_id,
434 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
435 approx_users_to_notify=len(users_to_notify),
436 community_id=node_id,
437 )
439 return admin_pb2.ListEventCommunityInviteRequestsRes(
440 requests=[_request_to_pb(request) for request in requests[:page_size]],
441 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
442 )
444 def DecideEventCommunityInviteRequest(self, request, context, session):
445 req = session.execute(
446 select(EventCommunityInviteRequest).where(
447 EventCommunityInviteRequest.id == request.event_community_invite_request_id
448 )
449 ).scalar_one_or_none()
451 if not req:
452 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND)
454 if req.decided:
455 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED)
457 decided = now()
458 req.decided = decided
459 req.decided_by_user_id = context.user_id
460 req.approved = request.approve
462 # deny other reqs for the same event
463 if request.approve:
464 session.execute(
465 update(EventCommunityInviteRequest)
466 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
467 .where(EventCommunityInviteRequest.decided.is_(None))
468 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
469 )
471 session.flush()
473 if request.approve:
474 queue_job(
475 session,
476 "generate_event_create_notifications",
477 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
478 inviting_user_id=req.user_id,
479 occurrence_id=req.occurrence_id,
480 approved=True,
481 ),
482 )
484 return admin_pb2.DecideEventCommunityInviteRequestRes()
486 def DeleteEvent(self, request, context, session):
487 res = session.execute(
488 select(Event, EventOccurrence)
489 .where(EventOccurrence.id == request.event_id)
490 .where(EventOccurrence.event_id == Event.id)
491 .where(~EventOccurrence.is_deleted)
492 ).one_or_none()
494 if not res:
495 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
497 event, occurrence = res
499 occurrence.is_deleted = True
501 queue_job(
502 session,
503 "generate_event_delete_notifications",
504 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
505 occurrence_id=occurrence.id,
506 ),
507 )
509 return empty_pb2.Empty()
511 def ListUserIds(self, request, context, session):
512 start_date = request.start_time.ToDatetime()
513 end_date = request.end_time.ToDatetime()
515 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
516 next_user_id = int(request.page_token) if request.page_token else 0
518 user_ids = (
519 session.execute(
520 select(User.id)
521 .where(User.id >= next_user_id)
522 .where(User.joined >= start_date)
523 .where(User.joined <= end_date)
524 .order_by(User.joined.desc())
525 .limit(page_size + 1)
526 )
527 .scalars()
528 .all()
529 )
531 return admin_pb2.ListUserIdsRes(
532 user_ids=user_ids[:page_size],
533 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
534 )
536 def EditReferenceText(self, request, context, session):
537 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
539 if reference is None:
540 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND)
542 if not request.new_text.strip():
543 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.REFERENCE_NO_TEXT)
545 reference.text = request.new_text.strip()
546 return empty_pb2.Empty()
548 def DeleteReference(self, request, context, session):
549 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
551 if reference is None:
552 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND)
554 reference.is_deleted = True
555 return empty_pb2.Empty()