Coverage for src/couchers/servicers/admin.py: 81%

262 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import json 

2import logging 

3from datetime import timedelta 

4 

5import grpc 

6from geoalchemy2.shape import from_shape 

7from google.protobuf import empty_pb2 

8from shapely.geometry import shape 

9from sqlalchemy.sql import or_, select, update 

10 

11from couchers import errors, urls 

12from couchers.helpers.badges import user_add_badge, user_remove_badge 

13from couchers.helpers.clusters import create_cluster, create_node 

14from couchers.jobs.enqueue import queue_job 

15from couchers.models import ( 

16 ContentReport, 

17 Event, 

18 EventCommunityInviteRequest, 

19 EventOccurrence, 

20 GroupChat, 

21 GroupChatSubscription, 

22 HostRequest, 

23 Message, 

24 ModNote, 

25 Node, 

26 User, 

27 UserBadge, 

28) 

29from couchers.notifications.notify import notify 

30from couchers.resources import get_badge_dict 

31from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb 

32from couchers.servicers.auth import create_session 

33from couchers.servicers.communities import community_to_pb 

34from couchers.servicers.events import get_users_to_notify_for_new_event 

35from couchers.sql import couchers_select as select 

36from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date 

37from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2 

38from proto.internal import jobs_pb2 

39 

40logger = logging.getLogger(__name__) 

41 

42MAX_PAGINATION_LENGTH = 250 

43 

44 

45def _user_to_details(session, user): 

46 return admin_pb2.UserDetails( 

47 user_id=user.id, 

48 username=user.username, 

49 name=user.name, 

50 email=user.email, 

51 gender=user.gender, 

52 birthdate=date_to_api(user.birthdate), 

53 banned=user.is_banned, 

54 deleted=user.is_deleted, 

55 do_not_email=user.do_not_email, 

56 badges=[badge.badge_id for badge in user.badges], 

57 **get_strong_verification_fields(session, user), 

58 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

59 admin_note=user.admin_note, 

60 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

61 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

62 ) 

63 

64 

65def _content_report_to_pb(content_report: ContentReport): 

66 return admin_pb2.ContentReport( 

67 content_report_id=content_report.id, 

68 time=Timestamp_from_datetime(content_report.time), 

69 reporting_user_id=content_report.reporting_user_id, 

70 author_user_id=content_report.author_user_id, 

71 reason=content_report.reason, 

72 description=content_report.description, 

73 content_ref=content_report.content_ref, 

74 user_agent=content_report.user_agent, 

75 page=content_report.page, 

76 ) 

77 

78 

79def append_admin_note(session, context, user, note): 

80 if not note.strip(): 

81 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY) 

82 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

83 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n" 

84 

85 

86def load_community_geom(geojson, context): 

87 geom = shape(json.loads(geojson)) 

88 

89 if geom.geom_type != "MultiPolygon": 

90 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON) 

91 

92 return geom 

93 

94 

95class Admin(admin_pb2_grpc.AdminServicer): 

96 def GetUserDetails(self, request, context, session): 

97 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

98 if not user: 

99 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

100 return _user_to_details(session, user) 

101 

102 def GetUser(self, request, context, session): 

103 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

104 if not user: 

105 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

106 return user_model_to_pb(user, session, context) 

107 

108 def ChangeUserGender(self, request, context, session): 

109 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

110 if not user: 

111 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

112 user.gender = request.gender 

113 session.commit() 

114 

115 notify( 

116 session, 

117 user_id=user.id, 

118 topic_action="gender:change", 

119 data=notification_data_pb2.GenderChange( 

120 gender=request.gender, 

121 ), 

122 ) 

123 

124 return _user_to_details(session, user) 

125 

126 def ChangeUserBirthdate(self, request, context, session): 

127 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

128 if not user: 

129 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

130 user.birthdate = parse_date(request.birthdate) 

131 session.commit() 

132 

133 notify( 

134 session, 

135 user_id=user.id, 

136 topic_action="birthdate:change", 

137 data=notification_data_pb2.BirthdateChange( 

138 birthdate=request.birthdate, 

139 ), 

140 ) 

141 

142 return _user_to_details(session, user) 

143 

144 def AddBadge(self, request, context, session): 

145 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

146 if not user: 

147 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

148 

149 badge = get_badge_dict().get(request.badge_id) 

150 if not badge: 

151 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

152 

153 if not badge["admin_editable"]: 

154 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

155 

156 if badge["id"] in [b.badge_id for b in user.badges]: 

157 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE) 

158 

159 user_add_badge(session, user.id, request.badge_id) 

160 

161 return _user_to_details(session, user) 

162 

163 def RemoveBadge(self, request, context, session): 

164 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

165 if not user: 

166 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

167 

168 badge = get_badge_dict().get(request.badge_id) 

169 if not badge: 

170 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

171 

172 if not badge["admin_editable"]: 

173 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

174 

175 user_badge = session.execute( 

176 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"]) 

177 ).scalar_one_or_none() 

178 if not user_badge: 

