Coverage for src / couchers / servicers / admin.py: 63%

362 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-29 02:10 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import select 

7from sqlalchemy.orm import Session 

8from sqlalchemy.sql import and_, func, or_ 

9from user_agents import parse as user_agents_parse 

10 

11from couchers import urls 

12from couchers.context import CouchersContext 

13from couchers.crypto import urlsafe_secure_token 

14from couchers.helpers.badges import user_add_badge, user_remove_badge 

15from couchers.helpers.geoip import geoip_approximate_location, geoip_asn 

16from couchers.helpers.strong_verification import get_strong_verification_fields 

17from couchers.jobs.enqueue import queue_job 

18from couchers.models import ( 

19 AccountDeletionToken, 

20 Comment, 

21 ContentReport, 

22 Discussion, 

23 Event, 

24 EventOccurrence, 

25 GroupChat, 

26 GroupChatSubscription, 

27 HostRequest, 

28 LanguageAbility, 

29 Message, 

30 ModerationUserList, 

31 ModNote, 

32 Reference, 

33 Reply, 

34 User, 

35 UserActivity, 

36 UserBadge, 

37) 

38from couchers.models.notifications import NotificationTopicAction 

39from couchers.models.uploads import has_avatar_photo_expression 

40from couchers.notifications.notify import notify 

41from couchers.proto import admin_pb2, admin_pb2_grpc, api_pb2, notification_data_pb2 

42from couchers.proto.internal import jobs_pb2 

43from couchers.resources import get_badge_dict 

44from couchers.servicers.api import user_model_to_pb 

45from couchers.servicers.auth import create_session 

46from couchers.servicers.events import generate_event_delete_notifications 

47from couchers.servicers.threads import unpack_thread_id 

48from couchers.sql import to_bool, username_or_email_or_id 

49from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime 

50 

51logger = logging.getLogger(__name__) 

52 

53MAX_PAGINATION_LENGTH = 250 

54 

55 

56def _user_to_details(session: Session, user: User) -> admin_pb2.UserDetails: 

57 return admin_pb2.UserDetails( 

58 user_id=user.id, 

59 username=user.username, 

60 name=user.name, 

61 email=user.email, 

62 gender=user.gender, 

63 birthdate=date_to_api(user.birthdate), 

64 banned=user.is_banned, 

65 deleted=user.is_deleted, 

66 do_not_email=user.do_not_email, 

67 badges=[badge.badge_id for badge in user.badges], 

68 **get_strong_verification_fields(session, user), 

69 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

70 admin_note=user.admin_note, 

71 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

72 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

73 ) 

74 

75 

76def _content_report_to_pb(content_report: ContentReport) -> admin_pb2.ContentReport: 

77 return admin_pb2.ContentReport( 

78 content_report_id=content_report.id, 

79 time=Timestamp_from_datetime(content_report.time), 

80 reporting_user_id=content_report.reporting_user_id, 

81 author_user_id=content_report.author_user_id, 

82 reason=content_report.reason, 

83 description=content_report.description, 

84 content_ref=content_report.content_ref, 

85 user_agent=content_report.user_agent, 

86 page=content_report.page, 

87 ) 

88 

89 

90def _reference_to_pb(reference: Reference) -> admin_pb2.AdminReference: 

91 return admin_pb2.AdminReference( 

92 reference_id=reference.id, 

93 from_user_id=reference.from_user_id, 

94 to_user_id=reference.to_user_id, 

95 reference_type=reference.reference_type.name, 

96 text=reference.text, 

97 private_text=reference.private_text or "", 

98 time=Timestamp_from_datetime(reference.time), 

99 host_request_id=reference.host_request_id or 0, 

100 rating=reference.rating, 

101 was_appropriate=reference.was_appropriate, 

102 is_deleted=reference.is_deleted, 

103 ) 

104 

105 

106def append_admin_note(session: Session, context: CouchersContext, user: User, note: str) -> None: 

107 if not note.strip(): 

108 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty") 

109 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

110 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n" 

111 

112 

113class Admin(admin_pb2_grpc.AdminServicer): 

