Coverage for src/couchers/servicers/admin.py: 79%

242 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-12-20 18:03 +0000

1import json 

2import logging 

3from datetime import timedelta 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from shapely.geometry import shape 

8from sqlalchemy.sql import or_, select, update 

9 

10from couchers import errors, urls 

11from couchers.helpers.badges import user_add_badge, user_remove_badge 

12from couchers.helpers.clusters import create_cluster, create_node 

13from couchers.jobs.enqueue import queue_job 

14from couchers.models import ( 

15 ContentReport, 

16 Event, 

17 EventCommunityInviteRequest, 

18 EventOccurrence, 

19 GroupChat, 

20 GroupChatSubscription, 

21 HostRequest, 

22 Message, 

23 ModNote, 

24 User, 

25 UserBadge, 

26) 

27from couchers.notifications.notify import notify 

28from couchers.resources import get_badge_dict 

29from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb 

30from couchers.servicers.auth import create_session 

31from couchers.servicers.communities import community_to_pb 

32from couchers.servicers.events import get_users_to_notify_for_new_event 

33from couchers.sql import couchers_select as select 

34from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date 

35from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2 

36from proto.internal import jobs_pb2 

37 

38logger = logging.getLogger(__name__) 

39 

40MAX_PAGINATION_LENGTH = 250 

41 

42 

43def _user_to_details(session, user): 

44 return admin_pb2.UserDetails( 

45 user_id=user.id, 

46 username=user.username, 

47 name=user.name, 

48 email=user.email, 

49 gender=user.gender, 

50 birthdate=date_to_api(user.birthdate), 

51 banned=user.is_banned, 

52 deleted=user.is_deleted, 

53 do_not_email=user.do_not_email, 

54 badges=[badge.badge_id for badge in user.badges], 

55 **get_strong_verification_fields(session, user), 

56 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

57 admin_note=user.admin_note, 

58 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

59 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

60 ) 

61 

62 

63def _content_report_to_pb(content_report: ContentReport): 

64 return admin_pb2.ContentReport( 

65 content_report_id=content_report.id, 

66 time=Timestamp_from_datetime(content_report.time), 

67 reporting_user_id=content_report.reporting_user_id, 

68 author_user_id=content_report.author_user_id, 

69 reason=content_report.reason, 

70 description=content_report.description, 

71 content_ref=content_report.content_ref, 

72 user_agent=content_report.user_agent, 

73 page=content_report.page, 

74 ) 

75 

76 

77def append_admin_note(session, context, user, note): 

78 if not note.strip(): 

79 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY) 

80 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

81 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n" 

82 

83 

84class Admin(admin_pb2_grpc.AdminServicer): 

85 def GetUserDetails(self, request, context, session): 

86 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

87 if not user: 

88 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

89 return _user_to_details(session, user) 

90 

91 def GetUser(self, request, context, session): 

92 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

93 if not user: 

94 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

95 return user_model_to_pb(user, session, context) 

96 

97 def ChangeUserGender(self, request, context, session): 

98 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

99 if not user: 

100 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

101 user.gender = request.gender 

102 session.commit() 

103 

104 notify( 

105 session, 

106 user_id=user.id, 

107 topic_action="gender:change", 

108 data=notification_data_pb2.GenderChange( 

109 gender=request.gender, 

110 ), 

111 ) 

112 

113 return _user_to_details(session, user) 

114 

115 def ChangeUserBirthdate(self, request, context, session): 

116 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

117 if not user: 

118 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

119 user.birthdate = parse_date(request.birthdate) 

120 session.commit() 

121 

122 notify( 

123 session, 

124 user_id=user.id, 

125 topic_action="birthdate:change", 

126 data=notification_data_pb2.BirthdateChange( 

127 birthdate=request.birthdate, 

128 ), 

129 ) 

130 

131 return _user_to_details(session, user) 

132 

133 def AddBadge(self, request, context, session): 

134 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

135 if not user: 

136 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

137 

138 badge = get_badge_dict().get(request.badge_id) 

139 if not badge: 

140 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

141 

142 if not badge["admin_editable"]: 

143 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

144 

145 if badge["id"] in [b.badge_id for b in user.badges]: 

146 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE) 

147 

148 user_add_badge(session, user.id, request.badge_id) 

149 

150 return _user_to_details(session, user) 

151 

152 def RemoveBadge(self, request, context, session): 

153 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

154 if not user: 

155 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

156 

157 badge = get_badge_dict().get(request.badge_id) 

158 if not badge: 

159 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

160 

161 if not badge["admin_editable"]: 

162 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

163 

164 user_badge = session.execute( 

165 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"]) 

166 ).scalar_one_or_none() 

167 if not user_badge: 

168 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE) 

169 

