Coverage for src/couchers/servicers/admin.py: 80%

276 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-24 14:08 +0000

1import json 

2import logging 

3from datetime import timedelta 

4 

5import grpc 

6from geoalchemy2.shape import from_shape 

7from google.protobuf import empty_pb2 

8from shapely.geometry import shape 

9from sqlalchemy.sql import or_, select, update 

10 

11from couchers import errors, urls 

12from couchers.helpers.badges import user_add_badge, user_remove_badge 

13from couchers.helpers.clusters import create_cluster, create_node 

14from couchers.jobs.enqueue import queue_job 

15from couchers.models import ( 

16 ContentReport, 

17 Event, 

18 EventCommunityInviteRequest, 

19 EventOccurrence, 

20 GroupChat, 

21 GroupChatSubscription, 

22 HostRequest, 

23 Message, 

24 ModNote, 

25 Node, 

26 Reference, 

27 User, 

28 UserBadge, 

29) 

30from couchers.notifications.notify import notify 

31from couchers.resources import get_badge_dict 

32from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb 

33from couchers.servicers.auth import create_session 

34from couchers.servicers.communities import community_to_pb 

35from couchers.servicers.events import get_users_to_notify_for_new_event 

36from couchers.sql import couchers_select as select 

37from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date 

38from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2 

39from proto.internal import jobs_pb2 

40 

41logger = logging.getLogger(__name__) 

42 

43MAX_PAGINATION_LENGTH = 250 

44 

45 

46def _user_to_details(session, user): 

47 return admin_pb2.UserDetails( 

48 user_id=user.id, 

49 username=user.username, 

50 name=user.name, 

51 email=user.email, 

52 gender=user.gender, 

53 birthdate=date_to_api(user.birthdate), 

54 banned=user.is_banned, 

55 deleted=user.is_deleted, 

56 do_not_email=user.do_not_email, 

57 badges=[badge.badge_id for badge in user.badges], 

58 **get_strong_verification_fields(session, user), 

59 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

60 admin_note=user.admin_note, 

61 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

62 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

63 ) 

64 

65 

66def _content_report_to_pb(content_report: ContentReport): 

67 return admin_pb2.ContentReport( 

68 content_report_id=content_report.id, 

69 time=Timestamp_from_datetime(content_report.time), 

70 reporting_user_id=content_report.reporting_user_id, 

71 author_user_id=content_report.author_user_id, 

72 reason=content_report.reason, 

73 description=content_report.description, 

74 content_ref=content_report.content_ref, 

75 user_agent=content_report.user_agent, 

76 page=content_report.page, 

77 ) 

78 

79 

80def append_admin_note(session, context, user, note): 

81 if not note.strip(): 

82 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY) 

83 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

84 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n" 

85 

86 

87def load_community_geom(geojson, context): 

88 geom = shape(json.loads(geojson)) 

89 

90 if geom.geom_type != "MultiPolygon": 

91 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON) 

92 

93 return geom 

94 

95 

96class Admin(admin_pb2_grpc.AdminServicer): 

97 def GetUserDetails(self, request, context, session): 

98 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

99 if not user: 

100 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

101 return _user_to_details(session, user) 

102 

103 def GetUser(self, request, context, session): 

104 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

105 if not user: 

106 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

107 return user_model_to_pb(user, session, context) 

108 

109 def ChangeUserGender(self, request, context, session): 

110 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

111 if not user: 

112 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

113 user.gender = request.gender 

114 session.commit() 

115 

116 notify( 

117 session, 

118 user_id=user.id, 

119 topic_action="gender:change", 

120 data=notification_data_pb2.GenderChange( 

121 gender=request.gender, 

122 ), 

123 ) 

124 

125 return _user_to_details(session, user) 

126 

127 def ChangeUserBirthdate(self, request, context, session): 

128 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

129 if not user: 

130 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

131 user.birthdate = parse_date(request.birthdate) 

132 session.commit() 