114 def GetUserDetails( 

115 self, request: admin_pb2.GetUserDetailsReq, context: CouchersContext, session: Session 

116 ) -> admin_pb2.UserDetails: 

117 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

118 if not user: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

120 return _user_to_details(session, user) 

121 

122 def GetUser(self, request: admin_pb2.GetUserReq, context: CouchersContext, session: Session) -> api_pb2.User: 

123 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

124 if not user: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

126 return user_model_to_pb(user, session, context, is_admin_see_ghosts=True) 

127 

128 def SearchUsers( 

129 self, request: admin_pb2.SearchUsersReq, context: CouchersContext, session: Session 

130 ) -> admin_pb2.SearchUsersRes: 

131 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

132 next_user_id = int(request.page_token) if request.page_token else 0 

133 statement = select(User) 

134 if request.username: 

135 statement = statement.where(User.username.ilike(request.username)) 

136 if request.email: 

137 statement = statement.where(User.email.ilike(request.email)) 

138 if request.name: 

139 statement = statement.where(User.name.ilike(request.name)) 

140 if request.admin_note: 

141 statement = statement.where(User.admin_note.ilike(request.admin_note)) 

142 if request.city: 

143 statement = statement.where(User.city.ilike(request.city)) 

144 if request.min_user_id: 

145 statement = statement.where(User.id >= request.min_user_id) 

146 if request.max_user_id: 

147 statement = statement.where(User.id <= request.max_user_id) 

148 if request.min_birthdate: 

149 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate)) 

150 if request.max_birthdate: 

151 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate)) 

152 if request.genders: 

153 statement = statement.where(User.gender.in_(request.genders)) 

154 if request.min_joined_date: 

155 statement = statement.where(User.joined >= parse_date(request.min_joined_date)) 

156 if request.max_joined_date: 

157 statement = statement.where(User.joined <= parse_date(request.max_joined_date)) 

158 if request.min_last_active_date: 

159 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date)) 

160 if request.max_last_active_date: 

161 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date)) 

162 if request.genders: 

163 statement = statement.where(User.gender.in_(request.genders)) 

164 if request.language_codes: 

165 statement = statement.join( 

166 LanguageAbility, 

167 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)), 

168 ) 

169 if request.HasField("is_deleted"): 

170 statement = statement.where(User.is_deleted == request.is_deleted.value) 

171 if request.HasField("is_banned"): 

172 statement = statement.where(User.is_banned == request.is_banned.value) 

173 if request.HasField("has_avatar"): 

174 statement = statement.where(has_avatar_photo_expression(User) == request.has_avatar.value) 

175 users = ( 

176 session.execute(statement.where(User.id >= next_user_id).order_by(User.id).limit(page_size + 1)) 

177 .scalars() 

178 .all() 

179 ) 

180 logger.info(users) 

181 return admin_pb2.SearchUsersRes( 

182 users=[_user_to_details(session, user) for user in users[:page_size]], 

183 next_page_token=str(users[-1].id) if len(users) > page_size else None, 

184 ) 

185 

186 def ChangeUserGender( 

187 self, request: admin_pb2.ChangeUserGenderReq, context: CouchersContext, session: Session 

188 ) -> admin_pb2.UserDetails: 

189 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

190 if not user: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

192 user.gender = request.gender 

193 session.commit() 

194 

195 notify( 

196 session, 

197 user_id=user.id, 

198 topic_action=NotificationTopicAction.gender__change, 

199 key="", 

200 data=notification_data_pb2.GenderChange( 

201 gender=request.gender, 

202 ), 

203 ) 

204 

205 return _user_to_details(session, user) 

206 

207 def ChangeUserBirthdate( 

208 self, request: admin_pb2.ChangeUserBirthdateReq, context: CouchersContext, session: Session 

209 ) -> admin_pb2.UserDetails: 

210 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

211 if not user: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

213 if not (birthdate := parse_date(request.birthdate)): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_birthdate") 

215 

216 user.birthdate = birthdate 

217 session.commit() 

218 

219 notify( 

220 session, 

221 user_id=user.id, 

222 topic_action=NotificationTopicAction.birthdate__change, 

223 key="", 

224 data=notification_data_pb2.BirthdateChange( 

225 birthdate=request.birthdate, 

226 ), 

227 ) 