170 user_remove_badge(session, user.id, request.badge_id) 

171 

172 return _user_to_details(session, user) 

173 

174 def SetPassportSexGenderException(self, request, context, session): 

175 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

176 if not user: 

177 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

178 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

179 return _user_to_details(session, user) 

180 

181 def BanUser(self, request, context, session): 

182 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

183 if not user: 

184 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

185 append_admin_note(session, context, user, request.admin_note) 

186 user.is_banned = True 

187 return _user_to_details(session, user) 

188 

189 def UnbanUser(self, request, context, session): 

190 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

191 if not user: 

192 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

193 append_admin_note(session, context, user, request.admin_note) 

194 user.is_banned = False 

195 return _user_to_details(session, user) 

196 

197 def AddAdminNote(self, request, context, session): 

198 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

199 if not user: 

200 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

201 append_admin_note(session, context, user, request.admin_note) 

202 return _user_to_details(session, user) 

203 

204 def GetContentReport(self, request, context, session): 

205 content_report = session.execute( 

206 select(ContentReport).where(ContentReport.id == request.content_report_id) 

207 ).scalar_one_or_none() 

208 if not content_report: 

209 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND) 

210 return admin_pb2.GetContentReportRes( 

211 content_report=_content_report_to_pb(content_report), 

212 ) 

213 

214 def GetContentReportsForAuthor(self, request, context, session): 

215 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

216 if not user: 

217 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

218 content_reports = ( 

219 session.execute( 

220 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

221 ) 

222 .scalars() 

223 .all() 

224 ) 

225 return admin_pb2.GetContentReportsForAuthorRes( 

226 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

227 ) 

228 

229 def SendModNote(self, request, context, session): 

230 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

231 if not user: 

232 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

233 session.add( 

234 ModNote( 

235 user_id=user.id, 

236 internal_id=request.internal_id, 

237 creator_user_id=context.user_id, 

238 note_content=request.content, 

239 ) 

240 ) 

241 session.flush() 

242 

243 if not request.do_not_notify: 

244 notify( 

245 session, 

246 user_id=user.id, 

247 topic_action="modnote:create", 

248 ) 

249 

250 return _user_to_details(session, user) 

251 

252 def DeleteUser(self, request, context, session): 

253 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

254 if not user: 

255 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

256 user.is_deleted = True 

257 return _user_to_details(session, user) 

258 

259 def CreateApiKey(self, request, context, session): 

260 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

261 if not user: 

262 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

263 token, expiry = create_session( 

264 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

265 ) 

266 

267 notify( 

268 session, 

269 user_id=user.id, 

270 topic_action="api_key:create", 

271 data=notification_data_pb2.ApiKeyCreate( 

272 api_key=token, 

273 expiry=Timestamp_from_datetime(expiry), 

274 ), 

275 ) 

276 

277 return _user_to_details(session, user) 

278 

279 def CreateCommunity(self, request, context, session): 

280 geom = shape(json.loads(request.geojson)) 

281 

282 if geom.type != "MultiPolygon": 

283 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON) 

284 

285 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

286 node = create_node(session, geom, parent_node_id) 

287 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

288 

289 return community_to_pb(session, node, context) 

290 

291 def GetChats(self, request, context, session): 

292 def format_user(user): 

293 return f"{user.name} ({user.username}, {user.id})" 

294 

295 def format_conversation(conversation_id): 

296 out = "" 

297 messages = ( 

298 session.execute( 

299 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

300 ) 

301 .scalars() 

302 .all() 

303 ) 

304 for message in messages: 

305 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n" 

306 out += str(message.text) 

307 out += "\n\n-----\n" 

308 out += "\n\n\n\n" 

309 return out 

310 

311 def format_host_request(host_request_id): 

312 out = "" 

313 host_request = session.execute( 

314 select(HostRequest).where(HostRequest.conversation_id == host_request_id) 

315 ).scalar_one() 

316 out += "==============================\n" 

317 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n" 

318 out += format_conversation(host_request.conversation_id) 

319 out += "\n\n\n\n" 

320 return out 

321 

322 def format_group_chat(group_chat_id): 

323 out = "" 

324 group_chat = session.execute( 

325 select(GroupChat).where(GroupChat.conversation_id == group_chat_id) 

326 ).scalar_one() 

327 out += "==============================\n" 

328 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n" 

329 subs = ( 

330 session.execute( 

331 select(GroupChatSubscription) 

332 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

333 .order_by(GroupChatSubscription.joined.asc()) 

334 ) 

335 .scalars() 

336 .all() 

337 ) 

338 for sub in subs: 

339 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n" 

340 out += "\n\nMessages:\n" 

341 out += format_conversation(group_chat.conversation_id) 

342 out += "\n\n\n\n" 