179 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE) 

180 

181 user_remove_badge(session, user.id, request.badge_id) 

182 

183 return _user_to_details(session, user) 

184 

185 def SetPassportSexGenderException(self, request, context, session): 

186 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

187 if not user: 

188 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

189 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

190 return _user_to_details(session, user) 

191 

192 def BanUser(self, request, context, session): 

193 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

194 if not user: 

195 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

196 append_admin_note(session, context, user, request.admin_note) 

197 user.is_banned = True 

198 return _user_to_details(session, user) 

199 

200 def UnbanUser(self, request, context, session): 

201 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

202 if not user: 

203 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

204 append_admin_note(session, context, user, request.admin_note) 

205 user.is_banned = False 

206 return _user_to_details(session, user) 

207 

208 def AddAdminNote(self, request, context, session): 

209 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

210 if not user: 

211 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

212 append_admin_note(session, context, user, request.admin_note) 

213 return _user_to_details(session, user) 

214 

215 def GetContentReport(self, request, context, session): 

216 content_report = session.execute( 

217 select(ContentReport).where(ContentReport.id == request.content_report_id) 

218 ).scalar_one_or_none() 

219 if not content_report: 

220 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND) 

221 return admin_pb2.GetContentReportRes( 

222 content_report=_content_report_to_pb(content_report), 

223 ) 

224 

225 def GetContentReportsForAuthor(self, request, context, session): 

226 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

227 if not user: 

228 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

229 content_reports = ( 

230 session.execute( 

231 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

232 ) 

233 .scalars() 

234 .all() 

235 ) 

236 return admin_pb2.GetContentReportsForAuthorRes( 

237 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

238 ) 

239 

240 def SendModNote(self, request, context, session): 

241 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

242 if not user: 

243 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

244 session.add( 

245 ModNote( 

246 user_id=user.id, 

247 internal_id=request.internal_id, 

248 creator_user_id=context.user_id, 

249 note_content=request.content, 

250 ) 

251 ) 

252 session.flush() 

253 

254 if not request.do_not_notify: 

255 notify( 

256 session, 

257 user_id=user.id, 

258 topic_action="modnote:create", 

259 ) 

260 

261 return _user_to_details(session, user) 

262 

263 def DeleteUser(self, request, context, session): 

264 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

265 if not user: 

266 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

267 user.is_deleted = True 

268 return _user_to_details(session, user) 

269 

270 def CreateApiKey(self, request, context, session): 

271 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

272 if not user: 

273 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

274 token, expiry = create_session( 

275 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

276 ) 

277 

278 notify( 

279 session, 

280 user_id=user.id, 

281 topic_action="api_key:create", 

282 data=notification_data_pb2.ApiKeyCreate( 

283 api_key=token, 

284 expiry=Timestamp_from_datetime(expiry), 

285 ), 

286 ) 

287 

288 return _user_to_details(session, user) 

289 

290 def CreateCommunity(self, request, context, session): 

291 geom = load_community_geom(request.geojson, context) 

292 

293 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

294 node = create_node(session, geom, parent_node_id) 

295 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

296 

297 return community_to_pb(session, node, context) 

298 

299 def UpdateCommunity(self, request, context, session): 

300 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

301 if not node: 

302 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

303 cluster = node.official_cluster 

304 

305 if request.name: 

306 cluster.name = request.name 

307 

308 if request.description: 

309 cluster.description = request.description 

310 

311 if request.geojson: 

312 geom = load_community_geom(request.geojson, context) 

313 

314 node.geom = from_shape(geom) 

315 

316 if request.parent_node_id != 0: 

317 node.parent_node_id = request.parent_node_id 

318 

319 session.flush() 

320 

321 return community_to_pb(session, cluster.parent_node, context) 

322 

323 def GetChats(self, request, context, session): 

324 def format_user(user): 

325 return f"{user.name} ({user.username}, {user.id})" 

326 

327 def format_conversation(conversation_id): 

328 out = "" 

329 messages = ( 

330 session.execute( 

331 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

332 ) 

333 .scalars() 

334 .all() 

335 ) 

336 for message in messages: 

337 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n" 

338 out += str(message.text) 

339 out += "\n\n-----\n" 

340 out += "\n\n\n\n" 

341 return out 

342 

343 def format_host_request(host_request_id): 

344 out = "" 

345 host_request = session.execute( 

346 select(HostRequest).where(HostRequest.conversation_id == host_request_id) 

347 ).scalar_one() 

348 out += "==============================\n" 

349 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n" 

350 out += format_conversation(host_request.conversation_id) 

351 out += "\n\n\n\n" 

352 return out 

353 

354 def format_group_chat(group_chat_id): 

355 out = "" 

356 group_chat = session.execute( 

357 select(GroupChat).where(GroupChat.conversation_id == group_chat_id) 

358 ).scalar_one() 

359 out += "==============================\n" 

360 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n" 

361 subs = ( 

362 session.execute( 

363 select(GroupChatSubscription) 

364 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

365 .order_by(GroupChatSubscription.joined.asc()) 

366 ) 

367 .scalars() 

368 .all() 

369 ) 