228 

229 return _user_to_details(session, user) 

230 

231 def AddBadge( 

232 self, request: admin_pb2.AddBadgeReq, context: CouchersContext, session: Session 

233 ) -> admin_pb2.UserDetails: 

234 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

235 if not user: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

237 

238 badge = get_badge_dict().get(request.badge_id) 

239 if not badge: 

240 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found") 

241 

242 if not badge.admin_editable: 

243 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge") 

244 

245 if badge.id in [b.badge_id for b in user.badges]: 

246 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_badge") 

247 

248 user_add_badge(session, user.id, request.badge_id) 

249 

250 return _user_to_details(session, user) 

251 

252 def RemoveBadge( 

253 self, request: admin_pb2.RemoveBadgeReq, context: CouchersContext, session: Session 

254 ) -> admin_pb2.UserDetails: 

255 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

256 if not user: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

258 

259 badge = get_badge_dict().get(request.badge_id) 

260 if not badge: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found") 

262 

263 if not badge.admin_editable: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge") 

265 

266 user_badge = session.execute( 

267 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge.id) 

268 ).scalar_one_or_none() 

269 if not user_badge: 

270 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_badge") 

271 

272 user_remove_badge(session, user.id, request.badge_id) 

273 

274 return _user_to_details(session, user) 

275 

276 def SetPassportSexGenderException( 

277 self, request: admin_pb2.SetPassportSexGenderExceptionReq, context: CouchersContext, session: Session 

278 ) -> admin_pb2.UserDetails: 

279 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

280 if not user: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

282 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

283 return _user_to_details(session, user) 

284 

285 def BanUser( 

286 self, request: admin_pb2.BanUserReq, context: CouchersContext, session: Session 

287 ) -> admin_pb2.UserDetails: 

288 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

289 if not user: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

291 append_admin_note(session, context, user, request.admin_note) 

292 user.is_banned = True 

293 return _user_to_details(session, user) 

294 

295 def UnbanUser( 

296 self, request: admin_pb2.UnbanUserReq, context: CouchersContext, session: Session 

297 ) -> admin_pb2.UserDetails: 

298 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

299 if not user: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

301 append_admin_note(session, context, user, request.admin_note) 

302 user.is_banned = False 

303 return _user_to_details(session, user) 

304 

305 def AddAdminNote( 

306 self, request: admin_pb2.AddAdminNoteReq, context: CouchersContext, session: Session 

307 ) -> admin_pb2.UserDetails: 

308 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

309 if not user: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

311 append_admin_note(session, context, user, request.admin_note) 

312 return _user_to_details(session, user) 

313 

314 def GetContentReport( 

315 self, request: admin_pb2.GetContentReportReq, context: CouchersContext, session: Session 

316 ) -> admin_pb2.GetContentReportRes: 

317 content_report = session.execute( 

318 select(ContentReport).where(ContentReport.id == request.content_report_id) 

319 ).scalar_one_or_none() 

320 if not content_report: 

321 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "content_report_not_found") 

322 return admin_pb2.GetContentReportRes( 

323 content_report=_content_report_to_pb(content_report), 

324 ) 

325 

326 def GetContentReportsForAuthor( 

327 self, request: admin_pb2.GetContentReportsForAuthorReq, context: CouchersContext, session: Session 

328 ) -> admin_pb2.GetContentReportsForAuthorRes: 

329 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

330 if not user: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

332 content_reports = ( 

333 session.execute( 

334 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

335 ) 

336 .scalars() 

337 .all() 

338 ) 

339 return admin_pb2.GetContentReportsForAuthorRes( 

340 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

341 ) 

342 

343 def SendModNote( 

344 self, request: admin_pb2.SendModNoteReq, context: CouchersContext, session: Session 

345 ) -> admin_pb2.UserDetails: 

346 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

347 if not user: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

349 session.add( 

350 ModNote( 

351 user_id=user.id, 

352 internal_id=request.internal_id, 

353 creator_user_id=context.user_id, 

354 note_content=request.content, 

355 ) 

356 ) 

