Coverage for src/couchers/servicers/admin.py: 69%
347 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-08 11:38 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-08 11:38 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import and_, func, or_, select
7from user_agents import parse as user_agents_parse
9from couchers import urls
10from couchers.crypto import urlsafe_secure_token
11from couchers.helpers.badges import user_add_badge, user_remove_badge
12from couchers.helpers.geoip import geoip_approximate_location, geoip_asn
13from couchers.helpers.strong_verification import get_strong_verification_fields
14from couchers.jobs.enqueue import queue_job
15from couchers.models import (
16 AccountDeletionToken,
17 Comment,
18 ContentReport,
19 Discussion,
20 Event,
21 EventOccurrence,
22 GroupChat,
23 GroupChatSubscription,
24 HostRequest,
25 LanguageAbility,
26 Message,
27 ModerationUserList,
28 ModNote,
29 Reference,
30 Reply,
31 User,
32 UserActivity,
33 UserBadge,
34)
35from couchers.notifications.notify import notify
36from couchers.proto import admin_pb2, admin_pb2_grpc, notification_data_pb2
37from couchers.proto.internal import jobs_pb2
38from couchers.resources import get_badge_dict
39from couchers.servicers.api import user_model_to_pb
40from couchers.servicers.auth import create_session
41from couchers.servicers.threads import unpack_thread_id
42from couchers.sql import couchers_select as select
43from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime
45logger = logging.getLogger(__name__)
47MAX_PAGINATION_LENGTH = 250
50def _user_to_details(session, user):
51 return admin_pb2.UserDetails(
52 user_id=user.id,
53 username=user.username,
54 name=user.name,
55 email=user.email,
56 gender=user.gender,
57 birthdate=date_to_api(user.birthdate),
58 banned=user.is_banned,
59 deleted=user.is_deleted,
60 do_not_email=user.do_not_email,
61 badges=[badge.badge_id for badge in user.badges],
62 **get_strong_verification_fields(session, user),
63 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
64 admin_note=user.admin_note,
65 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
66 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
67 )
70def _content_report_to_pb(content_report: ContentReport):
71 return admin_pb2.ContentReport(
72 content_report_id=content_report.id,
73 time=Timestamp_from_datetime(content_report.time),
74 reporting_user_id=content_report.reporting_user_id,
75 author_user_id=content_report.author_user_id,
76 reason=content_report.reason,
77 description=content_report.description,
78 content_ref=content_report.content_ref,
79 user_agent=content_report.user_agent,
80 page=content_report.page,
81 )
84def append_admin_note(session, context, user, note):
85 if not note.strip():
86 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty")
87 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
88 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n"
91class Admin(admin_pb2_grpc.AdminServicer):
92 def GetUserDetails(self, request, context, session):
93 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
94 if not user:
95 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
96 return _user_to_details(session, user)
98 def GetUser(self, request, context, session):
99 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
100 if not user:
101 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
102 return user_model_to_pb(user, session, context, is_admin_see_ghosts=True)
104 def SearchUsers(self, request, context, session):
105 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
106 next_user_id = int(request.page_token) if request.page_token else 0
107 statement = select(User)
108 if request.username:
109 statement = statement.where(User.username.ilike(request.username))
110 if request.email:
111 statement = statement.where(User.email.ilike(request.email))
112 if request.name:
113 statement = statement.where(User.name.ilike(request.name))
114 if request.admin_note:
115 statement = statement.where(User.admin_note.ilike(request.admin_note))
116 if request.city:
117 statement = statement.where(User.city.ilike(request.city))
118 if request.min_user_id:
119 statement = statement.where(User.id >= request.min_user_id)
120 if request.max_user_id:
121 statement = statement.where(User.id <= request.max_user_id)
122 if request.min_birthdate:
123 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate))
124 if request.max_birthdate:
125 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate))
126 if request.genders:
127 statement = statement.where(User.gender.in_(request.genders))
128 if request.min_joined_date:
129 statement = statement.where(User.joined >= parse_date(request.min_joined_date))
130 if request.max_joined_date:
131 statement = statement.where(User.joined <= parse_date(request.max_joined_date))
132 if request.min_last_active_date:
133 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date))
134 if request.max_last_active_date:
135 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date))
136 if request.genders:
137 statement = statement.where(User.gender.in_(request.genders))
138 if request.language_codes:
139 statement = statement.join(
140 LanguageAbility,
141 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)),
142 )
143 if request.HasField("is_deleted"):
144 statement = statement.where(User.is_deleted == request.is_deleted.value)
145 if request.HasField("is_banned"):
146 statement = statement.where(User.is_banned == request.is_banned.value)
147 if request.HasField("has_avatar"):
148 if request.has_avatar.value:
149 statement = statement.where(User.avatar_key != None)
150 else:
151 statement = statement.where(User.avatar_key == None)
152 users = (
153 session.execute(statement.where(User.id >= next_user_id).order_by(User.id).limit(page_size + 1))
154 .scalars()
155 .all()
156 )
157 logger.info(users)
158 return admin_pb2.SearchUsersRes(
159 users=[_user_to_details(session, user) for user in users[:page_size]],
160 next_page_token=str(users[-1].id) if len(users) > page_size else None,
161 )
163 def ChangeUserGender(self, request, context, session):
164 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
165 if not user:
166 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
167 user.gender = request.gender
168 session.commit()
170 notify(
171 session,
172 user_id=user.id,
173 topic_action="gender:change",
174 data=notification_data_pb2.GenderChange(
175 gender=request.gender,
176 ),
177 )
179 return _user_to_details(session, user)
181 def ChangeUserBirthdate(self, request, context, session):
182 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
183 if not user:
184 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
185 user.birthdate = parse_date(request.birthdate)
186 session.commit()
188 notify(
189 session,
190 user_id=user.id,
191 topic_action="birthdate:change",
192 data=notification_data_pb2.BirthdateChange(
193 birthdate=request.birthdate,
194 ),
195 )
197 return _user_to_details(session, user)
199 def AddBadge(self, request, context, session):
200 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
201 if not user:
202 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
204 badge = get_badge_dict().get(request.badge_id)
205 if not badge:
206 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found")
208 if not badge["admin_editable"]:
209 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge")
211 if badge["id"] in [b.badge_id for b in user.badges]:
212 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_badge")
214 user_add_badge(session, user.id, request.badge_id)
216 return _user_to_details(session, user)
218 def RemoveBadge(self, request, context, session):
219 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
220 if not user:
221 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
223 badge = get_badge_dict().get(request.badge_id)
224 if not badge:
225 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found")
227 if not badge["admin_editable"]:
228 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge")
230 user_badge = session.execute(
231 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"])
232 ).scalar_one_or_none()
233 if not user_badge:
234 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_badge")
236 user_remove_badge(session, user.id, request.badge_id)
238 return _user_to_details(session, user)
240 def SetPassportSexGenderException(self, request, context, session):
241 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
242 if not user:
243 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
244 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
245 return _user_to_details(session, user)
247 def BanUser(self, request, context, session):
248 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
249 if not user:
250 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
251 append_admin_note(session, context, user, request.admin_note)
252 user.is_banned = True
253 return _user_to_details(session, user)
255 def UnbanUser(self, request, context, session):
256 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
257 if not user:
258 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
259 append_admin_note(session, context, user, request.admin_note)
260 user.is_banned = False
261 return _user_to_details(session, user)
263 def AddAdminNote(self, request, context, session):
264 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
265 if not user:
266 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
267 append_admin_note(session, context, user, request.admin_note)
268 return _user_to_details(session, user)
270 def GetContentReport(self, request, context, session):
271 content_report = session.execute(
272 select(ContentReport).where(ContentReport.id == request.content_report_id)
273 ).scalar_one_or_none()
274 if not content_report:
275 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "content_report_not_found")
276 return admin_pb2.GetContentReportRes(
277 content_report=_content_report_to_pb(content_report),
278 )
280 def GetContentReportsForAuthor(self, request, context, session):
281 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
282 if not user:
283 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
284 content_reports = (
285 session.execute(
286 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
287 )
288 .scalars()
289 .all()
290 )
291 return admin_pb2.GetContentReportsForAuthorRes(
292 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
293 )
295 def SendModNote(self, request, context, session):
296 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
297 if not user:
298 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
299 session.add(
300 ModNote(
301 user_id=user.id,
302 internal_id=request.internal_id,
303 creator_user_id=context.user_id,
304 note_content=request.content,
305 )
306 )
307 session.flush()
309 if not request.do_not_notify:
310 notify(
311 session,
312 user_id=user.id,
313 topic_action="modnote:create",
314 )
316 return _user_to_details(session, user)
318 def MarkUserNeedsLocationUpdate(self, request, context, session):
319 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
320 if not user:
321 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
322 user.needs_to_update_location = True
323 return _user_to_details(session, user)
325 def DeleteUser(self, request, context, session):
326 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
327 if not user:
328 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
329 user.is_deleted = True
330 return _user_to_details(session, user)
332 def RecoverDeletedUser(self, request, context, session):
333 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
334 if not user:
335 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
336 user.is_deleted = False
337 return _user_to_details(session, user)
339 def CreateApiKey(self, request, context, session):
340 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
341 if not user:
342 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
343 token, expiry = create_session(
344 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
345 )
347 notify(
348 session,
349 user_id=user.id,
350 topic_action="api_key:create",
351 data=notification_data_pb2.ApiKeyCreate(
352 api_key=token,
353 expiry=Timestamp_from_datetime(expiry),
354 ),
355 )
357 return _user_to_details(session, user)
359 def GetChats(self, request, context, session):
360 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
361 if not user:
362 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
364 # Cache for UserDetails to avoid recomputing for the same user
365 user_details_cache = {}
367 def get_user_details(user_id):
368 if user_id not in user_details_cache:
369 u = session.execute(select(User).where(User.id == user_id)).scalar_one()
370 user_details_cache[user_id] = _user_to_details(session, u)
371 return user_details_cache[user_id]
373 def message_to_pb(message):
374 return admin_pb2.ChatMessage(
375 message_id=message.id,
376 author=get_user_details(message.author_id),
377 time=Timestamp_from_datetime(message.time),
378 message_type=message.message_type.name if message.message_type else "",
379 text=message.text or "",
380 host_request_status_target=(
381 message.host_request_status_target.name if message.host_request_status_target else ""
382 ),
383 target=get_user_details(message.target_id) if message.target_id else None,
384 )
386 def get_messages_for_conversation(conversation_id):
387 messages = (
388 session.execute(
389 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
390 )
391 .scalars()
392 .all()
393 )
394 return [message_to_pb(msg) for msg in messages]
396 def get_host_request_pb(host_request):
397 return admin_pb2.AdminHostRequest(
398 host_request_id=host_request.conversation_id,
399 surfer=get_user_details(host_request.surfer_user_id),
400 host=get_user_details(host_request.host_user_id),
401 status=host_request.status.name if host_request.status else "",
402 from_date=date_to_api(host_request.from_date),
403 to_date=date_to_api(host_request.to_date),
404 created=Timestamp_from_datetime(host_request.conversation.created),
405 messages=get_messages_for_conversation(host_request.conversation_id),
406 )
408 def get_group_chat_pb(group_chat):
409 subs = (
410 session.execute(
411 select(GroupChatSubscription)
412 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
413 .order_by(GroupChatSubscription.joined.asc())
414 )
415 .scalars()
416 .all()
417 )
418 members = [
419 admin_pb2.GroupChatMember(
420 user=get_user_details(sub.user_id),
421 joined=Timestamp_from_datetime(sub.joined),
422 left=Timestamp_from_datetime(sub.left) if sub.left else None,
423 role=sub.role.name if sub.role else "",
424 )
425 for sub in subs
426 ]
427 return admin_pb2.AdminGroupChat(
428 group_chat_id=group_chat.conversation_id,
429 title=group_chat.title or "",
430 is_dm=group_chat.is_dm,
431 creator=get_user_details(group_chat.creator_id),
432 members=members,
433 messages=get_messages_for_conversation(group_chat.conversation_id),
434 )
436 # Get all host requests for the user
437 host_requests = (
438 session.execute(
439 select(HostRequest)
440 .where(or_(HostRequest.host_user_id == user.id, HostRequest.surfer_user_id == user.id))
441 .order_by(HostRequest.conversation_id.desc())
442 )
443 .scalars()
444 .all()
445 )
447 # Get all group chats for the user
448 group_chat_ids = (
449 session.execute(
450 select(GroupChatSubscription.group_chat_id)
451 .where(GroupChatSubscription.user_id == user.id)
452 .order_by(GroupChatSubscription.joined.desc())
453 )
454 .scalars()
455 .all()
456 )
457 group_chats = (
458 session.execute(select(GroupChat).where(GroupChat.conversation_id.in_(group_chat_ids))).scalars().all()
459 )
461 return admin_pb2.GetChatsRes(
462 user=get_user_details(user.id),
463 host_requests=[get_host_request_pb(hr) for hr in host_requests],
464 group_chats=[get_group_chat_pb(gc) for gc in group_chats],
465 )
467 def DeleteEvent(self, request, context, session):
468 res = session.execute(
469 select(Event, EventOccurrence)
470 .where(EventOccurrence.id == request.event_id)
471 .where(EventOccurrence.event_id == Event.id)
472 .where(~EventOccurrence.is_deleted)
473 ).one_or_none()
475 if not res:
476 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
478 event, occurrence = res
480 occurrence.is_deleted = True
482 queue_job(
483 session,
484 "generate_event_delete_notifications",
485 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
486 occurrence_id=occurrence.id,
487 ),
488 )
490 return empty_pb2.Empty()
492 def ListUserIds(self, request, context, session):
493 start_date = to_aware_datetime(request.start_time)
494 end_date = to_aware_datetime(request.end_time)
496 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
497 next_user_id = int(request.page_token) if request.page_token else 0
499 user_ids = (
500 session.execute(
501 select(User.id)
502 .where(or_(User.id <= next_user_id, next_user_id == 0))
503 .where(User.joined >= start_date)
504 .where(User.joined <= end_date)
505 .order_by(User.id.desc())
506 .limit(page_size + 1)
507 )
508 .scalars()
509 .all()
510 )
512 return admin_pb2.ListUserIdsRes(
513 user_ids=user_ids[:page_size],
514 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
515 )
517 def EditReferenceText(self, request, context, session):
518 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
520 if reference is None:
521 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found")
523 if not request.new_text.strip():
524 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "reference_no_text")
526 reference.text = request.new_text.strip()
527 return empty_pb2.Empty()
529 def DeleteReference(self, request, context, session):
530 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
532 if reference is None:
533 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found")
535 reference.is_deleted = True
536 return empty_pb2.Empty()
538 def EditDiscussion(self, request, context, session):
539 discussion = session.execute(
540 select(Discussion).where(Discussion.id == request.discussion_id)
541 ).scalar_one_or_none()
542 if not discussion:
543 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found")
544 if request.new_title:
545 discussion.title = request.new_title.strip()
546 if request.new_content:
547 discussion.content = request.new_content.strip()
548 return empty_pb2.Empty()
550 def EditReply(self, request, context, session):
551 database_id, depth = unpack_thread_id(request.reply_id)
552 if depth == 1:
553 obj = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one_or_none()
554 elif depth == 2:
555 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
556 if not obj:
557 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "object_not_found")
558 obj.content = request.new_content.strip()
559 return empty_pb2.Empty()
561 def AddUsersToModerationUserList(self, request, context, session):
562 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created.
563 Id of the moderation list is returned."""
564 req_users = request.users
565 users = []
567 for req_user in req_users:
568 user = session.execute(select(User).where_username_or_email_or_id(req_user)).scalar_one_or_none()
569 if not user:
570 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
571 users.append(user)
573 # Create a new moderation user list if no one is provided
574 if not request.moderation_list_id:
575 moderation_user_list = ModerationUserList()
576 session.add(moderation_user_list)
577 session.flush()
578 else:
579 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
580 if not moderation_user_list:
581 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found")
583 # Add users to the moderation list only if not already in it
584 for user in users:
585 if user not in moderation_user_list.users:
586 moderation_user_list.users.append(user)
588 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id)
590 def ListModerationUserLists(self, request, context, session):
591 """Lists all moderation user lists for a user."""
592 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
593 if not user:
594 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
596 moderation_lists = [
597 admin_pb2.ModerationList(moderation_list_id=ml.id, member_ids=[u.id for u in ml.users])
598 for ml in user.moderation_user_lists
599 ]
600 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists)
602 def RemoveUserFromModerationUserList(self, request, context, session):
603 """Removes a user from a provided moderation user list."""
604 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
605 if not user:
606 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
607 if not request.moderation_list_id:
608 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_moderation_user_list_id")
610 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
611 if not moderation_user_list:
612 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found")
613 if user not in moderation_user_list.users:
614 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_the_moderation_user_list")
616 moderation_user_list.users.remove(user)
618 if len(moderation_user_list.users) == 0:
619 session.delete(moderation_user_list)
621 return empty_pb2.Empty()
623 def CreateAccountDeletionLink(self, request, context, session):
624 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
625 if not user:
626 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
627 expiry_days = request.expiry_days or 7
628 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2))
629 session.add(token)
630 return admin_pb2.CreateAccountDeletionLinkRes(
631 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token)
632 )
634 def AccessStats(self, request, context, session):
635 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
636 if not user:
637 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
639 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90)
640 end_time = to_aware_datetime(request.end_time) if request.end_time else now()
642 user_activity = session.execute(
643 select(
644 UserActivity.ip_address,
645 UserActivity.user_agent,
646 func.sum(UserActivity.api_calls),
647 func.count(UserActivity.period),
648 func.min(UserActivity.period),
649 func.max(UserActivity.period),
650 )
651 .where(UserActivity.user_id == user.id)
652 .where(UserActivity.period >= start_time)
653 .where(UserActivity.period >= end_time)
654 .order_by(func.max(UserActivity.period).desc())
655 .group_by(UserActivity.ip_address, UserActivity.user_agent)
656 ).all()
658 out = admin_pb2.AccessStatsRes()
660 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity:
661 user_agent_data = user_agents_parse(user_agent or "")
662 asn = geoip_asn(ip_address)
663 out.stats.append(
664 admin_pb2.AccessStat(
665 ip_address=ip_address,
666 asn=str(asn[0]) if asn else None,
667 asorg=str(asn[1]) if asn else None,
668 asnetwork=str(asn[2]) if asn else None,
669 user_agent=user_agent,
670 operating_system=user_agent_data.os.family,
671 browser=user_agent_data.browser.family,
672 device=user_agent_data.device.family,
673 approximate_location=geoip_approximate_location(ip_address) or "Unknown",
674 api_call_count=api_call_count,
675 periods_count=periods_count,
676 first_seen=Timestamp_from_datetime(first_seen),
677 last_seen=Timestamp_from_datetime(last_seen),
678 )
679 )
681 return out
683 def SetLastDonated(self, request, context, session):
684 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none()
685 if not user:
686 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
688 if request.HasField("last_donated"):
689 user.last_donated = to_aware_datetime(request.last_donated)
690 else:
691 user.last_donated = None
693 return _user_to_details(session, user)