Coverage for src/couchers/servicers/admin.py: 78%

250 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 16:44 +0000

1import json 

2import logging 

3from datetime import timedelta 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from shapely.geometry import shape 

8from sqlalchemy.sql import or_, select, update 

9 

10from couchers import errors, urls 

11from couchers.db import session_scope 

12from couchers.helpers.badges import user_add_badge, user_remove_badge 

13from couchers.helpers.clusters import create_cluster, create_node 

14from couchers.jobs.enqueue import queue_job 

15from couchers.models import ( 

16 ContentReport, 

17 Event, 

18 EventCommunityInviteRequest, 

19 EventOccurrence, 

20 GroupChat, 

21 GroupChatSubscription, 

22 HostRequest, 

23 Message, 

24 User, 

25 UserBadge, 

26) 

27from couchers.notifications.notify import notify 

28from couchers.resources import get_badge_dict 

29from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb 

30from couchers.servicers.auth import create_session 

31from couchers.servicers.communities import community_to_pb 

32from couchers.servicers.events import get_users_to_notify_for_new_event 

33from couchers.sql import couchers_select as select 

34from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date 

35from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2 

36from proto.internal import jobs_pb2 

37 

38logger = logging.getLogger(__name__) 

39 

40MAX_PAGINATION_LENGTH = 250 

41 

42 

43def _user_to_details(session, user): 

44 return admin_pb2.UserDetails( 

45 user_id=user.id, 

46 username=user.username, 

47 name=user.name, 

48 email=user.email, 

49 gender=user.gender, 

50 birthdate=date_to_api(user.birthdate), 

51 banned=user.is_banned, 

52 deleted=user.is_deleted, 

53 do_not_email=user.do_not_email, 

54 badges=[badge.badge_id for badge in user.badges], 

55 **get_strong_verification_fields(session, user), 

56 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

57 admin_note=user.admin_note, 

58 ) 

59 

60 

61def _content_report_to_pb(content_report: ContentReport): 

62 return admin_pb2.ContentReport( 

63 content_report_id=content_report.id, 

64 time=Timestamp_from_datetime(content_report.time), 

65 reporting_user_id=content_report.reporting_user_id, 

66 author_user_id=content_report.author_user_id, 

67 reason=content_report.reason, 

68 description=content_report.description, 

69 content_ref=content_report.content_ref, 

70 user_agent=content_report.user_agent, 

71 page=content_report.page, 

72 ) 

73 

74 

75def append_admin_note(session, context, user, note): 

76 if not note.strip(): 

77 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY) 

78 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

79 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n" 

80 

81 

82class Admin(admin_pb2_grpc.AdminServicer): 

83 def GetUserDetails(self, request, context): 

84 with session_scope() as session: 

85 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

86 if not user: 

87 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

88 return _user_to_details(session, user) 

89 

90 def GetUser(self, request, context): 

91 with session_scope() as session: 

92 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

93 if not user: 

94 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

95 return user_model_to_pb(user, session, context) 

96 

97 def ChangeUserGender(self, request, context): 

98 with session_scope() as session: 

99 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

100 if not user: 

101 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

102 user.gender = request.gender 

103 session.commit() 

104 

105 notify( 

106 user_id=user.id, 

107 topic_action="gender:change", 

108 data=notification_data_pb2.GenderChange( 

109 gender=request.gender, 

110 ), 

111 ) 

112 

113 return _user_to_details(session, user) 

114 

115 def ChangeUserBirthdate(self, request, context): 

116 with session_scope() as session: 

117 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

118 if not user: 

119 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

120 user.birthdate = parse_date(request.birthdate) 

121 session.commit() 

122 

123 notify( 

124 user_id=user.id, 

125 topic_action="birthdate:change", 

126 data=notification_data_pb2.BirthdateChange( 

127 birthdate=request.birthdate, 

128 ), 

129 ) 

130 

131 return _user_to_details(session, user) 

132 

133 def AddBadge(self, request, context): 

134 with session_scope() as session: 

135 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

136 if not user: 

137 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

138 

139 badge = get_badge_dict().get(request.badge_id) 

140 if not badge: 

141 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

142 

143 if not badge["admin_editable"]: 

144 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

145 

146 if badge["id"] in [b.badge_id for b in user.badges]: 

147 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE) 

148 

149 user_add_badge(session, user.id, request.badge_id) 

150 

151 return _user_to_details(session, user) 

152 

153 def RemoveBadge(self, request, context): 

154 with session_scope() as session: 

155 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

156 if not user: 

157 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

158 

159 badge = get_badge_dict().get(request.badge_id) 