357 session.flush() 

358 

359 if not request.do_not_notify: 

360 notify( 

361 session, 

362 user_id=user.id, 

363 topic_action=NotificationTopicAction.modnote__create, 

364 key="", 

365 ) 

366 

367 return _user_to_details(session, user) 

368 

369 def MarkUserNeedsLocationUpdate( 

370 self, request: admin_pb2.MarkUserNeedsLocationUpdateReq, context: CouchersContext, session: Session 

371 ) -> admin_pb2.UserDetails: 

372 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

373 if not user: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

375 user.needs_to_update_location = True 

376 return _user_to_details(session, user) 

377 

378 def DeleteUser( 

379 self, request: admin_pb2.DeleteUserReq, context: CouchersContext, session: Session 

380 ) -> admin_pb2.UserDetails: 

381 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

382 if not user: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true

383 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

384 user.is_deleted = True 

385 return _user_to_details(session, user) 

386 

387 def RecoverDeletedUser( 

388 self, request: admin_pb2.RecoverDeletedUserReq, context: CouchersContext, session: Session 

389 ) -> admin_pb2.UserDetails: 

390 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

391 if not user: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

393 user.is_deleted = False 

394 return _user_to_details(session, user) 

395 

396 def CreateApiKey( 

397 self, request: admin_pb2.CreateApiKeyReq, context: CouchersContext, session: Session 

398 ) -> admin_pb2.CreateApiKeyRes: 

399 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

400 if not user: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true

401 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

402 token, expiry = create_session( 

403 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

404 ) 

405 

406 notify( 

407 session, 

408 user_id=user.id, 

409 topic_action=NotificationTopicAction.api_key__create, 

410 key="", 

411 data=notification_data_pb2.ApiKeyCreate( 

412 api_key=token, 

413 expiry=Timestamp_from_datetime(expiry), 

414 ), 

415 ) 

416 

417 return admin_pb2.CreateApiKeyRes() 

418 

419 def GetChats( 

420 self, request: admin_pb2.GetChatsReq, context: CouchersContext, session: Session 

421 ) -> admin_pb2.GetChatsRes: 

422 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

423 if not user: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true

424 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

425 

426 # Cache for UserDetails to avoid recomputing for the same user 

427 user_details_cache = {} 

428 

429 def get_user_details(user_id: int) -> admin_pb2.UserDetails: 

430 if user_id not in user_details_cache: 430 ↛ 433line 430 didn't jump to line 433 because the condition on line 430 was always true

431 u = session.execute(select(User).where(User.id == user_id)).scalar_one() 

432 user_details_cache[user_id] = _user_to_details(session, u) 

433 return user_details_cache[user_id] 

434 

435 def message_to_pb(message: Message) -> admin_pb2.ChatMessage: 

436 return admin_pb2.ChatMessage( 

437 message_id=message.id, 

438 author=get_user_details(message.author_id), 

439 time=Timestamp_from_datetime(message.time), 

440 message_type=message.message_type.name if message.message_type else "", 

441 text=message.text or "", 

442 host_request_status_target=( 

443 message.host_request_status_target.name if message.host_request_status_target else "" 

444 ), 

445 target=get_user_details(message.target_id) if message.target_id else None, 

446 ) 

447 

448 def get_messages_for_conversation(conversation_id: int) -> list[admin_pb2.ChatMessage]: 

449 messages = ( 

450 session.execute( 

451 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

452 ) 

453 .scalars() 

454 .all() 

455 ) 

456 return [message_to_pb(msg) for msg in messages] 

457 

458 def get_host_request_pb(host_request: HostRequest) -> admin_pb2.AdminHostRequest: 

459 return admin_pb2.AdminHostRequest( 

460 host_request_id=host_request.conversation_id, 

461 surfer=get_user_details(host_request.surfer_user_id), 

462 host=get_user_details(host_request.host_user_id), 

463 status=host_request.status.name if host_request.status else "", 

464 from_date=date_to_api(host_request.from_date), 

465 to_date=date_to_api(host_request.to_date), 

466 created=Timestamp_from_datetime(host_request.conversation.created), 

467 messages=get_messages_for_conversation(host_request.conversation_id), 

468 ) 