133 

134 notify( 

135 session, 

136 user_id=user.id, 

137 topic_action="birthdate:change", 

138 data=notification_data_pb2.BirthdateChange( 

139 birthdate=request.birthdate, 

140 ), 

141 ) 

142 

143 return _user_to_details(session, user) 

144 

145 def AddBadge(self, request, context, session): 

146 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

147 if not user: 

148 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

149 

150 badge = get_badge_dict().get(request.badge_id) 

151 if not badge: 

152 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

153 

154 if not badge["admin_editable"]: 

155 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

156 

157 if badge["id"] in [b.badge_id for b in user.badges]: 

158 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE) 

159 

160 user_add_badge(session, user.id, request.badge_id) 

161 

162 return _user_to_details(session, user) 

163 

164 def RemoveBadge(self, request, context, session): 

165 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

166 if not user: 

167 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

168 

169 badge = get_badge_dict().get(request.badge_id) 

170 if not badge: 

171 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

172 

173 if not badge["admin_editable"]: 

174 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

175 

176 user_badge = session.execute( 

177 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"]) 

178 ).scalar_one_or_none() 

179 if not user_badge: 

180 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE) 

181 

182 user_remove_badge(session, user.id, request.badge_id) 

183 

184 return _user_to_details(session, user) 

185 

186 def SetPassportSexGenderException(self, request, context, session): 

187 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

188 if not user: 

189 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

190 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

191 return _user_to_details(session, user) 

192 

193 def BanUser(self, request, context, session): 

194 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

195 if not user: 

196 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

197 append_admin_note(session, context, user, request.admin_note) 

198 user.is_banned = True 

199 return _user_to_details(session, user) 

200 

201 def UnbanUser(self, request, context, session): 

202 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

203 if not user: 

204 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

205 append_admin_note(session, context, user, request.admin_note) 

206 user.is_banned = False 

207 return _user_to_details(session, user) 

208 

209 def AddAdminNote(self, request, context, session): 

210 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

211 if not user: 

212 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

213 append_admin_note(session, context, user, request.admin_note) 

214 return _user_to_details(session, user) 

215 

216 def GetContentReport(self, request, context, session): 

217 content_report = session.execute( 

218 select(ContentReport).where(ContentReport.id == request.content_report_id) 

219 ).scalar_one_or_none() 

220 if not content_report: 

221 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND) 

222 return admin_pb2.GetContentReportRes( 

223 content_report=_content_report_to_pb(content_report), 

224 ) 

225 

226 def GetContentReportsForAuthor(self, request, context, session): 

227 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

228 if not user: 

229 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

230 content_reports = ( 

231 session.execute( 

232 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

233 ) 

234 .scalars() 

235 .all() 

236 ) 

237 return admin_pb2.GetContentReportsForAuthorRes( 

238 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

239 ) 

240 

241 def SendModNote(self, request, context, session): 

242 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

243 if not user: 

244 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

245 session.add( 

246 ModNote( 

247 user_id=user.id, 

248 internal_id=request.internal_id, 

249 creator_user_id=context.user_id, 

250 note_content=request.content, 

251 ) 

252 ) 

253 session.flush() 

254 

255 if not request.do_not_notify: 

256 notify( 

257 session, 

258 user_id=user.id, 

259 topic_action="modnote:create", 

260 ) 

261 

262 return _user_to_details(session, user) 

263 

264 def DeleteUser(self, request, context, session): 

265 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

266 if not user: 

267 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

268 user.is_deleted = True 

269 return _user_to_details(session, user) 

270 

271 def CreateApiKey(self, request, context, session): 

272 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

273 if not user: 

274 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

275 token, expiry = create_session( 

276 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

277 ) 

278 

279 notify( 

280 session, 

281 user_id=user.id, 

282 topic_action="api_key:create", 

283 data=notification_data_pb2.ApiKeyCreate( 

284 api_key=token, 

285 expiry=Timestamp_from_datetime(expiry), 

286 ), 

287 ) 

