Coverage for src/couchers/servicers/admin.py: 69%
435 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1import json
2import logging
3from datetime import timedelta
5import grpc
6from geoalchemy2.shape import from_shape
7from google.protobuf import empty_pb2
8from shapely.geometry import shape
9from sqlalchemy.sql import and_, func, or_, select, update
10from user_agents import parse as user_agents_parse
12from couchers import errors, urls
13from couchers.context import make_background_user_context
14from couchers.crypto import urlsafe_secure_token
15from couchers.db import session_scope
16from couchers.helpers.badges import user_add_badge, user_remove_badge
17from couchers.helpers.clusters import create_cluster, create_node
18from couchers.helpers.geoip import geoip_approximate_location, geoip_asn
19from couchers.helpers.strong_verification import get_strong_verification_fields
20from couchers.jobs.enqueue import queue_job
21from couchers.models import (
22 AccountDeletionToken,
23 Comment,
24 ContentReport,
25 Discussion,
26 Event,
27 EventCommunityInviteRequest,
28 EventOccurrence,
29 GroupChat,
30 GroupChatSubscription,
31 HostRequest,
32 LanguageAbility,
33 Message,
34 ModerationUserList,
35 ModNote,
36 Node,
37 Reference,
38 Reply,
39 User,
40 UserActivity,
41 UserBadge,
42)
43from couchers.notifications.notify import notify
44from couchers.resources import get_badge_dict
45from couchers.servicers.api import user_model_to_pb
46from couchers.servicers.auth import create_session
47from couchers.servicers.communities import community_to_pb
48from couchers.servicers.events import get_users_to_notify_for_new_event
49from couchers.servicers.threads import unpack_thread_id
50from couchers.sql import couchers_select as select
51from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime
52from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2
53from proto.internal import jobs_pb2
55logger = logging.getLogger(__name__)
57MAX_PAGINATION_LENGTH = 250
60def _user_to_details(session, user):
61 return admin_pb2.UserDetails(
62 user_id=user.id,
63 username=user.username,
64 name=user.name,
65 email=user.email,
66 gender=user.gender,
67 birthdate=date_to_api(user.birthdate),
68 banned=user.is_banned,
69 deleted=user.is_deleted,
70 do_not_email=user.do_not_email,
71 badges=[badge.badge_id for badge in user.badges],
72 **get_strong_verification_fields(session, user),
73 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
74 admin_note=user.admin_note,
75 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
76 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
77 )
80def _content_report_to_pb(content_report: ContentReport):
81 return admin_pb2.ContentReport(
82 content_report_id=content_report.id,
83 time=Timestamp_from_datetime(content_report.time),
84 reporting_user_id=content_report.reporting_user_id,
85 author_user_id=content_report.author_user_id,
86 reason=content_report.reason,
87 description=content_report.description,
88 content_ref=content_report.content_ref,
89 user_agent=content_report.user_agent,
90 page=content_report.page,
91 )
94def append_admin_note(session, context, user, note):
95 if not note.strip():
96 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY)
97 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
98 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n"
101def load_community_geom(geojson, context):
102 geom = shape(json.loads(geojson))
104 if geom.geom_type != "MultiPolygon":
105 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON)
107 return geom
110def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload):
111 with session_scope() as session:
112 all_users = session.execute(select(User).where(User.is_visible)).scalars().all()
113 for user in all_users:
114 context = make_background_user_context(user_id=user.id)
115 notify(
116 session,
117 user_id=user.id,
118 topic_action="general:new_blog_post",
119 data=notification_data_pb2.GeneralNewBlogPost(
120 url=payload.url,
121 title=payload.title,
122 blurb=payload.blurb,
123 ),
124 )
127class Admin(admin_pb2_grpc.AdminServicer):
128 def GetUserDetails(self, request, context, session):
129 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
130 if not user:
131 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
132 return _user_to_details(session, user)
134 def GetUser(self, request, context, session):
135 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
136 if not user:
137 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
138 return user_model_to_pb(user, session, context)
140 def SearchUsers(self, request, context, session):
141 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
142 next_user_id = int(request.page_token) if request.page_token else 0
143 statement = select(User)
144 if request.username:
145 statement = statement.where(User.username.ilike(request.username))
146 if request.email:
147 statement = statement.where(User.email.ilike(request.email))
148 if request.name:
149 statement = statement.where(User.name.ilike(request.name))
150 if request.admin_note:
151 statement = statement.where(User.admin_note.ilike(request.admin_note))
152 if request.city:
153 statement = statement.where(User.city.ilike(request.city))
154 if request.min_user_id:
155 statement = statement.where(User.id >= request.min_user_id)
156 if request.max_user_id:
157 statement = statement.where(User.id <= request.max_user_id)
158 if request.min_birthdate:
159 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate))
160 if request.max_birthdate:
161 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate))
162 if request.genders:
163 statement = statement.where(User.gender.in_(request.genders))
164 if request.min_joined_date:
165 statement = statement.where(User.joined >= parse_date(request.min_joined_date))
166 if request.max_joined_date:
167 statement = statement.where(User.joined <= parse_date(request.max_joined_date))
168 if request.min_last_active_date:
169 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date))
170 if request.max_last_active_date:
171 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date))
172 if request.genders:
173 statement = statement.where(User.gender.in_(request.genders))
174 if request.language_codes:
175 statement = statement.join(
176 LanguageAbility,
177 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)),
178 )
179 if request.HasField("is_deleted"):
180 statement = statement.where(User.is_deleted == request.is_deleted.value)
181 if request.HasField("is_banned"):
182 statement = statement.where(User.is_banned == request.is_banned.value)
183 if request.HasField("has_avatar"):
184 if request.has_avatar.value:
185 statement = statement.where(User.avatar_key != None)
186 else:
187 statement = statement.where(User.avatar_key == None)
188 users = (
189 session.execute(statement.where(User.id >= next_user_id).order_by(User.id).limit(page_size + 1))
190 .scalars()
191 .all()
192 )
193 logger.info(users)
194 return admin_pb2.SearchUsersRes(
195 users=[_user_to_details(session, user) for user in users[:page_size]],
196 next_page_token=str(users[-1].id) if len(users) > page_size else None,
197 )
199 def ChangeUserGender(self, request, context, session):
200 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
201 if not user:
202 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
203 user.gender = request.gender
204 session.commit()
206 notify(
207 session,
208 user_id=user.id,
209 topic_action="gender:change",
210 data=notification_data_pb2.GenderChange(
211 gender=request.gender,
212 ),
213 )
215 return _user_to_details(session, user)
217 def ChangeUserBirthdate(self, request, context, session):
218 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
219 if not user:
220 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
221 user.birthdate = parse_date(request.birthdate)
222 session.commit()
224 notify(
225 session,
226 user_id=user.id,
227 topic_action="birthdate:change",
228 data=notification_data_pb2.BirthdateChange(
229 birthdate=request.birthdate,
230 ),
231 )
233 return _user_to_details(session, user)
235 def AddBadge(self, request, context, session):
236 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
237 if not user:
238 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
240 badge = get_badge_dict().get(request.badge_id)
241 if not badge:
242 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
244 if not badge["admin_editable"]:
245 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
247 if badge["id"] in [b.badge_id for b in user.badges]:
248 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE)
250 user_add_badge(session, user.id, request.badge_id)
252 return _user_to_details(session, user)
254 def RemoveBadge(self, request, context, session):
255 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
256 if not user:
257 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
259 badge = get_badge_dict().get(request.badge_id)
260 if not badge:
261 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
263 if not badge["admin_editable"]:
264 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
266 user_badge = session.execute(
267 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"])
268 ).scalar_one_or_none()
269 if not user_badge:
270 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE)
272 user_remove_badge(session, user.id, request.badge_id)
274 return _user_to_details(session, user)
276 def SetPassportSexGenderException(self, request, context, session):
277 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
278 if not user:
279 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
280 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
281 return _user_to_details(session, user)
283 def BanUser(self, request, context, session):
284 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
285 if not user:
286 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
287 append_admin_note(session, context, user, request.admin_note)
288 user.is_banned = True
289 return _user_to_details(session, user)
291 def UnbanUser(self, request, context, session):
292 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
293 if not user:
294 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
295 append_admin_note(session, context, user, request.admin_note)
296 user.is_banned = False
297 return _user_to_details(session, user)
299 def AddAdminNote(self, request, context, session):
300 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
301 if not user:
302 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
303 append_admin_note(session, context, user, request.admin_note)
304 return _user_to_details(session, user)
306 def GetContentReport(self, request, context, session):
307 content_report = session.execute(
308 select(ContentReport).where(ContentReport.id == request.content_report_id)
309 ).scalar_one_or_none()
310 if not content_report:
311 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND)
312 return admin_pb2.GetContentReportRes(
313 content_report=_content_report_to_pb(content_report),
314 )
316 def GetContentReportsForAuthor(self, request, context, session):
317 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
318 if not user:
319 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
320 content_reports = (
321 session.execute(
322 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
323 )
324 .scalars()
325 .all()
326 )
327 return admin_pb2.GetContentReportsForAuthorRes(
328 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
329 )
331 def SendModNote(self, request, context, session):
332 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
333 if not user:
334 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
335 session.add(
336 ModNote(
337 user_id=user.id,
338 internal_id=request.internal_id,
339 creator_user_id=context.user_id,
340 note_content=request.content,
341 )
342 )
343 session.flush()
345 if not request.do_not_notify:
346 notify(
347 session,
348 user_id=user.id,
349 topic_action="modnote:create",
350 )
352 return _user_to_details(session, user)
354 def MarkUserNeedsLocationUpdate(self, request, context, session):
355 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
356 if not user:
357 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
358 user.needs_to_update_location = True
359 return _user_to_details(session, user)
361 def DeleteUser(self, request, context, session):
362 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
363 if not user:
364 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
365 user.is_deleted = True
366 return _user_to_details(session, user)
368 def RecoverDeletedUser(self, request, context, session):
369 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
370 if not user:
371 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
372 user.is_deleted = False
373 return _user_to_details(session, user)
375 def CreateApiKey(self, request, context, session):
376 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
377 if not user:
378 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
379 token, expiry = create_session(
380 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
381 )
383 notify(
384 session,
385 user_id=user.id,
386 topic_action="api_key:create",
387 data=notification_data_pb2.ApiKeyCreate(
388 api_key=token,
389 expiry=Timestamp_from_datetime(expiry),
390 ),
391 )
393 return _user_to_details(session, user)
395 def CreateCommunity(self, request, context, session):
396 geom = load_community_geom(request.geojson, context)
398 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
399 node = create_node(session, geom, parent_node_id)
400 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
402 return community_to_pb(session, node, context)
404 def UpdateCommunity(self, request, context, session):
405 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
406 if not node:
407 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
408 cluster = node.official_cluster
410 if request.name:
411 cluster.name = request.name
413 if request.description:
414 cluster.description = request.description
416 if request.geojson:
417 geom = load_community_geom(request.geojson, context)
419 node.geom = from_shape(geom)
421 if request.parent_node_id != 0:
422 node.parent_node_id = request.parent_node_id
424 session.flush()
426 return community_to_pb(session, cluster.parent_node, context)
428 def GetChats(self, request, context, session):
429 def format_user(user):
430 return f"{user.name} ({user.username}, {user.id})"
432 def format_conversation(conversation_id):
433 out = ""
434 messages = (
435 session.execute(
436 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
437 )
438 .scalars()
439 .all()
440 )
441 for message in messages:
442 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n"
443 out += str(message.text)
444 out += "\n\n-----\n"
445 out += "\n\n\n\n"
446 return out
448 def format_host_request(host_request_id):
449 out = ""
450 host_request = session.execute(
451 select(HostRequest).where(HostRequest.conversation_id == host_request_id)
452 ).scalar_one()
453 out += "==============================\n"
454 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n"
455 out += format_conversation(host_request.conversation_id)
456 out += "\n\n\n\n"
457 return out
459 def format_group_chat(group_chat_id):
460 out = ""
461 group_chat = session.execute(
462 select(GroupChat).where(GroupChat.conversation_id == group_chat_id)
463 ).scalar_one()
464 out += "==============================\n"
465 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n"
466 subs = (
467 session.execute(
468 select(GroupChatSubscription)
469 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
470 .order_by(GroupChatSubscription.joined.asc())
471 )
472 .scalars()
473 .all()
474 )
475 for sub in subs:
476 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n"
477 out += "\n\nMessages:\n"
478 out += format_conversation(group_chat.conversation_id)
479 out += "\n\n\n\n"
480 return out
482 def format_all_chats_for_user(user_id):
483 out = ""
484 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
485 out += f"Chats for user {format_user(user)}\n"
486 host_request_ids = (
487 session.execute(
488 select(HostRequest.conversation_id)
489 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id))
490 .order_by(HostRequest.conversation_id.desc())
491 )
492 .scalars()
493 .all()
494 )
495 out += f"************************************* Requests ({len(host_request_ids)})\n"
496 for host_request in host_request_ids:
497 out += format_host_request(host_request)
498 group_chat_ids = (
499 session.execute(
500 select(GroupChatSubscription.group_chat_id)
501 .where(GroupChatSubscription.user_id == user_id)
502 .order_by(GroupChatSubscription.joined.desc())
503 )
504 .scalars()
505 .all()
506 )
507 out += f"************************************* Group chats ({len(group_chat_ids)})\n"
508 for group_chat_id in group_chat_ids:
509 out += format_group_chat(group_chat_id)
510 return out
512 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
513 if not user:
514 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
516 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id))
518 def ListEventCommunityInviteRequests(self, request, context, session):
519 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
520 next_request_id = int(request.page_token) if request.page_token else 0
521 requests = (
522 session.execute(
523 select(EventCommunityInviteRequest)
524 .where(EventCommunityInviteRequest.approved.is_(None))
525 .where(EventCommunityInviteRequest.id >= next_request_id)
526 .order_by(EventCommunityInviteRequest.id)
527 .limit(page_size + 1)
528 )
529 .scalars()
530 .all()
531 )
533 def _request_to_pb(request):
534 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
535 return admin_pb2.EventCommunityInviteRequest(
536 event_community_invite_request_id=request.id,
537 user_id=request.user_id,
538 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
539 approx_users_to_notify=len(users_to_notify),
540 community_id=node_id,
541 )
543 return admin_pb2.ListEventCommunityInviteRequestsRes(
544 requests=[_request_to_pb(request) for request in requests[:page_size]],
545 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
546 )
548 def DecideEventCommunityInviteRequest(self, request, context, session):
549 req = session.execute(
550 select(EventCommunityInviteRequest).where(
551 EventCommunityInviteRequest.id == request.event_community_invite_request_id
552 )
553 ).scalar_one_or_none()
555 if not req:
556 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND)
558 if req.decided:
559 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED)
561 decided = now()
562 req.decided = decided
563 req.decided_by_user_id = context.user_id
564 req.approved = request.approve
566 # deny other reqs for the same event
567 if request.approve:
568 session.execute(
569 update(EventCommunityInviteRequest)
570 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
571 .where(EventCommunityInviteRequest.decided.is_(None))
572 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
573 )
575 session.flush()
577 if request.approve:
578 queue_job(
579 session,
580 "generate_event_create_notifications",
581 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
582 inviting_user_id=req.user_id,
583 occurrence_id=req.occurrence_id,
584 approved=True,
585 ),
586 )
588 return admin_pb2.DecideEventCommunityInviteRequestRes()
590 def DeleteEvent(self, request, context, session):
591 res = session.execute(
592 select(Event, EventOccurrence)
593 .where(EventOccurrence.id == request.event_id)
594 .where(EventOccurrence.event_id == Event.id)
595 .where(~EventOccurrence.is_deleted)
596 ).one_or_none()
598 if not res:
599 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
601 event, occurrence = res
603 occurrence.is_deleted = True
605 queue_job(
606 session,
607 "generate_event_delete_notifications",
608 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
609 occurrence_id=occurrence.id,
610 ),
611 )
613 return empty_pb2.Empty()
615 def ListUserIds(self, request, context, session):
616 start_date = request.start_time.ToDatetime()
617 end_date = request.end_time.ToDatetime()
619 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
620 next_user_id = int(request.page_token) if request.page_token else 0
622 user_ids = (
623 session.execute(
624 select(User.id)
625 .where(or_(User.id <= next_user_id, next_user_id == 0))
626 .where(User.joined >= start_date)
627 .where(User.joined <= end_date)
628 .order_by(User.id.desc())
629 .limit(page_size + 1)
630 )
631 .scalars()
632 .all()
633 )
635 return admin_pb2.ListUserIdsRes(
636 user_ids=user_ids[:page_size],
637 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
638 )
640 def EditReferenceText(self, request, context, session):
641 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
643 if reference is None:
644 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND)
646 if not request.new_text.strip():
647 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.REFERENCE_NO_TEXT)
649 reference.text = request.new_text.strip()
650 return empty_pb2.Empty()
652 def DeleteReference(self, request, context, session):
653 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
655 if reference is None:
656 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND)
658 reference.is_deleted = True
659 return empty_pb2.Empty()
661 def EditDiscussion(self, request, context, session):
662 discussion = session.execute(
663 select(Discussion).where(Discussion.id == request.discussion_id)
664 ).scalar_one_or_none()
665 if not discussion:
666 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND)
667 if request.new_title:
668 discussion.title = request.new_title.strip()
669 if request.new_content:
670 discussion.content = request.new_content.strip()
671 return empty_pb2.Empty()
673 def EditReply(self, request, context, session):
674 database_id, depth = unpack_thread_id(request.reply_id)
675 if depth == 1:
676 obj = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one_or_none()
677 elif depth == 2:
678 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
679 if not obj:
680 context.abort(grpc.StatusCode.NOT_FOUND, errors.OBJECT_NOT_FOUND)
681 obj.content = request.new_content.strip()
682 return empty_pb2.Empty()
684 def AddUsersToModerationUserList(self, request, context, session):
685 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created.
686 Id of the moderation list is returned."""
687 req_users = request.users
688 users = []
690 for req_user in req_users:
691 user = session.execute(select(User).where_username_or_email_or_id(req_user)).scalar_one_or_none()
692 if not user:
693 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
694 users.append(user)
696 # Create a new moderation user list if no one is provided
697 if not request.moderation_list_id:
698 moderation_user_list = ModerationUserList()
699 session.add(moderation_user_list)
700 session.flush()
701 else:
702 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
703 if not moderation_user_list:
704 context.abort(grpc.StatusCode.NOT_FOUND, errors.MODERATION_USER_LIST_NOT_FOUND)
706 # Add users to the moderation list only if not already in it
707 for user in users:
708 if user not in moderation_user_list.users:
709 moderation_user_list.users.append(user)
711 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id)
713 def ListModerationUserLists(self, request, context, session):
714 """Lists all moderation user lists for a user."""
715 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
716 if not user:
717 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
719 moderation_lists = [
720 admin_pb2.ModerationList(moderation_list_id=ml.id, member_ids=[u.id for u in ml.users])
721 for ml in user.moderation_user_lists
722 ]
723 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists)
725 def RemoveUserFromModerationUserList(self, request, context, session):
726 """Removes a user from a provided moderation user list."""
727 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
728 if not user:
729 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
730 if not request.moderation_list_id:
731 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_MODERATION_USER_LIST_ID)
733 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
734 if not moderation_user_list:
735 context.abort(grpc.StatusCode.NOT_FOUND, errors.MODERATION_USER_LIST_NOT_FOUND)
736 if user not in moderation_user_list.users:
737 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_THE_MODERATION_USER_LIST)
739 moderation_user_list.users.remove(user)
741 if len(moderation_user_list.users) == 0:
742 session.delete(moderation_user_list)
744 return empty_pb2.Empty()
746 def CreateAccountDeletionLink(self, request, context, session):
747 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
748 if not user:
749 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
750 expiry_days = request.expiry_days or 7
751 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2))
752 session.add(token)
753 return admin_pb2.CreateAccountDeletionLinkRes(
754 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token)
755 )
757 def AccessStats(self, request, context, session):
758 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
759 if not user:
760 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
762 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90)
763 end_time = to_aware_datetime(request.end_time) if request.end_time else now()
765 user_activity = session.execute(
766 select(
767 UserActivity.ip_address,
768 UserActivity.user_agent,
769 func.sum(UserActivity.api_calls),
770 func.count(UserActivity.period),
771 func.min(UserActivity.period),
772 func.max(UserActivity.period),
773 )
774 .where(UserActivity.user_id == user.id)
775 .where(UserActivity.period >= start_time)
776 .where(UserActivity.period >= end_time)
777 .order_by(func.max(UserActivity.period).desc())
778 .group_by(UserActivity.ip_address, UserActivity.user_agent)
779 ).all()
781 out = admin_pb2.AccessStatsRes()
783 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity:
784 user_agent_data = user_agents_parse(user_agent or "")
785 asn = geoip_asn(ip_address)
786 out.stats.append(
787 admin_pb2.AccessStat(
788 ip_address=ip_address,
789 asn=str(asn[0]) if asn else None,
790 asorg=str(asn[1]) if asn else None,
791 asnetwork=str(asn[2]) if asn else None,
792 user_agent=user_agent,
793 operating_system=user_agent_data.os.family,
794 browser=user_agent_data.browser.family,
795 device=user_agent_data.device.family,
796 approximate_location=geoip_approximate_location(ip_address) or "Unknown",
797 api_call_count=api_call_count,
798 periods_count=periods_count,
799 first_seen=Timestamp_from_datetime(first_seen),
800 last_seen=Timestamp_from_datetime(last_seen),
801 )
802 )
804 return out
806 def SendBlogPostNotification(self, request, context, session):
807 if len(request.title) > 50:
808 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_TITLE_TOO_LONG)
809 if len(request.blurb) > 100:
810 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_BLURB_TOO_LONG)
811 queue_job(
812 session,
813 "generate_new_blog_post_notifications",
814 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload(
815 url=request.url,
816 title=request.title,
817 blurb=request.blurb,
818 ),
819 )
820 return empty_pb2.Empty()