469 

470 def get_group_chat_pb(group_chat: GroupChat) -> admin_pb2.AdminGroupChat: 

471 subs = ( 

472 session.execute( 

473 select(GroupChatSubscription) 

474 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

475 .order_by(GroupChatSubscription.joined.asc()) 

476 ) 

477 .scalars() 

478 .all() 

479 ) 

480 members = [ 

481 admin_pb2.GroupChatMember( 

482 user=get_user_details(sub.user_id), 

483 joined=Timestamp_from_datetime(sub.joined), 

484 left=Timestamp_from_datetime(sub.left) if sub.left else None, 

485 role=sub.role.name if sub.role else "", 

486 ) 

487 for sub in subs 

488 ] 

489 return admin_pb2.AdminGroupChat( 

490 group_chat_id=group_chat.conversation_id, 

491 title=group_chat.title or "", 

492 is_dm=group_chat.is_dm, 

493 creator=get_user_details(group_chat.creator_id), 

494 members=members, 

495 messages=get_messages_for_conversation(group_chat.conversation_id), 

496 ) 

497 

498 # Get all host requests for the user 

499 host_requests = ( 

500 session.execute( 

501 select(HostRequest) 

502 .where(or_(HostRequest.host_user_id == user.id, HostRequest.surfer_user_id == user.id)) 

503 .order_by(HostRequest.conversation_id.desc()) 

504 ) 

505 .scalars() 

506 .all() 

507 ) 

508 

509 # Get all group chats for the user 

510 group_chat_ids = ( 

511 session.execute( 

512 select(GroupChatSubscription.group_chat_id) 

513 .where(GroupChatSubscription.user_id == user.id) 

514 .order_by(GroupChatSubscription.joined.desc()) 

515 ) 

516 .scalars() 

517 .all() 

518 ) 

519 group_chats = ( 

520 session.execute(select(GroupChat).where(GroupChat.conversation_id.in_(group_chat_ids))).scalars().all() 

521 ) 

522 

523 return admin_pb2.GetChatsRes( 

524 user=get_user_details(user.id), 

525 host_requests=[get_host_request_pb(hr) for hr in host_requests], 

526 group_chats=[get_group_chat_pb(gc) for gc in group_chats], 

527 ) 

528 

529 def DeleteEvent( 

530 self, request: admin_pb2.DeleteEventReq, context: CouchersContext, session: Session 

531 ) -> empty_pb2.Empty: 

532 res = session.execute( 

533 select(Event, EventOccurrence) 

534 .where(EventOccurrence.id == request.event_id) 

535 .where(EventOccurrence.event_id == Event.id) 

536 .where(~EventOccurrence.is_deleted) 

537 ).one_or_none() 

538 

539 if not res: 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true

540 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

541 

542 event, occurrence = res 

543 

544 occurrence.is_deleted = True 

545 

546 queue_job( 

547 session, 

548 job=generate_event_delete_notifications, 

549 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

550 occurrence_id=occurrence.id, 

551 ), 

552 ) 

553 

554 return empty_pb2.Empty() 

555 

556 def ListUserIds( 

557 self, request: admin_pb2.ListUserIdsReq, context: CouchersContext, session: Session 

558 ) -> admin_pb2.ListUserIdsRes: 

559 start_date = to_aware_datetime(request.start_time) 

560 end_date = to_aware_datetime(request.end_time) 

561 

562 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

563 next_user_id = int(request.page_token) if request.page_token else 0 

564 

565 user_ids = ( 

566 session.execute( 

567 select(User.id) 

568 .where(or_(User.id <= next_user_id, to_bool(next_user_id == 0))) 

569 .where(User.joined >= start_date) 

570 .where(User.joined <= end_date) 

571 .order_by(User.id.desc()) 

572 .limit(page_size + 1) 

573 ) 

574 .scalars() 

575 .all() 

576 ) 

577 

578 return admin_pb2.ListUserIdsRes( 

579 user_ids=user_ids[:page_size], 

580 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

581 ) 

582 