288 

289 return _user_to_details(session, user) 

290 

291 def CreateCommunity(self, request, context, session): 

292 geom = load_community_geom(request.geojson, context) 

293 

294 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

295 node = create_node(session, geom, parent_node_id) 

296 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

297 

298 return community_to_pb(session, node, context) 

299 

300 def UpdateCommunity(self, request, context, session): 

301 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

302 if not node: 

303 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

304 cluster = node.official_cluster 

305 

306 if request.name: 

307 cluster.name = request.name 

308 

309 if request.description: 

310 cluster.description = request.description 

311 

312 if request.geojson: 

313 geom = load_community_geom(request.geojson, context) 

314 

315 node.geom = from_shape(geom) 

316 

317 if request.parent_node_id != 0: 

318 node.parent_node_id = request.parent_node_id 

319 

320 session.flush() 

321 

322 return community_to_pb(session, cluster.parent_node, context) 

323 

324 def GetChats(self, request, context, session): 

325 def format_user(user): 

326 return f"{user.name} ({user.username}, {user.id})" 

327 

328 def format_conversation(conversation_id): 

329 out = "" 

330 messages = ( 

331 session.execute( 

332 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

333 ) 

334 .scalars() 

335 .all() 

336 ) 

337 for message in messages: 

338 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n" 

339 out += str(message.text) 

340 out += "\n\n-----\n" 

341 out += "\n\n\n\n" 

342 return out 

343 

344 def format_host_request(host_request_id): 

345 out = "" 

346 host_request = session.execute( 

347 select(HostRequest).where(HostRequest.conversation_id == host_request_id) 

348 ).scalar_one() 

349 out += "==============================\n" 

350 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n" 

351 out += format_conversation(host_request.conversation_id) 

352 out += "\n\n\n\n" 

353 return out 

354 

355 def format_group_chat(group_chat_id): 

356 out = "" 

357 group_chat = session.execute( 

358 select(GroupChat).where(GroupChat.conversation_id == group_chat_id) 

359 ).scalar_one() 

360 out += "==============================\n" 

361 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n" 

362 subs = ( 

363 session.execute( 

364 select(GroupChatSubscription) 

365 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

366 .order_by(GroupChatSubscription.joined.asc()) 

367 ) 

368 .scalars() 

369 .all() 

370 ) 

371 for sub in subs: 

372 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n" 

373 out += "\n\nMessages:\n" 

374 out += format_conversation(group_chat.conversation_id) 

375 out += "\n\n\n\n" 

376 return out 

377 

378 def format_all_chats_for_user(user_id): 

379 out = "" 

380 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

381 out += f"Chats for user {format_user(user)}\n" 

382 host_request_ids = ( 

383 session.execute( 

384 select(HostRequest.conversation_id) 

385 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id)) 

386 .order_by(HostRequest.conversation_id.desc()) 

387 ) 

388 .scalars() 

389 .all() 

390 ) 

391 out += f"************************************* Requests ({len(host_request_ids)})\n" 

392 for host_request in host_request_ids: 

393 out += format_host_request(host_request) 

394 group_chat_ids = ( 

395 session.execute( 

396 select(GroupChatSubscription.group_chat_id) 

397 .where(GroupChatSubscription.user_id == user_id) 

398 .order_by(GroupChatSubscription.joined.desc()) 

399 ) 

400 .scalars() 

401 .all() 

402 ) 

403 out += f"************************************* Group chats ({len(group_chat_ids)})\n" 

404 for group_chat_id in group_chat_ids: 

405 out += format_group_chat(group_chat_id) 

406 return out 

407 

408 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

409 if not user: 

410 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

411 

412 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id)) 

413 

414 def ListEventCommunityInviteRequests(self, request, context, session): 

415 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

416 next_request_id = int(request.page_token) if request.page_token else 0 

