Coverage for src/couchers/servicers/admin.py: 76%
338 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import json
2import logging
3from datetime import timedelta
5import grpc
6from geoalchemy2.shape import from_shape
7from google.protobuf import empty_pb2
8from shapely.geometry import shape
9from sqlalchemy.sql import func, or_, select, update
10from user_agents import parse as user_agents_parse
12from couchers import errors, urls
13from couchers.crypto import urlsafe_secure_token
14from couchers.db import session_scope
15from couchers.helpers.badges import user_add_badge, user_remove_badge
16from couchers.helpers.clusters import create_cluster, create_node
17from couchers.helpers.geoip import geoip_approximate_location, geoip_asn
18from couchers.jobs.enqueue import queue_job
19from couchers.models import (
20 AccountDeletionToken,
21 Comment,
22 ContentReport,
23 Discussion,
24 Event,
25 EventCommunityInviteRequest,
26 EventOccurrence,
27 GroupChat,
28 GroupChatSubscription,
29 HostRequest,
30 Message,
31 ModNote,
32 Node,
33 Reference,
34 Reply,
35 User,
36 UserActivity,
37 UserBadge,
38)
39from couchers.notifications.notify import notify
40from couchers.resources import get_badge_dict
41from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb
42from couchers.servicers.auth import create_session
43from couchers.servicers.communities import community_to_pb
44from couchers.servicers.events import get_users_to_notify_for_new_event
45from couchers.servicers.threads import unpack_thread_id
46from couchers.sql import couchers_select as select
47from couchers.utils import Timestamp_from_datetime, date_to_api, make_user_context, now, parse_date, to_aware_datetime
48from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2
49from proto.internal import jobs_pb2
51logger = logging.getLogger(__name__)
53MAX_PAGINATION_LENGTH = 250
56def _user_to_details(session, user):
57 return admin_pb2.UserDetails(
58 user_id=user.id,
59 username=user.username,
60 name=user.name,
61 email=user.email,
62 gender=user.gender,
63 birthdate=date_to_api(user.birthdate),
64 banned=user.is_banned,
65 deleted=user.is_deleted,
66 do_not_email=user.do_not_email,
67 badges=[badge.badge_id for badge in user.badges],
68 **get_strong_verification_fields(session, user),
69 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
70 admin_note=user.admin_note,
71 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
72 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
73 )
76def _content_report_to_pb(content_report: ContentReport):
77 return admin_pb2.ContentReport(
78 content_report_id=content_report.id,
79 time=Timestamp_from_datetime(content_report.time),
80 reporting_user_id=content_report.reporting_user_id,
81 author_user_id=content_report.author_user_id,
82 reason=content_report.reason,
83 description=content_report.description,
84 content_ref=content_report.content_ref,
85 user_agent=content_report.user_agent,
86 page=content_report.page,
87 )
90def append_admin_note(session, context, user, note):
91 if not note.strip():
92 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY)
93 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
94 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n"
97def load_community_geom(geojson, context):
98 geom = shape(json.loads(geojson))
100 if geom.geom_type != "MultiPolygon":
101 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON)
103 return geom
106def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload):
107 with session_scope() as session:
108 all_users = session.execute(select(User).where(User.is_visible)).scalars().all()
109 for user in all_users:
110 context = make_user_context(user_id=user.id)
111 notify(
112 session,
113 user_id=user.id,
114 topic_action="general:new_blog_post",
115 data=notification_data_pb2.GeneralNewBlogPost(
116 url=payload.url,
117 title=payload.title,
118 blurb=payload.blurb,
119 ),
120 )
123class Admin(admin_pb2_grpc.AdminServicer):
124 def GetUserDetails(self, request, context, session):
125 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
126 if not user:
127 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
128 return _user_to_details(session, user)
130 def GetUser(self, request, context, session):
131 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
132 if not user:
133 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
134 return user_model_to_pb(user, session, context)
136 def ChangeUserGender(self, request, context, session):
137 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
138 if not user:
139 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
140 user.gender = request.gender
141 session.commit()
143 notify(
144 session,
145 user_id=user.id,
146 topic_action="gender:change",
147 data=notification_data_pb2.GenderChange(
148 gender=request.gender,
149 ),
150 )
152 return _user_to_details(session, user)
154 def ChangeUserBirthdate(self, request, context, session):
155 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
156 if not user:
157 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
158 user.birthdate = parse_date(request.birthdate)
159 session.commit()
161 notify(
162 session,
163 user_id=user.id,
164 topic_action="birthdate:change",
165 data=notification_data_pb2.BirthdateChange(
166 birthdate=request.birthdate,
167 ),
168 )
170 return _user_to_details(session, user)
172 def AddBadge(self, request, context, session):
173 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
174 if not user:
175 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
177 badge = get_badge_dict().get(request.badge_id)
178 if not badge:
179 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
181 if not badge["admin_editable"]:
182 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
184 if badge["id"] in [b.badge_id for b in user.badges]:
185 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE)
187 user_add_badge(session, user.id, request.badge_id)
189 return _user_to_details(session, user)
191 def RemoveBadge(self, request, context, session):
192 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
193 if not user:
194 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
196 badge = get_badge_dict().get(request.badge_id)
197 if not badge:
198 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND)
200 if not badge["admin_editable"]:
201 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE)
203 user_badge = session.execute(
204 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"])
205 ).scalar_one_or_none()
206 if not user_badge:
207 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE)
209 user_remove_badge(session, user.id, request.badge_id)
211 return _user_to_details(session, user)
213 def SetPassportSexGenderException(self, request, context, session):
214 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
215 if not user:
216 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
217 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
218 return _user_to_details(session, user)
220 def BanUser(self, request, context, session):
221 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
222 if not user:
223 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
224 append_admin_note(session, context, user, request.admin_note)
225 user.is_banned = True
226 return _user_to_details(session, user)
228 def UnbanUser(self, request, context, session):
229 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
230 if not user:
231 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
232 append_admin_note(session, context, user, request.admin_note)
233 user.is_banned = False
234 return _user_to_details(session, user)
236 def AddAdminNote(self, request, context, session):
237 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
238 if not user:
239 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
240 append_admin_note(session, context, user, request.admin_note)
241 return _user_to_details(session, user)
243 def GetContentReport(self, request, context, session):
244 content_report = session.execute(
245 select(ContentReport).where(ContentReport.id == request.content_report_id)
246 ).scalar_one_or_none()
247 if not content_report:
248 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND)
249 return admin_pb2.GetContentReportRes(
250 content_report=_content_report_to_pb(content_report),
251 )
253 def GetContentReportsForAuthor(self, request, context, session):
254 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
255 if not user:
256 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
257 content_reports = (
258 session.execute(
259 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
260 )
261 .scalars()
262 .all()
263 )
264 return admin_pb2.GetContentReportsForAuthorRes(
265 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
266 )
268 def SendModNote(self, request, context, session):
269 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
270 if not user:
271 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
272 session.add(
273 ModNote(
274 user_id=user.id,
275 internal_id=request.internal_id,
276 creator_user_id=context.user_id,
277 note_content=request.content,
278 )
279 )
280 session.flush()
282 if not request.do_not_notify:
283 notify(
284 session,
285 user_id=user.id,
286 topic_action="modnote:create",
287 )
289 return _user_to_details(session, user)
291 def MarkUserNeedsLocationUpdate(self, request, context, session):
292 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
293 if not user:
294 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
295 user.needs_to_update_location = True
296 return _user_to_details(session, user)
298 def DeleteUser(self, request, context, session):
299 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
300 if not user:
301 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
302 user.is_deleted = True
303 return _user_to_details(session, user)
305 def CreateApiKey(self, request, context, session):
306 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
307 if not user:
308 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
309 token, expiry = create_session(
310 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
311 )
313 notify(
314 session,
315 user_id=user.id,
316 topic_action="api_key:create",
317 data=notification_data_pb2.ApiKeyCreate(
318 api_key=token,
319 expiry=Timestamp_from_datetime(expiry),
320 ),
321 )
323 return _user_to_details(session, user)
325 def CreateCommunity(self, request, context, session):
326 geom = load_community_geom(request.geojson, context)
328 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
329 node = create_node(session, geom, parent_node_id)
330 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
332 return community_to_pb(session, node, context)
334 def UpdateCommunity(self, request, context, session):
335 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
336 if not node:
337 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
338 cluster = node.official_cluster
340 if request.name:
341 cluster.name = request.name
343 if request.description:
344 cluster.description = request.description
346 if request.geojson:
347 geom = load_community_geom(request.geojson, context)
349 node.geom = from_shape(geom)
351 if request.parent_node_id != 0:
352 node.parent_node_id = request.parent_node_id
354 session.flush()
356 return community_to_pb(session, cluster.parent_node, context)
358 def GetChats(self, request, context, session):
359 def format_user(user):
360 return f"{user.name} ({user.username}, {user.id})"
362 def format_conversation(conversation_id):
363 out = ""
364 messages = (
365 session.execute(
366 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
367 )
368 .scalars()
369 .all()
370 )
371 for message in messages:
372 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n"
373 out += str(message.text)
374 out += "\n\n-----\n"
375 out += "\n\n\n\n"
376 return out
378 def format_host_request(host_request_id):
379 out = ""
380 host_request = session.execute(
381 select(HostRequest).where(HostRequest.conversation_id == host_request_id)
382 ).scalar_one()
383 out += "==============================\n"
384 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n"
385 out += format_conversation(host_request.conversation_id)
386 out += "\n\n\n\n"
387 return out
389 def format_group_chat(group_chat_id):
390 out = ""
391 group_chat = session.execute(
392 select(GroupChat).where(GroupChat.conversation_id == group_chat_id)
393 ).scalar_one()
394 out += "==============================\n"
395 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n"
396 subs = (
397 session.execute(
398 select(GroupChatSubscription)
399 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
400 .order_by(GroupChatSubscription.joined.asc())
401 )
402 .scalars()
403 .all()
404 )
405 for sub in subs:
406 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n"
407 out += "\n\nMessages:\n"
408 out += format_conversation(group_chat.conversation_id)
409 out += "\n\n\n\n"
410 return out
412 def format_all_chats_for_user(user_id):
413 out = ""
414 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
415 out += f"Chats for user {format_user(user)}\n"
416 host_request_ids = (
417 session.execute(
418 select(HostRequest.conversation_id)
419 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id))
420 .order_by(HostRequest.conversation_id.desc())
421 )
422 .scalars()
423 .all()
424 )
425 out += f"************************************* Requests ({len(host_request_ids)})\n"
426 for host_request in host_request_ids:
427 out += format_host_request(host_request)
428 group_chat_ids = (
429 session.execute(
430 select(GroupChatSubscription.group_chat_id)
431 .where(GroupChatSubscription.user_id == user_id)
432 .order_by(GroupChatSubscription.joined.desc())
433 )
434 .scalars()
435 .all()
436 )
437 out += f"************************************* Group chats ({len(group_chat_ids)})\n"
438 for group_chat_id in group_chat_ids:
439 out += format_group_chat(group_chat_id)
440 return out
442 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
443 if not user:
444 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
446 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id))
448 def ListEventCommunityInviteRequests(self, request, context, session):
449 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
450 next_request_id = int(request.page_token) if request.page_token else 0
451 requests = (
452 session.execute(
453 select(EventCommunityInviteRequest)
454 .where(EventCommunityInviteRequest.approved.is_(None))
455 .where(EventCommunityInviteRequest.id >= next_request_id)
456 .order_by(EventCommunityInviteRequest.id)
457 .limit(page_size + 1)
458 )
459 .scalars()
460 .all()
461 )
463 def _request_to_pb(request):
464 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
465 return admin_pb2.EventCommunityInviteRequest(
466 event_community_invite_request_id=request.id,
467 user_id=request.user_id,
468 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
469 approx_users_to_notify=len(users_to_notify),
470 community_id=node_id,
471 )
473 return admin_pb2.ListEventCommunityInviteRequestsRes(
474 requests=[_request_to_pb(request) for request in requests[:page_size]],
475 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
476 )
478 def DecideEventCommunityInviteRequest(self, request, context, session):
479 req = session.execute(
480 select(EventCommunityInviteRequest).where(
481 EventCommunityInviteRequest.id == request.event_community_invite_request_id
482 )
483 ).scalar_one_or_none()
485 if not req:
486 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND)
488 if req.decided:
489 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED)
491 decided = now()
492 req.decided = decided
493 req.decided_by_user_id = context.user_id
494 req.approved = request.approve
496 # deny other reqs for the same event
497 if request.approve:
498 session.execute(
499 update(EventCommunityInviteRequest)
500 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
501 .where(EventCommunityInviteRequest.decided.is_(None))
502 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
503 )
505 session.flush()
507 if request.approve:
508 queue_job(
509 session,
510 "generate_event_create_notifications",
511 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
512 inviting_user_id=req.user_id,
513 occurrence_id=req.occurrence_id,
514 approved=True,
515 ),
516 )
518 return admin_pb2.DecideEventCommunityInviteRequestRes()
520 def DeleteEvent(self, request, context, session):
521 res = session.execute(
522 select(Event, EventOccurrence)
523 .where(EventOccurrence.id == request.event_id)
524 .where(EventOccurrence.event_id == Event.id)
525 .where(~EventOccurrence.is_deleted)
526 ).one_or_none()
528 if not res:
529 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
531 event, occurrence = res
533 occurrence.is_deleted = True
535 queue_job(
536 session,
537 "generate_event_delete_notifications",
538 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
539 occurrence_id=occurrence.id,
540 ),
541 )
543 return empty_pb2.Empty()
545 def ListUserIds(self, request, context, session):
546 start_date = request.start_time.ToDatetime()
547 end_date = request.end_time.ToDatetime()
549 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
550 next_user_id = int(request.page_token) if request.page_token else 0
552 user_ids = (
553 session.execute(
554 select(User.id)
555 .where(User.id >= next_user_id)
556 .where(User.joined >= start_date)
557 .where(User.joined <= end_date)
558 .order_by(User.joined.desc())
559 .limit(page_size + 1)
560 )
561 .scalars()
562 .all()
563 )
565 return admin_pb2.ListUserIdsRes(
566 user_ids=user_ids[:page_size],
567 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
568 )
570 def EditReferenceText(self, request, context, session):
571 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
573 if reference is None:
574 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND)
576 if not request.new_text.strip():
577 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.REFERENCE_NO_TEXT)
579 reference.text = request.new_text.strip()
580 return empty_pb2.Empty()
582 def DeleteReference(self, request, context, session):
583 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
585 if reference is None:
586 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND)
588 reference.is_deleted = True
589 return empty_pb2.Empty()
591 def EditDiscussion(self, request, context, session):
592 discussion = session.execute(
593 select(Discussion).where(Discussion.id == request.discussion_id)
594 ).scalar_one_or_none()
595 if request.new_title:
596 discussion.title = request.new_title.strip()
597 if request.new_content:
598 discussion.content = request.new_content.strip()
599 return empty_pb2.Empty()
601 def EditReply(self, request, context, session):
602 database_id, depth = unpack_thread_id(request.reply_id)
603 if depth == 1:
604 obj = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one_or_none()
605 elif depth == 2:
606 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
607 if not obj:
608 context.abort(grpc.StatusCode.NOT_FOUND, errors.OBJECT_NOT_FOUND)
609 obj.content = request.new_content.strip()
610 return empty_pb2.Empty()
612 def CreateAccountDeletionLink(self, request, context, session):
613 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
614 if not user:
615 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
616 expiry_days = request.expiry_days or 7
617 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2))
618 session.add(token)
619 return admin_pb2.CreateAccountDeletionLinkRes(
620 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token)
621 )
623 def AccessStats(self, request, context, session):
624 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
625 if not user:
626 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
628 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90)
629 end_time = to_aware_datetime(request.end_time) if request.end_time else now()
631 user_activity = session.execute(
632 select(
633 UserActivity.ip_address,
634 UserActivity.user_agent,
635 func.sum(UserActivity.api_calls),
636 func.count(UserActivity.period),
637 func.min(UserActivity.period),
638 func.max(UserActivity.period),
639 )
640 .where(UserActivity.user_id == user.id)
641 .where(UserActivity.period >= start_time)
642 .where(UserActivity.period >= end_time)
643 .order_by(func.max(UserActivity.period).desc())
644 .group_by(UserActivity.ip_address, UserActivity.user_agent)
645 ).all()
647 out = admin_pb2.AccessStatsRes()
649 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity:
650 user_agent_data = user_agents_parse(user_agent or "")
651 asn = geoip_asn(ip_address)
652 out.stats.append(
653 admin_pb2.AccessStat(
654 ip_address=ip_address,
655 asn=str(asn[0]) if asn else None,
656 asorg=str(asn[1]) if asn else None,
657 asnetwork=str(asn[2]) if asn else None,
658 user_agent=user_agent,
659 operating_system=user_agent_data.os.family,
660 browser=user_agent_data.browser.family,
661 device=user_agent_data.device.family,
662 approximate_location=geoip_approximate_location(ip_address) or "Unknown",
663 api_call_count=api_call_count,
664 periods_count=periods_count,
665 first_seen=Timestamp_from_datetime(first_seen),
666 last_seen=Timestamp_from_datetime(last_seen),
667 )
668 )
670 return out
672 def SendBlogPostNotification(self, request, context, session):
673 if len(request.title) > 50:
674 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_TITLE_TOO_LONG)
675 if len(request.blurb) > 100:
676 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_BLURB_TOO_LONG)
677 queue_job(
678 session,
679 "generate_new_blog_post_notifications",
680 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload(
681 url=request.url,
682 title=request.title,
683 blurb=request.blurb,
684 ),
685 )
686 return empty_pb2.Empty()