583 def EditReferenceText( 

584 self, request: admin_pb2.EditReferenceTextReq, context: CouchersContext, session: Session 

585 ) -> empty_pb2.Empty: 

586 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

587 

588 if reference is None: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true

589 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found") 

590 

591 if not request.new_text.strip(): 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "reference_no_text") 

593 

594 reference.text = request.new_text.strip() 

595 return empty_pb2.Empty() 

596 

597 def DeleteReference( 

598 self, request: admin_pb2.DeleteReferenceReq, context: CouchersContext, session: Session 

599 ) -> empty_pb2.Empty: 

600 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

601 

602 if reference is None: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found") 

604 

605 reference.is_deleted = True 

606 return empty_pb2.Empty() 

607 

608 def GetUserReferences( 

609 self, request: admin_pb2.GetUserReferencesReq, context: CouchersContext, session: Session 

610 ) -> admin_pb2.GetUserReferencesRes: 

611 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

612 if not user: 

613 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

614 

615 references_from = ( 

616 session.execute(select(Reference).where(Reference.from_user_id == user.id).order_by(Reference.id.desc())) 

617 .scalars() 

618 .all() 

619 ) 

620 

621 references_to = ( 

622 session.execute(select(Reference).where(Reference.to_user_id == user.id).order_by(Reference.id.desc())) 

623 .scalars() 

624 .all() 

625 ) 

626 

627 return admin_pb2.GetUserReferencesRes( 

628 references_from=[_reference_to_pb(ref) for ref in references_from], 

629 references_to=[_reference_to_pb(ref) for ref in references_to], 

630 ) 

631 

632 def EditDiscussion( 

633 self, request: admin_pb2.EditDiscussionReq, context: CouchersContext, session: Session 

634 ) -> empty_pb2.Empty: 

635 discussion = session.execute( 

636 select(Discussion).where(Discussion.id == request.discussion_id) 

637 ).scalar_one_or_none() 

638 if not discussion: 

639 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found") 

640 if request.new_title: 

641 discussion.title = request.new_title.strip() 

642 if request.new_content: 

643 discussion.content = request.new_content.strip() 

644 return empty_pb2.Empty() 

645 

646 def EditReply(self, request: admin_pb2.EditReplyReq, context: CouchersContext, session: Session) -> empty_pb2.Empty: 

647 database_id, depth = unpack_thread_id(request.reply_id) 

648 if depth == 1: 

649 obj: Comment | Reply | None = session.execute( 

650 select(Comment).where(Comment.id == database_id) 

651 ).scalar_one_or_none() 

652 elif depth == 2: 

653 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none() 

654 else: 

655 obj = None 

656 

657 if not obj: 

658 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "object_not_found") 

659 obj.content = request.new_content.strip() 

660 return empty_pb2.Empty() 

661 

662 def AddUsersToModerationUserList( 

663 self, request: admin_pb2.AddUsersToModerationUserListReq, context: CouchersContext, session: Session 

664 ) -> admin_pb2.AddUsersToModerationUserListRes: 

665 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created. 