370 for sub in subs: 

371 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n" 

372 out += "\n\nMessages:\n" 

373 out += format_conversation(group_chat.conversation_id) 

374 out += "\n\n\n\n" 

375 return out 

376 

377 def format_all_chats_for_user(user_id): 

378 out = "" 

379 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

380 out += f"Chats for user {format_user(user)}\n" 

381 host_request_ids = ( 

382 session.execute( 

383 select(HostRequest.conversation_id) 

384 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id)) 

385 .order_by(HostRequest.conversation_id.desc()) 

386 ) 

387 .scalars() 

388 .all() 

389 ) 

390 out += f"************************************* Requests ({len(host_request_ids)})\n" 

391 for host_request in host_request_ids: 

392 out += format_host_request(host_request) 

393 group_chat_ids = ( 

394 session.execute( 

395 select(GroupChatSubscription.group_chat_id) 

396 .where(GroupChatSubscription.user_id == user_id) 

397 .order_by(GroupChatSubscription.joined.desc()) 

398 ) 

399 .scalars() 

400 .all() 

401 ) 

402 out += f"************************************* Group chats ({len(group_chat_ids)})\n" 

403 for group_chat_id in group_chat_ids: 

404 out += format_group_chat(group_chat_id) 

405 return out 

406 

407 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

408 if not user: 

409 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

410 

411 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id)) 

412 

413 def ListEventCommunityInviteRequests(self, request, context, session): 

414 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

415 next_request_id = int(request.page_token) if request.page_token else 0 

416 requests = ( 

417 session.execute( 

418 select(EventCommunityInviteRequest) 

419 .where(EventCommunityInviteRequest.approved.is_(None)) 

420 .where(EventCommunityInviteRequest.id >= next_request_id) 

421 .order_by(EventCommunityInviteRequest.id) 

422 .limit(page_size + 1) 

423 ) 

424 .scalars() 

425 .all() 

426 ) 

427 

428 def _request_to_pb(request): 

429 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

430 return admin_pb2.EventCommunityInviteRequest( 

431 event_community_invite_request_id=request.id, 

432 user_id=request.user_id, 

433 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

434 approx_users_to_notify=len(users_to_notify), 

435 community_id=node_id, 

436 ) 

437 

438 return admin_pb2.ListEventCommunityInviteRequestsRes( 

439 requests=[_request_to_pb(request) for request in requests[:page_size]], 

440 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

441 ) 

442 

443 def DecideEventCommunityInviteRequest(self, request, context, session): 

444 req = session.execute( 

445 select(EventCommunityInviteRequest).where( 

446 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

447 ) 

448 ).scalar_one_or_none() 

449 

450 if not req: 

451 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND) 

452 

453 if req.decided: 

454 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED) 

455 

456 decided = now() 

457 req.decided = decided 

458 req.decided_by_user_id = context.user_id 

459 req.approved = request.approve 

460 

461 # deny other reqs for the same event 

462 if request.approve: 

463 session.execute( 

464 update(EventCommunityInviteRequest) 

465 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

466 .where(EventCommunityInviteRequest.decided.is_(None)) 

467 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

468 ) 

469 

470 session.flush() 

471 

472 if request.approve: 

473 queue_job( 

474 session, 

475 "generate_event_create_notifications", 

476 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

477 inviting_user_id=req.user_id, 

478 occurrence_id=req.occurrence_id, 

479 approved=True, 

480 ), 

481 ) 

482 

483 return admin_pb2.DecideEventCommunityInviteRequestRes() 

484 

485 def DeleteEvent(self, request, context, session): 

486 res = session.execute( 

487 select(Event, EventOccurrence) 

488 .where(EventOccurrence.id == request.event_id) 

489 .where(EventOccurrence.event_id == Event.id) 

490 .where(~EventOccurrence.is_deleted) 

491 ).one_or_none() 

492 

493 if not res: 

494 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

495 

496 event, occurrence = res 

497 

498 occurrence.is_deleted = True 

499 

500 queue_job( 

501 session, 

502 "generate_event_delete_notifications", 

503 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

504 occurrence_id=occurrence.id, 

505 ), 

506 ) 

507 

508 return empty_pb2.Empty() 

509 

510 def ListUserIds(self, request, context, session): 

511 start_date = request.start_time.ToDatetime() 

512 end_date = request.end_time.ToDatetime() 

513 

514 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

515 next_user_id = int(request.page_token) if request.page_token else 0 

516 

517 user_ids = ( 

518 session.execute( 

519 select(User.id) 

520 .where(User.id >= next_user_id) 

521 .where(User.joined >= start_date) 

522 .where(User.joined <= end_date) 

523 .order_by(User.joined.desc()) 

524 .limit(page_size + 1) 

525 ) 

526 .scalars() 

527 .all() 

528 ) 

529 

530 return admin_pb2.ListUserIdsRes( 

531 user_ids=user_ids[:page_size], 

532 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

533 )