160 if not badge: 

161 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

162 

163 if not badge["admin_editable"]: 

164 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

165 

166 user_badge = session.execute( 

167 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"]) 

168 ).scalar_one_or_none() 

169 if not user_badge: 

170 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE) 

171 

172 user_remove_badge(session, user.id, request.badge_id) 

173 

174 return _user_to_details(session, user) 

175 

176 def SetPassportSexGenderException(self, request, context): 

177 with session_scope() as session: 

178 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

179 if not user: 

180 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

181 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

182 return _user_to_details(session, user) 

183 

184 def BanUser(self, request, context): 

185 with session_scope() as session: 

186 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

187 if not user: 

188 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

189 append_admin_note(session, context, user, request.admin_note) 

190 user.is_banned = True 

191 return _user_to_details(session, user) 

192 

193 def UnbanUser(self, request, context): 

194 with session_scope() as session: 

195 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

196 if not user: 

197 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

198 append_admin_note(session, context, user, request.admin_note) 

199 user.is_banned = False 

200 return _user_to_details(session, user) 

201 

202 def AddAdminNote(self, request, context): 

203 with session_scope() as session: 

204 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

205 if not user: 

206 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

207 append_admin_note(session, context, user, request.admin_note) 

208 return _user_to_details(session, user) 

209 

210 def GetContentReport(self, request, context): 

211 with session_scope() as session: 

212 content_report = session.execute( 

213 select(ContentReport).where(ContentReport.id == request.content_report_id) 

214 ).scalar_one_or_none() 

215 if not content_report: 

216 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND) 

217 return admin_pb2.GetContentReportRes( 

218 content_report=_content_report_to_pb(content_report), 

219 ) 

220 

221 def GetContentReportsForAuthor(self, request, context): 

222 with session_scope() as session: 

223 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

224 if not user: 

225 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

226 content_reports = ( 

227 session.execute( 

228 select(ContentReport) 

229 .where(ContentReport.author_user_id == user.id) 

230 .order_by(ContentReport.id.desc()) 

231 ) 

232 .scalars() 

233 .all() 

234 ) 

235 return admin_pb2.GetContentReportsForAuthorRes( 

236 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

237 ) 

238 

239 def DeleteUser(self, request, context): 

240 with session_scope() as session: 

241 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

242 if not user: 

243 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

244 user.is_deleted = True 

245 return _user_to_details(session, user) 

246 

247 def CreateApiKey(self, request, context): 

248 with session_scope() as session: 

249 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

250 if not user: 

251 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

252 token, expiry = create_session( 

253 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

254 ) 

255 

256 notify( 

257 user_id=user.id, 

258 topic_action="api_key:create", 

259 data=notification_data_pb2.ApiKeyCreate( 

260 api_key=token, 

261 expiry=Timestamp_from_datetime(expiry), 

262 ), 

263 ) 

264 

265 return _user_to_details(session, user) 

266 

267 def CreateCommunity(self, request, context): 

268 with session_scope() as session: 

269 geom = shape(json.loads(request.geojson)) 

270 

271 if geom.type != "MultiPolygon": 

272 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON) 

273 

274 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

275 node = create_node(session, geom, parent_node_id) 

276 create_cluster( 

277 session, node.id, request.name, request.description, context.user_id, request.admin_ids, True 

278 ) 

279 

280 return community_to_pb(node, context) 

281 

282 def GetChats(self, request, context): 

283 with session_scope() as session: 

284 

285 def format_user(user): 

286 return f"{user.name} ({user.username}, {user.id})" 

287 

288 def format_conversation(conversation_id): 

289 out = "" 

290 with session_scope() as session: 

291 messages = ( 

292 session.execute( 

293 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

294 ) 

295 .scalars() 

296 .all() 

297 ) 

298 for message in messages: 

299 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n" 

300 out += str(message.text) 

301 out += "\n\n-----\n" 

302 out += "\n\n\n\n" 

303 return out 

304 

305 def format_host_request(host_request_id): 

306 out = "" 

307 with session_scope() as session: 

308 host_request = session.execute( 

309 select(HostRequest).where(HostRequest.conversation_id == host_request_id) 

310 ).scalar_one() 

311 out += "==============================\n" 

312 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n" 

313 out += format_conversation(host_request.conversation_id) 

314 out += "\n\n\n\n" 

315 return out 

316 

317 def format_group_chat(group_chat_id): 

318 out = "" 

319 with session_scope() as session: 

320 group_chat = session.execute( 

321 select(GroupChat).where(GroupChat.conversation_id == group_chat_id) 

322 ).scalar_one() 