666 Id of the moderation list is returned.""" 

667 req_users = request.users 

668 users = [] 

669 

670 for req_user in req_users: 

671 user = session.execute(select(User).where(username_or_email_or_id(req_user))).scalar_one_or_none() 

672 if not user: 

673 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

674 users.append(user) 

675 

676 if request.moderation_list_id: 

677 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

678 if not moderation_user_list: 

679 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found") 

680 # Create a new moderation user list if no one is provided 

681 else: 

682 moderation_user_list = ModerationUserList() 

683 session.add(moderation_user_list) 

684 session.flush() 

685 

686 # Add users to the moderation list only if not already in it 

687 for user in users: 

688 if user not in moderation_user_list.users: 688 ↛ 687line 688 didn't jump to line 687 because the condition on line 688 was always true

689 moderation_user_list.users.append(user) 

690 

691 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id) 

692 

693 def ListModerationUserLists( 

694 self, request: admin_pb2.ListModerationUserListsReq, context: CouchersContext, session: Session 

695 ) -> admin_pb2.ListModerationUserListsRes: 

696 """Lists all moderation user lists for a user.""" 

697 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

698 if not user: 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true

699 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

700 

701 moderation_lists = [ 

702 admin_pb2.ModerationList(moderation_list_id=ml.id, member_ids=[u.id for u in ml.users]) 

703 for ml in user.moderation_user_lists 

704 ] 

705 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists) 

706 

707 def RemoveUserFromModerationUserList( 

708 self, request: admin_pb2.RemoveUserFromModerationUserListReq, context: CouchersContext, session: Session 

709 ) -> empty_pb2.Empty: 

710 """Removes a user from a provided moderation user list.""" 

711 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

712 if not user: 

713 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

714 if not request.moderation_list_id: 

715 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_moderation_user_list_id") 

716 

717 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

718 if not moderation_user_list: 718 ↛ 719line 718 didn't jump to line 719 because the condition on line 718 was never true

719 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found") 

720 if user not in moderation_user_list.users: 

721 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_the_moderation_user_list") 

722 

723 moderation_user_list.users.remove(user) 

724 

725 if len(moderation_user_list.users) == 0: 

726 session.delete(moderation_user_list) 

727 

728 return empty_pb2.Empty() 

729 

730 def CreateAccountDeletionLink( 

731 self, request: admin_pb2.CreateAccountDeletionLinkReq, context: CouchersContext, session: Session 

732 ) -> admin_pb2.CreateAccountDeletionLinkRes: 

733 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

734 if not user: 734 ↛ 735line 734 didn't jump to line 735 because the condition on line 734 was never true

735 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

736 token = AccountDeletionToken(token=urlsafe_secure_token(), user_id=user.id, expiry=now() + timedelta(hours=2)) 

737 session.add(token) 

738 return admin_pb2.CreateAccountDeletionLinkRes( 

739 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token) 

740 ) 

741 

742 def AccessStats( 

743 self, request: admin_pb2.AccessStatsReq, context: CouchersContext, session: Session 

744 ) -> admin_pb2.AccessStatsRes: 

745 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

746 if not user: 

747 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

748 

749 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90) 

750 end_time = to_aware_datetime(request.end_time) if request.end_time else now() 

751 

752 user_activity = session.execute( 

753 select( 

754 UserActivity.ip_address, 

755 UserActivity.user_agent, 

756 func.sum(UserActivity.api_calls), 

757 func.count(UserActivity.period), 

758 func.min(UserActivity.period), 

759 func.max(UserActivity.period), 

760 ) 

761 .where(UserActivity.user_id == user.id) 

762 .where(UserActivity.period >= start_time) 

763 .where(UserActivity.period >= end_time) 

764 .order_by(func.max(UserActivity.period).desc()) 

765 .group_by(UserActivity.ip_address, UserActivity.user_agent) 

766 ).all() 

767 

768 out = admin_pb2.AccessStatsRes() 

769 

770 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity: 

771 user_agent_data = user_agents_parse(user_agent or "") 

772 asn = geoip_asn(ip_address) 

773 out.stats.append( 

774 admin_pb2.AccessStat( 

775 ip_address=ip_address, 

776 asn=str(asn[0]) if asn else None, 

777 asorg=str(asn[1]) if asn else None, 

778 asnetwork=str(asn[2]) if asn else None, 

779 user_agent=user_agent, 

780 operating_system=user_agent_data.os.family, 

781 browser=user_agent_data.browser.family, 

782 device=user_agent_data.device.family, 

783 approximate_location=geoip_approximate_location(ip_address) or "Unknown", 

784 api_call_count=api_call_count, 

785 periods_count=periods_count, 

786 first_seen=Timestamp_from_datetime(first_seen), 

787 last_seen=Timestamp_from_datetime(last_seen), 

788 ) 

789 ) 

790 

791 return out 

792 

793 def SetLastDonated( 

794 self, request: admin_pb2.SetLastDonatedReq, context: CouchersContext, session: Session 

795 ) -> admin_pb2.UserDetails: 

796 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

797 if not user: 

798 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

799 

800 if request.HasField("last_donated"): 

801 user.last_donated = to_aware_datetime(request.last_donated) 

802 else: 

803 user.last_donated = None 

804 

805 return _user_to_details(session, user)