Coverage for src/couchers/servicers/admin.py: 67%
393 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-02 02:47 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-02 02:47 +0000
1import json
2import logging
3from datetime import timedelta
5import grpc
6from geoalchemy2.shape import from_shape
7from google.protobuf import empty_pb2
8from shapely.geometry import shape
9from sqlalchemy.sql import and_, func, or_, select, update
10from user_agents import parse as user_agents_parse
12from couchers import errors, urls
13from couchers.crypto import urlsafe_secure_token
14from couchers.db import session_scope
15from couchers.helpers.badges import user_add_badge, user_remove_badge
16from couchers.helpers.clusters import create_cluster, create_node
17from couchers.helpers.geoip import geoip_approximate_location, geoip_asn
18from couchers.jobs.enqueue import queue_job
19from couchers.models import (
20 AccountDeletionToken,
21 Comment,
22 ContentReport,
23 Discussion,
24 Event,
25 EventCommunityInviteRequest,
26 EventOccurrence,
27 GroupChat,
28 GroupChatSubscription,
29 HostRequest,
30 LanguageAbility,
31 Message,
32 ModNote,
33 Node,
34 Reference,
35 Reply,
36 User,
37 UserActivity,
38 UserBadge,
39)
40from couchers.notifications.notify import notify
41from couchers.resources import get_badge_dict
42from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb
43from couchers.servicers.auth import create_session
44from couchers.servicers.communities import community_to_pb
45from couchers.servicers.events import get_users_to_notify_for_new_event
46from couchers.servicers.threads import unpack_thread_id
47from couchers.sql import couchers_select as select
48from couchers.utils import Timestamp_from_datetime, date_to_api, make_user_context, now, parse_date, to_aware_datetime
49from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2
50from proto.internal import jobs_pb2
52logger = logging.getLogger(__name__)
54MAX_PAGINATION_LENGTH = 250
57def _user_to_details(session, user):
58 return admin_pb2.UserDetails(
59 user_id=user.id,
60 username=user.username,
61 name=user.name,
62 email=user.email,
63 gender=user.gender,
64 birthdate=date_to_api(user.birthdate),
65 banned=user.is_banned,
66 deleted=user.is_deleted,
67 do_not_email=user.do_not_email,
68 badges=[badge.badge_id for badge in user.badges],
69 **get_strong_verification_fields(session, user),
70 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
71 admin_note=user.admin_note,
72 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
73 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
74 )
77def _content_report_to_pb(content_report: ContentReport):
78 return admin_pb2.ContentReport(
79 content_report_id=content_report.id,
80 time=Timestamp_from_datetime(content_report.time),
81 reporting_user_id=content_report.reporting_user_id,
82 author_user_id=content_report.author_user_id,
83 reason=content_report.reason,
84 description=content_report.description,
85 content_ref=content_report.content_ref,
86 user_agent=content_report.user_agent,
87 page=content_report.page,
88 )
91def append_admin_note(session, context, user, note):
92 if not note.strip():
93 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY)
94 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
95 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n"
98def load_community_geom(geojson, context):
99 geom = shape(json.loads(geojson))
101 if geom.geom_type != "MultiPolygon":
102 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON)
104 return geom
107def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload):
108 with session_scope() as session:
109 all_users = session.execute(select(User).where(User.is_visible)).scalars().all()
110 for user in all_users:
111 context = make_user_context(user_id=user.id)
112 notify(
113 session,
114 user_id=user.id,
115 topic_action="general:new_blog_post",
116 data=notification_data_pb2.GeneralNewBlogPost(
117 url=payload.url,
118 title=payload.title,
119 blurb=payload.blurb,
120 ),
121 )
124class Admin(admin_pb2_grpc.AdminServicer):
125 def GetUserDetails(self, request, context, session):
126 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
127 if not user:
128 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
129 return _user_to_details(session, user)
131 def GetUser(self, request, context, session):
132 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
133 if not user:
134 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
135 return user_model_to_pb(user, session, context)
137 def SearchUsers(self, request, context, session):
138 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
139 next_user_id = int(request.page_token) if request.page_token else 0
140 statement = select(User)
141 if request.username:
142 statement = statement.where(User.username.ilike(request.username))
143 if request.email:
144 statement = statement.where(User.email.ilike(request.email))
145 if request.name:
146 statement = statement.where(User.name.ilike(request.name))
147 if request.admin_note:
148 statement = statement.where(User.admin_note.ilike(request.admin_note))
149 if request.city:
150 statement = statement.where(User.city.ilike(request.city))
151 if request.min_user_id:
152 statement = statement.where(User.id >= request.min_user_id)
153 if request.max_user_id:
154 statement = statement.where(User.id <= request.max_user_id)
155 if request.min_birthdate:
156 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate))
157 if request.max_birthdate:
158 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate))
159 if request.genders:
160 statement = statement.where(User.gender.in_(request.genders))
161 if request.min_joined_date:
162 statement = statement.where(User.joined >= parse_date(request.min_joined_date))
163 if request.max_joined_date:
164 statement = statement.where(User.joined <= parse_date(request.max_joined_date))
165 if request.min_last_active_date:
166 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date))
167 if request.max_last_active_date:
168 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date))
169 if request.genders:
170 statement = statement.where(User.gender.in_(request.genders))
171 if request.language_codes:
172 statement = statement.join(
173 LanguageAbility,
174 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)),
175 )
176 if request.HasField("is_deleted"):
177 statement = statement.where(User.is_deleted == request.is_deleted.value)
178 if request.HasField("is_banned"):
179 statement = statement.where(User.is_banned == request.is_banned.value)
180 if request.HasField("has_avatar"):
181 if request.has_avatar.value:
182 statement = statement.where(User.avatar_key != None)
183 else:
184 statement = statement.where(User.avatar_key == None)
185 users = (
186 session.execute(statement.where(User.id >= next_user_id).order_by(User.id).limit(page_size + 1))
187 .scalars()
188 .all()
189 )
190 logger.info(users)
191 return admin_pb2.SearchUsersRes(
192 users=[_user_to_details(session, user) for user in users[:page_size]],
193 next_page_token=str(users[-1].id) if len(users) > page_size else None,
194 )
196 def ChangeUserGender(self, request, context, session):
197 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
198 if not user:
199 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
200 user.gender = request.gender
201 session.commit()
203 notify(
204 session,
205 user_id=user.id,
206 topic_action="gender:change",
207 data=notification_data_pb2.GenderChange(
208 gender=request.gender,
209 ),
210 )
212 return _user_to_details(session, user)
214 def ChangeUserBirthdate(self, request, context, session):
215 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
216 if not user:
217 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
218 user.birthdate = parse_date(request.birthdate)
219 session.commit()
221 notify(
222 session,
223 user_id=user.id,
224 topic_action="birthdate:change",
225 data=notification_data_pb2.BirthdateChange(
226 birthdate=request.birthdate,
227 ),
228 )
230 return _user_to_details(session, user)
232 def AddBadge(self, request, context, session):
233 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
234 if not user:
235 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
237 badge = get_badge_dict().get(request.badge_id)
238 if not badge:
239 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
241 if not badge["admin_editable"]:
242 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
244 if badge["id"] in [b.badge_id for b in user.badges]:
245 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE)
247 user_add_badge(session, user.id, request.badge_id)
249 return _user_to_details(session, user)
251 def RemoveBadge(self, request, context, session):
252 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
253 if not user:
254 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
256 badge = get_badge_dict().get(request.badge_id)
257 if not badge:
258 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
260 if not badge["admin_editable"]:
261 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
263 user_badge = session.execute(
264 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"])
265 ).scalar_one_or_none()
266 if not user_badge:
267 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE)
269 user_remove_badge(session, user.id, request.badge_id)
271 return _user_to_details(session, user)
273 def SetPassportSexGenderException(self, request, context, session):
274 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
275 if not user:
276 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
277 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
278 return _user_to_details(session, user)
280 def BanUser(self, request, context, session):
281 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
282 if not user:
283 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
284 append_admin_note(session, context, user, request.admin_note)
285 user.is_banned = True
286 return _user_to_details(session, user)
288 def UnbanUser(self, request, context, session):
289 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
290 if not user:
291 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
292 append_admin_note(session, context, user, request.admin_note)
293 user.is_banned = False
294 return _user_to_details(session, user)
296 def AddAdminNote(self, request, context, session):
297 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
298 if not user:
299 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
300 append_admin_note(session, context, user, request.admin_note)
301 return _user_to_details(session, user)
303 def GetContentReport(self, request, context, session):
304 content_report = session.execute(
305 select(ContentReport).where(ContentReport.id == request.content_report_id)
306 ).scalar_one_or_none()
307 if not content_report:
308 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND)
309 return admin_pb2.GetContentReportRes(
310 content_report=_content_report_to_pb(content_report),
311 )
313 def GetContentReportsForAuthor(self, request, context, session):
314 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
315 if not user:
316 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
317 content_reports = (
318 session.execute(
319 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
320 )
321 .scalars()
322 .all()
323 )
324 return admin_pb2.GetContentReportsForAuthorRes(
325 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
326 )
328 def SendModNote(self, request, context, session):
329 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
330 if not user:
331 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
332 session.add(
333 ModNote(
334 user_id=user.id,
335 internal_id=request.internal_id,
336 creator_user_id=context.user_id,
337 note_content=request.content,
338 )
339 )
340 session.flush()
342 if not request.do_not_notify:
343 notify(
344 session,
345 user_id=user.id,
346 topic_action="modnote:create",
347 )
349 return _user_to_details(session, user)
351 def MarkUserNeedsLocationUpdate(self, request, context, session):
352 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
353 if not user:
354 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
355 user.needs_to_update_location = True
356 return _user_to_details(session, user)
358 def DeleteUser(self, request, context, session):
359 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
360 if not user:
361 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
362 user.is_deleted = True
363 return _user_to_details(session, user)
365 def RecoverDeletedUser(self, request, context, session):
366 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
367 if not user:
368 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
369 user.is_deleted = False
370 return _user_to_details(session, user)
372 def CreateApiKey(self, request, context, session):
373 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
374 if not user:
375 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
376 token, expiry = create_session(
377 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
378 )
380 notify(
381 session,
382 user_id=user.id,
383 topic_action="api_key:create",
384 data=notification_data_pb2.ApiKeyCreate(
385 api_key=token,
386 expiry=Timestamp_from_datetime(expiry),
387 ),
388 )
390 return _user_to_details(session, user)
392 def CreateCommunity(self, request, context, session):
393 geom = load_community_geom(request.geojson, context)
395 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
396 node = create_node(session, geom, parent_node_id)
397 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
399 return community_to_pb(session, node, context)
401 def UpdateCommunity(self, request, context, session):
402 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
403 if not node:
404 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
405 cluster = node.official_cluster
407 if request.name:
408 cluster.name = request.name
410 if request.description:
411 cluster.description = request.description
413 if request.geojson:
414 geom = load_community_geom(request.geojson, context)
416 node.geom = from_shape(geom)
418 if request.parent_node_id != 0:
419 node.parent_node_id = request.parent_node_id
421 session.flush()
423 return community_to_pb(session, cluster.parent_node, context)
425 def GetChats(self, request, context, session):
426 def format_user(user):
427 return f"{user.name} ({user.username}, {user.id})"
429 def format_conversation(conversation_id):
430 out = ""
431 messages = (
432 session.execute(
433 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
434 )
435 .scalars()
436 .all()
437 )
438 for message in messages:
439 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n"
440 out += str(message.text)
441 out += "\n\n-----\n"
442 out += "\n\n\n\n"
443 return out
445 def format_host_request(host_request_id):
446 out = ""
447 host_request = session.execute(
448 select(HostRequest).where(HostRequest.conversation_id == host_request_id)
449 ).scalar_one()
450 out += "==============================\n"
451 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n"
452 out += format_conversation(host_request.conversation_id)
453 out += "\n\n\n\n"
454 return out
456 def format_group_chat(group_chat_id):
457 out = ""
458 group_chat = session.execute(
459 select(GroupChat).where(GroupChat.conversation_id == group_chat_id)
460 ).scalar_one()
461 out += "==============================\n"
462 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n"
463 subs = (
464 session.execute(
465 select(GroupChatSubscription)
466 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
467 .order_by(GroupChatSubscription.joined.asc())
468 )
469 .scalars()
470 .all()
471 )
472 for sub in subs:
473 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n"
474 out += "\n\nMessages:\n"
475 out += format_conversation(group_chat.conversation_id)
476 out += "\n\n\n\n"
477 return out
479 def format_all_chats_for_user(user_id):
480 out = ""
481 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
482 out += f"Chats for user {format_user(user)}\n"
483 host_request_ids = (
484 session.execute(
485 select(HostRequest.conversation_id)
486 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id))
487 .order_by(HostRequest.conversation_id.desc())
488 )
489 .scalars()
490 .all()
491 )
492 out += f"************************************* Requests ({len(host_request_ids)})\n"
493 for host_request in host_request_ids:
494 out += format_host_request(host_request)
495 group_chat_ids = (
496 session.execute(
497 select(GroupChatSubscription.group_chat_id)
498 .where(GroupChatSubscription.user_id == user_id)
499 .order_by(GroupChatSubscription.joined.desc())
500 )
501 .scalars()
502 .all()
503 )
504 out += f"************************************* Group chats ({len(group_chat_ids)})\n"
505 for group_chat_id in group_chat_ids:
506 out += format_group_chat(group_chat_id)
507 return out
509 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
510 if not user:
511 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
513 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id))
515 def ListEventCommunityInviteRequests(self, request, context, session):
516 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
517 next_request_id = int(request.page_token) if request.page_token else 0
518 requests = (
519 session.execute(
520 select(EventCommunityInviteRequest)
521 .where(EventCommunityInviteRequest.approved.is_(None))
522 .where(EventCommunityInviteRequest.id >= next_request_id)
523 .order_by(EventCommunityInviteRequest.id)
524 .limit(page_size + 1)
525 )
526 .scalars()
527 .all()
528 )
530 def _request_to_pb(request):
531 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
532 return admin_pb2.EventCommunityInviteRequest(
533 event_community_invite_request_id=request.id,
534 user_id=request.user_id,
535 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
536 approx_users_to_notify=len(users_to_notify),
537 community_id=node_id,
538 )
540 return admin_pb2.ListEventCommunityInviteRequestsRes(
541 requests=[_request_to_pb(request) for request in requests[:page_size]],
542 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
543 )
545 def DecideEventCommunityInviteRequest(self, request, context, session):
546 req = session.execute(
547 select(EventCommunityInviteRequest).where(
548 EventCommunityInviteRequest.id == request.event_community_invite_request_id
549 )
550 ).scalar_one_or_none()
552 if not req:
553 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND)
555 if req.decided:
556 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED)
558 decided = now()
559 req.decided = decided
560 req.decided_by_user_id = context.user_id
561 req.approved = request.approve
563 # deny other reqs for the same event
564 if request.approve:
565 session.execute(
566 update(EventCommunityInviteRequest)
567 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
568 .where(EventCommunityInviteRequest.decided.is_(None))
569 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
570 )
572 session.flush()
574 if request.approve:
575 queue_job(
576 session,
577 "generate_event_create_notifications",
578 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
579 inviting_user_id=req.user_id,
580 occurrence_id=req.occurrence_id,
581 approved=True,
582 ),
583 )
585 return admin_pb2.DecideEventCommunityInviteRequestRes()
587 def DeleteEvent(self, request, context, session):
588 res = session.execute(
589 select(Event, EventOccurrence)
590 .where(EventOccurrence.id == request.event_id)
591 .where(EventOccurrence.event_id == Event.id)
592 .where(~EventOccurrence.is_deleted)
593 ).one_or_none()
595 if not res:
596 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
598 event, occurrence = res
600 occurrence.is_deleted = True
602 queue_job(
603 session,
604 "generate_event_delete_notifications",
605 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
606 occurrence_id=occurrence.id,
607 ),
608 )
610 return empty_pb2.Empty()
612 def ListUserIds(self, request, context, session):
613 start_date = request.start_time.ToDatetime()
614 end_date = request.end_time.ToDatetime()
616 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
617 next_user_id = int(request.page_token) if request.page_token else 0
619 user_ids = (
620 session.execute(
621 select(User.id)
622 .where(or_(User.id <= next_user_id, next_user_id == 0))
623 .where(User.joined >= start_date)
624 .where(User.joined <= end_date)
625 .order_by(User.id.desc())
626 .limit(page_size + 1)
627 )
628 .scalars()
629 .all()
630 )
632 return admin_pb2.ListUserIdsRes(
633 user_ids=user_ids[:page_size],
634 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
635 )
637 def EditReferenceText(self, request, context, session):
638 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
640 if reference is None:
641 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND)
643 if not request.new_text.strip():
644 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.REFERENCE_NO_TEXT)
646 reference.text = request.new_text.strip()
647 return empty_pb2.Empty()
649 def DeleteReference(self, request, context, session):
650 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
652 if reference is None:
653 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND)
655 reference.is_deleted = True
656 return empty_pb2.Empty()
658 def EditDiscussion(self, request, context, session):
659 discussion = session.execute(
660 select(Discussion).where(Discussion.id == request.discussion_id)
661 ).scalar_one_or_none()
662 if not discussion:
663 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND)
664 if request.new_title:
665 discussion.title = request.new_title.strip()
666 if request.new_content:
667 discussion.content = request.new_content.strip()
668 return empty_pb2.Empty()
670 def EditReply(self, request, context, session):
671 database_id, depth = unpack_thread_id(request.reply_id)
672 if depth == 1:
673 obj = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one_or_none()
674 elif depth == 2:
675 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
676 if not obj:
677 context.abort(grpc.StatusCode.NOT_FOUND, errors.OBJECT_NOT_FOUND)
678 obj.content = request.new_content.strip()
679 return empty_pb2.Empty()
681 def CreateAccountDeletionLink(self, request, context, session):
682 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
683 if not user:
684 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
685 expiry_days = request.expiry_days or 7
686 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2))
687 session.add(token)
688 return admin_pb2.CreateAccountDeletionLinkRes(
689 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token)
690 )
692 def AccessStats(self, request, context, session):
693 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
694 if not user:
695 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
697 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90)
698 end_time = to_aware_datetime(request.end_time) if request.end_time else now()
700 user_activity = session.execute(
701 select(
702 UserActivity.ip_address,
703 UserActivity.user_agent,
704 func.sum(UserActivity.api_calls),
705 func.count(UserActivity.period),
706 func.min(UserActivity.period),
707 func.max(UserActivity.period),
708 )
709 .where(UserActivity.user_id == user.id)
710 .where(UserActivity.period >= start_time)
711 .where(UserActivity.period >= end_time)
712 .order_by(func.max(UserActivity.period).desc())
713 .group_by(UserActivity.ip_address, UserActivity.user_agent)
714 ).all()
716 out = admin_pb2.AccessStatsRes()
718 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity:
719 user_agent_data = user_agents_parse(user_agent or "")
720 asn = geoip_asn(ip_address)
721 out.stats.append(
722 admin_pb2.AccessStat(
723 ip_address=ip_address,
724 asn=str(asn[0]) if asn else None,
725 asorg=str(asn[1]) if asn else None,
726 asnetwork=str(asn[2]) if asn else None,
727 user_agent=user_agent,
728 operating_system=user_agent_data.os.family,
729 browser=user_agent_data.browser.family,
730 device=user_agent_data.device.family,
731 approximate_location=geoip_approximate_location(ip_address) or "Unknown",
732 api_call_count=api_call_count,
733 periods_count=periods_count,
734 first_seen=Timestamp_from_datetime(first_seen),
735 last_seen=Timestamp_from_datetime(last_seen),
736 )
737 )
739 return out
741 def SendBlogPostNotification(self, request, context, session):
742 if len(request.title) > 50:
743 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_TITLE_TOO_LONG)
744 if len(request.blurb) > 100:
745 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_BLURB_TOO_LONG)
746 queue_job(
747 session,
748 "generate_new_blog_post_notifications",
749 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload(
750 url=request.url,
751 title=request.title,
752 blurb=request.blurb,
753 ),
754 )
755 return empty_pb2.Empty()