323 out += "==============================\n" 

324 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n" 

325 subs = ( 

326 session.execute( 

327 select(GroupChatSubscription) 

328 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

329 .order_by(GroupChatSubscription.joined.asc()) 

330 ) 

331 .scalars() 

332 .all() 

333 ) 

334 for sub in subs: 

335 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n" 

336 out += "\n\nMessages:\n" 

337 out += format_conversation(group_chat.conversation_id) 

338 out += "\n\n\n\n" 

339 return out 

340 

341 def format_all_chats_for_user(user_id): 

342 out = "" 

343 with session_scope() as session: 

344 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

345 out += f"Chats for user {format_user(user)}\n" 

346 host_request_ids = ( 

347 session.execute( 

348 select(HostRequest.conversation_id) 

349 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id)) 

350 .order_by(HostRequest.conversation_id.desc()) 

351 ) 

352 .scalars() 

353 .all() 

354 ) 

355 out += f"************************************* Requests ({len(host_request_ids)})\n" 

356 for host_request in host_request_ids: 

357 out += format_host_request(host_request) 

358 group_chat_ids = ( 

359 session.execute( 

360 select(GroupChatSubscription.group_chat_id) 

361 .where(GroupChatSubscription.user_id == user_id) 

362 .order_by(GroupChatSubscription.joined.desc()) 

363 ) 

364 .scalars() 

365 .all() 

366 ) 

367 out += f"************************************* Group chats ({len(group_chat_ids)})\n" 

368 for group_chat_id in group_chat_ids: 

369 out += format_group_chat(group_chat_id) 

370 return out 

371 

372 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

373 if not user: 

374 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

375 

376 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id)) 

377 

378 def ListEventCommunityInviteRequests(self, request, context): 

379 with session_scope() as session: 

380 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

381 next_request_id = int(request.page_token) if request.page_token else 0 

382 requests = ( 

383 session.execute( 

384 select(EventCommunityInviteRequest) 

385 .where(EventCommunityInviteRequest.approved.is_(None)) 

386 .where(EventCommunityInviteRequest.id >= next_request_id) 

387 .order_by(EventCommunityInviteRequest.id) 

388 .limit(page_size + 1) 

389 ) 

390 .scalars() 

391 .all() 

392 ) 

393 

394 def _request_to_pb(request): 

395 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

396 return admin_pb2.EventCommunityInviteRequest( 

397 event_community_invite_request_id=request.id, 

398 user_id=request.user_id, 

399 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

400 approx_users_to_notify=len(users_to_notify), 

401 community_id=node_id, 

402 ) 

403 

404 return admin_pb2.ListEventCommunityInviteRequestsRes( 

405 requests=[_request_to_pb(request) for request in requests[:page_size]], 

406 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

407 ) 

408 

409 def DecideEventCommunityInviteRequest(self, request, context): 

410 with session_scope() as session: 

411 req = session.execute( 

412 select(EventCommunityInviteRequest).where( 

413 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

414 ) 

415 ).scalar_one_or_none() 

416 

417 if not req: 

418 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND) 

419 

420 if req.decided: 

421 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED) 

422 

423 decided = now() 

424 req.decided = decided 

425 req.decided_by_user_id = context.user_id 

426 req.approved = request.approve 

427 

428 # deny other reqs for the same event 

429 if request.approve: 

430 session.execute( 

431 update(EventCommunityInviteRequest) 

432 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

433 .where(EventCommunityInviteRequest.decided.is_(None)) 

434 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

435 ) 

436 

437 session.flush() 

438 

439 if request.approve: 

440 queue_job( 

441 "generate_event_create_notifications", 

442 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

443 inviting_user_id=req.user_id, 

444 occurrence_id=req.occurrence_id, 

445 approved=True, 

446 ), 

447 ) 

448 

449 return admin_pb2.DecideEventCommunityInviteRequestRes() 

450 

451 def DeleteEvent(self, request, context): 

452 with session_scope() as session: 

453 res = session.execute( 

454 select(Event, EventOccurrence) 

455 .where(EventOccurrence.id == request.event_id) 

456 .where(EventOccurrence.event_id == Event.id) 

457 .where(~EventOccurrence.is_deleted) 

458 ).one_or_none() 

459 

460 if not res: 

461 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

462 

463 event, occurrence = res 

464 

465 occurrence.is_deleted = True 

466 

467 queue_job( 

468 "generate_event_delete_notifications", 

469 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

470 occurrence_id=occurrence.id, 

471 ), 

472 ) 

473 

474 return empty_pb2.Empty()