343 return out 

344 

345 def format_all_chats_for_user(user_id): 

346 out = "" 

347 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

348 out += f"Chats for user {format_user(user)}\n" 

349 host_request_ids = ( 

350 session.execute( 

351 select(HostRequest.conversation_id) 

352 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id)) 

353 .order_by(HostRequest.conversation_id.desc()) 

354 ) 

355 .scalars() 

356 .all() 

357 ) 

358 out += f"************************************* Requests ({len(host_request_ids)})\n" 

359 for host_request in host_request_ids: 

360 out += format_host_request(host_request) 

361 group_chat_ids = ( 

362 session.execute( 

363 select(GroupChatSubscription.group_chat_id) 

364 .where(GroupChatSubscription.user_id == user_id) 

365 .order_by(GroupChatSubscription.joined.desc()) 

366 ) 

367 .scalars() 

368 .all() 

369 ) 

370 out += f"************************************* Group chats ({len(group_chat_ids)})\n" 

371 for group_chat_id in group_chat_ids: 

372 out += format_group_chat(group_chat_id) 

373 return out 

374 

375 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

376 if not user: 

377 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

378 

379 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id)) 

380 

381 def ListEventCommunityInviteRequests(self, request, context, session): 

382 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

383 next_request_id = int(request.page_token) if request.page_token else 0 

384 requests = ( 

385 session.execute( 

386 select(EventCommunityInviteRequest) 

387 .where(EventCommunityInviteRequest.approved.is_(None)) 

388 .where(EventCommunityInviteRequest.id >= next_request_id) 

389 .order_by(EventCommunityInviteRequest.id) 

390 .limit(page_size + 1) 

391 ) 

392 .scalars() 

393 .all() 

394 ) 

395 

396 def _request_to_pb(request): 

397 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

398 return admin_pb2.EventCommunityInviteRequest( 

399 event_community_invite_request_id=request.id, 

400 user_id=request.user_id, 

401 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

402 approx_users_to_notify=len(users_to_notify), 

403 community_id=node_id, 

404 ) 

405 

406 return admin_pb2.ListEventCommunityInviteRequestsRes( 

407 requests=[_request_to_pb(request) for request in requests[:page_size]], 

408 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

409 ) 

410 

411 def DecideEventCommunityInviteRequest(self, request, context, session): 

412 req = session.execute( 

413 select(EventCommunityInviteRequest).where( 

414 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

415 ) 

416 ).scalar_one_or_none() 

417 

418 if not req: 

419 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND) 

420 

421 if req.decided: 

422 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED) 

423 

424 decided = now() 

425 req.decided = decided 

426 req.decided_by_user_id = context.user_id 

427 req.approved = request.approve 

428 

429 # deny other reqs for the same event 

430 if request.approve: 

431 session.execute( 

432 update(EventCommunityInviteRequest) 

433 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

434 .where(EventCommunityInviteRequest.decided.is_(None)) 

435 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

436 ) 

437 

438 session.flush() 

439 

440 if request.approve: 

441 queue_job( 

442 session, 

443 "generate_event_create_notifications", 

444 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

445 inviting_user_id=req.user_id, 

446 occurrence_id=req.occurrence_id, 

447 approved=True, 

448 ), 

449 ) 

450 

451 return admin_pb2.DecideEventCommunityInviteRequestRes() 

452 

453 def DeleteEvent(self, request, context, session): 

454 res = session.execute( 

455 select(Event, EventOccurrence) 

456 .where(EventOccurrence.id == request.event_id) 

457 .where(EventOccurrence.event_id == Event.id) 

458 .where(~EventOccurrence.is_deleted) 

459 ).one_or_none() 

460 

461 if not res: 

462 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

463 

464 event, occurrence = res 

465 

466 occurrence.is_deleted = True 

467 

468 queue_job( 

469 session, 

470 "generate_event_delete_notifications", 

471 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

472 occurrence_id=occurrence.id, 

473 ), 

474 ) 

475 

476 return empty_pb2.Empty() 

477 

478 def ListUserIds(self, request, context, session): 

479 start_date = request.start_time.ToDatetime() 

480 end_date = request.end_time.ToDatetime() 

481 

482 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

483 next_user_id = int(request.page_token) if request.page_token else 0 

484 

485 user_ids = ( 

486 session.execute( 

487 select(User.id) 

488 .where(User.id >= next_user_id) 

489 .where(User.joined >= start_date) 

490 .where(User.joined <= end_date) 

491 .order_by(User.joined.desc()) 

492 .limit(page_size + 1) 

493 ) 

494 .scalars() 

495 .all() 

496 ) 

497 

498 return admin_pb2.ListUserIdsRes( 

499 user_ids=user_ids[:page_size], 

500 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

501 )