417 requests = ( 

418 session.execute( 

419 select(EventCommunityInviteRequest) 

420 .where(EventCommunityInviteRequest.approved.is_(None)) 

421 .where(EventCommunityInviteRequest.id >= next_request_id) 

422 .order_by(EventCommunityInviteRequest.id) 

423 .limit(page_size + 1) 

424 ) 

425 .scalars() 

426 .all() 

427 ) 

428 

429 def _request_to_pb(request): 

430 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

431 return admin_pb2.EventCommunityInviteRequest( 

432 event_community_invite_request_id=request.id, 

433 user_id=request.user_id, 

434 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

435 approx_users_to_notify=len(users_to_notify), 

436 community_id=node_id, 

437 ) 

438 

439 return admin_pb2.ListEventCommunityInviteRequestsRes( 

440 requests=[_request_to_pb(request) for request in requests[:page_size]], 

441 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

442 ) 

443 

444 def DecideEventCommunityInviteRequest(self, request, context, session): 

445 req = session.execute( 

446 select(EventCommunityInviteRequest).where( 

447 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

448 ) 

449 ).scalar_one_or_none() 

450 

451 if not req: 

452 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND) 

453 

454 if req.decided: 

455 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED) 

456 

457 decided = now() 

458 req.decided = decided 

459 req.decided_by_user_id = context.user_id 

460 req.approved = request.approve 

461 

462 # deny other reqs for the same event 

463 if request.approve: 

464 session.execute( 

465 update(EventCommunityInviteRequest) 

466 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

467 .where(EventCommunityInviteRequest.decided.is_(None)) 

468 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

469 ) 

470 

471 session.flush() 

472 

473 if request.approve: 

474 queue_job( 

475 session, 

476 "generate_event_create_notifications", 

477 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

478 inviting_user_id=req.user_id, 

479 occurrence_id=req.occurrence_id, 

480 approved=True, 

481 ), 

482 ) 

483 

484 return admin_pb2.DecideEventCommunityInviteRequestRes() 

485 

486 def DeleteEvent(self, request, context, session): 

487 res = session.execute( 

488 select(Event, EventOccurrence) 

489 .where(EventOccurrence.id == request.event_id) 

490 .where(EventOccurrence.event_id == Event.id) 

491 .where(~EventOccurrence.is_deleted) 

492 ).one_or_none() 

493 

494 if not res: 

495 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

496 

497 event, occurrence = res 

498 

499 occurrence.is_deleted = True 

500 

501 queue_job( 

502 session, 

503 "generate_event_delete_notifications", 

504 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

505 occurrence_id=occurrence.id, 

506 ), 

507 ) 

508 

509 return empty_pb2.Empty() 

510 

511 def ListUserIds(self, request, context, session): 

512 start_date = request.start_time.ToDatetime() 

513 end_date = request.end_time.ToDatetime() 

514 

515 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

516 next_user_id = int(request.page_token) if request.page_token else 0 

517 

518 user_ids = ( 

519 session.execute( 

520 select(User.id) 

521 .where(User.id >= next_user_id) 

522 .where(User.joined >= start_date) 

523 .where(User.joined <= end_date) 

524 .order_by(User.joined.desc()) 

525 .limit(page_size + 1) 

526 ) 

527 .scalars() 

528 .all() 

529 ) 

530 

531 return admin_pb2.ListUserIdsRes( 

532 user_ids=user_ids[:page_size], 

533 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

534 ) 

535 

536 def EditReferenceText(self, request, context, session): 

537 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

538 

539 if reference is None: 

540 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND) 

541 

542 if not request.new_text.strip(): 

543 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.REFERENCE_NO_TEXT) 

544 

545 reference.text = request.new_text.strip() 

546 return empty_pb2.Empty() 

547 

548 def DeleteReference(self, request, context, session): 

549 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

550 

551 if reference is None: 

552 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND) 

553 

554 reference.is_deleted = True 

555 return empty_pb2.Empty()