Coverage for app / backend / src / couchers / servicers / admin.py: 75%
440 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import select
7from sqlalchemy.orm import Session
8from sqlalchemy.sql import and_, func, or_
9from user_agents import parse as user_agents_parse
11from couchers import urls
12from couchers.context import CouchersContext
13from couchers.crypto import urlsafe_secure_token
14from couchers.helpers.badges import user_add_badge, user_remove_badge
15from couchers.helpers.geoip import geoip_approximate_location, geoip_asn
16from couchers.helpers.strong_verification import get_strong_verification_fields
17from couchers.jobs.enqueue import queue_job
18from couchers.models import (
19 AccountDeletionToken,
20 AdminAction,
21 AdminActionLevel,
22 AdminTag,
23 Comment,
24 ContentReport,
25 Discussion,
26 Event,
27 EventOccurrence,
28 GroupChat,
29 GroupChatSubscription,
30 HostRequest,
31 LanguageAbility,
32 Message,
33 ModerationUserList,
34 ModNote,
35 Reference,
36 Reply,
37 User,
38 UserActivity,
39 UserAdminTag,
40 UserBadge,
41)
42from couchers.models.notifications import NotificationTopicAction
43from couchers.models.uploads import has_avatar_photo_expression
44from couchers.notifications.notify import notify
45from couchers.proto import admin_pb2, admin_pb2_grpc, api_pb2, notification_data_pb2
46from couchers.proto.internal import jobs_pb2
47from couchers.resources import get_badge_dict
48from couchers.servicers.api import user_model_to_pb
49from couchers.servicers.auth import create_session
50from couchers.servicers.events import generate_event_delete_notifications
51from couchers.servicers.threads import unpack_thread_id
52from couchers.sql import to_bool, username_or_email_or_id
53from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime
55logger = logging.getLogger(__name__)
57MAX_PAGINATION_LENGTH = 250
60adminactionlevel2api = {
61 AdminActionLevel.debug: admin_pb2.ADMIN_ACTION_LEVEL_DEBUG,
62 AdminActionLevel.normal: admin_pb2.ADMIN_ACTION_LEVEL_NORMAL,
63 AdminActionLevel.high: admin_pb2.ADMIN_ACTION_LEVEL_HIGH,
64}
66api2adminactionlevel = {
67 admin_pb2.ADMIN_ACTION_LEVEL_DEBUG: AdminActionLevel.debug,
68 admin_pb2.ADMIN_ACTION_LEVEL_NORMAL: AdminActionLevel.normal,
69 admin_pb2.ADMIN_ACTION_LEVEL_HIGH: AdminActionLevel.high,
70}
73def log_admin_action(
74 session: Session,
75 context: CouchersContext,
76 target_user: User,
77 action_type: str,
78 note: str | None = None,
79 tag: str | None = None,
80 level: AdminActionLevel = AdminActionLevel.normal,
81) -> AdminAction:
82 action = AdminAction(
83 admin_user_id=context.user_id,
84 target_user_id=target_user.id,
85 action_type=action_type,
86 level=level,
87 note=note,
88 tag=tag,
89 )
90 session.add(action)
91 session.flush()
92 return action
95def _user_to_details(session: Session, user: User) -> admin_pb2.UserDetails:
96 # Query admin actions for this user
97 actions = session.execute(
98 select(AdminAction, User.username)
99 .join(User, AdminAction.admin_user_id == User.id)
100 .where(AdminAction.target_user_id == user.id)
101 .order_by(AdminAction.created.asc())
102 ).all()
104 action_pbs = []
105 for action, admin_username in actions:
106 action_pbs.append(
107 admin_pb2.AdminActionLog(
108 admin_action_id=action.id,
109 created=Timestamp_from_datetime(action.created),
110 admin_user_id=action.admin_user_id,
111 admin_username=admin_username,
112 action_type=action.action_type,
113 level=adminactionlevel2api[action.level],
114 note=action.note or "",
115 tag=action.tag or "",
116 )
117 )
119 # Query admin tags
120 admin_tags = (
121 session.execute(
122 select(AdminTag.tag)
123 .join(UserAdminTag, UserAdminTag.admin_tag_id == AdminTag.id)
124 .where(UserAdminTag.user_id == user.id)
125 .order_by(AdminTag.tag)
126 )
127 .scalars()
128 .all()
129 )
131 return admin_pb2.UserDetails(
132 user_id=user.id,
133 username=user.username,
134 name=user.name,
135 email=user.email,
136 gender=user.gender,
137 birthdate=date_to_api(user.birthdate),
138 banned=user.banned_at is not None,
139 deleted=user.deleted_at is not None,
140 do_not_email=user.do_not_email,
141 badges=[badge.badge_id for badge in user.badges],
142 **get_strong_verification_fields(session, user),
143 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
144 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
145 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
146 admin_actions=action_pbs,
147 admin_tags=list(admin_tags),
148 )
151def _content_report_to_pb(content_report: ContentReport) -> admin_pb2.ContentReport:
152 return admin_pb2.ContentReport(
153 content_report_id=content_report.id,
154 time=Timestamp_from_datetime(content_report.time),
155 reporting_user_id=content_report.reporting_user_id,
156 author_user_id=content_report.author_user_id,
157 reason=content_report.reason,
158 description=content_report.description,
159 content_ref=content_report.content_ref,
160 user_agent=content_report.user_agent,
161 page=content_report.page,
162 )
165def _reference_to_pb(reference: Reference) -> admin_pb2.AdminReference:
166 return admin_pb2.AdminReference(
167 reference_id=reference.id,
168 from_user_id=reference.from_user_id,
169 to_user_id=reference.to_user_id,
170 reference_type=reference.reference_type.name,
171 text=reference.text,
172 private_text=reference.private_text or "",
173 time=Timestamp_from_datetime(reference.time),
174 host_request_id=reference.host_request_id or 0,
175 rating=reference.rating,
176 was_appropriate=reference.was_appropriate,
177 is_deleted=reference.is_deleted,
178 )
181class Admin(admin_pb2_grpc.AdminServicer):
182 def GetUserDetails(
183 self, request: admin_pb2.GetUserDetailsReq, context: CouchersContext, session: Session
184 ) -> admin_pb2.UserDetails:
185 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
186 if not user: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
188 return _user_to_details(session, user)
190 def GetUser(self, request: admin_pb2.GetUserReq, context: CouchersContext, session: Session) -> api_pb2.User:
191 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
192 if not user: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
194 return user_model_to_pb(user, session, context, is_admin_see_ghosts=True)
196 def SearchUsers(
197 self, request: admin_pb2.SearchUsersReq, context: CouchersContext, session: Session
198 ) -> admin_pb2.SearchUsersRes:
199 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
200 next_user_id = int(request.page_token) if request.page_token else 0
201 statement = select(User)
202 if request.username: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 statement = statement.where(User.username.ilike(request.username))
204 if request.email: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 statement = statement.where(User.email.ilike(request.email))
206 if request.name: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 statement = statement.where(User.name.ilike(request.name))
208 if request.admin_action_log:
209 statement = statement.where(
210 User.id.in_(select(AdminAction.target_user_id).where(AdminAction.note.ilike(request.admin_action_log)))
211 )
212 if request.city: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 statement = statement.where(User.city.ilike(request.city))
214 if request.min_user_id: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 statement = statement.where(User.id >= request.min_user_id)
216 if request.max_user_id: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 statement = statement.where(User.id <= request.max_user_id)
218 if request.min_birthdate: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate))
220 if request.max_birthdate: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate))
222 if request.genders: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 statement = statement.where(User.gender.in_(request.genders))
224 if request.min_joined_date: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 statement = statement.where(User.joined >= parse_date(request.min_joined_date))
226 if request.max_joined_date: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 statement = statement.where(User.joined <= parse_date(request.max_joined_date))
228 if request.min_last_active_date: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date))
230 if request.max_last_active_date: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date))
232 if request.genders: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 statement = statement.where(User.gender.in_(request.genders))
234 if request.language_codes: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 statement = statement.join(
236 LanguageAbility,
237 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)),
238 )
239 if request.HasField("is_deleted"): 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 statement = statement.where((User.deleted_at != None) == request.is_deleted.value)
241 if request.HasField("is_banned"): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 statement = statement.where((User.banned_at != None) == request.is_banned.value)
243 if request.HasField("has_avatar"): 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 statement = statement.where(has_avatar_photo_expression(User) == request.has_avatar.value)
245 if request.admin_tags:
246 for tag_name in request.admin_tags:
247 statement = statement.where(
248 User.id.in_(
249 select(UserAdminTag.user_id)
250 .join(AdminTag, UserAdminTag.admin_tag_id == AdminTag.id)
251 .where(AdminTag.tag == tag_name)
252 )
253 )
254 users = (
255 session.execute(statement.where(User.id >= next_user_id).order_by(User.id).limit(page_size + 1))
256 .scalars()
257 .all()
258 )
259 logger.info(users)
260 return admin_pb2.SearchUsersRes(
261 users=[_user_to_details(session, user) for user in users[:page_size]],
262 next_page_token=str(users[-1].id) if len(users) > page_size else None,
263 )
265 def ChangeUserGender(
266 self, request: admin_pb2.ChangeUserGenderReq, context: CouchersContext, session: Session
267 ) -> admin_pb2.UserDetails:
268 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
269 if not user: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true
270 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
271 user.gender = request.gender
272 log_admin_action(session, context, user, "change_gender", note=f"Changed to {request.gender}")
273 session.commit()
275 notify(
276 session,
277 user_id=user.id,
278 topic_action=NotificationTopicAction.gender__change,
279 key="",
280 data=notification_data_pb2.GenderChange(
281 gender=request.gender,
282 ),
283 )
285 return _user_to_details(session, user)
287 def ChangeUserBirthdate(
288 self, request: admin_pb2.ChangeUserBirthdateReq, context: CouchersContext, session: Session
289 ) -> admin_pb2.UserDetails:
290 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
291 if not user: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
293 if not (birthdate := parse_date(request.birthdate)): 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_birthdate")
296 user.birthdate = birthdate
297 log_admin_action(session, context, user, "change_birthdate", note=f"Changed to {request.birthdate}")
298 session.commit()
300 notify(
301 session,
302 user_id=user.id,
303 topic_action=NotificationTopicAction.birthdate__change,
304 key="",
305 data=notification_data_pb2.BirthdateChange(
306 birthdate=request.birthdate,
307 ),
308 )
310 return _user_to_details(session, user)
312 def AddBadge(
313 self, request: admin_pb2.AddBadgeReq, context: CouchersContext, session: Session
314 ) -> admin_pb2.UserDetails:
315 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
316 if not user: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
319 badge = get_badge_dict().get(request.badge_id)
320 if not badge:
321 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found")
323 if not badge.admin_editable:
324 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge")
326 if badge.id in [b.badge_id for b in user.badges]:
327 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_badge")
329 user_add_badge(session, user.id, request.badge_id)
330 log_admin_action(session, context, user, "add_badge", note=f"Added badge {request.badge_id}")
332 return _user_to_details(session, user)
334 def RemoveBadge(
335 self, request: admin_pb2.RemoveBadgeReq, context: CouchersContext, session: Session
336 ) -> admin_pb2.UserDetails:
337 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
338 if not user: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
341 badge = get_badge_dict().get(request.badge_id)
342 if not badge: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true
343 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found")
345 if not badge.admin_editable: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge")
348 user_badge = session.execute(
349 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge.id)
350 ).scalar_one_or_none()
351 if not user_badge:
352 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_badge")
354 user_remove_badge(session, user.id, request.badge_id)
355 log_admin_action(session, context, user, "remove_badge", note=f"Removed badge {request.badge_id}")
357 return _user_to_details(session, user)
359 def SetPassportSexGenderException(
360 self, request: admin_pb2.SetPassportSexGenderExceptionReq, context: CouchersContext, session: Session
361 ) -> admin_pb2.UserDetails:
362 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
363 if not user: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true
364 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
365 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
366 log_admin_action(session, context, user, "set_passport_sex_gender_exception")
367 return _user_to_details(session, user)
369 def BanUser(
370 self, request: admin_pb2.BanUserReq, context: CouchersContext, session: Session
371 ) -> admin_pb2.UserDetails:
372 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
373 if not user: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
375 if not request.admin_note.strip(): 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true
376 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty")
377 log_admin_action(session, context, user, "ban", note=request.admin_note, level=AdminActionLevel.high)
378 user.banned_at = now()
379 return _user_to_details(session, user)
381 def UnbanUser(
382 self, request: admin_pb2.UnbanUserReq, context: CouchersContext, session: Session
383 ) -> admin_pb2.UserDetails:
384 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
385 if not user: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true
386 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
387 if not request.admin_note.strip(): 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty")
389 log_admin_action(session, context, user, "unban", note=request.admin_note, level=AdminActionLevel.high)
390 user.banned_at = None
391 return _user_to_details(session, user)
393 def AddAdminNote(
394 self, request: admin_pb2.AddAdminNoteReq, context: CouchersContext, session: Session
395 ) -> admin_pb2.UserDetails:
396 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
397 if not user: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
399 if not request.admin_note.strip():
400 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty")
401 level = api2adminactionlevel.get(request.level, AdminActionLevel.normal)
402 log_admin_action(session, context, user, "note", note=request.admin_note, level=level)
403 return _user_to_details(session, user)
405 def GetContentReport(
406 self, request: admin_pb2.GetContentReportReq, context: CouchersContext, session: Session
407 ) -> admin_pb2.GetContentReportRes:
408 content_report = session.execute(
409 select(ContentReport).where(ContentReport.id == request.content_report_id)
410 ).scalar_one_or_none()
411 if not content_report:
412 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "content_report_not_found")
413 return admin_pb2.GetContentReportRes(
414 content_report=_content_report_to_pb(content_report),
415 )
417 def GetContentReportsForAuthor(
418 self, request: admin_pb2.GetContentReportsForAuthorReq, context: CouchersContext, session: Session
419 ) -> admin_pb2.GetContentReportsForAuthorRes:
420 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
421 if not user: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true
422 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
423 content_reports = (
424 session.execute(
425 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
426 )
427 .scalars()
428 .all()
429 )
430 return admin_pb2.GetContentReportsForAuthorRes(
431 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
432 )
434 def SendModNote(
435 self, request: admin_pb2.SendModNoteReq, context: CouchersContext, session: Session
436 ) -> admin_pb2.UserDetails:
437 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
438 if not user: 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true
439 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
440 session.add(
441 ModNote(
442 user_id=user.id,
443 internal_id=request.internal_id,
444 creator_user_id=context.user_id,
445 note_content=request.content,
446 )
447 )
448 session.flush()
449 log_admin_action(session, context, user, "send_mod_note", note=request.content)
451 if not request.do_not_notify:
452 notify(
453 session,
454 user_id=user.id,
455 topic_action=NotificationTopicAction.modnote__create,
456 key="",
457 )
459 return _user_to_details(session, user)
461 def MarkUserNeedsLocationUpdate(
462 self, request: admin_pb2.MarkUserNeedsLocationUpdateReq, context: CouchersContext, session: Session
463 ) -> admin_pb2.UserDetails:
464 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
465 if not user: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true
466 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
467 user.needs_to_update_location = True
468 log_admin_action(session, context, user, "mark_needs_location_update")
469 return _user_to_details(session, user)
471 def DeleteUser(
472 self, request: admin_pb2.DeleteUserReq, context: CouchersContext, session: Session
473 ) -> admin_pb2.UserDetails:
474 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
475 if not user: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
477 user.deleted_at = now()
478 log_admin_action(session, context, user, "delete_user", level=AdminActionLevel.high)
479 return _user_to_details(session, user)
481 def RecoverDeletedUser(
482 self, request: admin_pb2.RecoverDeletedUserReq, context: CouchersContext, session: Session
483 ) -> admin_pb2.UserDetails:
484 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
485 if not user: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true
486 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
487 user.deleted_at = None
488 user.undelete_token = None
489 user.undelete_until = None
490 log_admin_action(session, context, user, "recover_user", level=AdminActionLevel.high)
491 return _user_to_details(session, user)
493 def CreateApiKey(
494 self, request: admin_pb2.CreateApiKeyReq, context: CouchersContext, session: Session
495 ) -> admin_pb2.CreateApiKeyRes:
496 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
497 if not user: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true
498 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
499 token, expiry = create_session(
500 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
501 )
502 log_admin_action(session, context, user, "create_api_key")
504 notify(
505 session,
506 user_id=user.id,
507 topic_action=NotificationTopicAction.api_key__create,
508 key="",
509 data=notification_data_pb2.ApiKeyCreate(
510 api_key=token,
511 expiry=Timestamp_from_datetime(expiry),
512 ),
513 )
515 return admin_pb2.CreateApiKeyRes()
517 def GetChats(
518 self, request: admin_pb2.GetChatsReq, context: CouchersContext, session: Session
519 ) -> admin_pb2.GetChatsRes:
520 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
521 if not user: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true
522 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
524 # Cache for ChatUserInfo to avoid recomputing for the same user
525 user_info_cache = {}
527 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo:
528 if user_id not in user_info_cache: 528 ↛ 537line 528 didn't jump to line 537 because the condition on line 528 was always true
529 u = session.execute(select(User).where(User.id == user_id)).scalar_one()
530 user_info_cache[user_id] = admin_pb2.ChatUserInfo(
531 user_id=u.id,
532 username=u.username,
533 name=u.name,
534 birthdate=date_to_api(u.birthdate),
535 gender=u.gender,
536 )
537 return user_info_cache[user_id]
539 def message_to_pb(message: Message) -> admin_pb2.ChatMessage:
540 return admin_pb2.ChatMessage(
541 message_id=message.id,
542 author=get_chat_user_info(message.author_id),
543 time=Timestamp_from_datetime(message.time),
544 message_type=message.message_type.name if message.message_type else "",
545 text=message.text or "",
546 host_request_status_target=(
547 message.host_request_status_target.name if message.host_request_status_target else ""
548 ),
549 target=get_chat_user_info(message.target_id) if message.target_id else None,
550 )
552 def get_messages_for_conversation(conversation_id: int) -> list[admin_pb2.ChatMessage]:
553 messages = (
554 session.execute(
555 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
556 )
557 .scalars()
558 .all()
559 )
560 return [message_to_pb(msg) for msg in messages]
562 def get_host_request_pb(host_request: HostRequest) -> admin_pb2.AdminHostRequest:
563 return admin_pb2.AdminHostRequest(
564 host_request_id=host_request.conversation_id,
565 surfer=get_chat_user_info(host_request.surfer_user_id),
566 host=get_chat_user_info(host_request.host_user_id),
567 status=host_request.status.name if host_request.status else "",
568 from_date=date_to_api(host_request.from_date),
569 to_date=date_to_api(host_request.to_date),
570 created=Timestamp_from_datetime(host_request.conversation.created),
571 messages=get_messages_for_conversation(host_request.conversation_id),
572 )
574 def get_group_chat_pb(group_chat: GroupChat) -> admin_pb2.AdminGroupChat:
575 subs = (
576 session.execute(
577 select(GroupChatSubscription)
578 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
579 .order_by(GroupChatSubscription.joined.asc())
580 )
581 .scalars()
582 .all()
583 )
584 members = [
585 admin_pb2.GroupChatMember(
586 user=get_chat_user_info(sub.user_id),
587 joined=Timestamp_from_datetime(sub.joined),
588 left=Timestamp_from_datetime(sub.left) if sub.left else None,
589 role=sub.role.name if sub.role else "",
590 )
591 for sub in subs
592 ]
593 return admin_pb2.AdminGroupChat(
594 group_chat_id=group_chat.conversation_id,
595 title=group_chat.title or "",
596 is_dm=group_chat.is_dm,
597 creator=get_chat_user_info(group_chat.creator_id),
598 members=members,
599 messages=get_messages_for_conversation(group_chat.conversation_id),
600 )
602 # Get all host requests for the user
603 host_requests = (
604 session.execute(
605 select(HostRequest)
606 .where(or_(HostRequest.host_user_id == user.id, HostRequest.surfer_user_id == user.id))
607 .order_by(HostRequest.conversation_id.desc())
608 )
609 .scalars()
610 .all()
611 )
613 # Get all group chats for the user
614 group_chat_ids = (
615 session.execute(
616 select(GroupChatSubscription.group_chat_id)
617 .where(GroupChatSubscription.user_id == user.id)
618 .order_by(GroupChatSubscription.joined.desc())
619 )
620 .scalars()
621 .all()
622 )
623 group_chats = (
624 session.execute(select(GroupChat).where(GroupChat.conversation_id.in_(group_chat_ids))).scalars().all()
625 )
627 return admin_pb2.GetChatsRes(
628 user=get_chat_user_info(user.id),
629 host_requests=[get_host_request_pb(hr) for hr in host_requests],
630 group_chats=[get_group_chat_pb(gc) for gc in group_chats],
631 )
633 def DeleteEvent(
634 self, request: admin_pb2.DeleteEventReq, context: CouchersContext, session: Session
635 ) -> empty_pb2.Empty:
636 res = session.execute(
637 select(Event, EventOccurrence)
638 .where(EventOccurrence.id == request.event_id)
639 .where(EventOccurrence.event_id == Event.id)
640 .where(~EventOccurrence.is_deleted)
641 ).one_or_none()
643 if not res: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true
644 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
646 event, occurrence = res
648 occurrence.is_deleted = True
650 queue_job(
651 session,
652 job=generate_event_delete_notifications,
653 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
654 occurrence_id=occurrence.id,
655 ),
656 )
658 return empty_pb2.Empty()
660 def ListUserIds(
661 self, request: admin_pb2.ListUserIdsReq, context: CouchersContext, session: Session
662 ) -> admin_pb2.ListUserIdsRes:
663 start_date = to_aware_datetime(request.start_time)
664 end_date = to_aware_datetime(request.end_time)
666 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
667 next_user_id = int(request.page_token) if request.page_token else 0
669 user_ids = (
670 session.execute(
671 select(User.id)
672 .where(or_(User.id <= next_user_id, to_bool(next_user_id == 0)))
673 .where(User.joined >= start_date)
674 .where(User.joined <= end_date)
675 .order_by(User.id.desc())
676 .limit(page_size + 1)
677 )
678 .scalars()
679 .all()
680 )
682 return admin_pb2.ListUserIdsRes(
683 user_ids=user_ids[:page_size],
684 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
685 )
687 def EditReferenceText(
688 self, request: admin_pb2.EditReferenceTextReq, context: CouchersContext, session: Session
689 ) -> empty_pb2.Empty:
690 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
692 if reference is None: 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true
693 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found")
695 if not request.new_text.strip(): 695 ↛ 696line 695 didn't jump to line 696 because the condition on line 695 was never true
696 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "reference_no_text")
698 reference.text = request.new_text.strip()
699 # Log action against the reference author
700 author = session.execute(select(User).where(User.id == reference.from_user_id)).scalar_one()
701 log_admin_action(session, context, author, "edit_reference", note=f"Edited reference {reference.id}")
702 return empty_pb2.Empty()
704 def DeleteReference(
705 self, request: admin_pb2.DeleteReferenceReq, context: CouchersContext, session: Session
706 ) -> empty_pb2.Empty:
707 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
709 if reference is None: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true
710 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found")
712 reference.is_deleted = True
713 # Log action against the reference author
714 author = session.execute(select(User).where(User.id == reference.from_user_id)).scalar_one()
715 log_admin_action(session, context, author, "delete_reference", note=f"Deleted reference {reference.id}")
716 return empty_pb2.Empty()
718 def GetUserReferences(
719 self, request: admin_pb2.GetUserReferencesReq, context: CouchersContext, session: Session
720 ) -> admin_pb2.GetUserReferencesRes:
721 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
722 if not user:
723 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
725 references_from = (
726 session.execute(select(Reference).where(Reference.from_user_id == user.id).order_by(Reference.id.desc()))
727 .scalars()
728 .all()
729 )
731 references_to = (
732 session.execute(select(Reference).where(Reference.to_user_id == user.id).order_by(Reference.id.desc()))
733 .scalars()
734 .all()
735 )
737 return admin_pb2.GetUserReferencesRes(
738 references_from=[_reference_to_pb(ref) for ref in references_from],
739 references_to=[_reference_to_pb(ref) for ref in references_to],
740 )
742 def EditDiscussion(
743 self, request: admin_pb2.EditDiscussionReq, context: CouchersContext, session: Session
744 ) -> empty_pb2.Empty:
745 discussion = session.execute(
746 select(Discussion).where(Discussion.id == request.discussion_id)
747 ).scalar_one_or_none()
748 if not discussion:
749 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found")
750 if request.new_title:
751 discussion.title = request.new_title.strip()
752 if request.new_content:
753 discussion.content = request.new_content.strip()
754 return empty_pb2.Empty()
756 def EditReply(self, request: admin_pb2.EditReplyReq, context: CouchersContext, session: Session) -> empty_pb2.Empty:
757 database_id, depth = unpack_thread_id(request.reply_id)
758 if depth == 1:
759 obj: Comment | Reply | None = session.execute(
760 select(Comment).where(Comment.id == database_id)
761 ).scalar_one_or_none()
762 elif depth == 2:
763 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
764 else:
765 obj = None
767 if not obj:
768 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "object_not_found")
769 obj.content = request.new_content.strip()
770 return empty_pb2.Empty()
772 def AddUsersToModerationUserList(
773 self, request: admin_pb2.AddUsersToModerationUserListReq, context: CouchersContext, session: Session
774 ) -> admin_pb2.AddUsersToModerationUserListRes:
775 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created.
776 Id of the moderation list is returned."""
777 req_users = request.users
778 users = []
780 for req_user in req_users:
781 user = session.execute(select(User).where(username_or_email_or_id(req_user))).scalar_one_or_none()
782 if not user:
783 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
784 users.append(user)
786 if request.moderation_list_id:
787 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
788 if not moderation_user_list:
789 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found")
790 # Create a new moderation user list if no one is provided
791 else:
792 moderation_user_list = ModerationUserList()
793 session.add(moderation_user_list)
794 session.flush()
796 # Add users to the moderation list only if not already in it
797 for user in users:
798 if user not in moderation_user_list.users: 798 ↛ 800line 798 didn't jump to line 800 because the condition on line 798 was always true
799 moderation_user_list.users.append(user)
800 log_admin_action(session, context, user, "add_to_moderation_list")
802 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id)
804 def ListModerationUserLists(
805 self, request: admin_pb2.ListModerationUserListsReq, context: CouchersContext, session: Session
806 ) -> admin_pb2.ListModerationUserListsRes:
807 """Lists all moderation user lists for a user."""
808 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
809 if not user: 809 ↛ 810line 809 didn't jump to line 810 because the condition on line 809 was never true
810 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
812 moderation_lists = [
813 admin_pb2.ModerationList(moderation_list_id=ml.id, member_ids=[u.id for u in ml.users])
814 for ml in user.moderation_user_lists
815 ]
816 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists)
818 def RemoveUserFromModerationUserList(
819 self, request: admin_pb2.RemoveUserFromModerationUserListReq, context: CouchersContext, session: Session
820 ) -> empty_pb2.Empty:
821 """Removes a user from a provided moderation user list."""
822 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
823 if not user:
824 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
825 if not request.moderation_list_id:
826 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_moderation_user_list_id")
828 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
829 if not moderation_user_list: 829 ↛ 830line 829 didn't jump to line 830 because the condition on line 829 was never true
830 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found")
831 if user not in moderation_user_list.users:
832 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_the_moderation_user_list")
834 moderation_user_list.users.remove(user)
835 log_admin_action(session, context, user, "remove_from_moderation_list")
837 if len(moderation_user_list.users) == 0:
838 session.delete(moderation_user_list)
840 return empty_pb2.Empty()
842 def CreateAccountDeletionLink(
843 self, request: admin_pb2.CreateAccountDeletionLinkReq, context: CouchersContext, session: Session
844 ) -> admin_pb2.CreateAccountDeletionLinkRes:
845 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
846 if not user: 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true
847 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
848 token = AccountDeletionToken(token=urlsafe_secure_token(), user_id=user.id, expiry=now() + timedelta(hours=2))
849 session.add(token)
850 log_admin_action(session, context, user, "create_account_deletion_link", level=AdminActionLevel.high)
851 return admin_pb2.CreateAccountDeletionLinkRes(
852 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token)
853 )
855 def AccessStats(
856 self, request: admin_pb2.AccessStatsReq, context: CouchersContext, session: Session
857 ) -> admin_pb2.AccessStatsRes:
858 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
859 if not user:
860 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
862 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90)
863 end_time = to_aware_datetime(request.end_time) if request.end_time else now()
865 user_activity = session.execute(
866 select(
867 UserActivity.ip_address,
868 UserActivity.user_agent,
869 func.sum(UserActivity.api_calls),
870 func.count(UserActivity.period),
871 func.min(UserActivity.period),
872 func.max(UserActivity.period),
873 )
874 .where(UserActivity.user_id == user.id)
875 .where(UserActivity.period >= start_time)
876 .where(UserActivity.period >= end_time)
877 .order_by(func.max(UserActivity.period).desc())
878 .group_by(UserActivity.ip_address, UserActivity.user_agent)
879 ).all()
881 out = admin_pb2.AccessStatsRes()
883 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity:
884 user_agent_data = user_agents_parse(user_agent or "")
885 asn = geoip_asn(ip_address)
886 out.stats.append(
887 admin_pb2.AccessStat(
888 ip_address=ip_address,
889 asn=str(asn[0]) if asn else None,
890 asorg=str(asn[1]) if asn else None,
891 asnetwork=str(asn[2]) if asn else None,
892 user_agent=user_agent,
893 operating_system=user_agent_data.os.family,
894 browser=user_agent_data.browser.family,
895 device=user_agent_data.device.family,
896 approximate_location=geoip_approximate_location(ip_address) or "Unknown",
897 api_call_count=api_call_count,
898 periods_count=periods_count,
899 first_seen=Timestamp_from_datetime(first_seen),
900 last_seen=Timestamp_from_datetime(last_seen),
901 )
902 )
904 return out
906 def SetLastDonated(
907 self, request: admin_pb2.SetLastDonatedReq, context: CouchersContext, session: Session
908 ) -> admin_pb2.UserDetails:
909 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
910 if not user:
911 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
913 if request.HasField("last_donated"):
914 user.last_donated = to_aware_datetime(request.last_donated)
915 else:
916 user.last_donated = None
918 log_admin_action(session, context, user, "set_last_donated")
919 return _user_to_details(session, user)
921 def CreateAdminTag(
922 self, request: admin_pb2.CreateAdminTagReq, context: CouchersContext, session: Session
923 ) -> admin_pb2.AdminTagInfo:
924 if not request.tag.strip():
925 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_tag_cant_be_empty")
926 existing = session.execute(select(AdminTag).where(AdminTag.tag == request.tag.strip())).scalar_one_or_none()
927 if existing:
928 context.abort_with_error_code(grpc.StatusCode.ALREADY_EXISTS, "admin_tag_already_exists")
929 admin_tag = AdminTag(tag=request.tag.strip())
930 session.add(admin_tag)
931 session.flush()
932 return admin_pb2.AdminTagInfo(admin_tag_id=admin_tag.id, tag=admin_tag.tag)
934 def ListAdminTags(
935 self, request: admin_pb2.ListAdminTagsReq, context: CouchersContext, session: Session
936 ) -> admin_pb2.ListAdminTagsRes:
937 tags = session.execute(select(AdminTag).order_by(AdminTag.tag)).scalars().all()
938 return admin_pb2.ListAdminTagsRes(
939 tags=[admin_pb2.AdminTagInfo(admin_tag_id=tag.id, tag=tag.tag) for tag in tags]
940 )
942 def AddAdminTagToUser(
943 self, request: admin_pb2.AddAdminTagToUserReq, context: CouchersContext, session: Session
944 ) -> admin_pb2.UserDetails:
945 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
946 if not user: 946 ↛ 947line 946 didn't jump to line 947 because the condition on line 946 was never true
947 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
948 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none()
949 if not admin_tag:
950 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin_tag_not_found")
951 existing = session.execute(
952 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id)
953 ).scalar_one_or_none()
954 if existing:
955 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_admin_tag")
956 session.add(UserAdminTag(user_id=user.id, admin_tag_id=admin_tag.id))
957 session.flush()
958 log_admin_action(session, context, user, "add_tag", tag=request.tag)
959 return _user_to_details(session, user)
961 def RemoveAdminTagFromUser(
962 self, request: admin_pb2.RemoveAdminTagFromUserReq, context: CouchersContext, session: Session
963 ) -> admin_pb2.UserDetails:
964 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
965 if not user: 965 ↛ 966line 965 didn't jump to line 966 because the condition on line 965 was never true
966 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
967 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none()
968 if not admin_tag: 968 ↛ 969line 968 didn't jump to line 969 because the condition on line 968 was never true
969 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin_tag_not_found")
970 user_admin_tag = session.execute(
971 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id)
972 ).scalar_one_or_none()
973 if not user_admin_tag:
974 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_admin_tag")
975 session.delete(user_admin_tag)
976 session.flush()
977 log_admin_action(session, context, user, "remove_tag", tag=request.tag)
978 return _user_to_details(session, user)